一、AMQP高级消息队列协议

1. 什么是AMQP

  1. AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

2. AMQP工作流程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

RabbitMQ:基于AMQP的开源消息代理软件 - 图1

3. 什么是消息队列

队列是数据结构中概念。数据存储在一个队列中,数据是有顺序的,先进的先出,后进后出。其中一侧负责进数据,另一次负责出数据。

MQ(消息队列)很多功能都是基于此队列结构实现的,队列也是解决高并发下排队问题的解决方案,即使恰巧出现同一时刻向队列添加的两个数据,也会有CPU帮助判断,放入到队列后,一定会有先后顺序。

RabbitMQ:基于AMQP的开源消息代理软件 - 图2

二、RabbitMQ开源消息代理软件

1. RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。所有主要的编程语言均有与代理接口通讯的客户端库。

消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

2. RabbitMQ核心特性

RabbitMQ两大核心特性:异步消息、队列。<br />    异步消息:只要异步消息就不阻塞线程,减少了主线程执行时间。所有需要这种效果场景都可以使用MQ。<br />    队列:进入队列的数据一定有先后之分。只要应用程序要对内容分先后的场景都可以使用MQ。

3. RabbitMQ适用场景

基于RabbitMQ核心特性,可以在下述具体场景中使用:

3.1 应用解耦

如:微信朋友圈和QQ说说的同时发布问题

当不使用MQ时:

这种情况下,QQ和微信中关于说说和朋友圈发布的业务逻辑、数据内容等任何代码发生了任何变化,都可能影响到另外一个系统。属于强耦合。不利于应用服务的升级维护。

RabbitMQ:基于AMQP的开源消息代理软件 - 图3

使用MQ后:

只要QQ和微信定义一个通用的数据载体(消息),当需要同时发布时,只要把通用的数据发送到队列,另一方消费处理这个消息,就可以实现同时发布说说和朋友圈了。这种情况下,任何系统的升级,只要可处理这个通用的数据载体(消息),就可以保持同时发布的功能,且不会对另外一个系统造成任何影响。

RabbitMQ:基于AMQP的开源消息代理软件 - 图4

3.2 排队算法

使用队列特性。把数据发送给MQ,进入到队列就有了排队的效果

3.3 秒杀活动

使用队列特性。例如:抢红包、限时秒杀、直播卖货时抢商品。使用了MQ按照顺序一个一个操作,当商品库存操作到0个时,秒杀结束。

3.4 消息分发

在程序中同时向多个其他程序发送消息。应用了AMQP中交换机,实现消息分发。

3.5 异步处理

利用MQ异步消息特性。大大提升主线程效率。

3.6 数据同步

利用异步特性。我们电商中使用RabbitMQ绝大多数的事情就是在实现数据同步。

3.7 处理耗时任务

利用异步特性。可以把程序中耗时任务(例如:发送邮件、发送验证码)交给MQ去处理,减少当前项目的耗时时间。

3.8 流量削峰

在互联网项目中,可能会出现某一段时间范围内,访问流量骤增的情况(双11、品牌促销,10点抢购),如果使用监控工具,会发现这段时间访问出现顶峰。使用MQ可以把这些访问分摊到多个项目中,把流量分摊,去除了顶峰效果,这就叫做流量削锋。是利用RabbitMQ中交换机实现的。

4. 目前市场上常见的MQ中间件

4.1 RabbitMQ

功能强大、适合分布式项目。Spring提供了对RabbitMQ的支持。Spring AMQP框架。在Spring Cloud中尤其Spring Cloud Netflix中无缝整合。

4.2 ActiveMQ

小巧。以前的项目中使用比较多。由Apache推出的。

4.3 RocketMQ

阿里的MQ。如果使用Spring Cloud Alibaba,推荐使用这个MQ。

4.4 Kafka

主要应用在大数据流处理上。也具备MQ相关功能。

三、RabbitMQ核心原理(常见面试题)

1. 结构原理图

RabbitMQ:基于AMQP的开源消息代理软件 - 图5

发送者Publisher向RabbitMQ发送消息Message,在Message会包含路由键Routing Key、交换器名称、消息内容。交换器Exchange接收到消息Message后会根据交换器类型Exchange Type判断把消息如何发送给绑定的队列Queue中,如果交换器类型是Direct这个消息只放入到路由键对应的队列中,如果是topic交换器消息放入到routing key匹配的多个队列中,如果是fanout交换器消息会放入到所有绑定到交换器的队列中。放入到队列成功后会返回给发送者Publisher一个ACK确认消息,表示消息发送成功了。剩下的事情是由Consumer进行完成,Consumer一直在监听队列,当队列里面有消息就会把消息取出,取出后根据程序的逻辑对消息进行处理,处理完成后会返回给RabbitMQ一个ACK,表示消息处理完成,RabbitMQ会删除这个消息。以上这些就是RabbitMQ的运行原理。

2. 核心概念

2.1 Message

消息。它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。

2.2 Publisher

消息的生产者。也是一个向交换器发布消息的客户端应用程序。<br />    通俗说明:哪些项目向RabbitMQ发送消息,哪些项目就是Publisher

2.3 Consumer

消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。Consumer会一直监听指定的队列,只要队列中有消息,就会按照顺序依次取出。使用MQ做耗时任务时,耗时任务就交给Consumer进行完成。

2.4 Exchange

交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。一共支持四种的交换器类型:
  1. direct(发布与订阅 完全匹配)
  2. fanout(广播)
  3. topic(主题,规则匹配)
  4. header(使用较少,相比direct就多了一些头信息)

2.5 Binding

绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。<br />    一个交换器里面可以绑定多个队列。一个队列一般都是只绑定到一个交换器上。消息发送给交换器,交换器会把效果按照特定规则发送给绑定的队列。

2.6 Queue

消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

2.7 Routing-key

路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的映射,路由键是key,队列是value)。队列通过路由键绑定到交换器。<br />    消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。<br />    如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞(相当于丢弃)。<br />    通俗理解:队列绑定到交换器时有路由键,这个路由键就相当于key-value中的key,value则是队列。当Publisher发送消息时一定会携带路由键(即使路由键是Null),有了路由键就让交换器知道了这个消息要发送给哪个队列。

2.8 Connection

链接。指Rabbit服务器和客户端建立的TCP链接。

2.9 Channel

Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。<br />    在RabbitMQ中,TCP链接一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。

2.10 Virtual Host

虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 '/'。<br />    通俗理解:一个RabbitMQ可以包含多个虚拟主机,每个虚拟主机都是一个RabbitMQ。平时我们没有去创建虚拟主机,都是使用RabbitMQ里面默认的'/'虚拟主机,单实际上一个RabbitMQ可以包含多个虚拟主机主机的,也就是说一个RabbitMQ可以包含多个实例。就像MySQL可以创建多个数据库一样。

2.11 Borker

表示消息队列服务器实体。就是RabbitMQ服务器进程。

2.12 交换器和队列的关系

交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟交换器中绑定的路由键匹配,那么消息就会被路由到该绑定的队列中。<br />    也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。<br />    路由键可以理解为匹配的规则。

2.13 RabbitMQ为什么需要信道?为什么不是TCP直接通信?

  1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。
  2. 如果不用信道,那应用程序就会以TCP链接RabbitMQ,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。
  3. 信道的原理是一条线程一条通道,多条线程多条通道共用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

这也是为什么使用RabbitMQ去处理秒杀、流量削锋、海量请求时依然对RabbitMQ比较有信心的原因。

四、基于Docker安装RabbitMQ

1. 拉取镜像

docker pull rabbitmq:management

2. 创建并启动容器

创建容器时,需要提供默认用户及密码。课上使用bjsxt作为用户名及密码。
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 --restart=always -e DEFAULT_USER=bjsxt -e DEFAULT_PASS=bjsxt rabbitmq:management

3. 访问管理界面

访问地址:
http://Docker宿主机IP:15672
显示下属登录界面,即为安装成功。

RabbitMQ:基于AMQP的开源消息代理软件 - 图6

4. 管理平台简介

4.1 Overview

此面板为RabbitMQ基础信息展示面板,列举了服务器的信息,如:节点名称、内存占用、磁盘占用等。

RabbitMQ:基于AMQP的开源消息代理软件 - 图7

4.2 Connections

此面板中展示所有连接到RabbitMQ的客户端链接。只展示基于5672端口的链接。

4.3 Channels

此面板中展示各链接中的具体信道。标记方式为'链接(编号)',如:192.168.91.1:12345(1)。

4.4 Exchanges

此面板中展示RabbitMQ中已有的交换器,并注明交换器名称、类型等基本信息。

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在RabbitMQ中支持四种交换器类型。

1. Direct Exchange:直连交换器(默认)。通过路由键明确指定存储消息的一个队列。

2. Fanout Exchange:扇形交换器。把消息发送给所有绑定的队列。

3. Topic Exchange:主题交换器。按照路由规则,把消息发送给多个队列。

4. Headers Exchange:首部交换器。比Direct多了一些头部消息,平时使用较少。

其中只有direct交换器有默认交换器(AMQP default),当使用direct交换器时,如果没有明确指定名称,使用AMQP default交换器,也可明确指定名称。但是其他类型交换器没有默认的,都需要指定名称。

RabbitMQ:基于AMQP的开源消息代理软件 - 图8

也可以点击任何一个交换器,进入详细信息展示面板。

RabbitMQ:基于AMQP的开源消息代理软件 - 图9

4.4.1 基于管理平台创建交换器
交换器名称要求:字母、数字、'-'、'_'、'.'组成。建议使用字母+'.'+数字组成交换器名称。

RabbitMQ:基于AMQP的开源消息代理软件 - 图10

4.4.2 基于交换器绑定队列
进入交换器详情展示面板。并绑定队列。

路由键定义要求:字母、数字、'-'、'_'、'.'组成。特殊符号包括'*'和'#'。建议使用字母+'.'+数字组成

'*' : 代表一个单词。不包含'.'的单词

'#' : 代表任意字符串。

RabbitMQ:基于AMQP的开源消息代理软件 - 图11

4.5 Queues

此面板展示RabbitMQ中的队列信息。

RabbitMQ:基于AMQP的开源消息代理软件 - 图12

点击任何队列,可进入详情展示面板。

RabbitMQ:基于AMQP的开源消息代理软件 - 图13

4.5.1 基于管理平台创建队列
队列名称要求:字母、数字、'-'、'_'、'.'组成。建议使用字母+'.'+数字组成队列名称。

在RabbitMQ中,队列中的消息默认都是保存到内存中的。这样处理更加快捷。为避免服务器关停导致丢失未处理的消息,RabbitMQ提供了队列持久化能力。

其中Durability代表队列是否可持久化。Durable代表可持久,即RabbitMQ服务器正常关闭时,未处理的消息会持久化到磁盘中,下次启动的时候自动加载恢复。

RabbitMQ:基于AMQP的开源消息代理软件 - 图14

4.5.2 基于队列绑定本身到某交换器
进入队列详情版面,并绑定本身到指定的交换器。

RabbitMQ:基于AMQP的开源消息代理软件 - 图15

4.6 Admin

此面板展示当前RabbitMQ中的用户,并提供各种服务器管理功能。

RabbitMQ:基于AMQP的开源消息代理软件 - 图16

五、Spring AMQP框架简介

Spring AMQP是Spring的顶级项目。是基于AMQP协议的消息传递解决方案。此框架提供顶级抽象模板AmqpTemplate接口,用于抽象消息传递标准。提供基于容器的监听处理。暂时Spring AMQP只提供了基于RabbitMQ处理消息传递的解决方案。其具体接口是RabbitOperations、实现是RabbitTemplate。

其官方解释如下:

RabbitMQ:基于AMQP的开源消息代理软件 - 图17

六、使用Spring AMQP访问RabbitMQ

使用Spring AMQP实现消息传递的实质,就是如何发送消息到指定的交换器,如何从指定的队列中消费消息。下面基于四种不同类型的交换器实现消息传递,重点关注交换器特性。

所有演示代码基于一个完整项目环境。包括发送消息的Publisher发布者工程与消费消息的Consumer消费者工程。

1. Direct交换器

direct交换器是RabbitMQ默认交换器类型。客户端向direct类型交换器发送消息时,direct会根据路由键把消息放入到指定的队列中。

基于管理平台创建交换器及队列,并绑定。交换器名称为:direct.first.ex,队列名称为queue.first,路由键为:routing.key.1。 后续不再使用管理平台创建任何交换器和队列。都基于注解实现所有流程。

1.1 新建项目spring_amqp_test

创建顶级项目spring_amqp_test。

1.1.1 编辑POM依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bjsxt</groupId>
    <artifactId>spring_amqp_test</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
    </parent>
</project>

1.2 新建子项目amqp_publisher

新建子模型(module)amqp_publisher。

1.2.1 编辑POM依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>s118_amqp</artifactId>
        <groupId>spring_amqp_test</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>amqp_publisher</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

1.2.2 编辑application.yml配置文件
# 配置RabbitMQ相关信息
# 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
# guest用户只能本地访问RabbitMQ。
spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

1.2.3 编写启动类型AmqpPublisherApp

新建启动类型: com.bjsxt.AmqpPublisherApp

@SpringBootApplication
public class AmqpPublisherApp {
    public static void main(String[] args) {
        SpringApplication.run(AmqpPublisherApp.class, args);
    }
}

1.2.4 编写测试类型TestPublisher

新建测试类型:com.bjsxt.test.TestPublisher

/**
 * 测试Spring AMQP框架中发送消息的方式。
 */
@SpringBootTest
public class TestPublisher {
    /**
     * 注入客户端对象。
     * 类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
     * 建议使用接口: 优先级是 RabbitOperations > AmqpTemplate
     */
    @Autowired
    private RabbitOperations rabbitOperations;

    /**
     * 测试发送消息
     * 消息内容是字符串。
     * 注意:
     * Spring AMQP可以发送的消息类型必须是Message类型。
     * Spring AMQP可以帮助程序员自动封装消息类型Message对象。
     * 只要提供消息具体内容(消息体)即可实现默认封装。
     * Spring AMQP可以自动转换封装的消息体类型是Object。只要类型可序列化即可。
     */
    @Test
    public void testSendStringMessage(){
        String messageContent = "第一个字符串消息";
        String exchangeName = "direct.first.ex";
        String routingKey = "routing.key.1";

        // 发送消息的时候,只要指定要发送到的具体交换器名称,路由键,和消息内容即可。
        rabbitOperations.convertAndSend(exchangeName, routingKey, messageContent);

        System.out.println("消息发送完毕");
    }
}

1.2.5 查看结果

在RabbitMQ的Web管理界面中查看队列里面是否已经有了一个消息。

RabbitMQ:基于AMQP的开源消息代理软件 - 图18

1.2.6 基于Configuration配置创建交换器、队列及完成绑定
如果不希望通过可视化创建队列及绑定队列到交换器上,也可以通过配置类进行创建。在项目中新建一个配置类com.bjsxt.config.RabbitMQConfig。

重点强调:发送消息时创建队列、创建交换器、绑定交换器。
@Configuration
public class RabbitMQConfig {
    // 发送消息时如果不存在这个队列,会自动创建这个队列。
    // 注意:是发送消息时,而不是启动项目时。
    // 相当于:可视化操作时创建一个队列
    // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
    @Bean
    public Queue queue(){
        return new Queue("queue.second");
    }

    // 如果没有这个交换器,在发送消息创建这个交换器
    // 配置类中方法名就是这个类型的实例名。相当于<bean id="" class="">的id属性,返回值相当于class
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct.first.ex");
    }

    // 配置类中方法参数,会由Spring 容器自动注入
    @Bean
    public Binding directBingding(DirectExchange directExchange,Queue queue){
        // with(“自定义路由键名称”)
        return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
        // withQueueName() 表示队列名就是路由键名称
        // return BindingBuilder.bind(queue).to(directExchange).withQueueName();
    }
}

1.3 新建子项目 amqp_consumer

新建子模型(module)amqp_consumer

1.3.1 编辑POM依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_amqp_test</artifactId>
        <groupId>com.bjsxt</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>amqp_consumer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
</project>

1.3.2 编辑application.yml配置文件
spring:
  rabbitmq:
    host: 192.168.91.128
    port: 5672
    username: bjsxt
    password: bjsxt

1.3.3 编辑消息消费者

1.3.3.1 消费者简单监听队列

新建类型: com.bjsxt.consumer.StringMessageConsumer

/**
 * 字符串类型消息体处理消费者
 * Spring AMQP和Spring Boot配合的启动器,可以自动注册监听。
 * 要求,当前类型的bean对象,必须被spring容器管理。
 */
