vlambda博客
学习文章列表

消息队列漫谈:消息丢失,消息重复,消息积压一些处理方式

说到消息队列对于不太熟悉它的原理的一些同学经常会遇到这样那样的坑,面试也会遇到一些,总结下来也就是消息丢失,消息重复,消息积压这三大类,只要了解它的原理这些问题我们就能处理个七七八八了。我们知道一条消息大致都要经历这么几个过程:生产者产生消息-->消息队列存储消息-->消费者消费消息。这个很重要,我们后面会围绕这三个过程来分析这些问题发生的原因与解决办法。

消息丢失

关于消息丢失这个问题,现在的这些消息队列中间件只要你正确配置基本都不会丢消息,但是万一出现了这种情况,我们需要知道从哪些地方着手来解决问题。一般消息丢失我们可以分为两大类:一类是网络丢失,还有一类是磁盘丢失。那么我们还是从上面的消息队列三个主要过程来分析:

  1. 生产者把消息从Producer生产出来,经过网络发送给Broker端(生产阶段):在这个阶段我们常用的保障手段就是,当Broker收到消息后给Producer一个确认响应(就是大家常说的ACK),如果Producer没有收到正确的响应,Producer直到收到Broker的确认响应后才会停止重试消息发送。这里需要注意的点是,根据自己业务场景有时候要对ACK做一些处理,消息队列大都提供了自动ACK,再需要手动确认ACK的时候我们需要关闭掉这一默认设置。还有就是需要在我们的代码处做好异常处理,尤其要注意在异步发送的回调中检查发送结果。这一就可以保证消息队列在这个阶段数据不会丢失。
  2. Broker收到来自Producer的消息,持久化到磁盘(存储阶段):在这个阶段只要Broker正常工作基本上都能序列化正常,但是在集群化的Broker中发生概率还是比较高的。因此我们要对刷盘机制根据情况做一些调整,在单节点的情况下,我们可以设置当Broker接收到消息后先刷盘,当刷盘成功后再给Producer成功响应。这样即使Broker宕机,由于消息已经写入磁盘,重启后会继续处理,这里就会引入一个新的问题我们接下来会说到(消息重复)。例如设置RocketMQ中flushDiskType=SYNC_FLUSH(同步刷盘)
  3. 消费者通过网络,从Broker上读取消息(消费阶段):在这个阶段我们需要注意的一定要当消费者处理完你自身业务逻辑后给Broker发送消费确认。否则当你收到消息就给Broker确认消费,此时Broker认为消费者消费成功,将消息从Broker队列中移除,当本地逻辑处理异常时就很有可能丢掉这个消息。

消息重复

上面我们说到了消息丢失的处理方式,大家有没有发现有好几个地方是有问题呢?问题就是:当我们保证了不丢消息的同时,又引入了消息重复的问题。在一定情况下当消费者消费成功后再ack时异常或者超,Broker此时会认为此消息未消费成功,进入重复消费阶段。其实现在解决这个问题就显得非常简单了,我们不管是生产者重复,还是Broker重复我们只要在消费端保证幂等性就可以(任意多次执行所产生的影响均与一次执行的影响相同称为幂等操作),当然我们这里说的是少量的因为系统异常导致的消息重复,因为业务异常出现大量重复消息,虽然也可以通过这个办法来解决,但是并不是最有效的办法。我们在设计系统时候不管是否会有重复消息,在消费端保证幂等性是我们必须要做的,防患于未然嘛。下面我就介绍几种常用的保证幂等性的方法:

  1. 通过数据库的为唯一键实现幂等:我们可以在设计消息结构的时候设置一个对应数据库唯一键的列字段,业务成功后将此字段作为唯一键报错入数据库。当同样的ID做保存的时候就会出现违反数据库唯一约束异常,这里的主键可以是单独的,也可以是组合的列。这种方式可以在任何支持“INSERT IF NOT EXIST”的存储系统中适用。
  2. 通过版本号/数据快照实现幂等:其实这种方式有点类似于乐观锁的实现方式,就是需要消息中带有此业务当前一个瞬时状态的值,通过这个值与业务当前数据比较来判断是否执行更新操作。比如有一条重复的消息是这样的:将订单号为00001的状态从01变更为02(当订单00001的状态为01就改为02),这样一条重复的消息进来是不会对00001的订单做任何影响的。此时状态就是一个前置条件。再比如:将商品A的库存从500中减1(当商品A的总库存为500则减1)。类似这些需要在设计消息结构的时候带上一些业务属性活数据。另外一种办法就是给消息增加一个类似数据库的version字段,在每次消费更新的时候比较当前数据的版本号是否与消息中带的版本号一致,来判断是否执行消费。3.全局唯一ID:当一条重复的消息发送到不同的消费者时候,貌似上面的办法都不怎么管用的。这就是所谓的分布式集群中出现的重复消息并行问题,你得保证一个健壮的全局唯一发号器,然后在每次操作之前判断此ID是否已经被别的消费者消费过。当然你要没有一个健壮的全局唯一ID发号器,那么我建议你可以通过消息路由规则。将某也业务的消息存储在同一个队列主题中。(比如通过user_id,order_id将消息业务消息分发到同一个Broker的队列中,这样就既利用了集群资源,又将问题回归到上面两张处理方式了)

