vlambda博客
学习文章列表

消息中间件之kafka

    确切的说kafka除了消息中间件这个角色之外,还可以作为能够存储数据的系统,这类似于我们常见的非关系型数据库,例如说MongoDB啊、ArangoDB啊等等。除此之外kafka能支持持续变化,不断增长的数据流, 可以发布和订阅数据流,还可以对于这些数据进行保存,也就是说kafka的本质是一个数据存储平台,流平台 , 只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。而且kafka在设计之初就是采用分布式架构设计的, 基于集群的方式工作,且可以自由伸缩,所以kafka构建集群so easy。


老规矩,上步骤

1.kafka的安装

windows下:

 所需的安装包:

①kafka基于java开发,所以jdk是必须的(官网需要登陆才可以下载):

官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

或者

链接:https://pan.baidu.com/s/1lHcRz5Y7b2x_cyqoj9JznA

提取码:6666

②kafka依赖zookeeper,所以zookeeper也是必须的(bin文件):

https://zookeeper.apache.org/releases.html#download

kafka安装包

http://kafka.apache.org/downloads


步骤:

①jdk的安装,略。不要忘了配置Java环境

②zookeeper安装

Ⅰ.进入解压目录的conf目录,将zoo_sample.cfg改为zoo.cfg

消息中间件之kafka

Ⅱ.打开zoo.cfg修改dataDir,分隔符一定要\\,如

消息中间件之kafka

Ⅲ.添加系统变量:ZOOKEEPER_HOME=D:\apache-zookeeper-3.7.0-bin

消息中间件之kafka

Ⅳ.编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin

消息中间件之kafka

Ⅴ.管理员身份运行cmd,输入“zkServer“,运行Zookeeper,别关闭黑框

消息中间件之kafka

③kafka的安装

    Ⅰ.进入kafka的解压目录,编辑server.properties文件,修改log.dirs

消息中间件之kafka

修改listeners

消息中间件之kafka

    Ⅱ.如果Zookeeper与Kafka不在同一台电脑上则修改zookeeper.connect

消息中间件之kafka

    Ⅲ.需要说明一点的是:Kafka默认端口9092,zookeeper默认端口:2181,kafka会链接到zookeeper

     Ⅳ.进入到kafka的解压根目录,空白处右击,点击 "在此处打开Power..."

消息中间件之kafka

然后输入

.\bin\windows\kafka-server-start.bat .\config\server.properties或者bin\kafka-server-start.sh config\server.properties

如下画面表示ok了

消息中间件之kafka


Linux(CentOS8为例)下:

    准备的安装包;

    ①JDK

    官网:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

    或者

    百度盘链接:https://pan.baidu.com/s/1yzhtIOdDLYvXwkxk1hvo6g

    提取码:6666

    ②zookeeper

    https://zookeeper.apache.org/releases.html#download

    ③kafka

    https://zookeeper.apache.org/releases.html#download

安装:

①jdk的安装

Ⅰ.下载好的JDK上传centos(我这里用的是MobaXterm),并cd到所在目录

消息中间件之kafka

Ⅱ.执行命令

rpm -ivh jdk-8u281-linux-x64.rpm

消息中间件之kafka

我这里已经安装了,所以就不执行了

Ⅲ.配置环境变量,执行命令

vim /etc/profile

在末尾输入以下内容

JAVA_HOME=/usr/java/jdk1.8.0_281-amd64  #自己的安装目录JRE_HOME=$JAVA_HOME/jrePATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/binCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/libexport JAVA_HOME JRE_HOME PATH CLASSPATH

然后按esc建以及输入 : 和 wq 按enter保存退出

消息中间件之kafka

Ⅳ.使配置文件生效,输入以下命令

source /etc/profile

②zookeeper的安装

Ⅰ.上传zeekeeper安装文件

或者进入自己喜欢的目录执行以下命令

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz

 

消息中间件之kafka

可能提示-bash: wget: command not found,执行以下命令

yum -y install wget

Ⅱ.解压zookeeper压缩包

