消息队列系列(三)producer生产消息源码解析
前边又一段时间没有继续深入消息队列这个模块了,今天这篇文章我们将深入研究下RocketMQ底层源码是如何启动生产者的。
首先我们来看RocketMQ启动一个消息生产者应用的demo代码:
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(GROUP_NAME,false);
defaultMQProducer.setNamesrvAddr(NAME_SERVER);
defaultMQProducer.setSendMsgTimeout(SEND_TIME_OUT);
defaultMQProducer.setVipChannelEnabled(false);
//启动的核心方法在这里
defaultMQProducer.start();
System.out.println("==============请选择需要发送堆消息==============");
while (true){
Scanner scanner = new Scanner(System.in);
String nextLine = scanner.nextLine();
if("A".equals(nextLine)){
sendTopicA();
}
}
}
注意这里我所使用的RocketMQ版本是4.8.0版本。下边我们开始深入到源代码进行分析:
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
//这里是核心的方法区,深入到其中
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
接着是第二部分的深入环节:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
//首先这里会将状态先设置为启动失败,等到服务完全启动之后才修改为启动成功
this.serviceState = ServiceState.START_FAILED;
//一些配置的检查,例如groupName的一些长度限制,是否有非法字符等
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//创建一个MQ的客户端工厂,主要会和nameserver打交道,
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//将一些topic信息存放到这个map集合中
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
//真正启动客户端类
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//将之前的状态同步修改
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000);
}
接下来又是一个start函数了,这次是客户端的启动函数:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
//又是先声明当前的服务状态为启动失败
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
//这里是启动netty的应用,让本地的服务和远程的netty server建立一个channel通道
this.mQClientAPIImpl.start();
// Start various schedule tasks
//启动了一个定时任务,内部包含了许多额外的定时任务
this.startScheduledTask();
// Start pull service
//启动拉取消息的后台线程
this.pullMessageService.start();
// Start rebalance service
// 启动Rebalance负载均衡服务
this.rebalanceService.start();
// Start push service
//这里又一次触发了start函数,但是入参事false,所以不会重复执行启动核心部分,但是看不懂为什么要这么设计
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
//修改状态为已经启动
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
可以看到这个start函数的内部将许多个实现步骤都抽象成了一个个独立的函数,所以很方便我们开发者们去阅读。下边我们将重点抽取几个函数进行介绍。
this.mQClientAPIImpl.start()
底层代码如下:
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
//这里是启动netty应用
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
this.timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
这个函数整体风格和普通的Netty应用书写风格没有太多差别,目前来看似乎只是和nameserver建立了链接的样子。
this.startScheduledTask();
这个函数的内部包含的内容就非常多了。
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
//启动定时任务,每隔2分钟去校验nameserver的地址
//思考:什么条件下nameserver地址为空?
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//每间30秒就会从服务器上拉取下topic的路有信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
//每隔30秒清理下线的broker信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
//每隔5秒会去同步一下所有队列的消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//每隔1分钟检测线程池大小是否需要调整
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
从源代码中引出了一些可以深入思考的点:
nameserver配置会有一个读取的优先顺序,首先是我们的配置文件中是否有设置。如果没有则会进入到系统变量中进行查询。关于这部分的先后顺序可以自行查阅该位置的代码:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
这个场景可以通过debug来进行判断。
this.pullMessageService.start();
这里是开启一个拉消息的服务,关于拉取消息的细节模块,我会在下一讲解中深入探讨不同的消息拉取模式的特点。
this.rebalanceService.start();
这个是客户端重平衡算法的一个核心步骤,细节点也是比较多,后边会专门整理一个章节进行讲解。
小结
整体流程图如下所示: