vlambda博客
学习文章列表

消息队列系列(三)producer生产消息源码解析



前边又一段时间没有继续深入消息队列这个模块了,今天这篇文章我们将深入研究下RocketMQ底层源码是如何启动生产者的。

首先我们来看RocketMQ启动一个消息生产者应用的demo代码:

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer defaultMQProducer = new DefaultMQProducer(GROUP_NAME,false); defaultMQProducer.setNamesrvAddr(NAME_SERVER); defaultMQProducer.setSendMsgTimeout(SEND_TIME_OUT); defaultMQProducer.setVipChannelEnabled(false); //启动的核心方法在这里 defaultMQProducer.start(); System.out.println("==============请选择需要发送堆消息=============="); while (true){ Scanner scanner = new Scanner(System.in); String nextLine = scanner.nextLine(); if("A".equals(nextLine)){ sendTopicA(); } } }

注意这里我所使用的RocketMQ版本是4.8.0版本。下边我们开始深入到源代码进行分析:

 @Override public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup));        //这里是核心的方法区,深入到其中 this.defaultMQProducerImpl.start(); if (null != traceDispatcher) {            try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }

接着是第二部分的深入环节:

 public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST:                //首先这里会将状态先设置为启动失败,等到服务完全启动之后才修改为启动成功 this.serviceState = ServiceState.START_FAILED;                //一些配置的检查,例如groupName的一些长度限制,是否有非法字符等 this.checkConfig();  if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); }                //创建一个MQ的客户端工厂,主要会和nameserver打交道,                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }                //将一些topic信息存放到这个map集合中 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) { //真正启动客户端类 mQClientFactory.start(); }
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel());                //将之前的状态同步修改 this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { RequestFutureTable.scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000); }

接下来又是一个start函数了,这次是客户端的启动函数:

public void start() throws MQClientException {
synchronized (this) { switch (this.serviceState) { case CREATE_JUST: //又是先声明当前的服务状态为启动失败 this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server                    if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel                    //这里是启动netty的应用,让本地的服务和远程的netty server建立一个channel通道 this.mQClientAPIImpl.start(); // Start various schedule tasks                    //启动了一个定时任务,内部包含了许多额外的定时任务 this.startScheduledTask(); // Start pull service                    //启动拉取消息的后台线程 this.pullMessageService.start(); // Start rebalance service // 启动Rebalance负载均衡服务 this.rebalanceService.start(); // Start push service                    //这里又一次触发了start函数,但是入参事false,所以不会重复执行启动核心部分,但是看不懂为什么要这么设计 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); //修改状态为已经启动                    this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }

可以看到这个start函数的内部将许多个实现步骤都抽象成了一个个独立的函数,所以很方便我们开发者们去阅读。下边我们将重点抽取几个函数进行介绍。

this.mQClientAPIImpl.start()

底层代码如下:

 public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); //这里是启动netty应用 Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } });
this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000);
if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }

这个函数整体风格和普通的Netty应用书写风格没有太多差别,目前来看似乎只是和nameserver建立了链接的样子。


this.startScheduledTask();

这个函数的内部包含的内容就非常多了。

 private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) {        //启动定时任务,每隔2分钟去校验nameserver的地址        //思考:什么条件下nameserver地址为空? this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); }         //每间30秒就会从服务器上拉取下topic的路有信息        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);        //每隔30秒清理下线的broker信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);        //每隔5秒会去同步一下所有队列的消费进度 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);        //每隔1分钟检测线程池大小是否需要调整 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }

从源代码中引出了一些可以深入思考的点:


  1. nameserver配置会有一个读取的优先顺序,首先是我们的配置文件中是否有设置。如果没有则会进入到系统变量中进行查询。关于这部分的先后顺序可以自行查阅该位置的代码:


消息队列系列(三)producer生产消息源码解析


http://jmenv.tbsite.net:8080/rocketmq/nsaddr


个场景可以通过debug来进行判断。

this.pullMessageService.start();

这里是开启一个拉消息的服务,关于拉取消息的细节模块,我会在下一讲解中深入探讨不同的消息拉取模式的特点。

this.rebalanceService.start();

这个是客户端重平衡算法的一个核心步骤,细节点也是比较多,后边会专门整理一个章节进行讲解。


小结

整体流程图如下所示: