vlambda博客
学习文章列表

精选文章|RocketMQ源码分析


RocketMQ源码分析

概要

本文主要做了RocketMQ4.8版本的源码主流程分析。

  1. 一二三四五节介绍文件存储部分。

  2. 六七节介绍消息生产流程,包含client端和server端。

  3. 八节介绍消息消费流程。

  4. 九节介绍了传统的主从消息复制模式。

  5. 最后一节介绍了相对较新的基于raft的dleger模式消息处理流程。


一.存储设计

RocketMQ主要的存储文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件。


RocketMQ将所有主题的消息存储在同一个文件中。但由于消息中间件一般是基于主题进行订阅,这样做不方便按照消息主题检索消息。

为了方便消息消费,RocketMQ引入了ConsumeQueue消息队列文件,每个Topic包含多个ConsumeQueue,每一个ConsumeQueue有一个文件。该文件可以看成是Commitlog关于消息消费的“索引”。


此外还有IndexFile索引文件,主要就是为了加速消息的检索,根据消息的属性快速从Commitlog文件中检索消息。

  1. CommitLog:消息存储文件,所有Topic的消息都存储在CommitLog文件中。

  2. ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到ConsumeQueue,供消费者消费。

  3. IndexFile:消息索引文件,主要存储消息Key与Offset(消息在CommitLog中的偏移)的对应关系。


二. 存储文件组织

RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumeQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就是该文件第一条消息对应的全局物理偏移量。


以下为commitlog实际文件截图:

精选文章|RocketMQ源码分析

存储目录

精选文章|RocketMQ源码分析

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。


MappedFile文件映射使用方式如下:

精选文章|RocketMQ源码分析

