vlambda博客
学习文章列表

高性能分布式消息中间件—RocketMQ(一)


我是这样规划的,想从以下的技术栈记录:

1:分布式系统消息中间件。

2:分布式搜索引擎。

3:分布式JOB。

4:分布式缓存。

5:分库分表。

6:手把手教你打造一个第三方公众平台。

7:分布式系统性能监控技术。

8:Spring生态源码解读。

9:Spring生态实战技术。

10:并发技术汇总。

11:jvm调优。

12:netty。

13:架构实战。

14:java基础。

15:Mysql & TIDB。

简介:

RocketMQ作为一款纯java、是由阿里巴巴公司开发的分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

由开源社区killme2008维护,开源社区非常活跃。https://github.com/killme2008/Metamorphosis。Metaq 2.x。于2012年10月份上线,在淘宝内部被广泛使用 。Metaq 3.0发布时,产品名称改为RocketMQ。基于公司内部开源共建原则,RocketMQ项目只维护核心功能,且去除了所有其他运行时的依赖,核心功能最简化。每个BU的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这个jar包即可,可通过API进行交互,如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。如今的rocketmq孵化成了Apache的顶级开源项目。

特点:

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 (RocketMQ可以保证严格的消息顺序,而ActiveMQ无法保证)
  • 支持拉(pull)和推(push)两种消息模式 (Push好理解,比如在消费者端设置Listener回调;而Pull,控制权在于应用,即应用需要主动的调用拉消息方法从Broker获取消息,这里面存在一个消费位置记录的问题(如果不记录,会导致消息重复消费))
  • 单一队列百万消息的堆积能力       (RocketMQ提供亿级消息的堆积能力,这不是重点,重点是堆积了亿级的消息后,依然保持写入低延迟)
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义    (RocketMQ原生就是支持分布式的,而ActiveMQ原生存在单点性)
  • 提供 docker 镜像用于隔离测试和云集群部署
  • 提供配置、指标和监控等功能丰富的 Dashboard

在Metaq1.x/2.x的版本中,分布式协调采用的是Zookeeper,而RocketMQ自己实现了一个NameServer,更加轻量级,性能更好!

  • **组(Group)**有Producer/Consumer Group。ActiveMQ中并没有Group这个概念,而在RocketMQ中理解Group的机制很重要。通过Group机制,让RocketMQ天然的支持消息负载均衡!比如某个Topic有9条消息,其中一个Consumer Group有3个实例(3个进程 OR 3台机器),那么每个实例将均摊3条消息!(注意RocketMQ只有一种模式,即发布订阅模式。)
  • 消息失败重试机制、高效的订阅者水平扩展能力、强大的API、事务机制等等。

专业术语

Producer

消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。

Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。

Consumer

消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

Consumer Group

消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。

Topic

Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。

Message

Tag

标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

Broker

Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

Name Server

Name Server 为 producer 和 consumer 提供路由信息。

rocketmq基于Dledger构建高可用

构建集群需要至少3台服务器

本人的RocketMq集群是基于阿里云的centos7.9版本。网上很多教程写的基于虚拟机的本地环境。我认为要玩儿就玩儿的正式一点,真正可以接近生产环境的玩法。

构建安装DLedger

git clone https://github.com/openmessaging/openmessaging-storage-dledger.git

cd openmessaging-storage-dledger

mvn clean install -DskipTests

另外系统中还要安装一个git,用作拉取rocketmq控制台的源码工程。

下载RocketMQ

wget https://github.com/apache/rocketmq/releases

unzip rocketmq-all-4.7.1-source-release.zip

cd rocketmq-all-4.7.1/

mvn -Prelease-all -DskipTests clean install -U

cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1

这里注意,rocketmq只有4.5以上版本支持dledger集群构建

修改配置文件

服务器说明:(生产中应该将 NameServer 部署到其他服务器中,在这为了方便,与Broker部署在一起)

安装Maven

cd /opt
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
tar -zxvf apache-maven-3.5.4-bin.tar.gz

# 修改环境变量
vim /etc/profile
# 最下面添加
export MAVEN_HOME=/opt/apache-maven-3.5.4
export PATH=$MAVEN_HOME/bin:$PATH
# 保存退出
source /etc/profile
# 建立软连接
ln -s /opt/apache-maven-3.5.4/bin/mvn /usr/bin/mvn

安装JDK

这里需要自己前往oracle官方下载

配置java环境变量

环境变量配置 配置
"" export JAVA_HOME=/opt/jdk1.8.0_261
vi /etc/profile export CLASSPATH=${JAVA_HOME}/lib
"" export PATH= {JAVA_HOME}/bin
source /etc/profile 刷新配置

修改配置文件

服务器 ip 安装的服务
服务器1-主 172.30.134.189 DLedger,Broker,NameServer
服务器2-从 172.30.134.190 DLedger,Broker,NameServer
服务器3-从 172.30.134.191 DLedger,Broker,NameServer

服务器1配置-Master

进入rocketmq安装目录找到

vim conf/dledger/broker-n0.conf

修改Broker配置

