vlambda博客
学习文章列表

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

技术文章第一时间送达!

前言

不知道说什么好,直接开始吧。本来想采用最新版本的,一想到生产和测试必须版本保持一致,不能随便升级,就只好去下载指定版本的rabbitmq的rpm。

RabbitMQ概念

Broker :消息中间件的服务节点,RabbitMQ的一个服务实例,也可以看做是RabbitMQ的一台服务器

Queue 队列:用于存储消息。kafka不一样,它的消息存在在topic逻辑层面,而队列存储的只是topic中实际存储文件中的编译标识。多个消费者可以同时订阅一个队列,平均分摊(Round-robin轮询)处理消息

Exchange 交换器:生产者将消息发送到交换器,由交换器路由到一个或者多个队列中

  • direct exchange和queue进行bingding时会设置相应的routingkey。生产者发送消息到交换器时会设定相应的routingkey,如果这两个routingkey相同,消息都会投放到绑定的队列上。

  • topic 和direct一样,但是支持routingkey的通配符模式,可以有通配符:* , # 其中 * 表示匹配一个单词, #则表示匹配没有或者多个单词

  • fanout 直接将发送到该交换器的消息路由到它绑定的一个或者多个队列

  • header 根据添加的header来判断

    • x-match == all,匹配所有header

    • x-match == any, 只需要匹配其中的一个header的值

Routingkey 路由键: 生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey ,用 来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 合起来使用才能最终生效。在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时, 通过指定 RoutingKey 来决定消息流向哪里

Bindingkey 绑定:通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一绑定键 BindingKey ,这样 RabbitMQ 就知何正确将消息路由到队列了。BindingKey只针对特定交换器才有效。

Producer:消息生产者

Consumer:消息消费者

安装条件

环境

Centos 7.4 3台虚机8c16g

用户权限

需要有sudo权限

安装文件

下载的文件统一在/home/lazasha/download目录下, rabbitmq和erlang对应的版本关系可以参考:

https://www.rabbitmq.com/which-erlang.html

epel: epel-release-7-12.noarch.rpm

erlang:erlang-22.1.8-1.el7.x86_64.rpm

rabbitmq: rabbitmq-server-3.8.2-1.el7.noarch.rpm

key: rabbitmq-release-signing-key.asc (我好像后面没有用到)

步骤

epel安装

sudo yum -y install epel-release-7-12.noarch.rpm

erlang安装

sudo yum -y install erlang-22.1.8-1.el7.x86_64.rpm


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

检查是否安装成功:

输入:erl


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

rabbitmq安装

sudo yum -y install rabbitmq-server-3.8.2-1.el7.noarch.rpm 


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

验证是否成功:

sudo systemctl start rabbitmq-server 
sudo systemctl status rabbitmq-server


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

停止服务:

sudo systemctl stop rabbitmq-server

在他两台机器上同样操作. 服务缺省端口是5672.

集群搭建

在3台机器上/etc/hosts文件中添加IP和节点名称的对应

10.156.13.92 lchod1392
10.156.13.93 lchod1393
10.156.13.94 lchod1394

把lchod1392上的 cookie文件,赋值到lchod1393、lchod1394节点上,集群环境下各个节点的cookie必须一致。rpm安装的cookie 文件默认路 径为 /var/lib/rabbitmq/.erlang.cookie

注意:.erlang.cookie可能有权限问题,可以使用下面的操作:

sudo chmod -R 600 /var/lib/rabbitmq/.erlang.cookie

注意: 拷贝到另外两台机器上后,不管怎么样执行一下下面的命令,改一下.erlang.cookie的owner:

sudo chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

通过Rabbitmqctl来配置集群,集群内部通讯端口是25672

1.首先启动3个节点上的RabbitMQ服务

sudo systemctl start rabbitmq-server

可以使用rabbitmqctl cluster_status 查看各个节点的集群状态

2.以lchod1392为基准,将lchod1393、lchod1394加入到集群中,把3个节点都设置为硬盘节点了。