像上面说到的通过某一种消息发送路由规则这样既能解决消息并行中存在多重问题,而且还能保住消息的业务局部有序性,这种方式值得大家思考下。

消息积压

其实对于一个原本正常的消息系统来说消息积压,只会出现两种情况:要么生产者消息数量增加导致的积压;要么就是消费者消费变慢导致的消息积压。对于一个消息队列我们肯定在上线前就预估好,单节点最大承受流量与系统目前最大峰值流量的数据,一般情况下消息队列收发性能是远大于业务处理性能的,一旦出现的话问题也很显而易见:要么就是流量突然增加,要么就是业务逻辑异常。我能应该从三个方面来查找问题:

  1. 生产端:一般当生产端发生积压(Broker正常的情况下)就要查看你的业务逻辑是否有异常的耗时步骤导致的。是否需要改并行化操作等。
  2. Broker端:当Broker端发生积压我们首先要查看,消息队列内存使用情况,如果有分区的的话还得看每个分区积压的消息数量差异。当每个分区的消息积压数据量相对均匀的话,我们大致可以认为是流量激增。需要在消费端做优化,或者同时需要增加Broker节点(相当于存储扩容),如果分区加压消息数量差异很大的话(有的队列满了,有的队列可能还是空闲状态),我们这时候就要检查我们的路由转发规则是否合理,
  3. 消费端:在使用消息队列的时候大部分的问题都出在消费端,当消费速度小于生产速度很快就会出现积压,导致消息延迟,以至于丢失。这里需要重点说明一点的是,当消费速度小于生产速度的时候,仅增加消费者是没有用处的,因为多个消费者在同一个分区上实际是单线程资源竞争关系(当然还有一些冒险的单队列多消费者并行方式就是:消费者接到消息就ack成功再去处理业务逻辑,这样你就要承受消息丢失的代价),我们需要同时增加Broker上的分区数量才能解决这一问题。

那么上面我们说到消息积压的问题所在,那么遇到这样问题我们怎么能够快速的解决呢?我们需要查看是否有无限重发的消息或者有进入死锁的程序等等,当确定是流量激增的话,我们需要评估是否需要增加资源还是通过限流的方式解决,当短时间大量消息需要处理时,在资源允许的情况下,我们可以新启一批消费者与消息队列,将原来的消费者中的消息直接作为生产者转发到临时应急队列中,这样大概率的能够快速解决消息积压。与其事后处理不如我们在设计之初就要把积压考虑进来,对于数据量非常大,但是实时性要求不高的场景,可以设计出批量消息发送,当队列积累到一定阀值再做批量消费消费,这里需要注意的就是重复消费带来的影响,设计不好就是一场灾难。

总结

这一节我们就说到这里,我们说了一些处理与分析问题的方法,这里有一个最重要的点就是我们需要有一套实用的监控发现工具或者方式,在问题第一时间发现才是王道,不然我们上面所说的都空谈,当问题发现的时候损失已经无法挽回。所以我们要在设计系统之初需要要为监控系统或者程序提供完备或者必须的日志,接口,数据等,这要才是一个合理的设计。当没有监控系统的情况下我们必须自己设计一套简单分析接口。

题外话:RocketMQ消息何时删除?

今天写到这里我突然想到一个漏掉的问题。在RocketMQ中存在多个消费组时,消费者只记录自己的消费位置offset,那么过消息啥时候过期?过期后啥时候删除?怎么删除呢?前面在《什么是消息模型?》中我们说到了RocketMQ通过一个主题包含多个队列的方式实现并行生产与消费,并且一个队列给每个消费者提供offset标示消费位置。我们知道它的消息是顺序写在在一个叫CommitLog的文件中,然后会把里面的消息分发对应到主题上的多个Consume Queue,消费端根据offset跳过历史记录按次序获取每个Queue上最新的历史记录。默认情况下CommitLog在每天4点删除超过48小时的文件或者当磁盘水位线达到75%。