在微服务中使用Apache Kafka进行异步通信
为什么选择Apache Kafka?
Kafka是LinkedIn于2011年创建的分布式流媒体平台,用于处理高吞吐量,低延迟传输以及实时处理记录流。它的三大功能使其非常适合此用例:
发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。
以容错方式存储记录流。
处理发生的记录流。
设置Apache Kafka
开始本教程之前,需要满足以下条件:
Mac版Docker或Windows版Docker
Docker Compose的知识
Node.js的知识
我们将使用Wurstmeister Kafka Docker映像。请注意,Kafka使用Zookeeper在不同的Kafka节点之间进行协调。
docker-compose.yml
类似docker-compose.yml
用于为Kafka和Zookeeper提取图像。Kafka服务所需的配置选项之一是KAFKA_ZOOKEEPER_CONNECT
,它告诉Kafka在哪里可以找到Zookeeper实例。
version: '2.1'
services:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
ports:
- "9092"
depends_on:
- "zookeeper"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
将数据发布到Kafka主题
要将数据发布到Kafka主题,我们将创建一个提供两个端点的用户服务:
/api/register
–将用户详细信息存储在内存中存储节点缓存中,并将用户数据发布到Kafka主题user_account_created
。/api/verify
–验证提供的代码正确,并将用户数据发布到Kafka主题user_account_verified
。
我们使用node-rdkafka
NPM软件包创建一个生产者,该生产者从我们的节点应用程序连接到Kafka:
let producerReady;
producer = new kafka.Producer({
debug: 'all',
'client.id': 'user-api',
'metadata.broker.list': KAFKA_BROKER_LIST,
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
dr_cb: true
});
producer.connect({}, err => {
if (err) {
logger.error('connect', err);
}
});
producerReady = new Promise((resolve, reject) => {
producer.on('ready', () => {
logger.info('producer ready');
resolve(producer);
});
});
我们创建一个新的Promise对象,该对象解析为准备开始发布数据的生产者。这在我们的sendMessage
函数中使用,该函数将数据发布到Kafka主题分区:
KafkaService.prototype.sendMessage = function sendMessage(
topic,
payload,
partition = 0
) {
return producerReady
.then(producer => {
const message = Buffer.from(JSON.stringify(payload));
producer.produce(topic, partition, message);
})
.catch(error => logger.error('unable to send message', error));
};
使用Kafka主题中的数据
为了使用Kafka主题中的数据,我们将创建一个通知服务,以监听来自我们主题的数据,并根据从中获取数据的主题发送带有验证码或成功消息的电子邮件。
我们创建一个连接到Kafka的使用者,其中KAFKA_BROKER_LIST
是所有Kafka实例的逗号分隔列表。
process.stdin.resume(); // keep process alive
require('dotenv').config();
const Kafka = require('node-rdkafka');
const logger = require('./logger');
const sendMail = require('./email');
const KAFKA_BROKER_LIST = process.env.KAFKA_BROKER_LIST;
const consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': KAFKA_BROKER_LIST,
'group.id': 'notification-service',
'enable.auto.commit': false
});
node-rdkafka
返回的使用者对象是可读流的实例。我们等待ready
事件订阅我们的主题user_account_created
和user_account_verified
,并侦听这些主题中的数据:
const topics = [
'user_account_created',
'user_account_verified'
];
//counter to commit offsets every numMessages are received
let counter = 0;
let numMessages = 5;
consumer.on('ready', function(arg) {
logger.info('consumer ready.' + JSON.stringify(arg));
consumer.subscribe(topics);
//start consuming messages
consumer.consume();
});
consumer.on('data', function(metadata) {
counter++;
//committing offsets every numMessages
if (counter % numMessages === 0) {
logger.info('calling commit');
consumer.commit(metadata);
}
// Output the actual message contents
const data = JSON.parse(metadata.value.toString());
logger.info('data value', data);
if(metadata.topic === 'user_account_created'){
const to = data.email;
const subject = 'Verify Account';
const content = `Hello ${data.first_name},
Please use this code ${data.code} to complete your verification`;
sendMail(subject, content,to);
}else if(metadata.topic === 'user_account_verified') {
const to = data.email;
const subject = 'Account Verified';
const content = `Hello ${data.first_name},
You have successfully been verified`;
sendMail(subject, content,to);
}
});
consumer.on('disconnected', function(arg) {
logger.info('consumer disconnected. ' + JSON.stringify(arg));
});
//logging all errors
consumer.on('event.error', function(err) {
logger.error('Error from consumer', err, 'code: ', err.code);
});
//starting the consumer
consumer.connect();
当消息发布到我们正在侦听的任何主题时,将调用data
事件处理程序。在这里,我们解析传入的消息并检查元数据对象,以了解接收到的数据所针对的主题,因此我们可以执行适当的操作。
结论
我们的两因素身份验证应用程序演示了使用Apache Kafka(还有其他系统,例如RabbitMQ , ZeroMQ )的两个微服务之间的通信模式,相对于Feign增加了未来的灵活性。
例如,假设我们将来会添加一个推荐服务,当新用户登录时,该服务需要发送推荐;它仅订阅user_account_verified
主题,因此无需更改用户服务。