2020黑马——消息中间件RabbitMQ
MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程

一、MQ引言

1. 什么是MQ

MQ(Message Quene) : 是在消息的传输过程中保存消息的容器,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

分布式系统有两种通信方式:直接远程调用和借助第三方完成间接通信。MQ属于第二种

image.png

1.1 优势:

  • 应用解耦:提高了系统容错性和可维护性

image.png
image.png
各模块开发人员只需要关注各自负责的模块,消费者模块从消息队列中获取信息,增减模块或者出现故障都不会影响生产者模块。

  • 异步提速:提升了用户体验和系统吞吐量

image.png
image.png

  • 削峰填谷:提高了系统稳定性

削峰
image.png
image.png
填谷
image.png
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。

1.2 劣势:

image.png

  • 系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。如何保证MQ的高可用?

  • 系统复杂度提高

MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费(幂等性)?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?

  • 一致性问题

A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?

1.3 使用MQ需要满足什么条件:

  • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  • 容许短暂的不一致性。
  • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

    2. MQ有哪些

    当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。也有公司直接使用redis充当消息队列。

    3. 不同MQ特点

    |
    | RabbitMQ | ActiveMQ | RocketMQ | Kafka | | —- | —- | —- | —- | —- | | 公司/社区 | Rabbit | Apache | 阿里 | Apache | | 开发语言 | Erlang | Java | Java | Scala&Java | | 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了http协议支持 | | 客户端支持语言 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 | | 单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) | | 消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 | | 功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |

二、RabbitMQ 的引言

1. AMQP

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

AMQP 协议 AMQP(advanced message queuing protocol)是一种网络协议,是应用层协议的一个开放标准,可以类比HTTP去理解,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

image.png

2. RabbitMQ基础架构:

image.png
相关概念:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

3. JMS

  • JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
  • JMS是JavaEE规范中的一种,类比JDBC
  • 很多消息中间件都实现了JMS规范,例如ActiveMQ。RabbitMQ官方没有提供JMS的实现包,开源社区有

    4. RabbitMQ 的安装

    4.1 下载

    image.png

    4.2 下载的安装包

    image.png

注意: 这里的安装包是centos7安装的包

4.3 安装步骤

  1. # 1.将rabbitmq安装包上传到linux系统中
  2. erlang-22.0.7-1.el7.x86_64.rpm
  3. rabbitmq-server-3.7.18-1.el7.noarch.rpm
  4. # 2.安装Erlang依赖包
  5. rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
  6. # 3.安装RabbitMQ安装包(需要联网)
  7. yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
  8. 注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要
  9. 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
  10. # 4.复制配置文件
  11. cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
  12. # 5.查看配置文件位置
  13. ls /etc/rabbitmq/rabbitmq.config
  14. # 6.修改配置文件(参见下图:)
  15. vim /etc/rabbitmq/rabbitmq.config

image.png
将上图中配置文件中红色部分去掉%%,以及最后的,逗号 修改为下图:
image.png

# 7.执行如下命令,启动rabbitmq中的插件管理
    rabbitmq-plugins enable rabbitmq_management

    出现如下说明:
        Enabling plugins on node rabbit@localhost:
    rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@localhost...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch

    set 3 plugins.
    Offline change; changes will take effect at broker restart.

# 8.启动RabbitMQ的服务
    systemctl start rabbitmq-server
    systemctl restart rabbitmq-server
    systemctl stop rabbitmq-server


# 9.查看服务状态(见下图:)
    systemctl status rabbitmq-server
  ● rabbitmq-server.service - RabbitMQ broker
     Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
     Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
   Main PID: 2904 (beam.smp)
     Status: "Initialized"
     CGroup: /system.slice/rabbitmq-server.service
             ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
             MBlmbcs...
             ├─3220 erl_child_setup 32768
             ├─3243 inet_gethost 4
             └─3244 inet_gethost 4
      .........

image.png

# 10.关闭防火墙服务
    systemctl disable firewalld
    Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
    Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
    systemctl stop firewalld   

  # 11.访问web管理界面
    http://10.15.0.8:15672/

  # 12.登录管理界面
    username:  guest
    password:  guest

三、RabbitMQ 配置

