vlambda博客
学习文章列表

在微服务中使用Apache Kafka进行异步通信


尽管微服务架构可能并不是所有系统的灵丹妙药,但它无疑具有其优势,尤其是在构建具有许多不同组件的复杂系统时。如果您正在考虑微服务,则考虑不同服务的通信方式。
在本文中,我们将研究如何设置Apache Kafka实例,创建用户服务以将数据发布到主题,以及构建通知服务以使用这些主题中的数据。具体来说,我们将构建一个两步验证应用程序,用户可以在该程序中进行注册,接收带有验证码的邮件,并使用该代码完成注册。源代码可以在这里找到。

为什么选择Apache Kafka?

Kafka是LinkedIn于2011年创建的分布式流媒体平台,用于处理高吞吐量,低延迟传输以及实时处理记录流。它的三大功能使其非常适合此用例:

  • 发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。

  • 以容错方式存储记录流。

  • 处理发生的记录流。

设置Apache Kafka

开始本教程之前,需要满足以下条件:

  • Mac版Docker或Windows版Docker

  • Docker Compose的知识

  • Node.js的知识

我们将使用Wurstmeister Kafka Docker映像。请注意,Kafka使用Zookeeper在不同的Kafka节点之间进行协调。

docker-compose.yml类似docker-compose.yml用于为Kafka和Zookeeper提取图像。Kafka服务所需的配置选项之一是KAFKA_ZOOKEEPER_CONNECT ,它告诉Kafka在哪里可以找到Zookeeper实例。


  
    
    
  
  1. version: '2.1'

  2. services:

  3. zookeeper:

  4. container_name: zookeeper

  5. image: wurstmeister/zookeeper

  6. ports:

  7. - "2181:2181"

  8. kafka:

  9. container_name: kafka

  10. image: wurstmeister/kafka

  11. ports:

  12. - "9092"

  13. depends_on:

  14. - "zookeeper"

  15. environment:

  16. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

将数据发布到Kafka主题

要将数据发布到Kafka主题,我们将创建一个提供两个端点的用户服务:

  • /api/register –将用户详细信息存储在内存中存储节点缓存中,并将用户数据发布到Kafka主题user_account_created 。

  • /api/verify –验证提供的代码正确,并将用户数据发布到Kafka主题user_account_verified 。

我们使用node-rdkafka NPM软件包创建一个生产者,该生产者从我们的节点应用程序连接到Kafka:


  
    
    
  
  1. let producerReady;

  2. producer = new kafka.Producer({

  3. debug: 'all',

  4. 'client.id': 'user-api',

  5. 'metadata.broker.list': KAFKA_BROKER_LIST,

  6. 'compression.codec': 'gzip',

  7. 'retry.backoff.ms': 200,

  8. 'message.send.max.retries': 10,

  9. 'socket.keepalive.enable': true,

  10. 'queue.buffering.max.messages': 100000,

  11. 'queue.buffering.max.ms': 1000,

  12. 'batch.num.messages': 1000000,

  13. dr_cb: true

  14. });

  15. producer.connect({}, err => {

  16. if (err) {

  17. logger.error('connect', err);

  18. }

  19. });

  20. producerReady = new Promise((resolve, reject) => {

  21. producer.on('ready', () => {

  22. logger.info('producer ready');

  23. resolve(producer);

  24. });

  25. });

我们创建一个新的Promise对象,该对象解析为准备开始发布数据的生产者。这在我们的sendMessage函数中使用,该函数将数据发布到Kafka主题分区:


  
    
    
  
  1. KafkaService.prototype.sendMessage = function sendMessage(

  2. topic,

  3. payload,

  4. partition = 0

  5. ) {

  6. return producerReady

  7. .then(producer => {

  8. const message = Buffer.from(JSON.stringify(payload));

  9. producer.produce(topic, partition, message);

  10. })

  11. .catch(error => logger.error('unable to send message', error));

  12. };

使用Kafka主题中的数据

为了使用Kafka主题中的数据,我们将创建一个通知服务,以监听来自我们主题的数据,并根据从中获取数据的主题发送带有验证码或成功消息的电子邮件。

我们创建一个连接到Kafka的使用者,其中KAFKA_BROKER_LIST是所有Kafka实例的逗号分隔列表。


  
    
    
  
  1. process.stdin.resume(); // keep process alive


  2. require('dotenv').config();


  3. const Kafka = require('node-rdkafka');


  4. const logger = require('./logger');


  5. const sendMail = require('./email');


  6. const KAFKA_BROKER_LIST = process.env.KAFKA_BROKER_LIST;


  7. const consumer = new Kafka.KafkaConsumer({

  8. //'debug': 'all',

  9. 'metadata.broker.list': KAFKA_BROKER_LIST,

  10. 'group.id': 'notification-service',

  11. 'enable.auto.commit': false

  12. });

node-rdkafka返回的使用者对象是可读流的实例。我们等待ready事件订阅我们的主题user_account_createduser_account_verified ,并侦听这些主题中的数据:


  
    
    
  
  1. const topics = [

  2. 'user_account_created',

  3. 'user_account_verified'

  4. ];


  5. //counter to commit offsets every numMessages are received

  6. let counter = 0;

  7. let numMessages = 5;


  8. consumer.on('ready', function(arg) {

  9. logger.info('consumer ready.' + JSON.stringify(arg));


  10. consumer.subscribe(topics);

  11. //start consuming messages

  12. consumer.consume();

  13. });


  14. consumer.on('data', function(metadata) {

  15. counter++;


  16. //committing offsets every numMessages

  17. if (counter % numMessages === 0) {

  18. logger.info('calling commit');

  19. consumer.commit(metadata);

  20. }


  21. // Output the actual message contents

  22. const data = JSON.parse(metadata.value.toString());

  23. logger.info('data value', data);


  24. if(metadata.topic === 'user_account_created'){

  25. const to = data.email;

  26. const subject = 'Verify Account';

  27. const content = `Hello ${data.first_name},

  28. Please use this code ${data.code} to complete your verification`;

  29. sendMail(subject, content,to);

  30. }else if(metadata.topic === 'user_account_verified') {

  31. const to = data.email;

  32. const subject = 'Account Verified';

  33. const content = `Hello ${data.first_name},

  34. You have successfully been verified`;

  35. sendMail(subject, content,to);

  36. }


  37. });


  38. consumer.on('disconnected', function(arg) {

  39. logger.info('consumer disconnected. ' + JSON.stringify(arg));

  40. });


  41. //logging all errors

  42. consumer.on('event.error', function(err) {

  43. logger.error('Error from consumer', err, 'code: ', err.code);

  44. });


  45. //starting the consumer

  46. consumer.connect();

当消息发布到我们正在侦听的任何主题时,将调用data事件处理程序。在这里,我们解析传入的消息并检查元数据对象,以了解接收到的数据所针对的主题,因此我们可以执行适当的操作。

结论

我们的两因素身份验证应用程序演示了使用Apache Kafka(还有其他系统,例如RabbitMQ , ZeroMQ )的两个微服务之间的通信模式,相对于Feign增加了未来的灵活性。
例如,假设我们将来会添加一个推荐服务,当新用户登录时,该服务需要发送推荐;它仅订阅user_account_verified主题,因此无需更改用户服务。

系统架构结论
(1)数据管道场景,MQ比cache更合适;
(2)服务化架构,不应该绕过service读取其后端的cache/db,而应该通过RPC接口访问;
(3)MQ通信可扩展