lchod1393:

    sudo rabbitmqctl stop_app                    //只关闭rabbitmq服务,不关闭erlang服务
    sudo rabbitmqctl reset                       //这个命令我在加集群时没有执行
    sudo rabbitmqctl join_cluster rabbit@lchod1392   //--ram这个参数是内存节点模式,不是就是硬盘节点
    sudo rabbitmqctl start_app

lchod1394:

    sudo rabbitmqctl stop_app                    //只关闭rabbitmq服务,不关闭erlang服务
    sudo rabbitmqctl reset                       //这个命令我在加集群时没有执行
    sudo rabbitmqctl join_cluster rabbit@lchod1392   //--ram这个参数是内存节点模式,不是就是硬盘节点
    sudo rabbitmqctl start_app


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

3.检查集群状态

sudo rabbitmqctl cluster_status


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

注意点: 如果关闭了集群中的所有节点,确保启动时最后一个关闭的节点第一个启动,否则会有问题。

创建远程访问用户

sudo rabbitmqctl add_user rabbitmq ******
sudo rabbitmqctl set_user_tags rabbitmq administrator
sudo rabbitmqctl set_permissions -p "/" rabbitmq ".*" ".*" ".*"
//查看新增加的用户
sudo rabbitmqctl list_users


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

注意:不用在启动后台管理插件了,使用systemctl start rabbitmq-server就已经启动了,端口是15672

Mirror Queue 镜像队列搭建

针对每一个镜像队列都包含一个master节点 和 多个slave节点,需求确保队列的master节点均匀分散的落在集群的各个broker中。如果master不工作,那么假如镜像队列最早的salve升级为master.

镜像队列的配置主要是通过添加相应的 Policy 来完成 :

rabbitmqctl set_policy [-p vhost) [--priority
priority) [--apply- to apply- to) {name) {pattern) {definition)

definition 要包含 个部分 ha-mode、 ha-params、 ha-sync-mode

  • ha-mode 指明镜像队列的模式,有效值为 all/exactly/nodes默认为 all
        all 表示在集群中所有的节点上进行镜像
        exactly 表示在指定个数的节点上进行镜像,节点个数由 ha-params 指定;
        nodes 表示在指定节点上进行镜像,节点名称通ha-params 指定,节点的名称通常类似于 rabbit@hostname ,可以通过rabbitmqctl cluster status 命令查看到

  • ha-params 不同的 hamode 配置中需要用到的参数。

  • ha-sync-mode 队列中消息的同步方式,有效值为 automatic 、manual

命令样例

  • 对队列名称以 queue_" 开头的所有队列进行镜像,并在集群的两个节点上完 成镜像

    rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue " ^queue_"
    ' {"ha-mode ":"exactly","ha-params ":2, "ha-sync-mode ": "automatic" }'
  • 对队列名称以 queue_" 开头的所有队列进行镜像,并在集群的所有节点上完 成镜像

    rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue " ^queue_"
    ' {"ha-mode ":"all","ha-sync-mode ":"automatic" }'

    rabbitmqctl set_policy ha-all “^” ‘{“ha-mode”:“all”}’ 可以把队列设置为镜像队列

命令执行

   sudo rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue " ^queue_"
   ' {"ha-mode ":"all","ha-sync-mode ":"automatic" }'

验证

使用新建的rabbitmq用户从本机登录远程的机器

lchod1392: 创建一个队列,以queue开头

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

lchod1393: 已经有了这个队列

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

lchod1394: 有了这个队列

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

队列知识

mandatory、 immediate 参数 channel.basicPublish 方法中的两个参数

  • mandatory参数 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 数设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监昕器实现。

  • immediate参数 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在 任何消费者,那么这条消息将不会存入队列中。当与路由键匹配所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。

  • 概括来说 mandatory 参数告诉服务器至少将该消息路由到一个队中, 将消息返 回给生产者。 imrnediate 参数告诉服务器 如果该消息关联的队列上有消费者, 立刻投递; 如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

  • RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此RabbitMQ官方解释是 immediate 参数会影响镜像队列的性能,增加代码码复杂性,建议采用 TTL、 DLX 的方法

