面试官:说说RabbitMQ中间件吧!
一期一句
前言
我们来看看流行的RabbitMQ消息系统以及它是如何让你的系统之间进行解耦的
本文提纲
正文
RabbitMQ是部署最广泛的开源消息代理。它接收生产者发布的消息并发送给消费者。它扮演中转站的角色,可以用来降低web服务器因发送消息带来的负载以及延时。
消息队列
消息与消息队列?
消息(Message)是指应用于应用之间传送的数据,消息的类型包括文本字符串、JSON、XML、内嵌对象等。
消息队列(Message Queue),是基础数据结构中 “先进先出” 的一种数据结构。
是进程之间或者线程之间用来通信的一个队列组件,也就是我们常说的消息中间件,但是它的存在并不仅仅只是为了解决通信的问题。
为什么需要消息队列
因为互联网的扩张发展,业务不断扩展,导致由以前的单体架构升级到现在的微服务架构,很多服务之间相互调用依赖,所以我们需要消息队列来进行服务之间的解耦、控制资源合理使用以及缓冲流量洪峰等等。
消息队列三个核心作用:异步处理、服务解耦、流量削峰。
异步处理:随着业务的不断扩展,你会发现请求链路越来越长,请求链长了,响应速度也就慢了,所以需要用消息队列实现异步处理,从而减少处理请求的等待时间,让服务异步并发处理,提升系统的总体性能。
下面以一个库存、下单、短信、积分服务为例,用流程图的方式让你更好的理解一下什么是异步处理
异步处理场景解释:
一般是每个系统都会有自己单独的消息队列,用户连续发送不同的业务消息后,进入不同业务的消息队列,处理时间在异步的业务上是“同步”的。
服务解耦:看上面的例子,订单服务下游还有短信服务和积分服务,如果现在还要加营销服务和数据分析服务,甚至更多的服务,为了迎合下游服务的改动,订单服务需要经常修改,这订单服务项目组也太难受了吧!
所以就需要使用消息队列来进行对服务之间关系的解耦,订单服务把相关信息放到消息队列里面,下游的服务谁需要谁就订阅订单服务主题,这样订单服务项目组就会轻松很多了啦!
MQ解耦场景解释:
产生一条数据放到消息队列中->哪个服务需要数据自己去MQ里消费,如果新服务需要数据,自己直接从MQ里消费即可->如果哪个服务不需要这条数据就取消MQ消息的消费即可->生成订单后不需要考虑给谁发送数据,不需要维护代码,不需要考虑是否调用成功,失败,超时。
流量控制:在并发量级很大的情况下,比如秒杀活动爆发式的流量打入后台,后台很可能会顶不住,所以就需要一个中间件,先将请求全部放入消息队列,然后后台尽自己最大的能力去处理消息队列中的请求,等待超时的请求可以直接返回错误。这种就属于生产者生产过快的情况。
有一些请求不需要实时响应,但是业务复杂度很高,逻辑流程链很长,如果实时处理需要耗费很多时间。那么就可以将请求放入消息队列,然后后台服务根据自己的节奏对消息队列中的请求进行处理。这种就属于消费者消费过慢的情况。
上面这两种情况,消息队列都可以在其中发挥很好的缓冲作用。
队列模型,点对点,不可重复消费
生产者向队列中发送消息,一个队列可以有多个生产者生产的消息,也可以有多个消费者。
消息被消费以后,MQ中不再有存储,所以消息消费者不可能消费到已经被消费的消息。但是每个消费者之间存在竞争关系,每一条消息只能由一个消费者消费。
发布/订阅模型,Topic,可重复消费
如果想让一条消息被多个消费者消费,该怎么办呢?
那么发布/订阅模式派上用场了,该模型是将一个消息发布到主题(topic)中,所有订阅了这个主题的消费者都可以消费这个主题的消息。
这里所说的一条消息被多个消费者消费,有一个前提,这多个消费者不属于同一个消费者,同一个消息只能由同一个消费组中的某一个消费者去消费。
这里说的多个消费者消费同一个消息,是因为可以有多个消费组都订阅了这个主题。
比如一个主题下面有5个队列,那么这个主题的并发度就是5,也就是同时可以有5个消费者并行消费该主题下的消息。一般可以采用轮询或者key hash取余的策略把同一个主题下的消息分配到不同的队列中。
消费者还有组的概念,也就是消费组(Consumer Group),所有的消费者都是属于某个消费组的,同一条消息会发送给订阅了这个主题的多个消费组。
比如现在有两个消费组,Group 1和Group 2,他们都订阅了主题Topic a,此时有一条消息发送到了topic a,那么Group 1和Group 2都可以接收到这条消息。这条消息是写入Topic a中的某个队列的,消费组中会有一个消费者去消费这条消息。
每个消费组都会有自己的一个消费点(offset)来表示消费到的位置,在消费点之前的消息说明这个消息已经被同一个消费组和其他消费者消费过了,如果这个offset是队列级别的,每个消费者都会维护订阅的Topic下的每个队列的offset。
生产者轮询往Topic a中的队列中发送消息。
每一个消费者组(consumer group)可以看做是一个服务。消费组中的消费者(comsumer;)可以看做是集群中的多台机器,或者一台机器上的多个线程或进程。
RabbitMQ如何工作的?
重要概念
生产者(Producer):发送消息的应用。
消费者(Consumer):接收消息的应用。
队列(Queue):存储消息的缓存。
消息(Message):由生产者通过RabbitMQ发送给消费者的信息。
连接(Connection):连接RabbitMQ和应用服务器的TCP连接。
通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。
交换机(Exchange):交换机负责从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须到绑定一个交换机。当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用交换机(Exchange)来发送。
然后,消息会被消费者从队列里读取并消费,这就是“消费”。
交换机类型
直接(Direct):直接交换机通过消息上的路由键直接对消息进行分发。
扇出(Fanout):一个扇出交换机会将消息发送到所有和它进行绑定的队列上。
主题(Topic):这个交换机会将路由键和绑定上的模式进行通配符匹配。
消息头(Headers):消息头交换机使用消息头的属性进行消息路由。
绑定(Binding):绑定是队列和交换机的一个关联连接。
RabbitMQ消息流程
生产者(producer)把消息发送给交换机。当创建交换机的时候,你需要指定类型。
交换机(exchange)接收消息并且负责对消息进行路由。根据交换机的类型,消息的多个属性会被使用,例如路由键。
绑定(binding)需要从交换机到队列的这种方式来进行创建。在这个例子里,我们可以看到交换机有到两个不同队列的绑定。交换机根据消息的属性来把消息分发到不同的队列上。
消息(message)消息会一直留在队列里直到被消费。
消费者(consumer)处理消息。
面试四连问?!
如何保证消息不丢失
常见的队列,逻辑关系如下:
一共有生产消息、存储消息、消费消息三个阶段,下面就以这三个阶段入手看看如何保证消息不丢失。
生产消息
生产者发送消息至Borker,需要对Broket的响应进行处理,不管是同步发送还是异步发送消息,都需要做好try-catch对Broket的响应做出妥善的处理
如果Broket返回写入消息失败等其他错误信息,生产者需要重新发送消息至Broket,如果多次发送失败则需要报警并且记录日志。这样可以保证消息在生产阶段不会丢失。
存储消息
存储消息阶段需要等待消息写入磁盘(消息刷盘)之后再给生产者做出响应。因为如果消息仅仅写入了内存就给生产者响应的话,这个时候如果断电导致机器停了,那么消息也就没了,但是生产者却以为消息已经存储成功了。
如果Borket是集群部署,有多副本机制,那么消息不仅仅要写入当前Broket,还需要写入副本机器中,那么必须等待消息写入两台机器之后再给生产者做出相应,这样就可以保证消息在存储阶段不丢失了。
消费消息
消费者拿到消息之后,等他真正执行完逻辑之后,也就是处理完消息之后,再给Borket做出相应,如果在消费者刚拿到消息就做相应的话,消费者宕机了,那这消息就没了!这样就可以保证消息在消费阶段不会丢失了。
如何处理重复消息
消息重复是怎么出现的?
为了保证消息不丢失,在生产消息阶段生产者需要等待Broket的响应,如果在Broket做出相应的时候,网络出现问题,导致生产者没有收到相应,那么生产者会再次重复发送,这样Broket中就会有两条重复的消息了!
再看消费消息阶段,如果一个消费者已经处理完了消息,业务逻辑已经处理完,事物也提交了,准备更新Consumer offset了,就在这时消费者挂了,也就是更新Consumer offset还没更新成功,所以此时另一个消费者还是会拿到刚才那条消息再重复执行一次,这就造成了消息重复消费!
从正常的业务流程来看,消息重复好像是不可避免的,因为我们总不能为了解决消息重复的问题又导致消息丢失的问题吧!那么我们换个角度去思考这个问题。既然我们不能避免消息的重复,那么我们就对因为消息重复带来的业务上面的影响进行处理,关键点就在于幂等。
什么叫做幂等?
这是一个数学的概念。但是在代码的角度来思考,何为幂等,其实我们可以这样简单理解,幂等就是使用同样的参数多次调用同一个接口和一次调用同一个接口执行结果是一样的、举个例子:
执行一条SQL语句:
update table money = 100 where id = 1 and money = 50;
对于这条SQL语句,不管执行多少次,money的值都是150,这个就叫做幂等。
几个常用的实现幂等的套路:
我们现在要做的,就是对业务逻辑进行改造处理,实现幂等,从而保证就算消息重复了也不会影响最终的结果。怎么做呢?常见的套路主要有以下这些:
提交结果的时候,做一个前置的判断,就比如上面的money=50。
加一个version,利用版本号机制,对比消息中的版本号和数据库中的版本号是否相等。
使用数据库的约束列,比如唯一键。
记录关键的key,比如保存订单处理数据的时候,如果有重复消息,就先判断一下这个ID的订单是否已经被处理过了,如果没有被处理过再进行后面的业务逻辑。
方法绝不仅仅只有这些,如何实现还是要看业务的具体细节,根据业务逻辑而定。
如何保证消息的有序性
有序性分为全局有序和部分有序。
全局有序
必须只有一个生产者往Topic中发送消息,并且Topic中只有一个队列(分区),消费者必须单线程消费这个队列(分区)中的消息,这样消息就是全局有序的,但是一般我们不需要全局有序。
部分有序
绝大多数时候我们都是使用部分有序,将Topic内部划分成我们指定数量的队列(分区),然后通过特定的策略将消息发送给指定的队列,然后每个队列对应一个单线程的消费者去消费队列中的信息,这样既可以实现消息的部分有序,还可以通过Topic中队列的数量提高并发处理消息的效率。
如何处理堆积消息
消息堆积往往是因为生产者的生产能力和消费者的消费能力不匹配造成的,有可能是因为消费者消费消息出现错误反复重试导致,也有可能是消费者消费能力太弱导致。
要解决消息堆积的问题,首先要确定消费慢的原因:
如果是因为消费错误导致反复重试,那么就先解决代码中的bug。
如果是因为消费者消费能力太弱,就对消息处理的业务逻辑代码进行优化
如果优化之后还是消费慢,那么就进行扩容,也就是增加Topic中队列的数量和消费者的数量。
注:增加一个消费者就必须增加一个队列,否则消费者是没有东西消费的,在同一个Topic中,一个队列只会分配给一个消费者。
巨人的肩膀:
https://www.rabbitmq.com
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
感谢各位阅读,如果你觉得“小乙文摘”还有点东西,点赞,关注,分享三连,2021,与您共同成长。