@Component
public class StringMessageConsumer {
    /**
     * 定义处理消息的消费方法。
     * 方法定义要求:
     *  修饰符: public
     *  返回值: 异步消息必须是void
     *  方法名: 自定义
     *  参数表: 一个参数,类型可以是Message或者具体的消息体类型(Object)。
     *   message是Spring AMQP中消息的唯一类型。代表完整消息,有头和体组成。
     *   如果对消息头没有任何处理要求,则直接定义消息体具体类型即可。
     *  方法实现: 根据具体要求,定义即可。
     * 注意:
     *  方法可以抛出任意类型的异常。只要抛出异常,则代表消费错误,RabbitMQ不删除队列中的消息。
     * 方法必须使用注解描述。注解是RabbitListener。
     *  代表当前方法是一个RabbitMQ的消息监听器。
     *
     * 注解RabbitListener
     *  可选属性:
     *   queues - 当前方法监听的队列名称都是什么。可以有多个队列。属性类型是String[]
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
    }
}

1.3.3.2 消费者基于注解动态监测监听器、队列及绑定规则
    /**
     * 常规开发中,都会先定义消息的消费者。后定义消息的发布者。
     * 典型的观察者设计模式。先有监听,后有事件。
     *
     * 注解RabbitListener
     *  可选属性: 常用属性
     *   bindings - 定义绑定规则。属性类型是: QueueBinding[]
     * 注解QueueBinding,描述具体的绑定规则。就是交换器和队列的绑定规则。
     *  必要属性:
     *   value - 监听的队列,属性类型是Queue
     *   exchange - 队列绑定的交换器,属性是Exchange
     *  可选属性:
     *   key - 绑定的路由键都是什么,类型是String[]
     * 注解Queue,描述一个具体的队列,如果队列在RabbitMQ中存在,直接使用并监听;如果不存在,
     *  创建队列并监听。
     *  可选属性:
     *   name - 队列名称,String类型
     *   autoDelete - 是否是自动删除的队列,String类型。可选值: "true" | "false"
     *
     * 注解Exchange,描述一个具体的交换器,如果交换器存在,直接使用;如果不存在,则创建,
     *  并基于key绑定队列
     *  可选属性:
     *   name - 交换器名称,String类型
     *   autoDelete - 是否是自动删除的交换器,String类型。可选值: "false" | "true"。默认false
     *   type - 交换器的类型,String类型。默认是direct。可选: direct,fanout,topic,headers
     *    可以使用枚举类型中的常量赋值,具体是ExchangeTypes.XXX
     * @param messageBody
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.second", autoDelete = "false", durable = "true"),
                    exchange = @Exchange(name = "direct.second.ex",
                            autoDelete = "false", type = ExchangeTypes.DIRECT),
                    key = {"routing.key.second.1", "routing.key.second.2"}
            )
    })
    public void onMessage(String messageBody){
        System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
    }

1.3.4 编写启动类型

新建com.bjsxt.AmqpConsumerApp

/**
 * Spring AMQP框架,启动后,如果有监听器,项目不会自动关闭。
 *  启动后,自动开启新的子线程,长期维护监听器对象的生命周期,直到关闭JVM进程为止。
 */
@SpringBootApplication
public class AmqpConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(AmqpConsumerApp.class, args);
    }
}

1.3.5 观察消费结果

RabbitMQ:基于AMQP的开源消息代理软件 - 图19

1.4 消费者集群效果

1.4.1 修改消费者

修改类型: com.bjsxt.consumer.StringMessageConsumer

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"queue.first"})
    public void onMessage1(String messageBody){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
    }

    @RabbitListener(queues = {"queue.first"})
    public void onMessage2(String messageBody){
        System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
    }


    @RabbitListener(queues = {"queue.first"})
    public void onMessage3(String messageBody){
        System.out.println("第三个消息消费者监听,处理消息:" + messageBody);
    }
}

1.4.2 修改发送消息测试代码

测试类型:com.bjsxt.test.TestPublisher 中新增测试方法

    /**
     * 测试集群消费者处理方式
     */
    @Test
    public void testConsumerCluster(){
        for(int i = 0; i < 10; i++){
            rabbitOperations.convertAndSend("direct.first.ex",
                    "routing.key.1",
                    "消息-"+i);
        }
    }

1.4.3 观察结果
集群消费方式采用公平调度,即每个消费者轮流消费队列中的消息。且消费者集群的消费调度方式与交换器类型无关。

RabbitMQ:基于AMQP的开源消息代理软件 - 图20

2. Fanout交换器

扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换器上的队列。且忽略路由键,也就是,交换器和队列没有绑定路由键;发送消息时,可使用任何路由键(包括null),因为fanout会忽略路由键,把消息投递到所有绑定的队列中。

2.1 编写amqp_consumer消费者工程

2.1.1 编写消费者

新建类型:com.bjsxt.consumer.StringMessageFanoutConsumer