TTL time to live 过期时间

  • 设置方式:通过队列属性设置,整个队列的消息都有同样的过期时间;也可以对单条消息单独设置,则一个队列中消息有不同的过期时间。如果两种都设置了,以时间小的为准

  • 设置队列消息的TTL代码

    Map<String,Object> argss = new HashMap<String, Object>();
    argss.put("x-message-ttl " , 5000);
    channel.queueDeclare(queueName , durable , exclusive , autoDelete , argss) ; 

    这种方式, 一旦消息过期,就会从队列中抹去

    针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration 的属性参数,单位为毫秒:

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder deliveryMode(2); 持久化消息
    builder expiration50000 );/ 设置 TTL=50000ms
    AMQP.BasicProperties properties = builder. build() ;
    channel.basicPublish(exchangeName , routingKey, mandatory, properties,
    "test ttl".getBytes());

    这种方式, 即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的

  • 如果不设置 TTL.则表示此消息不会过期 ;如果将 TTL 设置为 0,则表示除非此时可以直 接将消息投递到消费者,否则该消息会被立即丢弃

  • 设置队列的TTL

    通过 channel.queueDeclare 方法中的 expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并 且在过期时间段内也未调用过 Basic.Get 命令。

    Map<String , Object> args =口ew HashMap<String, Object>{) ;
    args . put( "x-expires" , 100000); 
    channel . queueDeclare("queuesleb " , false , false , false , args) ; 

死信队列 DLX(Dead Letter Message) 当 消息在一个队列中变成死信 (dea message) 之后,它能被重新被发送到另一个交换器中,这个 交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。

  • 消息被拒绝 (Basic.Reject/Basic .Na ck) ,井且设置 requeue 参数为 false

  • 消息过期

  • 队列达到最大长度

  • 可以创建消费者监听这个队列的消息进行处理

  • 通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这 个队列添加 DLX

    channel.exchangeDeclare("dlx_exchange " , "direct "); // 创建 DLX: dlx_exchange
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange" , " dlx-exchange ");
    //为队列 myqueue 添加 DLX
    channel.queueDeclare("myqueue" , false , false , false , args); 

    //也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键, 如果指定了,则消费者需要使用
    //的路由键才能消费这个队列的消息:
    args.put("x-dead-letter-routing-key" , "dlx-routing-key"); 

延迟队列

  • 场景:一个订单在30分钟内支付有效,否则自动取消

  • 利用上面的TTL和DLX来达到延迟队列的功能

优先级队列

通过设置队列的 x-max-priority 参数来实现:

    Map<String, Object> args = new HashMap<String, Object>() ;
    args.put( "x-max-priority" , 10) ;
    channel.queueDeclare( "queue.priority" , true , fa1se , false , args) ; 

在生产者速度大于消费者速度且broker中有积压的消息的时候,才有效果

持久化

  • 交换器的持久化、队列的持久化和消息的持久化 ,才能真正的持久化

  • 交换器的持久化:设置durable = true

  • 队列的持久化: durable = true

  • 消息的持久化:通过将消息的投递模式 (BasicPropertes 中的 deliveryMode 属性)设置为2( DeliveryMode.PERSISTENT) 即可实现消息的持久化 )

发送方确认机制 publisher confirm

publisher-confirms: true #确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,只要正确的到达exchange中,broker即可确认该消息返回给客户端

