vlambda博客
学习文章列表

我理解的RocketMQ:主从复制HA(High Availability)原理

我理解的RocketMQ:主从复制HA(high availability)的机制分析

1 概述

主从消息复制方式:RocketMQ中主从同步采用的是主节点主动向从节点发送同步消息,是由一个后台不断运行的线程执行。注意是后台。并不是生产者给主Broker发送消息,主Broker处理接收消息时进行显示调用同步消息给从Broker。

从Broker向主Broker反馈主从消息复制进度:从Broker定时的向主Broker反馈复制消息进度。主Broker便知道从Broker的消息复制进度。这个反馈主要是用来实现主从同步复制。

发送消息同步的方式进行主从消息复制的实现:主Broker在处理接收消息时,去询问从Broker复制消息的进度是否已经到达当前消息所在的偏移量,如果是就返回,否则就进行等待。

下图是整个主从同步时,从Broker和主Broker的交互过程

HA

初始化

  • 副节点:启动HAServiceHAService创建HAClientHAClient是一个不断运行的RocketMQ服务线程,其作用就是与主节点建立连接、接收主节点的同步消息、向主节点同步确认其本地消息偏移量。

  • 主节点:启动HAServiceHAService创建并启动AcceptSocketService,顾名思义,此服务主要就是接受Socket连接。

主从复制消息步骤

  • 第一步:从Broker与主Broker之间建立网络连接,由从Broker主动发起连接 ,从Broker通过HAClient向主节点请求建立TCP连接,主Broker的AcceptSocketService接受连接请求,并建立TCP连接通道SocketChannel,并用HAConnection来进行包装。HAConnection表示主Broker与某个从Broker之间的连接关系,处理了主Broker与从Broker之间的主从同步消息。它里面维护了两个服务:ReadSocketServiceWriteSocketService,前者主要处理网络通道的读事件(从Broker同步复制进度偏移量),后者主要用来处理写事件,也即主Broker向从Broker发送的复制消息。通过此步后,从Broker与主Broker之间就建立网络通道。图中的1、2两个步骤

  • 第二步:从Broker向主Broker发送复制进度。从Broker通过HAClient向主Broker汇报其已经复制的消息偏移量。主Broker通过HAConnection中的 ReadSocketService处理。即图中的3、4、5。

  • 第三步:主Broker向从Broker发送复制消息。主Broker通过WriteSocketService向从Broker发送消息。从Broker通过HAClient处理。即6、7、8、9、10

二三步不断执行下去,便实现了主从复制。

关键点

  • 主Broker向从Broker发送消息的时候都会带上物理偏移量。从Broker接收消息并读取物理偏移量,并与其本地的物理偏移量进行比较,如果相等则存储并继续与主Broker进行通信。如果不相等,则表示主从复制出现了混乱,此时会主动断开与主Broker之间的TCP连接,重新建立一条TCP连接,再次开始主从复制。
  • 使用原生Java NIO网络通信编程。

2 源码分析

2.1 主要类及其作用

HAService:主从复制模块对外的类,开启主从复制功能时,首先创建该类实例,然后通过它创建相关组件。然后通过启动其来启动整个主从复制模块。

AcceptSocketServiceHAService的内部类,主Broker使用,主要用来监听从Broker的网络连接。

GroupTransferServiceHAService的内部类,主Broker使用,主要用来实现发送消息时的主从"同步"复制功能。

HAClientHAService的内部类,从Broker使用,用来向主Broker发起网络连接,处理与主Broker复制消息的事情。

HAConnection:主Broker使用,此类是在主Broker接受从Broker网络连接时,对主从连接主端SocketChannel的封装,一对一的处理与从Broker之间的复制消息事情。

ReadSocketServiceHAConnection的内部类,主Broker使用,向从Broker发送消息。

WriteSocketServiceHAConnection的内部类,主Broker使用,接受从Broker反馈的消息偏移量。

HAService

2.2 主从Broker在启动阶段对主从复制做的准备工作

主/从Broker其实是一样的启动步骤:

  • 实例化主要作用类的对象
  • 启动相应的服务。

2.2.1 实例化主要作用类的对象

创建HAService对象,HAService实例化时创建了AcceptSocketService,GroupTransferServiceHAClient。前两者是主Broker用的,最后者是从Broker用的。