1. RabbitMQ 管理命令行

# 1.服务启动相关
    systemctl start|restart|stop|status rabbitmq-server

# 2.管理命令行  用来在不使用web管理界面情况下命令操作RabbitMQ
    rabbitmqctl  help  可以查看更多命令

# 3.插件管理命令行
    rabbitmq-plugins enable|list|disable

2. web管理界面介绍

2.1 overview概览

20201030175840800.png

  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
  • Exchanges:交换机,用来实现消息的路由
  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

2.2 Admin用户和虚拟主机管理

2.2.1 添加用户

20201030175840800.png
上面的Tags选项,其实是指定用户的角色,可选的有以下几个:

  • 超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  • 监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

  • 策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

  • 普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

  • 其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

2.2.2 创建虚拟主机

虚拟主机 为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

20201030175840800.png

2.2.3 绑定虚拟主机和用户

创建好虚拟主机,我们还要给用户添加访问权限:
点击添加好的虚拟主机:
20201030180151290.png
进入虚拟机设置界面
20201030180151290.png

四、RabbitMQ 工作模式

1. 简单模式

image.png
一个生产者对应一个消费者

  1. 创建两个module,分别作为生产者和消费者

image.png

  1. 分别添加rabbitmq客户端依赖

     <dependencies>
         <dependency>
             <groupId>com.rabbitmq</groupId>
             <artifactId>amqp-client</artifactId>
             <version>5.6.0</version>
         </dependency>
     </dependencies>
    
  2. 生产者 ```java package me.nic.rabbitmq;

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException; import java.util.concurrent.TimeoutException;

/**

  • 生产者,发送消息 */ public class MyProducer { public static void main(String[] args) throws IOException, TimeoutException {
     // 1. 创建连接工厂
     ConnectionFactory factory = new ConnectionFactory();
     // 2. 设置参数
     factory.setHost("192.168.31.136");
     factory.setPort(5672);
     factory.setVirtualHost("nic-vh");
     factory.setUsername("nic");
     factory.setPassword("nc970728");
     // 3. 创建连接
     Connection connection = factory.newConnection();
     // 4. 创建channel
     Channel channel = connection.createChannel();
     // 5. 创建队列
     /*
     queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         queue: 队列名称
         durable: 是否持久化,mq重启后还在
         exclusive: 是否独占。只能有一个消费者监听该队列
         autoDelete: 是否自动删除。当没有consumer时,自动删除掉
         arguments: 参数
     如果没有名为"nic-vh"的队列,则会创建该队列,如果有则不会
      */
     channel.queueDeclare("nic-vh", true, false, false, null);
     // 6. 发送消息
     /*
     basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         exchange: 交换机名称,简单模式下会使用默认的“”
         routingKey: 路由名称
         props: 配置信息
         body: 发送消息数据
      */
     String msg = "hello world~";
     channel.basicPublish("", "nic-vh", null, msg.getBytes());
     // 7. 释放资源
    
    // channel.close(); // connection.close(); } }

4. 消费者
```java
package me.nic.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MyConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("nic");
        factory.setPassword("nc970728");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 接收消息
        // 回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            /*
                回调方法,当收到消息后,会自动执行该方法
                consumerTag:标识
                envelope:获取一些信息,交换机、路由key等
                properties:配置信息
                body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                System.out.println("Exchange: " + envelope.getExchange());
                System.out.println("DeliveryTag: " + envelope.getDeliveryTag());
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        };
        /*
        basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
            queue: 队列名称
            autoAck: 是否自动确认
            cancelCallback: 回调对象
         */
        channel.basicConsume("nic-vh", true, consumer);
        // 消费者需要一直监听,所以一般不会释放资源
    }
}
  1. 效果

生产者向队列中写入数据”hello world~”,消费者从队列中取出数据:

consumerTag: amq.ctag-iFu2t9ubshlTvnq8vd6Xtw
Exchange: 
DeliveryTag: 1
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: hello world~

2. Work queues