## 集群名
brokerClusterName = RaftCluster
### broker组名,同一个RaftClusterGroup内,brokerName名要一样
brokerName=RaftNode00
### 监听的端口
listenPort=10911
### 你设置的NameServer地址和端口
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
### n0 n1 n2 分别是broker1,broker2,broker3 的 dLegerSelfId
### 例如:dLegerPeers=n0-服务器1的IP:40911;n1-服务器2的IP:40912;n2-服务器3的IP:40913
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
### must be unique
autoCreateTopicEnable=true
### 这个值必须是在同一个RaftClusterGroup内唯一的
dLegerSelfId=n0
sendMessageThreadPoolNums=4
### 由于我的虚拟机配置了多个网卡,所以会绑定ip错误,因此我配置了这项,
brokerIP1=master主机的外网IP地址

服务器2配置-Slave

vim conf/dledger/broker-n1.conf

修改Broker配置

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=10921
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
## must be unique
dLegerSelfId=n1
autoCreateTopicEnable=true
sendMessageThreadPoolNums=4
brokerIP1=内网IP

这里有个坑需要注意一下,如果说,你的master主机的brokerIP1设置了外网的ip那么你的slave节点的brokerIP1就要设置为内网的IP

服务器3配置-Slave

vim conf/dledger/broker-n2.conf 修改Broker配置

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=10931
namesrvAddr=172.30.134.189:9876;172.30.134.190:9876;172.30.134.191:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
## must be unique
autoCreateTopicEnable=true
dLegerSelfId=n2
sendMessageThreadPoolNums=4
brokerIP1=172.30.134.191

启动集群

在服务器1 执行

nohup sh bin/mqnamesrv -n 172.30.134.189 & > nohubNameserv
nohup sh bin/mqbroker  > nohubBroker -c conf/dledger/broker-n0.conf autoCreateTopicEnable=true &

启动集群

在服务器2 执行

nohup sh bin/mqnamesrv -n 172.30.134.190 & > nohubNameserv
nohup sh bin/mqbroker  > nohubBroker -c conf/dledger/broker-n1.conf autoCreateTopicEnable=true &

启动集群

在服务器3 执行

nohup sh bin/mqnamesrv -n 172.30.134.191 & > nohubNameserv
nohup sh bin/mqbroker  > nohubBroker -c conf/dledger/broker-n2.conf autoCreateTopicEnable=true &

查看集群情况

sh bin/mqadmin clusterList -n 127.0.0.1:9876

备注

Broker 配置

Broker 配置

参考文档:

参数名 默认值 说明
listenPort 10911 接受客户端连接的监听端口
namesrvAddr null nameServer 地址
brokerIP1 网卡的 InetAddress 当前 broker 监听的 IP
brokerIP2 跟 brokerIP1 一样 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步
brokerName null broker 的名称
brokerClusterName DefaultCluster 本 broker 所属的 Cluser 名称
brokerId 0 broker id, 0 表示 master, 其他的正整数表示 slave
storePathCommitLog $HOME/store/commitlog/ 存储 commit log 的路径
storePathConsumerQueue $HOME/store/consumequeue/ 存储 consume queue 的路径
mappedFileSizeCommitLog 1024 * 1024 * 1024(1G) commit log 的映射文件大小
deleteWhen 04 在每天的什么时间删除已经超过文件保留时间的 commit log
fileReservedTime 72 以小时计算的文件保留时间
brokerRole ASYNC_MASTER SYNC_MASTER/ASYNC_MASTER/SLAVE
flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。
enableDLegerCommitLog 是否启动 DLedger true
dLegerGroup DLedger Raft Group的名字,建议和 brokerName 保持一致 RaftNode00
dLegerPeers DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 n0-172.30.134.189:40911;n1-172.30.134.190:40912;n2-172.30.134.191:40913
dLegerSelfId 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 n0
sendMessageThreadPoolNums 发送线程个数,建议配置成 Cpu 核数 16

启动内存不够

修改 bin/runbroker.sh 和 bin/runserver.sh 中的
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

报连接超时

查看防火墙服务状态 systemctl status firewalld

将防火墙关闭 systemctl stop firewalld

安装控制台

1.下载文件(/usr/local目录下)

2.解压(/usr/local目录下)

yum install -y unzip zip 前提是:unzip解压文件无法使用 unzip rocketmq-externals-master.zip 解压文件

3.修改配置文件(usr/local/rocketmq-externals-master/目录下)

find -name application.properties 可以查看到两个文件都在rocketmq-console文件目录下

vi application.properties

4.编译(usr/local/rocketmq-externals-master/rocketmq-console/目录下)

mvn clean package -Dmaven.test.skip=true 如果失败多编译几次--可能是网络问题

编译成功后,在rocketmq-console目录下会生成一个目录:target目录,该目录下有启动rocketmq界面的jar文件

5.启动web(usr/local/rocketmq-externals-master/rocketmq-console/target目录下)

java -jar rocketmq-console-ng-1.0.0.jar 启动 ---当终端断了该服务就会停止 nohup java -jar rocketmq-console-ng-1.0.0.jar >>/usr/logs/log.out 2>&1 & 后台启动 --当终端断了也不会停止服务

6.添加端口并在阿里云服务器上开通端口

firewall-cmd --zone=public --add-port=8080/tcp --permanen 永久添加端口并重启防火墙

在阿里云服务器添加出入规则。

查看web监控:

高性能分布式消息中间件—RocketMQ(一)
高性能分布式消息中间件—RocketMQ(一)

消费者组:

高性能分布式消息中间件—RocketMQ(一)

本篇文章就介绍到这里,读者们首先要学会安装rocketmq、以及rocketmq中的基础概念。后续的文章会对rocketmq做具体的介绍以及使用,以及如何集成Springboot和spring cloud stream。