创建HAService对象实例:HAService是在实例化DefaultMessageStore的对象时创建的,后者的构造函数:

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
                            final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig)
 throws IOException 
{
    ...省略
    // 判断是否启用了Dleger模式,也即Dleger模式与传统的主从模式采用的是不同的实现方式
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        this.haService = new HAService(this);
    } else {
        this.haService = null;
    }
    ...省略
}

HAService的构造函数

  public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
      this.defaultMessageStore = defaultMessageStore;
      // 主Broker用来接受从Broker连接的后台线程服务
      // 也即Server中的accept
      this.acceptSocketService =
              new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
      // 提供Broker设置“同步复制”时,发送消息时判断是否已将消息复制给Broker,一个后台服务线程
      this.groupTransferService = new GroupTransferService();
      // 启动从Broker实现主从复制的关键类
      // 作用:与主Broker建立连接;报告从Broker消息复制偏移量;接收处理主Broker发送的消息落盘存储
      this.haClient = new HAClient();
  }

2.2.2 启动主从服务的相关的服务类

主从复制相关服务的启动都是通过HAService来启动的。HAService的启动函数start()是在DefaultMessageStore的启动函数start()调用的。

启动的最终结果:

  • 主Broker:开启监听从Broker的连接, AcceptSocketService服务线程启动
  • 从Broker:HAClient服务线程启动

HAService#start()

public void start() throws Exception {
    // 打开主Broker监听端口进行监听,创建ServerSocketChannel
    // 从Broker将通过此端口来与主Broker进行通信
    this.acceptSocketService.beginAccept();
    // 启动接受连接服务
    this.acceptSocketService.start();
    this.groupTransferService.start();

    // 启动从Broker使用的主从复制的服务客户端
    this.haClient.start();
}

下面主要分析:this.acceptSocketService.beginAccept()this.acceptSocketService.start()this.haClient.start()

主Broker启动监听acceptSocketService#beginAccept(),NIO启动服务端的标准写法。创建ServerSocketChannel,绑定监听端口,设置为非阻塞模式,注册感兴趣的事件OP_ACCEPT。至此主Broker就开始监听从Broker的连接。