image.png

  • 与简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息,消费者之间对于同一个消息的关系是竞争的关系。
  • 应用场景:对于任务过重或任务较多的情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。
  • 假如生产者往队列里写入1-10十条消息,有两个消费者进行消费,那么C1消费的是1、3、5、7、9,C2消费的是2、4、6、8、10。

    3. Publish/Subscribe发布与订阅模式image.png

    在订阅模型中,多了一个Exchange角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X (交换机)

  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key的队列
    • Topic:通配符,把消息交给符合routing pattern (路由模式)的队列

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合 路由规则的队列,那么消息会丢失!

示例:
生产者发布日志,两个消费者分别将日志打印与存入数据库:

/**
 * 发布订阅模式,生产者,发送消息
 */
public class MyProducerPubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 创建交换机
        /*
        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
            exchange:交换机名称
            type:交换机类型,枚举类型
                DIRECT:定向,指定路由
                FANOUT:广播,每一个队列都可以收到
                TOPIC:通配符的方式
                HEADERS:参数匹配
            durable:是否持久化
            autoDelete:是否自动删除
            internal:内部使用
            arguments:参数
         */
        String exchangeName = "testFanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
        // 6. 创建两个队列
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
            queue: 队列名称
            durable: 是否持久化,mq重启后还在
            exclusive: 是否独占。只能有一个消费者监听该队列
            autoDelete: 是否自动删除。当没有consumer时,自动删除掉
            arguments: 参数
        如果没有名为"nic-vh"的队列,则会创建该队列,如果有则不会
         */
        String queueName1 = "testFanoutQueue1";
        String queueName2 = "testFanoutQueue2";
        channel.queueDeclare(queueName1, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);
        // 7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
            queue:队列名称
            exchange:交换机名称
            routingKey:路由键,绑定规则。如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queueName1, exchangeName, "");
        channel.queueBind(queueName2, exchangeName, "");
        // 6. 发送消息
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            exchange: 交换机名称,简单模式下会使用默认的“”
            routingKey: 路由名称
            props: 配置信息
            body: 发送消息数据
         */
        String msg = "日志:张三登录 ";
        channel.basicPublish(exchangeName, "", null, msg.getBytes());
    }
}

消费者1:

/*
消费者1,从队列中取出数据,写入数据库
 */
public class MyConsumerPubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 接收消息
        // 回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            /*
                回调方法,当收到消息后,会自动执行该方法
                consumerTag:标识
                envelope:获取一些信息,交换机、路由key等
                properties:配置信息
                body:数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body) + "写入数据库");
            }
        };
        /*
        basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
            queue: 队列名称
            autoAck: 是否自动确认
            cancelCallback: 回调对象
         */
        String queueName1 = "testFanoutQueue1";
        channel.basicConsume(queueName1, true, consumer);
        // 消费者需要一直监听,所以一般不会释放资源
    }
}

消费者2:

/*
消费者2,从队列中取出数据,打印到控制台
 */
public class MyConsumerPubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 接收消息
        // 回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body) + "打印到控制台");
            }
        };
        String queueName2 = "testFanoutQueue2";
        channel.basicConsume(queueName2, true, consumer);
        // 消费者需要一直监听,所以一般不会释放资源
    }
}

结果:

# MyConsumerPubSub2
body: 日志:张三登录 写入数据库

# MyConsumerPubSub2
body: 日志:张三登录 打印到控制台

4. Routing路由模式

Routing模式要求队列在绑定交换机时要指定RoutingKey,消息会转发到符合RoutingKey的队列。
image.png

  • 队列和交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey
  • 消息的发送方向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息。

示例:
生产者发布日志,消费者1将error级别日志存入数据库,消费者2将info/error/warning级别日志打印到控制台:

/**
 * 路由模式,生产者,发送消息
 * 交换机绑定队列时指定路由
 */
public class MyProducerRouting {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 创建交换机
        String exchangeName = "testDirect";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 6. 创建两个队列
        String queueName1 = "testDirectQueue1";
        String queueName2 = "testDirectQueue2";
        channel.queueDeclare(queueName1, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);
        // 7. 绑定队列和交换机,指定RoutingKey
        // 绑定队列1 error
        channel.queueBind(queueName1, exchangeName, "error");
        // 绑定队列2 info/error/warning
        channel.queueBind(queueName2, exchangeName, "info");
        channel.queueBind(queueName2, exchangeName, "error");
        channel.queueBind(queueName2, exchangeName, "warning");
        // 6. 发送消息,指定RoutingKey
        String msg = "日志:张三调用delete方法失败 日志级别:error";
        channel.basicPublish(exchangeName, "error", null, msg.getBytes());
    }
}

