消息中间件之kafka
确切的说kafka除了消息中间件这个角色之外,还可以作为能够存储数据的系统,这类似于我们常见的非关系型数据库,例如说MongoDB啊、ArangoDB啊等等。除此之外kafka能支持持续变化,不断增长的数据流, 可以发布和订阅数据流,还可以对于这些数据进行保存,也就是说kafka的本质是一个数据存储平台,流平台 , 只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。而且kafka在设计之初就是采用分布式架构设计的, 基于集群的方式工作,且可以自由伸缩,所以kafka构建集群so easy。
老规矩,上步骤
1.kafka的安装
windows下:
所需的安装包:
①kafka基于java开发,所以jdk是必须的(官网需要登陆才可以下载):
官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
或者
链接:https://pan.baidu.com/s/1lHcRz5Y7b2x_cyqoj9JznA
提取码:6666
②kafka依赖zookeeper,所以zookeeper也是必须的(bin文件):
https://zookeeper.apache.org/releases.html#download
③kafka安装包
http://kafka.apache.org/downloads
步骤:
①jdk的安装,略。不要忘了配置Java环境
②zookeeper安装
Ⅰ.进入解压目录的conf目录,将zoo_sample.cfg改为zoo.cfg
Ⅱ.打开zoo.cfg修改dataDir,分隔符一定要\\,如
Ⅲ.添加系统变量:ZOOKEEPER_HOME=D:\apache-zookeeper-3.7.0-bin
Ⅳ.编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
Ⅴ.管理员身份运行cmd,输入“zkServer“,运行Zookeeper,别关闭黑框
③kafka的安装
Ⅰ.进入kafka的解压目录,编辑server.properties文件,修改log.dirs
修改listeners
Ⅱ.如果Zookeeper与Kafka不在同一台电脑上则修改zookeeper.connect
Ⅲ.需要说明一点的是:Kafka默认端口9092,zookeeper默认端口:2181,kafka会链接到zookeeper
Ⅳ.进入到kafka的解压根目录,空白处右击,点击 "在此处打开Power..."
然后输入
.\bin\windows\kafka-server-start.bat .\config\server.properties
或者
bin\kafka-server-start.sh config\server.properties
如下画面表示ok了
Linux(CentOS8为例)下:
准备的安装包;
①JDK
官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
或者
百度盘链接:https://pan.baidu.com/s/1yzhtIOdDLYvXwkxk1hvo6g
提取码:6666
②zookeeper
https://zookeeper.apache.org/releases.html#download
③kafka
https://zookeeper.apache.org/releases.html#download
安装:
①jdk的安装
Ⅰ.下载好的JDK上传centos(我这里用的是MobaXterm),并cd到所在目录
Ⅱ.执行命令
rpm -ivh jdk-8u281-linux-x64.rpm
我这里已经安装了,所以就不执行了
Ⅲ.配置环境变量,执行命令
vim /etc/profile
在末尾输入以下内容
JAVA_HOME=/usr/java/jdk1.8.0_281-amd64 #自己的安装目录
JRE_HOME=$JAVA_HOME/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
然后按esc建以及输入 : 和 wq 按enter保存退出
Ⅳ.使配置文件生效,输入以下命令
source /etc/profile
②zookeeper的安装
Ⅰ.上传zeekeeper安装文件
或者进入自己喜欢的目录执行以下命令
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
可能提示-bash: wget: command not found,执行以下命令
yum -y install wget
Ⅱ.解压zookeeper压缩包
tar -zxvf apache-zookeeper-3.7.0-bin
Ⅲ.进入解压目录的conf目录,修改zoo_sample.cfg名称
cp zoo_sample.cfg zoo.cfg
Ⅳ.测试
cd ../bin/
./zkServer.sh start
如下代表成功了
Ⅴ.zookeeper系统启动配置
方式一:
#输入命令,修改profile文件
vim /etc/profile
#在末尾添加,这个要根据自己的目录做相应修改
export PATH=$PATH:/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/
#使修改生效
source /etc/profile
zookeeper启动/停止方式
zkServer.sh start
zkServer.sh stop
方式二:
#输入命令
vim /etc/systemd/system/zookeeperd.service
#在该新建文件中添加如下内容
#特别注意以下目录路径要根据自己的做相应修改
[Unit]
Description=zookeeperd.service
After=network.target
ConditionPathExists=/usr/zookeeper/apache-zookeeper-3.7.0-bin/conf/zoo.cfg
[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start
ExecStop=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh stop
ExecReload=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh restart
ExecStatus=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh status
[Install]
WantedBy=multi-user.target
使修改生效
systemctl daemon-reload
启动方式(启动/停止/重启/开机启动)
systemctl start zookeeperd
systemctl stop zookeeperd
systemctl restart zookeeperd
systemctl enable zookeeperd
③kafka的安装
Ⅰ.解压kafka
tar -zxvf kafka_2.13-2.8.0.tgz
Ⅱ.移动kafka的解压包到指定目录
mv kafka_2.13-2.8.0 /usr/
Ⅲ.配置kafka的server.properties文件
vim /usr/kafka_2.13-2.8.0/config/server.propertie
修改
log.dirs=/usr/kafka_2.13-2.8.0/kafka-logs
Ⅳ.创建systemctl服务
vim /etc/systemd/system/kafkad.service
按i 输入
[Unit]
Description=kafkad.service
After=network.target zookeeperd.service
[Service]
Type=simple
User=root
Group=root
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/java/jdk1.8.0_281-amd64/bin/"
ExecStart=/usr/kafka_2.13-2.8.0/bin/kafka-server-start.sh /usr/kafka_2.13-2.8.0/config/server.properties
ExecStop=/usr/kafka_2.13-2.8.0/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
2.kafka的术语
为了更利于学习,先举个栗子
有生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。
消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
后面会遇到kafka的一些名词,例如topic、producer、consumer、broker,partition我来简单解释解释:
producer
:生产者,就是它来生产“鸡蛋”的。
consumer
:消费者,生出的“鸡蛋”它来消费。
topic
:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
broker
:就是那个装鸡蛋的篮子
从技术的角度,topic标签其实就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。
Kafka中,包含多个producer(生产者)、多个consumer(消费者)、多个broker(kafka服务器)、一个zookeeper集群。
kafka运行在一个或者多个服务器上作为集群运行,集群存储的消息记录的文件夹称为topic,每一条记录包含:键(key)、值(value)、时间戳(Timestamp)
zooKeeper 在Kafka集群中是负责管理集群元数据、控制器选举等操作的分布式协调器。
3.实例
用idea创建一个SpringBoot项目,messaging选 kafka
yml配置
spring:
kafka:
/*
* kafka服务器地址
*/
bootstrap-servers: localhost:9092
producer:
/**
* 参数acks
* 0 表示producer只push消息,而不等待kafka服务器的ack确认消息,就认为push成功
* 1 表示producer在push消息后,等待leader成功接收后回复的ack确认,才认为push成功
* all 表示producer在push消息后,等待leader以及followers都成功接收并恢复ack确认,才认为push成功
*/
acks: 1
/*
* 出现问题后,消息重发的次数,MAX 表示无限重发次数
*/
retries: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
/*
*批处理消息字节数,达到就发送,默认16K。只要满足batch.size&&linger.ms就发生批处理
*/
batch-size: 16384
/*
* 生产者用于缓存数据的内存大小
*/
buffer-memory: 33554432
properties:
linger:
/*
* 延迟10s发送给broker
*/
ms: 10000
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.example.test.domain
listener:
missing-topics-fatal: false
logging:
level:
org:
springframework:
kafka: ERROR
apache:
kafka: ERROR
生产者
package com.example.test.producer;
import com.example.test.constants.Topic;
import com.example.test.domain.MyMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* @author DeanYang
* @date 2021/05/09 19:18:02
**/
public class Producer {
private KafkaTemplate<Object,Object> kafkaTemplate;
//同步发送
public SendResult sendMsgSync() throws ExecutionException, InterruptedException {
Integer id = new Random().nextInt(100);
MyMessage myMessage = new MyMessage(id,"测试消息 "+id);
//同步等待
return kafkaTemplate.send(Topic.TOPIC,myMessage).get();
}
//异步发送
public ListenableFuture<SendResult<Object,Object>> sendMsgAsync(){
Integer id = new Random().nextInt(100);
MyMessage myMessage = new MyMessage(id,"测试消息 "+id);
ListenableFuture<SendResult<Object,Object>> result = kafkaTemplate.send(Topic.TOPIC,myMessage);
return result;
}
}
消费者A
package com.example.test.consumer;
import com.example.test.constants.Topic;
import com.example.test.domain.MyMessage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author DeanYang
* @date 2021/05/09 19:17:47
**/
@Component
public class ConsumerA {
private Logger log = LoggerFactory.getLogger(getClass());
private static final String CONSUMER_GROUP_PREFIX = "My_A_";
@KafkaListener(topics = Topic.TOPIC,groupId = CONSUMER_GROUP_PREFIX+Topic.TOPIC)
public void msg(MyMessage myMessage){
log.info("接收到的消息: 线程->{},内容->{}",Thread.currentThread().getName(),myMessage);
}
}
消费者B
package com.example.test.consumer;
import com.example.test.constants.Topic;
import com.example.test.domain.MyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author DeanYang
* @date 2021/05/09 20:15:37
**/
public class ConsumerB {
private Logger log = LoggerFactory.getLogger(getClass());
private static final String CONSUMER_GROUP_PREFIX = "My_B_";
(topics = Topic.TOPIC,groupId = CONSUMER_GROUP_PREFIX+Topic.TOPIC)
public void msg(MyMessage myMessage){
log.info("接收到的消息: 线程->{},内容->{}",Thread.currentThread().getName(),myMessage);
}
}
测试
package com.example.test;
import com.example.test.producer.Producer;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
class TestApplicationTests {
private Logger log = LoggerFactory.getLogger(getClass());
private Producer producer;
public void AsyncSend() throws InterruptedException {
log.info("开始发送消息");
for(int i = 0;i < 2;i++){
producer.sendMsgAsync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
public void onFailure(Throwable throwable) {
log.error("发送错误 {}",throwable);
}
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
log.info("回调结果 Result = topic->{},partition->{},offset->{}",
objectObjectSendResult.getRecordMetadata().topic(),
objectObjectSendResult.getRecordMetadata().partition(),
objectObjectSendResult.getRecordMetadata().offset());
}
});
TimeUnit.SECONDS.sleep(5); //两次刚好 liner:ms: 10000
}
new CountDownLatch(1).await(); //阻塞等待,保证消费
}
}
回复暗号:03 ,获取本篇文章源码