vlambda博客
学习文章列表

解读RocketMQ对RPC的设计实现原理

01
背景


基于RokcetMQ可以实现异步处理、流量削锋、业务解耦,通常是依赖RocketMQ的发布订阅模式。今天分享RocketMQ的新特性,基于Request/Reply模式来实现RPC的功能。该模式在4.6版本作为RocketMQ的新特性引入,但在4.7.0版本上是有bug存在的,所以使用的话需要依赖于4.7.1以上的版本。


02
设计思路


从整个数据流的角度上来说,发布/订阅模式中生产者和消费者之间是异步处理的,生产者负责把消息投递到RocketMQ上,消费者负责处理数据,如果把生产者当作客户端,消费者看成服务端,那么上下游之间是没有任何的状态交流的。而我们知道,RPC客户端和服务端之间是有状态交互的,所以要想实现RPC功能,在整个数据链路上,需要把原先的异步模式改成同步模式。


异步模式:

把异步模式换成同步模式,需要在生产者发送消息到MQ之后,保持原有的状态,比如可以用一个Map集合去统一维护,等到消费者处理完数据返回响应后,再从Map集合中拿到对应的请求进行处理。其中涉及到怎么去标识一个请求,这里可以用UUID或者雪花id去标记。


同步模式:

解读RocketMQ对RPC的设计实现原理

RocketMQ整体的处理思路跟上面是类似的,DefaultMQProducerImpl#request负责RPC消息的下发,而DefaultMQPushConsumer负责消息的消费。具体用法可以看RocketMQ源码example中的RPC部分。

解读RocketMQ对RPC的设计实现原理


03
结构定义


RocketMQ是依赖于Message的Properties来区分不同的请求,在调用DefaultMQProducerImpl#request进行消息下发之间会先给消息设置不同的属性,通过属性来保证上下游之间的处理是同一个请求。


设置的属性有:

CORRELATION_ID:消息的标识Id,这里对应是一个UUIDREPLY_TO_CLIENT:消息下发的客户端IdTTL:消息下发的超时时间,单位ms

其实就类似于HTTP请求中的头部内容一样,通过这些属性来维护一个请求。

解读RocketMQ对RPC的设计实现原理

之后还会校验一下消息中对应Topic的一个合法性。


04
请求下发


RocketMQ将下发的客户端封装成RequestResponseFuture,包含客户端Id,请求超时时间,同时根据客户端Id维护在ConcurrentHashMap,调用DefaultMQProducerImpl#sendDefaultImpl下发消息,根据下发消息的回调函数确认消息下发的状态。

解读RocketMQ对RPC的设计实现原理

消息下发后会调用waitResponse,waitResponse调用CountDownLatch进入阻塞状态,等待消息消费之后的响应。

解读RocketMQ对RPC的设计实现原理

CountDownLatch中的计数器是1,每个请求都是独立隔离阻塞。

解读RocketMQ对RPC的设计实现原理


05
请求响应


当服务端(消费者)收到消息处理完返回响应时,会调用ReplyMessageProcessor#pushReplyMessage封装响应的内容,处理响应的头部信息和返回body的参数,最终封装成一个PUSH_REPLY_MESSAGE_TO_CLIENT的请求命令重新投递回MQ。

解读RocketMQ对RPC的设计实现原理

客户端收到请求响应后,会调用ClientRemotingProcessor#processRequest,如果指令PUSH_REPLY_MESSAGE_TO_CLIENT调用receiveReplyMessage,将接收到的数据封装成新的消息,接着交给响应处理器去处理。

ClientRemotingProcessor#processReplyMessage中主要做的从消息中获取消息的Id,从ConcurrentHashMap中定位到具体的请求,将返回消息封装到RequestResponseFuture中,同时CountDownLatch的计数值减1,此时线程阻塞状态被释放,之后便将消息响应回给到客户端。


06
总结


所以整体上看来,RocketMQ的Request/Reply模式,其实是利用客户端线程阻塞来换取请求异步变同步以及RocketMQ的回调机制从而间接的实现了RPC效果,但是相比直接RPC调用,数据的链路更长,性能肯定是会有损耗,但是请求会持久化,所以给了重复下发提供了可能。