消费者1,绑定队列1:

/*
消费者1,从队列中取出数据,写入数据库,只有error级别才执行
 */
public class MyConsumerRouting1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 接收消息
        // 回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body) + "写入数据库");
            }
        };
        String queueName1 = "testDirectQueue1";
        channel.basicConsume(queueName1, true, consumer);
    }
}

消费者2,绑定队列2:

/*
消费者2,从队列中取出数据,打印到控制台,info/error/warning级别都执行
 */
public class MyConsumerRouting2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 设置参数
        factory.setHost("192.168.31.136");
        factory.setPort(5672);
        factory.setVirtualHost("nic-vh");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 3. 创建连接
        Connection connection = factory.newConnection();
        // 4. 创建channel
        Channel channel = connection.createChannel();
        // 5. 接收消息
        // 回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body: " + new String(body) + "打印到控制台");
            }
        };
        String queueName2 = "testDirectQueue2";
        channel.basicConsume(queueName2, true, consumer);
    }
}

结果:

// MyConsumerRouting1
body: 日志:张三调用delete方法失败 日志级别:error写入数据库

// MyConsumerRouting2
body: 日志:张三调用findAll方法 日志级别:info打印到控制台
body: 日志:张三调用delete方法失败 日志级别:error打印到控制台

5. Topics通配符模式

image.png
Topic模式可以实现Pub/Sub发布与订阅模式和Routing路由模式的功能,只是Topic在配置routing key时可以使用通配符,使得路由的匹配更加灵活,*表示一个字符,#表示0到多个字符:

  • *.orange.*:包含.orange.的路由均可匹配
  • lazy.#:以lazy.开头的路由均可匹配

    6. RPC远程调用模式

五、Spring整合RabbitMQ

1. 生产者

1.1 添加maven依赖

spring-context、rabbit、junit、spring-test

<dependencies>
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.1.7.RELEASE</version>
  </dependency>

  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.8.RELEASE</version>
  </dependency>

  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
  </dependency>

  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>5.1.7.RELEASE</version>
  </dependency>
</dependencies>

1.2 配置文件

rabbitmq.properties连接参数等配置:

rabbitmq.host=106.15.72.229
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

spring-rabbitmq.xml整合配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
    <!--定义管理交换机、队列-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
    默认交换机类型为direct,名字为:"",路由键为队列的名称
    -->
    <!--
        id:bean的名称
        name:queue的名称
        auto-declare:自动创建
        auto-delete:自动删除。 最后一个消费者和该队列断开连接后,自动删除队列
        exclusive:是否独占
        durable:是否持久化
    -->

    <rabbit:queue id="spring_queue" name="spring_queue"    auto-declare="true"/>

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>

    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>

    <!--定义广播类型交换机;并绑定上述两个队列-->
    <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange"  auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding  queue="spring_fanout_queue_1"  />
            <rabbit:binding queue="spring_fanout_queue_2"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--<rabbit:direct-exchange name="aa" >
        <rabbit:bindings>
            &lt;!&ndash;direct 类型的交换机绑定队列  key :路由key  queue:队列名称&ndash;&gt;
            <rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
        </rabbit:bindings>

    </rabbit:direct-exchange>-->

    <!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star"  auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
    <!--定义广播交换机中的持久化队列,不存在则自动创建-->
    <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>

    <rabbit:topic-exchange id="spring_topic_exchange"  name="spring_topic_exchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding pattern="heima.*"  queue="spring_topic_queue_star"/>
            <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
            <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>

1.3 测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class ProducerTest {

    // 注入 RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testHelloWorld() {
        // 路由、消息
        rabbitTemplate.convertAndSend("spring_queue", "简单模式消息");
    }

    @Test
    public void testFanout() {
        // 交换机、路由、消息
        rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发布与订阅模式消息");
    }

    @Test
    public void testTopic() {
        // 交换机、路由、消息
        rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.hehe.haha", "topic模式消息");
    }
}

