我理解的RocketMQ:主从复制HA(High Availability)原理
我理解的RocketMQ:主从复制HA(high availability)的机制分析
1 概述
主从消息复制方式:RocketMQ中主从同步采用的是主节点主动向从节点发送同步消息,是由一个后台不断运行的线程执行。注意是后台。并不是生产者给主Broker发送消息,主Broker处理接收消息时进行显示调用同步消息给从Broker。
从Broker向主Broker反馈主从消息复制进度:从Broker定时的向主Broker反馈复制消息进度。主Broker便知道从Broker的消息复制进度。这个反馈主要是用来实现主从同步复制。
发送消息同步的方式进行主从消息复制的实现:主Broker在处理接收消息时,去询问从Broker复制消息的进度是否已经到达当前消息所在的偏移量,如果是就返回,否则就进行等待。
下图是整个主从同步时,从Broker和主Broker的交互过程
初始化
-
副节点:启动
HAService
,HAService
创建HAClient
,HAClien
t是一个不断运行的RocketMQ服务线程,其作用就是与主节点建立连接、接收主节点的同步消息、向主节点同步确认其本地消息偏移量。 -
主节点:启动
HAService
,HAService
创建并启动AcceptSocketService
,顾名思义,此服务主要就是接受Socket连接。
主从复制消息步骤
-
第一步:从Broker与主Broker之间建立网络连接,由从Broker主动发起连接 ,从Broker通过
HAClient
向主节点请求建立TCP连接,主Broker的AcceptSocketService
接受连接请求,并建立TCP连接通道SocketChannel
,并用HAConnection
来进行包装。HAConnection
表示主Broker与某个从Broker之间的连接关系,处理了主Broker与从Broker之间的主从同步消息。它里面维护了两个服务:ReadSocketService
和WriteSocketService
,前者主要处理网络通道的读事件(从Broker同步复制进度偏移量),后者主要用来处理写事件,也即主Broker向从Broker发送的复制消息。通过此步后,从Broker与主Broker之间就建立网络通道。图中的1、2两个步骤 -
第二步:从Broker向主Broker发送复制进度。从Broker通过
HAClient
向主Broker汇报其已经复制的消息偏移量。主Broker通过HAConnection
中的ReadSocketService
处理。即图中的3、4、5。 -
第三步:主Broker向从Broker发送复制消息。主Broker通过
WriteSocketService
向从Broker发送消息。从Broker通过HAClient
处理。即6、7、8、9、10
二三步不断执行下去,便实现了主从复制。
关键点
-
主Broker向从Broker发送消息的时候都会带上物理偏移量。从Broker接收消息并读取物理偏移量,并与其本地的物理偏移量进行比较,如果相等则存储并继续与主Broker进行通信。如果不相等,则表示主从复制出现了混乱,此时会主动断开与主Broker之间的TCP连接,重新建立一条TCP连接,再次开始主从复制。 -
使用原生Java NIO网络通信编程。
2 源码分析
2.1 主要类及其作用
HAService
:主从复制模块对外的类,开启主从复制功能时,首先创建该类实例,然后通过它创建相关组件。然后通过启动其来启动整个主从复制模块。
AcceptSocketService
:HAService
的内部类,主Broker使用,主要用来监听从Broker的网络连接。
GroupTransferService
:HAService
的内部类,主Broker使用,主要用来实现发送消息时的主从"同步"复制功能。
HAClient
:HAService
的内部类,从Broker使用,用来向主Broker发起网络连接,处理与主Broker复制消息的事情。
HAConnection
:主Broker使用,此类是在主Broker接受从Broker网络连接时,对主从连接主端SocketChannel
的封装,一对一的处理与从Broker之间的复制消息事情。
ReadSocketService
:HAConnection
的内部类,主Broker使用,向从Broker发送消息。
WriteSocketService
:HAConnection
的内部类,主Broker使用,接受从Broker反馈的消息偏移量。
2.2 主从Broker在启动阶段对主从复制做的准备工作
主/从Broker其实是一样的启动步骤:
-
实例化主要作用类的对象 -
启动相应的服务。
2.2.1 实例化主要作用类的对象
创建HAService
对象,HAService
实例化时创建了AcceptSocketService
,GroupTransferService
和HAClient
。前两者是主Broker用的,最后者是从Broker用的。
创建HAService对象实例:HAService是在实例化DefaultMessageStore
的对象时创建的,后者的构造函数:
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
...省略
// 判断是否启用了Dleger模式,也即Dleger模式与传统的主从模式采用的是不同的实现方式
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
...省略
}
HAService
的构造函数
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 主Broker用来接受从Broker连接的后台线程服务
// 也即Server中的accept
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
// 提供Broker设置“同步复制”时,发送消息时判断是否已将消息复制给Broker,一个后台服务线程
this.groupTransferService = new GroupTransferService();
// 启动从Broker实现主从复制的关键类
// 作用:与主Broker建立连接;报告从Broker消息复制偏移量;接收处理主Broker发送的消息落盘存储
this.haClient = new HAClient();
}
2.2.2 启动主从服务的相关的服务类
主从复制相关服务的启动都是通过HAService
来启动的。HAService
的启动函数start()
是在DefaultMessageStore
的启动函数start()
调用的。
启动的最终结果:
-
主Broker:开启监听从Broker的连接, AcceptSocketService
服务线程启动 -
从Broker:HAClient服务线程启动
HAService#start()
:
public void start() throws Exception {
// 打开主Broker监听端口进行监听,创建ServerSocketChannel
// 从Broker将通过此端口来与主Broker进行通信
this.acceptSocketService.beginAccept();
// 启动接受连接服务
this.acceptSocketService.start();
this.groupTransferService.start();
// 启动从Broker使用的主从复制的服务客户端
this.haClient.start();
}
下面主要分析:this.acceptSocketService.beginAccept()
,this.acceptSocketService.start()
和this.haClient.start()
主Broker启动监听acceptSocketService#beginAccept()
,NIO启动服务端的标准写法。创建ServerSocketChannel
,绑定监听端口,设置为非阻塞模式,注册感兴趣的事件OP_ACCEPT
。至此主Broker就开始监听从Broker的连接。
public void beginAccept() throws Exception {
// 打开一个服务端监听通道
this.serverSocketChannel = ServerSocketChannel.open();
// 创建一个选择器
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 绑定监听端口号
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 设置为非阻塞模式
this.serverSocketChannel.configureBlocking(false);
// 注册selector, 感兴趣的事件为OP_ACCEPT
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
主Broker启动AcceptSocketService
服务线程,acceptSocketService.start()
,此方法是RocketMQ中标准的服务线程的使用模式,在该方法中会创建一个线程,并使用该线程运行acceptSocketService
的run()
方法,所以只需关注其run()
方法。
其run()
方法是不断循环执行的,主要逻辑就是接受从Broker向主Broker发起的网络连接,获得一个SocketChannel
,并将此用HAConnnection
进行包装,一对一的处理与从Broker之间的事情。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 轮询,超时设置为1000毫秒
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// 获得网络通信通道
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 对网络通道进行封装,创建HAConnection
HAConnection conn = new HAConnection(HAService.this, sc);
// 启动连接
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
HAClient的启动,其也是RocketMQ中的服务线程,同样只需关注其run()
方法。run()
方法的主要逻辑就是发起与主Broker之间网络连接,发送从Broker的消息偏移量,接收主Broker的消息。
public void run() {
log.info(this.getServiceName() + " service started");
// 一个不停止的线程是会始终向master建立连接
while (!this.isStopped()) {
try {
// 发起与主Broker的连接
if (this.connectMaster()) {
// 判断是否到了向主Beoker报告消息偏移量的时间
if (this.isTimeToReportOffset()) {// 向主Broker报告当前从Broker的消息偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
// 如果报告不成功,则直接关闭与主Broker之间的网络连接
this.closeMaster();
}
}
// 轮询1000秒
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
2.3 从Broker与主Broker之间建立连接
从Broker在启动的时候启动了HAClient
这个后台线程,从主从复制中,从Broker与主Broker的通信都是由HAClient
来完成的,下面是HAClient
的run
方法
2.3.1 从Broker向主Broker发起Socket
连接
从Broker通过HAClient#connectMaster()
方法向主Broker发起Socket
连接
private boolean connectMaster() throws ClosedChannelException {
// 判断当前的网络通道为空
if (null == socketChannel) {
// 获取主Broker的ip:port
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 向主Broker发起Socket连接
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 给Socket通道注册读事件,这个socketChannel主要是用来处理主Broker向从Broker传送的【复制消息】
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 当前从Broker的已存储消息的物理偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
2.3.2 主Broker接受从Broker的Socket
连接
主Broker接受从Broker的连接在AcceptSocketServer
的run
方法中。主要逻辑接受连接得到一个SocketChannel
,并用HAConnection
进行包装,然后启动HAConnction
。启动HAConnection
其实就是启动两个服务线程:ReadSocketService
和WriteSocketService
。
public void run() {
while (!this.isStopped()) {
... 省略代码
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
// 对网络通道进行封装,创建HAConnection
HAConnection conn = new HAConnection(HAService.this, sc);
// 启动连接
conn.start();
... 省略代码
}
}
HAConnction
的构造函数
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
// 主Broker与从Broker网络通信用的Channel
this.socketChannel = socketChannel;
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
// 参数配置
this.socketChannel.configureBlocking(false);
this.socketChannel.socket().setSoLinger(false, -1);
this.socketChannel.socket().setTcpNoDelay(true);
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
this.socketChannel.socket().setSendBufferSize(1024 * 64);
// 创建两个服务线程,分别监听读事件和写事件
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
HAConnection
的启动函数start()
,启动连个服务线程。
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}
2.4 从Broker向主Broker反馈消息消费进度
从Broker通过HAClient
的reportSlaveMaxOffset()
方法向主Broker报告期消息偏移量
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 放进消息偏移量
this.reportOffset.putLong(maxOffset);
// 切换回读模式
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
// 向网络通道之中写入数据
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
// 判断是否还有字节没有写过去
return !this.reportOffset.hasRemaining();
}
主Broker接收从Broker发送的消息偏移量。主Broker主要通过 ReadSocketService
服务线程来处理的。更新HAConnection.this.slaveAckOffset的值。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 空间是否足够?[java的ByteBuffer的使用]
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
// 判断空间是否足够
while (this.byteBufferRead.hasRemaining()) {
try {
// 将通道中的数据读到缓冲区里面
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 网络通信协议,即固定长度为8,此处采用这种固定的长度来解决TCP网络传输的粘包与拆包问题
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 读取最后的8个字节
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 通知已经转移成功了一部分消息,且物理偏移量为ackOffset
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
2.5 主Broker向从Broker发送同步消息
同步发送消息采用发送一个消息长度来解决TCP的粘包和拆包的问题。
其消息的格式为:
2.5.1 主Broker向从Broker发送消息
主从复制时,主Broker主动向从Broker发送消息,一个后台运行的服务线程,即HAConnction
中的WriteSocketService
。其run()
放法中执行了些消息的逻辑。
创建消息头,网络发送;获取消息,发送消息。消息时一条一条发送的,每次发送消息的时候都携带了当前的物理偏移量。
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 轮询1秒
this.selector.select(1000);
// 副节点的消息偏移量,等于-1表示刚初始化好,从Broker还没有报告其偏移量
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 下一次从什么位置转移消息
if (-1 == this.nextTransferFromWhere) {
// 如果副节点的物理偏移量为0
if (0 == HAConnection.this.slaveRequestOffset) {
// 获取主节点的物理偏移量
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 上一次写是否已经终止了
if (this.lastWriteOver) {
// 上一次拉取的时间间隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 上一次同步的间隔时间是否大于
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
// 创建请求头
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
// 首先放入的是位移偏移量
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
// 消息长度为0
this.byteBufferHeader.putInt(0);
// 跳转
this.byteBufferHeader.flip();
// 传输数据
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 上一次写还没有终止了
} else {
// 传输数据
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 获取到偏移量处的消息
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
// 消息头大小
this.byteBufferHeader.limit(headerSize);
// 偏移量
this.byteBufferHeader.putLong(thisOffset);
// 消息总大小
this.byteBufferHeader.putInt(size);
// 跳转
this.byteBufferHeader.flip();
// 传输数据
this.lastWriteOver = this.transferData();
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
2.5.2 从Broker接收消息
从Broker接收消息,是通过HAClient
中的run()
方法
public void run() {
log.info(this.getServiceName() + " service started");
// 一个不停止的线程是会始终向master建立连接
while (!this.isStopped()) {
try {
// 发起与主Broker的连接
if (this.connectMaster()) {
// 判断是否到了向主Beoker报告消息偏移量的时间
if (this.isTimeToReportOffset()) {// 向主Broker报告当前从Broker的消息偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
// 如果报告不成功,则直接关闭与主Broker之间的网络连接
this.closeMaster();
}
}
// 轮询1000秒
this.selector.select(1000);
// 真正的处理读事件
boolean ok = this.processReadEvent();
// 没有写消息成功直接关闭连接
// 这里很重要,没有写读成功将就直接关闭了
if (!ok) {
this.closeMaster();
}
...省略代码
}
log.info(this.getServiceName() + " service end");
}
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 判断是否还有空间可写
while (this.byteBufferRead.hasRemaining()) {
try {
// 通道中的数据写到缓冲区
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
// 分发的结果
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
private boolean dispatchReadRequest() {
// 消息头大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
// 读的位置
int readSocketPos = this.byteBufferRead.position();
while (true) {
// 接收的数据差
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 收到的数据长度大于拟定的消息头大小 8
// 此if中的读缓冲区的消息都没有重置position
if (diff >= msgHeaderSize) {
// 读取主的物理偏移量, 主的偏移量必须和副节点的偏移量必须相等,否则就不写入消息 8个字节
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 消息体的大小 4 个字节 Int类型
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// 副节点的最大物理偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 当副节点的最大物理偏移量不等于主节点的物理偏移量时,就返回false
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
// 重置缓冲区的大小
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
// 读物数据
this.byteBufferRead.get(bodyData);
// 存储消息,开始位置为masterPhyOffset
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
// 重置到读的位置
this.byteBufferRead.position(readSocketPos);
// 设置已经分发的位置
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}