ackpublisher-returns: true #确认消息是否正确到达queue,如果没有则触发,如果有则不触发

ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    CorrelationDataEx c = (CorrelationDataEx)correlationData;
                    System.out.println("发送消息: " + c.getMsg());
                    System.out.println("HelloSender 消息发送成功 :" + correlationData.toString() );
                    /**
                     * 通过设置correlationData.id为业务主键,消息发送成功后去继续做候选业务。
                     */

                } else {
                    System.out.println("HelloSender消息发送失败" + cause);
                }
            });

ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调

     rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                 //Users users1 = (Users)message.getBody().toString();
                 //String correlationId = message.getMessageProperties().getCorrelationId();

                 System.out.println("Message : " + new String(message.getBody()));
                 //System.out.println("Message : " + new String(message.getBody()));
                 System.out.println("replyCode : " + replyCode);
                 System.out.println("replyText : " + replyText);  //错误原因
                 System.out.println("exchange : " + exchange);
                 System.out.println("routingKey : " + routingKey);//queue名称

             });


     /**
              * CorrelationDataEx继承CorrelationData, 把需要发送消息的关键字段加入
              * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
              */

             CorrelationDataEx c = new CorrelationDataEx();
             c.setId(users.getId().toString());
             c.setMsg(users.toString());

             /**
              * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
              * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
              */

             rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

             rabbitTemplate.convertAndSend(EXCHANGE, QUEUE_TWO_ROUTING, users, c);

消息消费

1.配置

        listener:
              simple:
                prefetch: 1               #设置一次处理一个消息
                acknowledge-mode: manual  #设置消费端手动 ack
                concurrency: 3            #设置同时有3个消费者消费,需要3个消费者实例

2.代码

        @RabbitHandler
            @RabbitListener(queues = QUEUE_ONE_ROUTING) //containerFactory = "rabbitListenerContainerFactory", concurrency = "2")
            public void process(Users users, Channel channel, Message message) throws IOException {
                System.out.println("HelloReceiver收到  : " + users.toString() + "收到时间" + new Date());

                try {
                    //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
                    // 否则消息服务器以为这条消息没处理掉 后续还会在发
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    System.out.println("receiver success");
                } catch (IOException e) {
                    e.printStackTrace();
                    //丢弃这条消息,则不会重新发送了
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    System.out.println("receiver fail");
                }
            }

验证

创建消息生产者和消费者

生产者

集群配置:

    spring:
      application:
        name: rabbitmq-producer-demo
      rabbitmq:
        # 单点配置
        #host: localhost
        #port: 5672
        # 集群的配置
        addresses: 10.156.13.92:5672,10.156.13.93:5672,10.156.13.94:5672
        username: rabbitmq  #guest是缺省,只能localhost网络访问,要访问远程网络,需要创建用户
        password: 123456
        # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
        # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
        # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。
        # Virtual Name一般以/开头
        virtual-host: /
        # 确认消息是否正确到达queue,如果没有则触发,如果有则不触发
        publisher-returns: on
        # 确认消息是否到达broker服务器,也就是只确认是否正确到达exchange中即可,
        # 只要正确的到达exchange中,broker即可确认该消息返回给客户端ack
        # 如果是simple就不会回调
        publisher-confirm-type: correlated
        template:
          #设置为 on 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
          mandatory: on