@Component
public class StringMessageFanoutConsumer {
    /**
     * 处理扇形交换器的消费者方法,和direct没有任何区别。
     * 只是不需要考虑绑定过程中的路由键。
     * 无论是否提供路由键,RabbitMQ都会忽略。
     * @param messageBody
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.fanout.1", autoDelete = "false"),
                    exchange = @Exchange(name = "fanout.first", type = "fanout")
            )
    })
    public void onMessage1(String messageBody){
        System.out.println("onMessage1 - 消费扇形交换器消息:" + messageBody);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.fanout.2", autoDelete = "false"),
                    exchange = @Exchange(name = "fanout.first", type = "fanout"),
                    key = {"routing.key.fanout.2"}
            )
    })
    public void onMessage2(String messageBody){
        System.out.println("onMessage2 - 消费扇形交换器消息:" + messageBody);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.fanout.3", autoDelete = "false"),
                    exchange = @Exchange(name = "fanout.first", type = "fanout"),
                    key = {"routing.key.fanout.3"}
            )
    })
    public void onMessage3(String messageBody){
        System.out.println("onMessage3 - 消费扇形交换器消息:" + messageBody);
    }
}

2.2 编写amqp_publisher发布者工程

2.2.1 修改测试类型

增加测试方法:

    @Test
    public void testSendMessage2Fanout(){
        rabbitOperations.convertAndSend(
                "fanout.first",
                "routing.key.fanout.2",
                "路由键是routing.key.fanout.2"
        );
        rabbitOperations.convertAndSend(
                "fanout.first",
                "routing.key.fanout.3",
                "路由键是routing.key.fanout.3"
        );
        rabbitOperations.convertAndSend(
                "fanout.first",
                null,
                "路由键是null"
        );
        rabbitOperations.convertAndSend(
                "fanout.first",
                "random",
                "路由键是random"
        );
    }

2.3 观察结果

RabbitMQ:基于AMQP的开源消息代理软件 - 图21

3. Topic交换器

主题交换器。此交换器是最常用的,功能最丰富,应用最灵活的交换器。其绑定队列使用的路由键基于规则匹配,可以使用定制化处理。路由键可包括特殊字符实现通配。特殊字符包括: '*' 和 '#'。

'*' : 代表一个单词。多个单词使用'.'分割。

'#' : 代表0~n个字符,即任意字符串。

3.1 编写amqp_consumer消费者工程

3.1.1 编写消费者

新建类型:com.bjsxt.consumer.StringMessageTopicConsumer

/**
 * 主题交换器。
 * 路由键是匹配规则,可以是具体的路由键,部分匹配的路由键,或者通配路由键。
 * 具体路由键,如: a   b   routing.key  等。相当于direct
 * 部分匹配, 如: 小学.*   初中.*   高中.*  *.同学   *.老师 等。注意,多个单词之间使用'.'分隔。
 * 通配, 只有 #。代表包含'.'的所有字符串。相当于fanout
 */
@Component
public class StringMessageTopicConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.topic.1", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.first", type = "topic"),
                    key = {"小学.同学"}
            )
    })
    public void onMessage1(String message){
        System.out.println("小学.同学 - " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.topic.2", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.first", type = "topic"),
                    key = {"小学.老师"}
            )
    })
    public void onMessage2(String message){
        System.out.println("小学.老师 - " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.topic.3", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.first", type = "topic"),
                    key = {"小学.*"}
            )
    })
    public void onMessage3(String message){
        System.out.println("小学.* - " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.topic.4", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.first", type = "topic"),
                    key = {"大学.老师"}
            )
    })
    public void onMessage4(String message){
        System.out.println("大学.老师 - " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.topic.5", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.first", type = "topic"),
                    key = {"*.老师"}
            )
    })
    public void onMessage5(String message){
        System.out.println("*.老师 - " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.topic.6", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.first", type = "topic"),
                    key = {"#"}
            )
    })
    public void onMessage6(String message){
        System.out.println("# - " + message);
    }
}

3.2 编写amqp_publisher发布者工程

3.2.1 修改测试类型

增加测试方法:

    @Test
    public void testSendMessage2Topic(){
        rabbitOperations.convertAndSend(
                "topic.first",
                "小学.同学",
                "我胡汉三又回来了,谁请我吃饭?"
        );
        rabbitOperations.convertAndSend(
                "topic.first",
                "小学.老师",
                "上班后,没发现有游泳池同时进水和防水的问题。"
        );
        rabbitOperations.convertAndSend(
                "topic.first",
                "小学.*",
                "曾经的朋友们,我要结婚了,都准备好红包!!!"
        );
        rabbitOperations.convertAndSend(
                "topic.first",
                "大学.老师",
                "当初骗我说,不上课就找不到工作,结果上课还是没找到。"
        );
        rabbitOperations.convertAndSend(
                "topic.first",
                "*.老师",
                "教师节快乐"
        );
        rabbitOperations.convertAndSend(
                "topic.first",
                "#",
                "换手机了,新号码是13588888888,够硬吧。"
        );
    }

3.3 观察结果

RabbitMQ:基于AMQP的开源消息代理软件 - 图22

4. 传递自定义类型消息

在RabbitMQ中,对消息体的数据类型没有任何约束。使用Spring AMQP框架实现消息传递时,只要求消息体数据类型必须可序列化,也就是必须实现接口java.io.Serializable。除此之外没有任何其他要求。而消费者消费消息时,可以通过编码反向序列化消息体中的字节数组(因为Java的Serializable就是把对象序列化成字节数组)转成Java对象,也可以基于Spring AMQP自动类型处理机制直接处理。

4.1 定义消息类型工程 amqp_pojo

创建子模块: amqp_message_pojo

4.1.1 编辑POM依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring_amqp_test</artifactId>
        <groupId>com.bjsxt</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>amqp_message_pojo</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

4.1.2 定义消息类型

新建类型:com.bjsxt.message.MyMessage

/**
 * 自定义消息体类型。
 * Spring AMQP可以传输任何可序列化的Java对象。
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MyMessage implements Serializable {
    private Long id;
    private String name;
    private String gender;
}

4.2 编写amqp_consumer消费者工程

4.2.1 编辑POM依赖

增加新依赖:

        <dependency>
            <groupId>com.bjsxt</groupId>
            <artifactId>amqp_message_pojo</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

4.2.2 编写消费者

新建消费者类型:com.bjsxt.consumer.MyMessageConsumer

/**
 * 处理自定义类型消息体
 */