public void beginAccept() throws Exception {
    // 打开一个服务端监听通道
    this.serverSocketChannel = ServerSocketChannel.open();
    // 创建一个选择器
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    // 绑定监听端口号
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    // 设置为非阻塞模式
    this.serverSocketChannel.configureBlocking(false);
    // 注册selector, 感兴趣的事件为OP_ACCEPT
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

主Broker启动AcceptSocketService服务线程,acceptSocketService.start(),此方法是RocketMQ中标准的服务线程的使用模式,在该方法中会创建一个线程,并使用该线程运行acceptSocketServicerun()方法,所以只需关注其run()方法。

run()方法是不断循环执行的,主要逻辑就是接受从Broker向主Broker发起的网络连接,获得一个SocketChannel,并将此用HAConnnection进行包装,一对一的处理与从Broker之间的事情。

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 轮询,超时设置为1000毫秒
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();

            if (selected != null) {
                for (SelectionKey k : selected) {
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {

                        // 获得网络通信通道
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                        if (sc != null) {
                            HAService.log.info("HAService receive new connection, "
                                    + sc.socket().getRemoteSocketAddress());

                            try {
                                // 对网络通道进行封装,创建HAConnection
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                // 启动连接
                                conn.start();
                                HAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }

                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

HAClient的启动,其也是RocketMQ中的服务线程,同样只需关注其run()方法。run()方法的主要逻辑就是发起与主Broker之间网络连接,发送从Broker的消息偏移量,接收主Broker的消息。

public void run() {
    log.info(this.getServiceName() + " service started");

    // 一个不停止的线程是会始终向master建立连接
    while (!this.isStopped()) {
        try {
            // 发起与主Broker的连接
            if (this.connectMaster()) {
                // 判断是否到了向主Beoker报告消息偏移量的时间
                if (this.isTimeToReportOffset()) {// 向主Broker报告当前从Broker的消息偏移量
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        // 如果报告不成功,则直接关闭与主Broker之间的网络连接
                        this.closeMaster();
                    }
                }
                // 轮询1000秒
                this.selector.select(1000);

                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }

                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                long interval =
                        HAService.this.getDefaultMessageStore().getSystemClock().now()
                                - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                        .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                            + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}

2.3 从Broker与主Broker之间建立连接

从Broker在启动的时候启动了HAClient这个后台线程,从主从复制中,从Broker与主Broker的通信都是由HAClient来完成的,下面是HAClientrun方法

2.3.1 从Broker向主Broker发起Socket连接

从Broker通过HAClient#connectMaster()方法向主Broker发起Socket连接

private boolean connectMaster() throws ClosedChannelException {
    // 判断当前的网络通道为空
    if (null == socketChannel) {
        // 获取主Broker的ip:port
        String addr = this.masterAddress.get();
        if (addr != null) {
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 向主Broker发起Socket连接
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    // 给Socket通道注册读事件,这个socketChannel主要是用来处理主Broker向从Broker传送的【复制消息】
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        // 当前从Broker的已存储消息的物理偏移量
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

2.3.2 主Broker接受从Broker的Socket连接

主Broker接受从Broker的连接在AcceptSocketServerrun方法中。主要逻辑接受连接得到一个SocketChannel,并用HAConnection进行包装,然后启动HAConnction。启动HAConnection其实就是启动两个服务线程:ReadSocketServiceWriteSocketService

public void run() {
    while (!this.isStopped()) {
      ... 省略代码
      SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
              // 对网络通道进行封装,创建HAConnection
              HAConnection conn = new HAConnection(HAService.this, sc);
              // 启动连接
              conn.start();
    
    ... 省略代码
    }
}

HAConnction的构造函数

public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
    this.haService = haService;
    // 主Broker与从Broker网络通信用的Channel
    this.socketChannel = socketChannel;
    this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
    // 参数配置
    this.socketChannel.configureBlocking(false);
    this.socketChannel.socket().setSoLinger(false, -1);
    this.socketChannel.socket().setTcpNoDelay(true);
    this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
    this.socketChannel.socket().setSendBufferSize(1024 * 64);

    // 创建两个服务线程,分别监听读事件和写事件
    this.writeSocketService = new WriteSocketService(this.socketChannel);
    this.readSocketService = new ReadSocketService(this.socketChannel);
    this.haService.getConnectionCount().incrementAndGet();
}

HAConnection的启动函数start(),启动连个服务线程。

public void start() {
    this.readSocketService.start();
    this.writeSocketService.start();
}

2.4 从Broker向主Broker反馈消息消费进度

从Broker通过HAClientreportSlaveMaxOffset()方法向主Broker报告期消息偏移量

private boolean reportSlaveMaxOffset(final long maxOffset) {
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    // 放进消息偏移量
    this.reportOffset.putLong(maxOffset);
    // 切换回读模式
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            // 向网络通道之中写入数据
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                    + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    // 判断是否还有字节没有写过去
    return !this.reportOffset.hasRemaining();
}

主Broker接收从Broker发送的消息偏移量。主Broker主要通过  ReadSocketService服务线程来处理的。更新HAConnection.this.slaveAckOffset的值。

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    // 空间是否足够?[java的ByteBuffer的使用]
    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip();
        this.processPosition = 0;
    }
    // 判断空间是否足够
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 将通道中的数据读到缓冲区里面
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                  // 网络通信协议,即固定长度为8,此处采用这种固定的长度来解决TCP网络传输的粘包与拆包问题
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    // 读取最后的8个字节
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    this.processPosition = pos;

                    HAConnection.this.slaveAckOffset = readOffset;
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                    }
                    // 通知已经转移成功了一部分消息,且物理偏移量为ackOffset
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

2.5 主Broker向从Broker发送同步消息

同步发送消息采用发送一个消息长度来解决TCP的粘包和拆包的问题。

其消息的格式为:

2.5.1 主Broker向从Broker发送消息

主从复制时,主Broker主动向从Broker发送消息,一个后台运行的服务线程,即HAConnction中的WriteSocketService。其run()放法中执行了些消息的逻辑。

创建消息头,网络发送;获取消息,发送消息。消息时一条一条发送的,每次发送消息的时候都携带了当前的物理偏移量。

public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 轮询1秒
            this.selector.select(1000);

            // 副节点的消息偏移量,等于-1表示刚初始化好,从Broker还没有报告其偏移量
            if (-1 == HAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }

            // 下一次从什么位置转移消息
            if (-1 == this.nextTransferFromWhere) {
                // 如果副节点的物理偏移量为0
                if (0 == HAConnection.this.slaveRequestOffset) {
                    // 获取主节点的物理偏移量
                    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    masterOffset =
                            masterOffset
                                    - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                                    .getMappedFileSizeCommitLog());

                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }

                    this.nextTransferFromWhere = masterOffset;
                } else {
                    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                }

                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                        + "], and slave request " + HAConnection.this.slaveRequestOffset);
            }
            // 上一次写是否已经终止了
            if (this.lastWriteOver) {
                // 上一次拉取的时间间隔
                long interval =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                // 上一次同步的间隔时间是否大于
                if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                        .getHaSendHeartbeatInterval()) {

                    // Build Header
                    // 创建请求头
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(headerSize);
                    // 首先放入的是位移偏移量
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    // 消息长度为0
                    this.byteBufferHeader.putInt(0);
                    // 跳转
                    this.byteBufferHeader.flip();
                    // 传输数据
                    this.lastWriteOver = this.transferData();
                    if (!this.lastWriteOver)
                        continue;
                }
                // 上一次写还没有终止了
            } else {
                // 传输数据
                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }
            // 获取到偏移量处的消息
            SelectMappedBufferResult selectResult =
                    HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            if (selectResult != null) {
                int size = selectResult.getSize();
                if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }

                long thisOffset = this.nextTransferFromWhere;
                this.nextTransferFromWhere += size;

                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;

                // Build Header
                this.byteBufferHeader.position(0);
                // 消息头大小
                this.byteBufferHeader.limit(headerSize);
                // 偏移量
                this.byteBufferHeader.putLong(thisOffset);
                // 消息总大小
                this.byteBufferHeader.putInt(size);
                // 跳转
                this.byteBufferHeader.flip();
                // 传输数据
                this.lastWriteOver = this.transferData();
            } else {

                HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {

            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }

    this.makeStop();

    readSocketService.makeStop();

    haService.removeConnection(HAConnection.this);

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

2.5.2 从Broker接收消息

从Broker接收消息,是通过HAClient中的run()方法

public void run() {
    log.info(this.getServiceName() + " service started");

    // 一个不停止的线程是会始终向master建立连接
    while (!this.isStopped()) {
        try {
            // 发起与主Broker的连接
            if (this.connectMaster()) {
                // 判断是否到了向主Beoker报告消息偏移量的时间
                if (this.isTimeToReportOffset()) {// 向主Broker报告当前从Broker的消息偏移量
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        // 如果报告不成功,则直接关闭与主Broker之间的网络连接
                        this.closeMaster();
                    }
                }
                // 轮询1000秒
                this.selector.select(1000);

                // 真正的处理读事件
                boolean ok = this.processReadEvent();
                // 没有写消息成功直接关闭连接
                // 这里很重要,没有写读成功将就直接关闭了
                if (!ok) {
                    this.closeMaster();
                }
                ...省略代码
    }

    log.info(this.getServiceName() + " service end");
}
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    // 判断是否还有空间可写
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 通道中的数据写到缓冲区
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                // 分发的结果
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }

    return true;
}
private boolean dispatchReadRequest() {
    // 消息头大小
    final int msgHeaderSize = 8 + 4// phyoffset + size
    // 读的位置
    int readSocketPos = this.byteBufferRead.position();

    while (true) {
        // 接收的数据差
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        // 收到的数据长度大于拟定的消息头大小 8
        // 此if中的读缓冲区的消息都没有重置position
        if (diff >= msgHeaderSize) {
            // 读取主的物理偏移量, 主的偏移量必须和副节点的偏移量必须相等,否则就不写入消息 8个字节
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            // 消息体的大小 4 个字节 Int类型
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
            // 副节点的最大物理偏移量
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            // 当副节点的最大物理偏移量不等于主节点的物理偏移量时,就返回false
            if (slavePhyOffset != 0) {
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                            + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }

            if (diff >= (msgHeaderSize + bodySize)) {
                byte[] bodyData = new byte[bodySize];
                // 重置缓冲区的大小
                this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
                // 读物数据
                this.byteBufferRead.get(bodyData);
                // 存储消息,开始位置为masterPhyOffset
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                // 重置到读的位置
                this.byteBufferRead.position(readSocketPos);
                // 设置已经分发的位置
                this.dispatchPosition += msgHeaderSize + bodySize;

                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }

                continue;
            }
        }

        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}