队列设置: 设置了queue_sleb_accept队列

    @Configuration
    public class RabbitConfig {
        /**
         * 投保消息交换机的名字
         */

        public static final String EXCHANGE_SLEB_ACCEPT = "exchange_sleb_accept";

        /**
         * 投保消息队列
         */

        public static final String QUEUE_SLEB_ACCEPT = "queue_sleb_accept";
        /**
         * 投保消息路由键
         */

        public static final String ROUTING_KEY_ACCEPT = "routing_key_accept";
        /**
         *  投保消息死信交换机
         */

        public static final String DLX_EXCHANGE_SLEB_ACCEPT = "exchange_dlx_sleb_accept";
        /**
         * 投保消息死信队列
         */

        public static final String DLX_QUEUE_SLEB_ACCEPT = "queue_dlx_sleb_accept";
        /**
         *  常用交换器类型如下:
         *       Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送".
         *       即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
         *       Topic(TopicExchange):按规则转发消息(最灵活)。
         *       Headers(HeadersExchange):设置header attribute参数类型的交换机。
         *       Fanout(FanoutExchange):转发消息到所有绑定队列。
         *
         * 下面都是采用direct, 必须严格匹配exchange和queue
         * 投保消息交换机
         */

        @Bean("slebAcceptExchange")
        DirectExchange slebAcceptExchange() {
            return ExchangeBuilder.directExchange(EXCHANGE_SLEB_ACCEPT).durable(true).build();

        }
        /**
         * 第二个参数 durable: 是否持久化,如果true,则此种队列叫持久化队列(Durable queues)。此队列会被存储在磁盘上,
         *                 当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
         * 第三个参数 execulusive: 表示此对应只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * 第四个参数 autoDelete: 当没有生成者/消费者使用此队列时,此队列会被自动删除。(即当最后一个消费者退订后即被删除)
         *
         * 这儿是(queue)队列持久化(durable=true),exchange也需要持久化
         * ********************死信队列**********************************************************
         *            x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
         *            x-dead-letter-routing-key  这里声明当前队列的死信路由key
         *            采用死信队列,才会用到下面的参数
         *            Map<String, Object> args = new HashMap<>(2);
         *            args.put("x-dead-letter-exchange", DLX_EXCHANGE_SLEB_ACCEPT);
         *            args.put("x-dead-letter-routing-key", ROUTING_KEY_ACCEPT);
         *            return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).withArguments(args).build();
         * ********************死信队列**********************************************************
         * 投保消息队列
         */

        @Bean("slebAcceptQueue")
        public Queue slebAcceptQueue() {
            return QueueBuilder.durable(QUEUE_SLEB_ACCEPT).build();
        }

        /**
         * 交换机、队列、绑定
         */

        @Bean("bindingSlebAcceptExchange")
        Binding bindingSlebAcceptExchange(@Qualifier("slebAcceptQueue") Queue queue,
                                          @Qualifier("slebAcceptExchange") DirectExchange directExchange) 
{
            return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT);
        }
        /**
         * 投保死信交换机
         */

        @Bean("slebDlxAcceptExchange")
        DirectExchange slebDlxAcceptExchange() {
            return ExchangeBuilder.directExchange(DLX_EXCHANGE_SLEB_ACCEPT).durable(true).build();
        }
        /**
         * 投保死信队列
         */

        @Bean("slebDlxAcceptQueue")
        public Queue slebDlxAcceptQueue() {
            return QueueBuilder.durable(DLX_QUEUE_SLEB_ACCEPT).build();
        }
        /**
         * 死信交换机、队列、绑定
         */

        @Bean("bindingDlxSlebAcceptExchange")
        Binding bindingDlxSlebAcceptExchange(@Qualifier("slebDlxAcceptQueue") Queue     queue, @Qualifier("slebDlxAcceptExchange") DirectExchange directExchange) {
            return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY_ACCEPT);
        }

生产消息

    @Service
    public class AcceptProducerServiceImpl implements AcceptProducerService {
        private final Logger logger = LoggerFactory.getLogger(AcceptProducerServiceImpl.class);


        private final RabbitTemplate rabbitTemplate;

        public AcceptProducerServiceImpl(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }

        @Override
        public void sendMessage(PolicyModal policyModal) {
            logger.info("开始发送时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                    + ",保单号: " + policyModal.getPolicyNo()
                    + ",发送内容: " + policyModal.toString());
            /*
             * policyDataEx继承CorrelationData, 把需要发送消息的关键字段加入
             * 这样confirmcallback可以返回带有关键字段的correlationData,我们可以通过这个来确定发送的是那条业务记录
             * policyno为唯一的值
             */

            PolicyDataEx policyDataEx = new PolicyDataEx();
            policyDataEx.setId(policyModal.getPolicyNo());
            policyDataEx.setMessage(policyModal.toString());

            /*
             * 加上这个,可以从returncallback参数中读取发送的json消息,否则是二进制bytes
             * 比如:如果returncallback触发,则表明消息没有投递到队列,则继续业务操作,比如将消息记录标志位未投递成功,记录投递次数
             */

            //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。

            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_SLEB_ACCEPT, RabbitConfig.ROUTING_KEY_ACCEPT, policyModal, policyDataEx);

        }

