从源码分析RocketMQ不保证幂等的三个原因
阅读本文大概需要 5 分钟
-
前言 -
1.场景 -
1.1. 疑问点 -
1.2. 日志追踪 -
1.3.定位问题 -
2.问题点 -
2.1. 批量消费消息问题
-
2.2. 更新位点(offset)问题 -
2.3.消费超时时间问题 -
3.总结 -
总结
前言
目前公司主要业务是跨境物流方面的,我们部门主要负责包裹全中心对接的业务,通俗点讲就是实现包裹在各个系统的流转。而我负责的模块主要是包裹全链路路径的需求,这部分更注重业务上的处理,因此对于业务上的产生的数据要保证正确性。
1. 场景
最近生产环境中测试发现包裹的链路数据出现了一些重复的数据,主要体现在一个事件产生了多条重复的数据,从这个问题点开始追踪问题所在。
追踪问题如下:
1.1. 疑问点
首先怀疑的问题点就是幂等问题,因此我们业务为了保证时效性都大部分都通过MQ做了异步处理,因此可能会由于MQ出现重复消费导致的。
1.2. 日志追踪
通过查看线上日志,可以看到同一个messageId被消费了多次,日志记录出现重复消费。
1.3. 定位问题
问题找到了就是由于mq重复消费导致,然后通过mq控制台查看该消息的轨迹,知道该消息的消费时间超过了消息消费默认的时常(默认15分钟),因此消息服务器认为消费未成功,又重新推送了消息。
因此问题点定位到了接下来就是做业务上的幂等处理。
但是这里我还是刨根问底的去翻了源码,看了mq如何处理幂等问题的,真被我找到了RocketMQ不保证幂等的原因。
2. 问题点
通过查看源码知道RocketMQ处理消息的消费情况主要通过一个叫做位点(offset)来实现的,首先看一下RocketMQ的消息在broker中的情况。
broker中都会存在一个CommitLog用来绑定每个消息的消费情况,每个消息在CommitLog上都是通过一个offset进行控制状态的,因此RocketMQ消费成功一条消息后就会在CommitLog中将该消息的offset进行移位。因此RocketMQ主要通过offset进行控制消息的消费。
那么通过源码找到不保证幂等的问题点在哪里呢?
主要有两个方面:
2.1. 批量消费消息
源码如下:
源码中Action action = listener.consume(msgList, context);该源码朔源可以看到是操作的消息列表,因此这批消息的消费情况只通过一个action标识,因此批量消息更新消费状态时可能出现牵一发而动全身的问题,假设一批消息中有10条消息,其中一条消息消费失败,那么action的返回值依然是RECONSUME_LATER,因此导致这批10条消息都会被重推,从而导致重复消费。
这种批量消费消息的场景在我们实际业务中并没有使用,实际业务中我们最终调用的是ConsumeMessageConcurrentlyService实现类!
2.2. 更新位点(offset)问题
源码如下:
因此通过源码可以看到由于RocketMQ取未消费成功的offset最小值,那么如果有10条消息,1-6消费成功,但是6消费失败,7-10消费成功,那么RocketMQ更新offset时会更新为6,导致7-10消息也被重新推送,造成消息重复消费。
这样的问题出现从我的理解上为了保证消息不丢失做的兼容。
2.3. 消费超时问题
消费时间超时应该是比较常见的消息重复消费的原因,消费超时最根本的问题还是在于业务逻辑的处理,因此如果出现消费超时第一时间就应该从业务逻辑上做优化,而不是改动消息消费的超时时间。
我负责的模块出现消费超时主要原因是业务逻辑中处理大批量数据,在处理大批量数据时并没有做多线程的优化以及mq的异步优化,目前已优化mq异步的方式,减少消费超时的问题。
那么RocketMQ的消费超时时间是如何设定的?
源码如下:
RocketMQ官网也提供了消费者的超时时间,因此在消费消息时若超过默认的15分钟,那么RocketMQ就会把该消息的状态设置为TIMEOUT,在RocketMQ中只要消息状态不是SUCCESS,那么都是需要消息服务器重推消息,从而造成消息的重复消费。
3. 总结
以上就是探寻RocketMQ的过程,也了解了RocketMQ的消费过程以及如何控制消息状态的逻辑。
知其然知其所以然!!
参考资料:
1、RocketMQ源码