@Component
public class MyMessageConsumer {
    /**
     * Spring AMQP可以自动实现消息体类型转换。
     * 使用的方式是强制类型转换。只要包装传输的消息体数据类型和方法参数类型匹配即可。
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "pojo.queue1", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
                    key = {"routing.key.1"}
            )
    })
    public void onMessage1(MyMessage myMessage){
        System.out.println("处理自定义消息:" + myMessage);
    }

    /**
     * 可以通过统一消息类型Message处理消息内容
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "pojo.queue2", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
                    key = {"routing.key.2"}
            )
    })
    public void onMessage2(Message message) throws Exception{
        // 获取消息体,消息体是字节数组。根据具体类型进行处理。
        byte[] body = message.getBody();
        ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(body);
        ObjectInputStream inputStream =
                new ObjectInputStream(byteArrayInputStream);
        Object obj = inputStream.readObject();
        if(obj.getClass() == MyMessage.class){
            MyMessage myMessage = (MyMessage) obj;
            System.out.println(myMessage);
        }
        System.out.println("消息体中的对象类型是:" + obj.getClass().getName());
    }
}

4.3 编写amqp_publisher发布者工程

4.3.1 编辑POM依赖

增加新依赖:

        <dependency>
            <groupId>com.bjsxt</groupId>
            <artifactId>amqp_message_pojo</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

4.3.2 修改测试类型

新增测试方法

    @Test
    public void testSendObject(){
        MyMessage message = new MyMessage(1L, "张三毛", "男");
        rabbitOperations.convertAndSend(
                "topic.pojo.ex",
                "routing.key",
                message
        );
    }

七、ACK确认机制

默认情况下如果一个 消息(Message)被消费者(Consumer)正确消费,则会从 队列(Queue) 中移除;如果未被Consumer正确消费(如消费者代码发生异常),Message仍旧保留在Queue中,直到被正确消费为止。

那么RabbitMQ是如何确定消息正确消费的?这是基于ACK机制(消息确认机制)实现的。其本质就是,消费者在消费消息成功后,需要通知RabbitMQ已正确消费,RabbitMQ接收到确认信息后,则会从队列中移除已消费的消息。

基于上述的思想,那么发布者(Publisher)发送消息(Message)时,是否能确定发送是否成功呢?显然是一定可以的。这也是基于ACK机制实现的。只不过,这次是RabbitMQ发送确认信息给Publisher。反馈的确认信息有2次,分别是Exchange是否可到达(交换器是否存在),队列是否可路由(路由键是否有队列与之绑定)。

接下来分别实验发送消息和消费消息的ACK机制特征。

1. 发送消息的确认

1.1 当发送消息不能正确到达交换器时

当Publisher发送消息时,需要先连接到Broker(RabbitMQ实例),再进入到virtual-host(虚拟主机),最后到达Exchange(交换器)。如果无法到达交换器,RabbitMQ会触发RabbitTemplate中的回调机制,通知Publisher消息发送的失败原因。

具体实现如下:

1.1.1 修改application.yml配置
修改amqp_publisher工程中的配置文件,具体如下:
spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
    publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。

1.1.2 增加类型
新建类型:com.bjsxt.handle.PublisherHandler。此类型实现ConfirmCallback接口,提供不能到达交换器时的回调处理逻辑。并把当前对象设置到RabbitTemplate对象中。
@Component
public class PublisherHandler implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法执行结束后立刻执行此方法。即初始化逻辑。
     */
    @PostConstruct
    public void init(){
        // 设置RabbitTemplate中的回调逻辑
        this.rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 当交换器不能到达时,具体的处理方案。
     * @param correlationData 消息唯一标记
     * @param ack 是否确认
     * @param cause 不能到达交换器(即ack为false)的具体原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标记 : " + correlationData);
        System.out.println("是否确认到达交换器 : " + ack);
        System.out.println("不能到达交换器的原因 : " + cause);
    }
}

1.1.3 修改测试类型
新增测试方法:
    @Test
    public void testSendAck(){
        rabbitOperations.convertAndSend(
                "不存在的交换器",
                "瞎写的路由键",
                "永远发送不到队列中的消息");
    }

1.1.4 观察结果

RabbitMQ:基于AMQP的开源消息代理软件 - 图23

1.2 当消息不能正确路由到队列时

当Publisher发送消息可以正常到达交换器,但是路由键没有对应的绑定队列时,会触发另外一个回调处理机制,具体如下:

1.2.1 修改application.yml配置
修改amqp_publisher工程中的配置文件,具体如下:
spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
    publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。
    publisher-returns: true # 开启路由失败确认机制。默认值:false

1.2.2 修改PublisherHandler
修改类型:com.bjsxt.handle.PublisherHandler。增加新接口,提供路由失败回调处理逻辑。
@Component
public class PublisherHandler implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法执行结束后立刻执行此方法。即初始化逻辑。
     */
    @PostConstruct
    public void init(){
        // 设置RabbitTemplate中的回调逻辑
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 消息路由失败回调逻辑
     * @param returned 路由失败的消息
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("交换器 : " + returned.getExchange());
        System.out.println("路由键 : " + returned.getRoutingKey());
        System.out.println("路由失败编码 : " + returned.getReplyCode());
        System.out.println("路由失败描述 : " + returned.getReplyText());
        System.out.println("消息 : " + returned.getMessage());
    }

    /**
     * 当交换器不能到达时,具体的处理方案。
     * @param correlationData 消息唯一标记
     * @param ack 是否确认
     * @param cause 不能到达交换器(即ack为false)的具体原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("消息唯一标记 : " + correlationData);
        System.out.println("是否确认到达交换器 : " + ack);
        System.out.println("不能到达交换器的原因 : " + cause);
    }
}

1.2.3 修改测试类型
修改测试方法:使交换器可到达
    @Test
    public void testSendAck(){
        rabbitOperations.convertAndSend(
                "direct.first.ex",
                "瞎写的路由键",
                "永远发送不到队列中的消息");
    }

1.2.4 观察结果

RabbitMQ:基于AMQP的开源消息代理软件 - 图24

2. 消费消息确认

在Spring AMQP中,消费者(Consumer)默认的ACK机制是自动确认,即消费代码正常执行结束,立刻确认消息已消费;消费代码发送异常,相当于消息未消费。如果希望关闭自动ACK机制,可使用两种处理方案实现。

2.1 重试消费

可以在消费者中基于配置开启重试机制,并设置重试消费次数。当消费消息发生错误,导致未确认(NACK)时,消费者尝试重复消费消息;当重复消费次数到达设置阈值后,强制确认(ACK),RabbitMQ会移除队列中的消息。

2.1.1 修改application.yml配置
修改amqp_consumer工程中配置
spring:
  rabbitmq:
    host: 192.168.91.128
    port: 5672
    username: bjsxt
    password: bjsxt
    listener:
      simple:
        retry:
          enabled: true # 开启重试机制
          max-attempts: 1  # 重试消费1次

2.1.1 修改消费者

修改类型: com.bjsxt.consumer.StringMessageConsumer

@Component
public class StringMessageConsumer {
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        int i = 1/0; // 模拟消费失败
    }
}

2.2 手工确认ACK

修改amqp_consumer工程中的application.yml配置文件
spring:
  rabbitmq:
    host: 192.168.91.128
    port: 5672
    username: bjsxt
    password: bjsxt
    listener:
      simple:
        acknowledge-mode: manual # 手工确认。 默认AUTO,自动确认

2.2.1 ACK确认(正常消费)

修改消费方法逻辑:

@Component
public class StringMessageConsumer {
    /**
     * 消费方法。实现手工ACK确认
     * @param messageBody 消息内容
     * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
     *                RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
     * @param deliveryTag  RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
     *                     是一个递增的正整数,delivery tag 的范围仅限于 Channel
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        try {
            /*
             * 确认消息
             *  参数1 - 消息的唯一标识
             *  参数2 - 是否批量提交。为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,
             *         则可以一次性确认 deliveryTag 小于等于传入值的所有消息
             */
            channel.basicAck(deliveryTag, false);
            System.out.println("消息已确认");
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