2. 消费者

2.1 添加maven依赖(同上)

2.2 配置文件

rabbitmq.properties连接参数等配置

rabbitmq.host=106.15.72.229
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

spring-rabbitmq.xml整合配置:
声明监听器,并绑定队列

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:properties/rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
    <bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
    <bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
    <bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
    <bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
    <bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
        <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
        <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
        <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
        <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
        <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
        <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>
</beans>

2.3 消息监听器

例:com.itheima.rabbitmq.listener.SpringQueueListener

public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            String msg = new String(message.getBody(), "utf-8");

            System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
                    message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(),
                    message.getMessageProperties().getConsumerQueue(),
                    msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.4 测试

需要保证程序一直运行

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test1(){
        boolean flag = true;
        while (true){

        }
    }
}

六、Springboot整合RabbitMQ

1. 生产者

1.1 添加依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
</dependency>

1.2 配置文件

application.yml

# 配置RabbitMQ的基本信息  ip 端口 username  password..
spring:
  rabbitmq:
    host: 192.168.31.136
    port: 5672
    username: admin
    password: admin
    virtual-host: nic-vh

1.3 配置类

定义交换机、队列、绑定关系。以定义路由模式为例

@Configuration
public class RabbitMQConfig {

    // 1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange("boot_topic_exchange").durable(true).build();
    }

    // 2.队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable("boot_queue").build();
    }

    // 3.队列和交换机绑定
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

1.4 测试

发送消息

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsg() {
        rabbitTemplate.convertAndSend("boot_topic_exchange", "boot.ttt", "boot_topic_exchange消息");
    }
}

2 消费者

2.1 添加依赖(同上)

2.2 配置文件(同上)

2.3 消息监听处理类

@Component
public class RabbimtMQListener {

    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message) {
        System.out.println(new String(message.getBody()));
    }

}

app启动之后就会一直监听指定队列

七、高级特性

1. 消息的可靠投递

1.1 生产者 -> 中间件(生产方确认/退回)

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为:
producer—->exchange—->queue—->consumer

  • 消息从 producer 到 exchange 设置一个监听,不管消息能否成功到达交换机,回调函数confirmCallback都会执行。
  • 消息从 exchange—>queue 投递失败则会执行returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递,如果投递失败,RabbitMQ可以监听到投递失败,然后通过业务处理保证消息再次发送能够成功。

  1. confirm模式

如果是spring项目,xml配置的方式:设置ConnectionFactory的publisher-confirms=”true” 开启 确认模式。
如果是springboot项目,yaml配置的方式:设置spring:rabbitmq:publisher-confirms: true开启 确认模式。

spring:
  rabbitmq:
    #消息发送到交换机确认机制,是否确认回调
    #如果没有本条配置信息,当消费者收到生产者发送的消息后,生产者无法收到确认成功的回调信息
    publisher-confirms: true

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsg() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息发送成功");
                } else {
                    System.out.println("消息发送失败");
                    System.out.println(cause);
                }
            }
        });
        rabbitTemplate.convertAndSend("boot_topic_exchange", "boot.ttt", "boot_topic_exchange消息");
    }
}

消息发送成功,则打印 “消息发送成功”
消息发送失败,则打印 “消息发送失败”,以及失败原因

  1. return模式

如果是spring项目,xml配置的方式:设置ConnectionFactory的publisher-returns=”true” 开启 回退模式。
如果是springboot项目,yaml配置的方式:设置spring:rabbitmq:publisher-returns: true开启 回退模式。

spring:
  rabbitmq:
    publisher-returns: true

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

    @Test
    public void testReturn() {
        // 只有设置为true,消息才会退回给生产者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 回调函数,只有消息从exchange路由到queue失败才会执行
             * @param message 消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange 交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });
        rabbitTemplate.convertAndSend("boot_topic_exchange", "111boot.ttt", "boot_topic_exchange消息");

    }

由于路由失败,会打印相关信息:

(Body:'boot_topic_exchange消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
312
NO_ROUTE
boot_topic_exchange
111boot.ttt

1.2 中间件 -> 消费者(消费方ack)

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:

  • 自动确认:acknowledge=”none”,默认
  • 手动确认:acknowledge=”manual”
  • 根据异常情况确认:acknowledge=”auto”,(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

@Component
public class RabbimtMQListener {

    @RabbitListener(queues = "boot_queue")
    @RabbitHandler
    public void ListenerQueue(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 1.接收转换信息
            System.out.println(new String(message.getBody()));
            // 2.处理业务逻辑
            System.out.println("处理业务逻辑");
            // 出现异常
            int a = 1 / 0;
            // 3.手动签收
            /*
                deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。
                    手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
                multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
                举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,
                   当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
             */
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 4.拒绝签收
            /*
                deliveryTag:表示消息投递序号。
                multiple:是否批量确认。
                requeue:值为 true 消息将重新回到队列。
             */
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

以上方法为消费者对于队列”boot_queue”的监听器,spring项目中可以实现ChannelAwareMessageListener以重写带有Channel参数的onMessage方法。
由于设置了requeue=true,上面方法将会无限循环,无法正常签收。

1.3 消息可靠性总结

  1. 持久化
    1. exchange要持久化
    2. queue要持久化
    3. message要持久化
  2. 生产方确认Confirm
  3. 消费方确认Ack
  4. Broker高可用(集群)

    2. 消费端限流

    image.png
  • spring配置需要在 中配置 prefetch属性设置消费端一次拉取多少消息
  • springboot配置可以在application.yml文件中设定spring.rabbitmq.listener.prefetch进行全局配置, 这会影响到本Spring Boot应用中所有使用默认SimpleRabbitListenerContainerFactory的消费者。也可以在消费者的配置中自定义一个SimpleRabbitListenerContainerFactory,然后在消费者上声明使用该ContainerFactory即可达到对特定消费者配置prefetch的作用
  • 消费端的确认模式一定为手动确认。acknowledge=”manual”
  • 消费端限流,就保证了系统的稳定性 ```java @Component public class RabbimtMQListener {

    // 特定消费者设置 @Bean public SimpleRabbitListenerContainerFactory myContainerFactory(

          SimpleRabbitListenerContainerFactoryConfigurer configurer,
          ConnectionFactory connectionFactory) {
    
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setPrefetchCount(1); 
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      configurer.configure(factory, connectionFactory);
      return factory;
    

    }

@RabbitListener(queues = "boot_queue", containerFactory = "myContainerFactory")
@RabbitHandler
public void ListenerQueue(Message message, Channel channel) throws Exception {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        // 1.接收转换信息
        // 2.处理业务逻辑
        System.out.println(new String(message.getBody()));
        // 3.手动签收
        // channel.basicAck(deliveryTag, true);
    } catch (Exception e) {
        // 4.拒绝签收
        channel.basicNack(deliveryTag, true, true);
    }
}

}


- 上面的代码中将手动签收注释掉,则消费者只会取一条消息,然后停下等待确认,也就是说消费者每次取prefetch条消息进行处理
- 但是如果不设置prefetch,尽管同样将手动签收注释掉,消费者还是会一次性取出所有消息。
<a name="oNCmi"></a>
## 3. TTL

- TTL 全称 Time To Live(存活时间/过期时间)。
- 当消息到达存活时间后,还没有被消费,会被自动清除。
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

![image.png](https://cdn.nlark.com/yuque/0/2021/png/12909202/1621861048474-193b24c0-25aa-4ab8-8265-2640c83c69e2.png#clientId=u48d23761-1360-4&from=paste&height=378&id=u43be4b88&margin=%5Bobject%20Object%5D&name=image.png&originHeight=378&originWidth=992&originalType=binary&size=27567&status=done&style=none&taskId=u8a60c09f-e6a8-4225-8db6-196af899844&width=992)

1. **设置队列的统一过期时间:**

springboot可以rabbitmq的配置类里设置队列的`x-message-ttl`参数
```java
    @Bean("bootQueue")
    public Queue bootQueue() {
        // 队列中的消息未被消费则5秒后过期
        return QueueBuilder.durable("boot_queue").withArgument("x-message-ttl",5000).build();
    }
  1. 单独设置消息的过期时间:

通过匿名内部类的方式设置messagePostProcessor对象的消息过期时间,作为参数传入convertAndSend方法

    @Test
    public void testTtl() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("3000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend("boot_topic_exchange", "boot.ttt", "boot_topic_exchange消息", messagePostProcessor);
    }
  1. 注意
  • 如果设置了消息的过期时间,也设置了队列的过期时间,则会以时间短的为准
  • 队列过期后,会将队列所有消息全部移除
  • 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除),即 即使消息过期,也不会立即移除掉,只有即将被消费的消息才会判断是否过期

    4. 死信队列

    死信队列,英文缩写:DLX ,也叫Dead Letter Exchange(死信交换机)。当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
    image.png
    消息成为死信的三种情况:
    1. 队列消息长度到达限制;
    2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
    3. 原队列存在消息过期设置,消息到达超时时间未被消费;
    死信队列声明:

    @Configuration
    public class RabbitMQConfig {
    
      //=========================================声明死信队列,和普通交换机、队列没有区别======================================
      @Bean("dlxExchange")
      public Exchange dlxExchange() {
          return ExchangeBuilder.directExchange("dlx_direct_exchange").durable(true).build();
      }
    
      @Bean("dlxQueue")
      public Queue dlxQueue() {
          return QueueBuilder.durable("dlx_queue").build();
      }
    
      @Bean
      public Binding dlxBindQueueExchange(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {
          return BindingBuilder.bind(queue).to(exchange).with("dlx.Order").noargs();
      }
    
      //=========================================声明正常队列,设置死信队列======================================
      @Bean("orderExchange")
      public Exchange orderExchange() {
          return ExchangeBuilder.topicExchange("order_topic_exchange").durable(true).build();
      }
    
      @Bean("orderQueue")
      public Queue orderQueue() {
          Map<String, Object> args = new HashMap<>();
          args.put("x-dead-letter-exchange", "dlx_direct_exchange");
          args.put("x-dead-letter-routing-key", "dlx.Order");
          // 设置参数
          return QueueBuilder.durable("order_queue").withArguments(args).build();
      }
    
      @Bean
      public Binding orderBindQueueExchange(@Qualifier("orderExchange") Exchange exchange, @Qualifier("orderQueue") Queue queue) {
          return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
      }
    }
    

    测试过期消息:

      @Test
      public void testDlx() {
          MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                  message.getMessageProperties().setExpiration("10000");
                  return message;
              }
          };
          rabbitTemplate.convertAndSend("order_topic_exchange", "order.ttt", "我是10秒过期消息", messagePostProcessor);
      }
    

    效果:
    发送一条10秒过期的消息到正常队列order_queue,10秒后,消息从正常队列删除,到达死信队列dlx_queue
    image.png

    5. 延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
    需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器
  2. 延迟队列

image.png
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
image.png

  1. 生产者

声明正常队列,死期队列,并设置队列过期时间

@Configuration
public class RabbitMQConfig {

    //=========================================声明死信队列,和普通交换机、队列没有区别======================================
    @Bean("orderDlxExchange")
    public Exchange orderDlxExchange() {
        return ExchangeBuilder.directExchange("order_dlx_exchange").durable(true).build();
    }

    @Bean("orderDlxQueue")
    public Queue orderDlxQueue() {
        return QueueBuilder.durable("order_dlx_queue").build();
    }

    @Bean
    public Binding dlxBindQueueExchange(@Qualifier("orderDlxExchange") Exchange exchange, @Qualifier("orderDlxQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("dlx.Order").noargs();
    }

    //=========================================声明正常队列,设置死信队列======================================
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return ExchangeBuilder.directExchange("order_exchange").durable(true).build();
    }

    @Bean("orderQueue")
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>();
        // 队列中消息的过期时间
        args.put("x-message-ttl", 10000);
        // 死信队列
        args.put("x-dead-letter-exchange", "order_dlx_exchange");
        args.put("x-dead-letter-routing-key", "dlx.Order");
        return QueueBuilder.durable("order_queue").withArguments(args).build();
    }

    @Bean
    public Binding orderBindQueueExchange(@Qualifier("orderExchange") Exchange exchange, @Qualifier("orderQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("order").noargs();
    }
}

发送消息:

    @Test
    public void testDelay() throws InterruptedException {
        // 发送订单消息,30分钟后进入死信队列,判断是否支付,决定是否减少库存
        String msg = "订单id:0001 时间:2021-5-25 11:00";
        rabbitTemplate.convertAndSend("order_exchange", "order", msg);
        for (int i = 0; i < 10; i++) {
            System.out.println(i + 1);
            Thread.sleep(1000);
        }
    }
  1. 消费者

    @Component
    public class RabbimtMQListener {
    
     @Bean
     public SimpleRabbitListenerContainerFactory myContainerFactory(
             SimpleRabbitListenerContainerFactoryConfigurer configurer,
             ConnectionFactory connectionFactory) {
         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
         factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
         configurer.configure(factory, connectionFactory);
         return factory;
     }
    
     // 注意!!!监听的是死信队列,而是不正常队列
     @RabbitListener(queues = "order_dlx_queue", containerFactory = "myContainerFactory")
     @RabbitHandler
     public void ListenerOrderDlxQueue(Message message, Channel channel) throws Exception {
         long deliveryTag = message.getMessageProperties().getDeliveryTag();
         try {
             // 1.接收转换信息
             System.out.println(new String(message.getBody()));
             // 2.处理业务逻辑
             System.out.println("处理业务逻辑");
             System.out.println("根据订单id查询其状态");
             System.out.println("订单未支付");
             System.out.println("取消订单,回滚仓库");
             // 3.手动签收
             channel.basicAck(deliveryTag, true);
         } catch (Exception e) {
             // 4.拒绝签收
             channel.basicNack(deliveryTag, true, true);
         }
     }
    }
    

    生产者发送的消息首先会到达普通队列,此时消费者监听的死信队列没有消息。
    达到设定的时间之后,即TTL过期之后,消费者监听到死信队列中的消息。

    6. 日志与监控

    6.1 RabbitMQ日志

    RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
    日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

    6.2 rabbitmqctl管理和监控

    ```bash 查看队列

    rabbitmqctl list_queues

查看exchanges

rabbitmqctl list_exchanges

查看用户

rabbitmqctl list_users

查看连接

rabbitmqctl list_connections

查看消费者信息

rabbitmqctl list_consumers

查看环境变量

rabbitmqctl environment

查看未被确认的队列

rabbitmqctl list_queues name messages_unacknowledged

查看单个队列的内存使用

rabbitmqctl list_queues name memory

查看准备就绪的队列

rabbitmqctl list_queues name messages_ready

<a name="MWC57"></a>
## 7. 消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。<br />在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
> firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
```bash
rabbitmqctl trace_on:开启Firehose命令
rabbitmqctl trace_off:关闭Firehose命令

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

启用插件:

rabbitmq-plugins enable rabbitmq_tracing

八、应用问题

1. 消息可靠性保障-消息补偿机制

100%确保消息发送成功
image.png

  • 基本情况是生产者发送消息到Q1,消费者监听Q1并消费Q1的消息
  • 在此基础上生产者延迟发送消息到Q3,消费者接收到Q1消息后发送确认消息到Q2
  • 回调检查服务消费Q2的确认消息,并将消息写入数据库
  • 当延迟消息到达Q3后,回调检查服务消费Q3的延迟消息,与确认消息对比。如果一致,说明消费者收到了消息;如果不一致,调用消费者重新发送消息
  • 考虑最坏情况,步骤2和3均发送失败。定时检查服务会定期查看MDB中的确认消息和DB中的业务数据是否一致,以保证消息重发

    2. 消息幂等性保障-乐观锁解决方案

    幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
    在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
    image.png
    例:生产者下订单,消费者将对应用户的余额减500。可能由于一些特殊原因,消费端重复消费这条消息,使得余额减去500*N,这样就不满足消息的幂等性。可以使用乐观锁来解决这个问题: ```bash 第一次执行:version=1 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1

第二次执行:version=2 update account set money = money - 500 , version = version + 1 where id = 1 and version = 1 ``` 由于第二次执行,已经不满足version = 1,所以不会重复消费