运行验证

http://localhost:9020/sendsing


带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

查看3台服务器控制台:看到已经创建了镜像队列,并且有一个消息在队列里面:

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

消费者

配置

    spring:
      application:
        name: rabbitmq-consumer-demo
      rabbitmq:
        # 单点配置
        #host: localhost
        #port: 5672
        # 集群的配置
        addresses: 10.156.13.92:5672,10.156.13.93:5672,10.156.13.94:5672
        username: rabbitmq
        password: 123456
        # 像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?RabbitMQ也有类似的权限管理。
        # 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当于一个相对独立的RabbitMQ服务器,
        # 每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。
        # Virtual Name一般以/开头
        virtual-host: /
        listener:
          simple:
            prefetch: 1               #设置一次处理一个消息
            acknowledge-mode: manual  #设置消费端手动 ack
            concurrency: 3            #设置同时有3个消费者消费
            #消息接收确认,可选模式:NONE(不确认)、AUTO(自动确认)、MANUAL(手动确认)

配置队列名称,主要名称和生产者里面的名称一样

    public class RabbitMQConfigInfo {
        /**
         * 投保消息队列
         */

        public static final String QUEUE_SLEB_ACCEPT = "queue_sleb_accept";
        /**
         * 投保消息交换机的名字
         */

        public static final String EXCHANGE_SLEB_ACCEPT = "exchange_sleb_accept";

        /**
         * 投保消息路由键
         */

        public static final String ROUTING_KEY_ACCEPT = "routing_key_accept";
    }

消费

    @Service
    public class RabbitConsumerServiceImpl implements RabbitConsumerService {

        private final Logger logger = LoggerFactory.getLogger(RabbitConsumerServiceImpl.class);

        @RabbitHandler
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = QUEUE_SLEB_ACCEPT, durable = "true"),
                exchange = @Exchange(name = EXCHANGE_SLEB_ACCEPT,
                        ignoreDeclarationExceptions = "true"),
                key = {ROUTING_KEY_ACCEPT}
        ))
        @Override
        public void process(Channel channel, Message message) throws IOException {
            String jsonStr = new String(message.getBody());
            logger.info("接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                    + "\n,消息:" + jsonStr);
            //PolicyModal类的全限名称(包名+类名)会带入到mq, 所以消费者服务一边必须有同样全限名称的类,否则接收会失败。
            PolicyModal policyModal = JsonUtils.JSON2Object(jsonStr, PolicyModal.class);
            assert policyModal != null;
            try {
                //将message中的body获取出来, 转换为PolicyModal,再获取policyno
                //更根据policyno新数据库里面的标志,
                // todo

                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
                // 否则消息服务器以为这条消息没处理掉 后续还会在发
                //throw new IOException("myself");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                /*logger.info("接收处理成功:\n"
                        + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                        + ",保单号: " + policyModal.getPolicyNo()
                        + "\n,消息:" + new String(message.getBody()));
    */

            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息,则不会重新发送了
                //一般不丢弃,超时后mq自动会转到死信队列(如果设置了超时时间和死信交换机和队列后)
                //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                logger.info("接收处理失败:\n"
                        + "接收信息时间: " + DateUtils.localDateTimeToString(LocalDateTime.now())
                        + ",保单号: " + policyModal.getPolicyNo()
                        + "\n,消息:" + new String(message.getBody()));
            }
        }

    }

启动验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

在看各个服务器控制台:消息已经被消费,队列里面消息为0

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

带你从头进行RabbitMQ安装、集群搭建、镜像队列配置和代码验证

结束

技术文章难写,这个花了前后一个礼拜的时间,希望对大家有帮助。有要验证代码的,可以发邮件:[email protected]联系我,我给你发。懒,没空上github,回来再说。

END

Java面试题专栏











我知道你 “在看