MappedFileQueue是MappedFile的管理容器,也是对存储目录的封装,例如CommitLog文件的存储路径${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件(MappedFile)。


逻辑关系

精选文章|RocketMQ源码分析

数据结构

精选文章|RocketMQ源码分析


三.存储文件格式

3.1 Commitlog文件格式

精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析

我们来看下按照这种格式来查找指定消息的逻辑。

根据offset查找消息。

  • 根据offset定位到文件

精选文章|RocketMQ源码分析
  • 用offset与文件长度取余得到在文件内的偏移量,从该偏移量读取size长度的内容返回即可

精选文章|RocketMQ源码分析


3.2 Consumequeue文件格式

精选文章|RocketMQ源码分析


为了加速ConsumeQueue消息条目的检索,每一个Consumequeue条目不会存储消息的全量信息,其存储格式如图:

精选文章|RocketMQ源码分析

ConsumeQueue即为Commitlog文件的索引文件,其构建机制是当消息存储到Commitlog文件后,由专门的线程转发消息,从而构建ConsumeQueue。


如下图,将consumeQueue想象成一个元素大小为20字节的大数组的话,startIndex就是这个数组的索引,而真正在文件中查找的时候则需要换算成文件内的字节偏移量(直接乘20)。


以下为根据消费者消费的逻辑偏移量查找consumeQueue条目的逻辑。

精选文章|RocketMQ源码分析


四.消费队列文件的实时更新

ConsumeQueue、IndexFile都是基于CommitLog文件构建的,当生产者提交的消息存储在Commitlog文件中时,ConsumeQueue文件需要及时更新,否则消息无法及时被消费。


RocketMQ通过开启一个线程ReputMessageServcie来实时读取CommitLog文件新增内容,使用reputFromOffset来标记已经追踪到的位置。


主要流程如下:

  • 从当前记录已追踪到的位置reputFromOffset开始读取commitLog数据

  • 将读取到的buffer解析成携带关键信息的DispatchRequest(包含commitLogOffset,msgSize,tagsCode等consumeQueue的必要信息,以用来构建consumeQueue)

精选文章|RocketMQ源码分析
  • doDispatch方法最终会调用putMessagePositionInfo方法,将读取到的信息存入consumeQueue对应文件。

精选文章|RocketMQ源码分析


五.文件刷盘机制

RocketMQ的存储与读写基于内存映射机制(MappedByteBuffer),消息存储时首先将消息追加到内存,再根据配置的刷盘策略在一定时间进行刷写磁盘。


如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法。


如果是异步刷盘,在消息追加到内存后立刻返回给发送端。


RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。

精选文章|RocketMQ源码分析

参数解析:

  1. flushIntervalCommitLog:FlushRealTimeService线程任务运行间隔(默认500ms)。

  2. flushPhysicQueueLeastPages:一次刷写任务至少包含的页数,如果脏页数量不足,小于该参数配置的值,将忽略本次刷写任务,默认4页。

  3. flushPhysicQueueThoroughInterval:两次真实刷写任务最大间隔,默认10s。

注:每间隔10S会把flushPhysicQueueLeastPages强制设置为0,意为强制刷盘,无论脏页数量是否达到。


5.1 异步刷盘主流程

精选文章|RocketMQ源码分析


  1. 单线程死循环,每间隔500ms调用一次flush方法。

  2. flush方法的入参是一次刷写任务至少包含的页数, 如果脏页页数不足,则啥都不做。

  3. flush方法最终会走到 mappedByteBuffer.force()方法。

注:force方法详解

1. jdk 源码中显示force方法最终会调用msync系统调用,并传入MS_SYNC标志位。

精选文章|RocketMQ源码分析

2. MS_SYNC标志位的含义为同步等待刷盘结果。

精选文章|RocketMQ源码分析


六.消息生产 client端流程

生产者消息发送流程主要的步骤:校验消息、查找路由、消息发送(包含异常处理机制)。

精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析


6.1 消息生产总流程

精选文章|RocketMQ源码分析


  1. 获取topic路由信息

  2. 根据策略选择一个messageQueue进行消息发送,默认使用轮询策略

  3. 调用sendKernelImpl方法发送消息,如果发送失败,默认最多重试三次

  4. sendKernelImpl 会调用到mqClient,使用netty发送请求给mq broker

注:

1. 路由信息既指定topic共有多少队列,这些队列分布在哪些broker节点上,方便发送消息时选择具体的broker节点发送rpc请求。

2. 路由信息数据结构如下:

精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析


6.2 消息发送路由选择详解

精选文章|RocketMQ源码分析


  1. 获取路由信息

    1. 如果本地缓存中包含指定topic的路由信息,则直接返回;

    2. 如本地map中不包含指定topic的路由信息,则发送rpc请求到nameserver获取。

  2. 消息队列选择

    1. 查询到的路由信息(队列分布信息)结构如下图。

    2. 选择策略默认使用轮询模式,依次来做消息发送的负载均衡。

精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析


七.消息生产 broker端流程

消息存储代码入口:org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage。

精选文章|RocketMQ源码分析
  1. 检查消息合法性

  2. 调用CommitLog类的putMessage方法

  3. 展开CommitLog类的putMessage

    1. 获取当前可以写入的Commitlog文件

    2. put之前先上独占锁

    3. 调用MappedFile#appendMessage方法

    4. finally块中解锁

精选文章|RocketMQ源码分析

4. 展开MappedFile#appendMessage(核心就是调用MappedByteBuffer.put)

精选文章|RocketMQ源码分析


八.消费者消费

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。


集群模式,Topic下的同一条消息只允许被其中一个消费者消费。


广播模式,Topic下的同一条消息将被集群内的所有消费者消费一次。


集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循的原则:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。


8.1 客户端消息拉取

精选文章|RocketMQ源码分析


RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。

a.从pullRequestQueue中获取一个PullRequest(消息拉取任务),如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入。

b.调用pullMessage方法处理拿到的PullRequest进行消息拉取,PullRequest包含将要消费的messageQueue信息。

精选文章|RocketMQ源码分析

c. 调用pullKernelImpl,传入需要拉取的具体队列、offset、maxNums,拉取完成后需要执行的回调。

精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析

e. 收到响应,执行回调。

  • 将foundList(收到的消息列表)放入processQueue。

  • 调用submitConsumeRequest处理收到的消息体(实际上是将拉取到的消息丢入线程池,做异步化处理)。

  • 将pullRequest重新放回pullRequestQueue,启动下一次pull流程。

注:下图312行,pullRequest的nextOffseet已更新,所以下次拉取的时候会从新的offset开始拉取。

精选文章|RocketMQ源码分析

f. submitConsumeRequest处理流程最终会走到ConsumeRequest的run方法

  • 调用业务方注册的listener

  • 在processConsumeResult中处理消息消费Result、消费失败重试等逻辑

精选文章|RocketMQ源码分析


8.2 服务端处理消息拉取

精选文章|RocketMQ源码分析


入口在PullMessageProcessor#processRequest,该方法核心是DefaultMessageStore#getMessage。

  • 展开DefaultMessageStore#getMessage方法,传入topic、queueId、offset、maxMsgNum。

  • 根据topic,queueId确定consumeQueue文件。

  • 从找到的consumeQueue文件指定offset位置找到第一条‘索引’,读出对应在commitLog中的offset以及size。

  • 用上述读取到的offset和size从commitLog中读到目标消息(此处有相对较多的随机读取)。

  • 依次循环,直到消息数量达到maxMsgNum。

精选文章|RocketMQ源码分析


九. 主从同步(HA)机制

为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费甚至丢失,RocketMQ引入了Broker主备机制。

即消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。


9.1 RocketMQ HA的实现原理

精选文章|RocketMQ源码分析


  1. master启动,并在特定端口上监听slave的连接。

  2. slave主动连接master,master接收客户端的连接,并建立相关TCP连接。

  3. slave主动向master发送待拉取消息偏移量,master解析请求并返回消息给从服务器。

  4. slave保存消息并继续发送新的消息同步请求。

具体实现(HAService)

  • master启动,并在特定端口上监听从服务器的连接。

精选文章|RocketMQ源码分析
  • slave启动主动连接master(connectMaster)

  • slave默认固定每隔5s向master会报当前拉取到的消息偏移量(reportSlaveMaxOffset)

  • 处理master发过来的消息副本(processReadEvent)

  • 继续发送将要拉取的 消息偏移量,等到master响应(reportSlaveMaxOffsetPlus)

精选文章|RocketMQ源码分析

部分步骤详解

  • slave -> master请求详解

    内容很简单,只有将要拉取的offset

精选文章|RocketMQ源码分析
  • master 处理 slave请求详解

    • 读取要拉取的offset

    • 更新slaveAckOffset,用于主从同步复制消息时及时响应给slave

    • 如果是初次收到slave请求,更新slaveRequestOffset

精选文章|RocketMQ源码分析
    • master发送header给slave(调用transferData)(包含接下来将要发送的offset)

    • master从本地commitLog中查询指定offset处的消息,填充进入selectMappedBufferResult

    • master调用transferData写出刚才查到的mq消息给到slave节点

精选文章|RocketMQ源码分析
  • 由于上述流程更新了nextTransferFromWhere,之后就可以不断循环,发送消息给到slave。


十.Dledger模式

从4.5版本开始,rocket引入了一种新的高可用机制,即rocketMq的Dleger模式。


Dleger模式遵从Raft协议http://thesecretlivesofdata.com/raft/


我们以一个集群三台机器为例(包含一主两从)。

精选文章|RocketMQ源码分析
精选文章|RocketMQ源码分析


10.1 dleger的不同之处

精选文章|RocketMQ源码分析


引入dledger后,broker端处理消息发送的流程略有不同:

commitLog存储实现类换成了DledgerCommitLog,核心方法也从mappedFile.appendMessage换成了dLedgerServer.handleAppend,也就是说核心方法不再是把消息put进mappedByteBuffer就ok了。

精选文章|RocketMQ源码分析


10.2 dLedger消息存储主流程

精选文章|RocketMQ源码分析


剖析下dLedgerServer.handleAppend:

判断从节点是不是压力太大响应太慢,如果是则直接拒绝producer发过来的请求

  • 因为理论上三节点组成的cluster至少要有一个从节点响应成功消息才算正常落地。

  • 如果两个从节点都响应慢,则master也无法接口请求,只能报错。

如果从节点正常,则存入master本地的mappedByteBuffer(dLedgerStore.appendAsLeader)。

将刚存入本地的消息 交给dLedgerEntryPusher来负责发送给slave,并等待从节点的响应(waitAck)。

精选文章|RocketMQ源码分析

详解下waitAck:

  • 核心就一条,把msg在commitLog中的索引以及对应的future放入一个map。

注:方便slave响应回来之后,根据响应的index,调用对应index的future,以发送响应给到producer,表示消息发送成功。

精选文章|RocketMQ源码分析


10.3 消息从master到 slave的复制流程

精选文章|RocketMQ源码分析

    master启动时同时启动了2个dispatcher,各自对应一个slave节点。

    精选文章|RocketMQ源码分析

    在dispatcher的主方法中,循环调用doAppend方法,不断复制消息发送到slave。

    精选文章|RocketMQ源码分析

    doAppend负责根据当前已经到达的位置writeIndex,查询并发送消息到从节点(doAppendInner),并更新writeIndex,循环往复。

    精选文章|RocketMQ源码分析

    doAppendInner 先查询index位置的消息,再rpc push消息到从节点。

    在从节点ack回调中,更新对应从节点所达到的水位(updatePeerWaterMark)。

    注:水位即对应从节点已经追加到的index位置。

    精选文章|RocketMQ源码分析

    更新水位时,peerid为从节点的id,三个节点的水位信息统一放在peerWaterMarksByTerm。

    精选文章|RocketMQ源码分析

    master-> slave push消息,收到响应后更新水位,就是这段的总结。


    10.4 消息复制成功后及时响应producer

    精选文章|RocketMQ源码分析


    消息复制成功后及时响应producer,以表示消息发送成功。

    master启动时同样启动了一个QuorumAckChecker线程。

    精选文章|RocketMQ源码分析

    QuorumAckChecker主方法中进行水位(peerWaterMarksByTerm)的处理。

    • checker计算水位map中的数据,算出当前多数节点(两个节点就够了)已经达到的水位(quorumIndex),标志着这些消息已经收到slave的ack,低于这个水位的消息全部都可以给producer响应了。

    注:举例子:

    master append到了index 5,slave 1到了 4, slave 2 到了 3,则quorumIndex计算为4。

    • 处理的内容很简单,从pendingResponse的map中找出低于共识水位(quorumIndex)的消息发送response给到producer(调用future.complete)。

    • 这个future正是上边消息发送流程写完master本地后放入pendingResponse这个map的。

    • 至此整个producer的请求,响应流程已经结束。

    精选文章|RocketMQ源码分析





    精选文章|RocketMQ源码分析

    关注得物技术,携手走向技术的云端

    文|zmm