tar -zxvf apache-zookeeper-3.7.0-bin

Ⅲ.进入解压目录的conf目录,修改zoo_sample.cfg名称

cp zoo_sample.cfg zoo.cfg

消息中间件之kafka

Ⅳ.测试

cd ../bin/./zkServer.sh start

如下代表成功了

消息中间件之kafka

Ⅴ.zookeeper系统启动配置

方式一:

#输入命令,修改profile文件vim /etc/profile#在末尾添加,这个要根据自己的目录做相应修改export PATH=$PATH:/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/#使修改生效source /etc/profile

    zookeeper启动/停止方式

zkServer.sh startzkServer.sh stop

方式二:

#输入命令vim /etc/systemd/system/zookeeperd.service#在该新建文件中添加如下内容#特别注意以下目录路径要根据自己的做相应修改[Unit]Description=zookeeperd.serviceAfter=network.targetConditionPathExists=/usr/zookeeper/apache-zookeeper-3.7.0-bin/conf/zoo.cfg[Service]Type=forkingUser=rootGroup=rootExecStart=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh startExecStop=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh stopExecReload=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh restartExecStatus=/usr/zookeeper/apache-zookeeper-3.7.0-bin/bin/zkServer.sh status[Install]WantedBy=multi-user.target

 使修改生效

systemctl daemon-reload

     启动方式(启动/停止/重启/开机启动)


systemctl start zookeeperdsystemctl stop zookeeperdsystemctl restart zookeeperdsystemctl enable zookeeperd

③kafka的安装

Ⅰ.解压kafka

tar -zxvf kafka_2.13-2.8.0.tgz

Ⅱ.移动kafka的解压包到指定目录

mv kafka_2.13-2.8.0 /usr/

Ⅲ.配置kafka的server.properties文件

vim /usr/kafka_2.13-2.8.0/config/server.propertie

修改

log.dirs=/usr/kafka_2.13-2.8.0/kafka-logs

Ⅳ.创建systemctl服务

 vim /etc/systemd/system/kafkad.service

按i 输入

[Unit]Description=kafkad.serviceAfter=network.target zookeeperd.service[Service]Type=simpleUser=rootGroup=rootEnvironment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/java/jdk1.8.0_281-amd64/bin/"ExecStart=/usr/kafka_2.13-2.8.0/bin/kafka-server-start.sh /usr/kafka_2.13-2.8.0/config/server.propertiesExecStop=/usr/kafka_2.13-2.8.0/bin/kafka-server-stop.shRestart=on-failure[Install]WantedBy=multi-user.target

2.kafka的术语

    为了更利于学习,先举个栗子

    有生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。

    鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https什么的),也称为报文,也叫“消息”。

消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。

    后面会遇到kafka的一些名词,例如topic、producer、consumer、broker,partition我来简单解释解释:

    producer:生产者,就是它来生产“鸡蛋”的。

    consumer:消费者,生出的“鸡蛋”它来消费。

    topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。

    broker:就是那个装鸡蛋的篮子

    从技术的角度,topic标签其实就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。

    Kafka中,包含多个producer(生产者)、多个consumer(消费者)、多个broker(kafka服务器)、一个zookeeper集群。

kafka运行在一个或者多个服务器上作为集群运行,集群存储的消息记录的文件夹称为topic,每一条记录包含:键(key)、值(value)、时间戳(Timestamp)

zooKeeper 在Kafka集群中是负责管理集群元数据、控制器选举等操作的分布式协调器。

3.实例

用idea创建一个SpringBoot项目,messaging选 kafka

yml配置