2.2.2 NACK确认(错误消费)

修改消费方法逻辑:会发现NACK确认的消息重复消费的效果。

@Component
public class StringMessageConsumer {
    /**
     * 消费方法。实现手工NACK确认
     * @param messageBody 消息内容
     * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
     *                RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
     * @param deliveryTag  RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
     *                     是一个递增的正整数,delivery tag 的范围仅限于 Channel
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        try {
            /*
             * 参数1 - 消息的唯一标识
             * 参数2 - 是否批量提交
             * 参数3 - 是否重新发出消息。false则废弃此消息。
             */
            channel.basicNack(deliveryTag, false, true);

            System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

2.2.3 Reject确认(丢弃)

修改消费方法逻辑:消息无论是否正常消费,都会丢弃。相当于另类的ACK确认。

@Component
public class StringMessageConsumer {
    /**
     * 消费方法。实现手工ACK确认
     * @param messageBody 消息内容
     * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
     *                RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
     * @param deliveryTag  RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
     *                     是一个递增的正整数,delivery tag 的范围仅限于 Channel
     */
    @RabbitListener(queues = {"queue.first"})
    public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
        try {
            /*
             * 参数1 - 消息的唯一标识
             * 参数3 - 是否重新发出消息。false则废弃此消息。
             */
            channel.basicReject(deliveryTag, false);

            System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

八、同步消息

之前的Publisher在发出消息后,都是立刻返回的,是无法接收任何的消息消费反馈的。如果需要消息的消费反馈结果,则需要使用同步消息逻辑来实现。如秒杀、抢红包等功能时都适用。具体实现如下:

1. 编辑消息消费者

此逻辑中需要Consumer返回ACK值(监听方法返回值),所以在使用这个功能时,消费方法逻辑必须有返回结果。
/**
 * 同步消息消费者。如:抢红包。
 */
@Component
public class SyncMessageConsumer {
    /**
     * 同步消息消费方法。和异步消息消费方法的唯一区别就是有返回值。类型不限。
     * @param message
     * @return
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "queue.sync", autoDelete = "false"),
                    exchange = @Exchange(name = "topic.sync", type = "topic"),
                    key = {"routing.key"}
            )
    })
    public String onMessage(String message){
        System.out.println("处理同步消息:" + message);
        return "消息已处理";
    }
}

2. 编辑发送消息测试方法

    /**
     * 转换,处理,发送消息,并等待接收消费者反馈。方法返回值就是消费者端返回的确认消息,即消费方法返回结果
     * Object convertSendAndReceive(String exchange, String routingKey, Object messageBody)
     * 如果消费者超时未返回,代码自动结束,并向下继续运行。
     * 消费者反馈结果使用null填充。
     */
    @Test
    public void testSyncMessage(){
        String message = "测试同步消息";
        Object result = rabbitOperations.convertSendAndReceive(
                "topic.sync",
                "routing.key",
                message
        );
        if(result == null){
            System.out.println("超时未返回");
        }else {
            System.out.println(result.getClass().getName());
            System.out.println(result);
        }
    }
如果超过时间没有收到consumer返回值,不在继续阻塞主线程,向下执行,返回值为null。如果需要设置超时时间,可以通过配置文件实现,具体如下:(修改publisher端配置)
spring:
  rabbitmq:
    host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
    port: 5672 # RabbitMQ服务器的端口。
    username: bjsxt # RabbitMQ的访问用户名。默认guest。
    password: bjsxt # RabbitMQ的访问密码。默认guest
    virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
    template:
      reply-timeout: 100000 # 配置同步消息超时时长,单位毫秒

九、常见面试题

1. 幂等性

幂等性常见两种情况:接口幂等性和消息幂等性。

1.1 接口幂等性

接口幂等性是指用户的同一个操作发起的一次请求或多次请求的结果是一样的,不会因为多次点击而产生不同的结果。

举例说明:用户在提交表单时(注册、电商中下单等都属于提交表单)第一次点击请求已经发送给服务器,但是由于网络问题,服务器的响应没有正常返回。用户以为没有执行成功,会再次点击提交按钮,但是实际上服务器已经处理了消息。第二次点击就会在服务器产生第二条数据。这就是没有保证接口的幂等性。

在对数据库操作中,查询和删除是不需要考虑接口幂等性问题的。只要数据库数据不变每次查询的结果都是一样的,而多次删除同一条数据影响也不大(但是需要考虑代码删除后返回值),新增时多次执行就新增多条数据,尤其是在主键自增的情况下。而修改时如果是修改成固定值是没有接口幂等性问题的,但是如果对某个列修改的时候是增量修改就需要考虑了。

1.1.1 解决方案-逻辑判断
此方案通过代码逻辑进行判断是否为重复提交,所以局限性比较大。只有少量需求满足这种情况。

1.1.2 解决方案-令牌
在显示需要提交表单的页面之前生成Token(UUID),把Token的值放入到表单内部,服务器中也存储一份token。用户在第一次提交表单的时候,服务器会判断是否有同值Token,如果有执行操作后吧Token在服务器删除。用户在同一个页面再次点击表单后,服务返回Token值已经不存在了,则不会执行对应操作。

1.2 消息幂等性

在MQ中可能出现消息幂等性的情况。

1.2.1 重复发送
Publisher给MQ发送消息的时候,MQ在给Publisher返回ACK时由于网络中断等问题,没有成功返回。Publisher会认为消息没有发送成功,在网络恢复后会重新发送消息。

1.2.2 解决重复发送问题
MQ内部会给每个消息生成一个唯一ID。当消息接收到后会判断此ID。

1.2.3 重复消费
Consumer接收到消息后,在给MQ返回ACK时由于网络问题,MQ没有成功接收ACK,MQ会认为此消息没有正确消费。在网络重连后会把消息重新发送给此消费者,或重新广播给其他所有消费者。

1.2.4 解决重复消费问题
在Consumer中可以通过消息的唯一ID进行判断是否已经消费过(借助Redis等工具每次消费都要记录已经消费过),也可以在每条消息中自定义唯一标识,判断是否已经消费过。

2. 消息持久化

RabbitMQ中队列保存的消息,默认是保存在内存中的。如果重启服务应该丢失消息。RabbitMQ有消息持久能力。当RabbitMQ空闲的时候,关闭服务的时候,都会把内存中未消费的消息保存到硬盘。重启后,恢复到内存。

想要实现消息持久化需要具备以下特性:

交换器是持久的,在项目中建立的交换器的Durability=Durable,表示持久交换器。

RabbitMQ:基于AMQP的开源消息代理软件 - 图25

队列是持久的.创建队列,默认也是持久的。

RabbitMQ:基于AMQP的开源消息代理软件 - 图26

消息是持久的。在使用Spring AMQP时,调用的convertAndSend方法时MessageProperties中的deliveryMode的默认取值为MessageDeliveryMode.PERSISTENT,表示消息的delivery_mode=2,即持久消息。

3. 死信队列

3.1 什么是死信队列

当某消息在一个队列中变成死信(Dead Message)之后,把这个消息重新发送(Publish)到另外一个交换器(DLX 死信交换器 dead-letter-exchange),并基于对应的路由投递到队列(DLQ 死信队列 dead-letter-queue)中。

3.2 消息变成死信的几种情况

1. 消息被拒绝(basicNack 或 basicReject)时,且未重新排队(requeue = false)。参考ACK确认机制。
2. 消息过期。即消息头中的TTL过期。
3. 队列达到最大长度时。

3.3 死信队列能做什么

一般死信队列用于处理需要延迟消费的消息,如:电商系统中用于关闭超时未支付的订单、铁路售票系统中恢复超时未支付的车票等。

3.4 实现过程

3.4.1 创建DLX(死信交换器)
DLX就是一个普通的Topic交换器。

RabbitMQ:基于AMQP的开源消息代理软件 - 图27

3.4.2 创建DLQ(信息队列)并绑定到DLX(死信交换器)上
DLQ就是一个普通的队列。

RabbitMQ:基于AMQP的开源消息代理软件 - 图28

绑定到DLX。

RabbitMQ:基于AMQP的开源消息代理软件 - 图29

结果如下:

RabbitMQ:基于AMQP的开源消息代理软件 - 图30

3.4.3 创建任意交换器

RabbitMQ:基于AMQP的开源消息代理软件 - 图31

3.4.4 创建任意i队列并绑定到交换器( 上一节创建的 test.ex.topic 交换器 )
创建任意队列,绑定到上一节创建的交换器。注意:必须设置队列参数 x-dead-letter-exchange,此参数用于绑定死信处理逻辑,即信息成为死信后,投递到哪个死信交换器。

RabbitMQ:基于AMQP的开源消息代理软件 - 图32

RabbitMQ:基于AMQP的开源消息代理软件 - 图33

3.4.5 创建死信队列消费者

在amqp_consumer工程中创建类型:com.bjsxt.consumer.DLXMessageConsumer

@Component
public class DLXMessageConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(name = "my.dlx.queue", autoDelete = "false"),
                    exchange = @Exchange(name = "my.dlx", type = "topic"),
                    key = {"#"}
            )
    })
    public void onMessage(String messageBody, Channel channel,
                          @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
        System.out.println("处理死信队列中的消息:" + messageBody);

        try {
            channel.basicAck(deliveryTag, true);
        } catch (IOException e){
            e.printStackTrace();
        }
    }
}

