消息中间件之kafka
确切的说kafka除了消息中间件这个角色之外,还可以作为能够存储数据的系统,这类似于我们常见的非关系型数据库,例如说MongoDB啊、ArangoDB啊等等。除此之外kafka能支持持续变化,不断增长的数据流, 可以发布和订阅数据流,还可以对于这些数据进行保存,也就是说kafka的本质是一个数据存储平台,流平台 , 只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。而且kafka在设计之初就是采用分布式架构设计的, 基于集群的方式工作,且可以自由伸缩,所以kafka构建集群so easy。
老规矩,上步骤
1.kafka的安装
windows下:
所需的安装包:
①kafka基于java开发,所以jdk是必须的(官网需要登陆才可以下载):
官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
或者
链接:https://pan.baidu.com/s/1lHcRz5Y7b2x_cyqoj9JznA
提取码:6666
②kafka依赖zookeeper,所以zookeeper也是必须的(bin文件):
https://zookeeper.apache.org/releases.html#download
③kafka安装包
http://kafka.apache.org/downloads
步骤:
①jdk的安装,略。不要忘了配置Java环境
②zookeeper安装
Ⅰ.进入解压目录的conf目录,将zoo_sample.cfg改为zoo.cfg
Ⅱ.打开zoo.cfg修改dataDir,分隔符一定要\\,如
Ⅲ.添加系统变量:ZOOKEEPER_HOME=D:\apache-zookeeper-3.7.0-bin
Ⅳ.编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
Ⅴ.管理员身份运行cmd,输入“zkServer“,运行Zookeeper,别关闭黑框
③kafka的安装
Ⅰ.进入kafka的解压目录,编辑server.properties文件,修改log.dirs
修改listeners
Ⅱ.如果Zookeeper与Kafka不在同一台电脑上则修改zookeeper.connect
Ⅲ.需要说明一点的是:Kafka默认端口9092,zookeeper默认端口:2181,kafka会链接到zookeeper
Ⅳ.进入到kafka的解压根目录,空白处右击,点击 "在此处打开Power..."
然后输入
.\bin\windows\kafka-server-start.bat .\config\server.properties或者bin\kafka-server-start.sh config\server.properties
如下画面表示ok了
Linux(CentOS8为例)下:
准备的安装包;
①JDK
官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
或者
百度盘链接:https://pan.baidu.com/s/1yzhtIOdDLYvXwkxk1hvo6g
提取码:6666
②zookeeper
https://zookeeper.apache.org/releases.html#download
③kafka
https://zookeeper.apache.org/releases.html#download
安装:
①jdk的安装
Ⅰ.下载好的JDK上传centos(我这里用的是MobaXterm),并cd到所在目录
Ⅱ.执行命令
rpm -ivh jdk-8u281-linux-x64.rpm
我这里已经安装了,所以就不执行了
Ⅲ.配置环境变量,执行命令
vim /etc/profile
在末尾输入以下内容
JAVA_HOME=/usr/java/jdk1.8.0_281-amd64 #自己的安装目录JRE_HOME=$JAVA_HOME/jrePATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/binCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/libexport JAVA_HOME JRE_HOME PATH CLASSPATH
然后按esc建以及输入 : 和 wq 按enter保存退出
Ⅳ.使配置文件生效,输入以下命令
source /etc/profile
②zookeeper的安装
Ⅰ.上传zeekeeper安装文件
或者进入自己喜欢的目录执行以下命令
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
可能提示-bash: wget: command not found,执行以下命令
yum -y install wget
Ⅱ.解压zookeeper压缩包
tar -zxvf apache-zookeeper-3.7.0-bin
Ⅲ.进入解压目录的conf目录,修改zoo_sample.cfg名称
cp zoo_sample.cfg zoo.cfg
Ⅳ.测试
cd ../bin/./zkServer.sh start
如下代表成功了
Ⅴ.zookeeper系统启动配置
方式一:
#输入命令,修改profile文件vim /etc/profile#在末尾添加,这个要根据自己的目录做相应修改export PATH=$PATH:/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/#使修改生效source /etc/profile
zookeeper启动/停止方式
zkServer.sh startzkServer.sh stop
方式二:
#输入命令vim /etc/systemd/system/zookeeperd.service#在该新建文件中添加如下内容#特别注意以下目录路径要根据自己的做相应修改[Unit]Description=zookeeperd.serviceAfter=network.targetConditionPathExists=/usr/zookeeper/apache-zookeeper-3.7.0-bin/conf/zoo.cfg[Service]Type=forkingUser=rootGroup=rootExecStart=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh startExecStop=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh stopExecReload=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh restartExecStatus=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh status[Install]WantedBy=multi-user.target
使修改生效
systemctl daemon-reload
启动方式(启动/停止/重启/开机启动)
systemctl start zookeeperdsystemctl stop zookeeperdsystemctl restart zookeeperdsystemctl enable zookeeperd
③kafka的安装
Ⅰ.解压kafka
tar -zxvf kafka_2.13-2.8.0.tgz
Ⅱ.移动kafka的解压包到指定目录
mv kafka_2.13-2.8.0 /usr/
Ⅲ.配置kafka的server.properties文件
vim /usr/kafka_2.13-2.8.0/config/server.propertie
修改
log.dirs=/usr/kafka_2.13-2.8.0/kafka-logs
Ⅳ.创建systemctl服务
vim /etc/systemd/system/kafkad.service
按i 输入
[Unit]Description=kafkad.serviceAfter=network.target zookeeperd.service[Service]Type=simpleUser=rootGroup=rootEnvironment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/java/jdk1.8.0_281-amd64/bin/"ExecStart=/usr/kafka_2.13-2.8.0/bin/kafka-server-start.sh /usr/kafka_2.13-2.8.0/config/server.propertiesExecStop=/usr/kafka_2.13-2.8.0/bin/kafka-server-stop.shRestart=on-failure[Install]WantedBy=multi-user.target
2.kafka的术语
为了更利于学习,先举个栗子
有生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。
消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
后面会遇到kafka的一些名词,例如topic、producer、consumer、broker,partition我来简单解释解释:
producer:生产者,就是它来生产“鸡蛋”的。
consumer:消费者,生出的“鸡蛋”它来消费。
topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
broker:就是那个装鸡蛋的篮子
从技术的角度,topic标签其实就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。
Kafka中,包含多个producer(生产者)、多个consumer(消费者)、多个broker(kafka服务器)、一个zookeeper集群。
kafka运行在一个或者多个服务器上作为集群运行,集群存储的消息记录的文件夹称为topic,每一条记录包含:键(key)、值(value)、时间戳(Timestamp)
zooKeeper 在Kafka集群中是负责管理集群元数据、控制器选举等操作的分布式协调器。
3.实例
用idea创建一个SpringBoot项目,messaging选 kafka
yml配置
spring:kafka:/** kafka服务器地址*/bootstrap-servers: localhost:9092producer:/*** 参数acks* 0 表示producer只push消息,而不等待kafka服务器的ack确认消息,就认为push成功* 1 表示producer在push消息后,等待leader成功接收后回复的ack确认,才认为push成功* all 表示producer在push消息后,等待leader以及followers都成功接收并恢复ack确认,才认为push成功*/acks: 1/** 出现问题后,消息重发的次数,MAX 表示无限重发次数*/retries: 3key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializer/**批处理消息字节数,达到就发送,默认16K。只要满足batch.size&&linger.ms就发生批处理*/batch-size: 16384/** 生产者用于缓存数据的内存大小*/buffer-memory: 33554432properties:linger:/** 延迟10s发送给broker*/ms: 10000consumer:auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring:json:trusted:packages: com.example.test.domainlistener:missing-topics-fatal: falselogging:level:org:springframework:kafka: ERRORapache:kafka: ERROR
生产者
package com.example.test.producer;import com.example.test.constants.Topic;import com.example.test.domain.MyMessage;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import java.util.Random;import java.util.concurrent.ExecutionException;/*** @author DeanYang* @date 2021/05/09 19:18:02**/public class Producer {private KafkaTemplate<Object,Object> kafkaTemplate;//同步发送public SendResult sendMsgSync() throws ExecutionException, InterruptedException {Integer id = new Random().nextInt(100);MyMessage myMessage = new MyMessage(id,"测试消息 "+id);//同步等待return kafkaTemplate.send(Topic.TOPIC,myMessage).get();}//异步发送public ListenableFuture<SendResult<Object,Object>> sendMsgAsync(){Integer id = new Random().nextInt(100);MyMessage myMessage = new MyMessage(id,"测试消息 "+id);ListenableFuture<SendResult<Object,Object>> result = kafkaTemplate.send(Topic.TOPIC,myMessage);return result;}}
消费者A
package com.example.test.consumer;import com.example.test.constants.Topic;import com.example.test.domain.MyMessage;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.List;/*** @author DeanYang* @date 2021/05/09 19:17:47**/@Componentpublic class ConsumerA {private Logger log = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "My_A_";@KafkaListener(topics = Topic.TOPIC,groupId = CONSUMER_GROUP_PREFIX+Topic.TOPIC)public void msg(MyMessage myMessage){log.info("接收到的消息: 线程->{},内容->{}",Thread.currentThread().getName(),myMessage);}}
消费者B
package com.example.test.consumer;import com.example.test.constants.Topic;import com.example.test.domain.MyMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/*** @author DeanYang* @date 2021/05/09 20:15:37**/public class ConsumerB {private Logger log = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "My_B_";(topics = Topic.TOPIC,groupId = CONSUMER_GROUP_PREFIX+Topic.TOPIC)public void msg(MyMessage myMessage){log.info("接收到的消息: 线程->{},内容->{}",Thread.currentThread().getName(),myMessage);}}
测试
package com.example.test;import com.example.test.producer.Producer;import org.junit.jupiter.api.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;class TestApplicationTests {private Logger log = LoggerFactory.getLogger(getClass());private Producer producer;public void AsyncSend() throws InterruptedException {log.info("开始发送消息");for(int i = 0;i < 2;i++){producer.sendMsgAsync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {public void onFailure(Throwable throwable) {log.error("发送错误 {}",throwable);}public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {log.info("回调结果 Result = topic->{},partition->{},offset->{}",objectObjectSendResult.getRecordMetadata().topic(),objectObjectSendResult.getRecordMetadata().partition(),objectObjectSendResult.getRecordMetadata().offset());}});TimeUnit.SECONDS.sleep(5); //两次刚好 liner:ms: 10000}new CountDownLatch(1).await(); //阻塞等待,保证消费}}
回复暗号:03 ,获取本篇文章源码