spring: kafka: /*     * kafka服务器地址     */ bootstrap-servers: localhost:9092 producer: /** * 参数acks * 0 表示producer只push消息,而不等待kafka服务器的ack确认消息,就认为push成功 * 1 表示producer在push消息后,等待leader成功接收后回复的ack确认,才认为push成功 * all 表示producer在push消息后,等待leader以及followers都成功接收并恢复ack确认,才认为push成功         */ acks: 1       /*       * 出现问题后,消息重发的次数,MAX 表示无限重发次数       */ retries: 3 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer      /*       *批处理消息字节数,达到就发送,默认16K。只要满足batch.size&&linger.ms就发生批处理       */ batch-size: 16384 /*       * 生产者用于缓存数据的内存大小       */ buffer-memory: 33554432 properties: linger: /*           * 延迟10s发送给broker           */          ms: 10000 consumer: auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: com.example.test.domain listener: missing-topics-fatal: falselogging: level: org: springframework: kafka: ERROR apache: kafka: ERROR

生产者

package com.example.test.producer;
import com.example.test.constants.Topic;import com.example.test.domain.MyMessage;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;
import java.util.Random;import java.util.concurrent.ExecutionException;
/** * @author DeanYang * @date 2021/05/09 19:18:02 **/@Componentpublic class Producer { @Autowired private KafkaTemplate<Object,Object> kafkaTemplate;
//同步发送 public SendResult sendMsgSync() throws ExecutionException, InterruptedException { Integer id = new Random().nextInt(100); MyMessage myMessage = new MyMessage(id,"测试消息 "+id); //同步等待 return kafkaTemplate.send(Topic.TOPIC,myMessage).get(); }
//异步发送 public ListenableFuture<SendResult<Object,Object>> sendMsgAsync(){ Integer id = new Random().nextInt(100); MyMessage myMessage = new MyMessage(id,"测试消息 "+id); ListenableFuture<SendResult<Object,Object>> result = kafkaTemplate.send(Topic.TOPIC,myMessage); return result; }}

消费者A

package com.example.test.consumer;
import com.example.test.constants.Topic;import com.example.test.domain.MyMessage;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;
import java.util.List;

/** * @author DeanYang * @date 2021/05/09 19:17:47 **/@Componentpublic class ConsumerA {
private Logger log = LoggerFactory.getLogger(getClass()); private static final String CONSUMER_GROUP_PREFIX = "My_A_";
@KafkaListener(topics = Topic.TOPIC,groupId = CONSUMER_GROUP_PREFIX+Topic.TOPIC) public void msg(MyMessage myMessage){ log.info("接收到的消息: 线程->{},内容->{}",Thread.currentThread().getName(),myMessage); }}

消费者B

package com.example.test.consumer;
import com.example.test.constants.Topic;import com.example.test.domain.MyMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;
/** * @author DeanYang * @date 2021/05/09 20:15:37 **/@Componentpublic class ConsumerB {
private Logger log = LoggerFactory.getLogger(getClass()); private static final String CONSUMER_GROUP_PREFIX = "My_B_";
@KafkaListener(topics = Topic.TOPIC,groupId = CONSUMER_GROUP_PREFIX+Topic.TOPIC) public void msg(MyMessage myMessage){ log.info("接收到的消息: 线程->{},内容->{}",Thread.currentThread().getName(),myMessage); }}

测试

package com.example.test;
import com.example.test.producer.Producer;import org.junit.jupiter.api.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;
@SpringBootTestclass TestApplicationTests {
private Logger log = LoggerFactory.getLogger(getClass()); @Autowired private Producer producer;
@Test public void AsyncSend() throws InterruptedException { log.info("开始发送消息"); for(int i = 0;i < 2;i++){ producer.sendMsgAsync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() { @Override public void onFailure(Throwable throwable) { log.error("发送错误 {}",throwable); }
@Override public void onSuccess(SendResult<Object, Object> objectObjectSendResult) { log.info("回调结果 Result = topic->{},partition->{},offset->{}", objectObjectSendResult.getRecordMetadata().topic(), objectObjectSendResult.getRecordMetadata().partition(), objectObjectSendResult.getRecordMetadata().offset()); } }); TimeUnit.SECONDS.sleep(5); //两次刚好 liner:ms: 10000 } new CountDownLatch(1).await(); //阻塞等待,保证消费 }}

回复暗号:03 ,获取本篇文章源码