3.4.6 编辑发送消息测试方法

增加测试方法:

    @Test
    public void testDLX(){
        String messageBody = "投递到test.queue队列中的消息,等待超时后投递到DLX中再处理";
        MessageProperties messageProperties =
                new MessageProperties();
        // 设置消息持久化
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        // 设置消息体字符集
        messageProperties.setContentEncoding("UTF-8");
        // 设置消息超时时间,单位毫秒,参数类型是字符串
        messageProperties.setExpiration("30000");
        // 基于字符串消息体内容和消息参数,创建要传递的消息对象。
        Message message = new Message(
                        messageBody.getBytes(),
                        messageProperties
                );
        // 发送消息到队列,等待超时后消息成为死信,并转投到死信队列后,被相应消费者处理。
        rabbitOperations.send(
                "test.ex.topic",
                "routing.key",
                message
        );
    }

3.4.7 观察结果
消息发送到普通队列 test.queue

RabbitMQ:基于AMQP的开源消息代理软件 - 图34

消息具体内容如下:

RabbitMQ:基于AMQP的开源消息代理软件 - 图35

超时后消息投递到死信队列中:

RabbitMQ:基于AMQP的开源消息代理软件 - 图36

启动死信队列消费者消费死信:

RabbitMQ:基于AMQP的开源消息代理软件 - 图37

消费结束后,死信队列状态如下:

RabbitMQ:基于AMQP的开源消息代理软件 - 图38