本文所使用到的文件在这里附件.zip

第一章:入门

1.1 MQ 的相关概念

1.1.1 什么是 MQ ?

  • MQ(Message Queue),从字面意思上看,本质是个队列,遵循 FIFO (先入先出)原则,只不过队列中存放的内容是 Message 而已。
  • MQ 是一种跨进程的通信机制,用于上下游传递消息。
  • 在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ ,不需要依赖其他服务。

1.1.2 为什么要使用 MQ ?

  • ① 流量削峰:如果订单系统每秒最多能处理一万次的订单,这个处理能力应付正常时段的下单是绰绰有余的,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果每秒有两万次的下单操作,系统是处理不了的,那么只能限制订单超过一万后的用户不能下单,这样很不人性化的(我可能等待,怎么能直接告诉我下不了单呢?)。此时如果使用 MQ 之后,那么就可以取消这个限制了,将一秒内下的订单分散成一段时间来处理,这时可能有些用户在下单十几秒之后才能收到下单成功的操作,但是也有比不能下单的体验要好。

1.PNG

  • ② 应用解耦:以电商系统为例,应用中有订单系统、库存系统、物流系统、支付系统。用户在创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出现故障,都会造成下单操作异常。当使用了 MQ 之后,系统间调用的问题就会减少很多,如:物流系统因为发生故障,需要几分钟来修复,在这几分钟之内,物流系统要处理的消息会被缓存在 MQ 之后,不会影响到用户的正常下单操作的。当物流系统恢复之后,继续处理订单信息即可。在整个操作中间,下单的用户是感受不到物流系统出现故障,提高的系统的可用性。

2.PNG

  • ③ 异步处理:有些服务间调用是异步的,例如:A 调用 B ,B 需要花费很长时间去执行,但是 A 需要知道 B 什么时候可以执行完毕,以前有两种处理方式:1)A 每过一段时间就去轮询调用 B 提供的查询 API 。2) A 提供一个 callback 的 API ,B 执行完毕会后调用这个 API 告诉 A 执行完毕。这两种方式都不是很优雅,使用 MQ 可以方便的解决这个问题。A 调用 B 服务后,只需要监督 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ ,MQ 会将此消息转发给 A 服务,这样 A 服务既不用循环调用 B 的查询 API ,也不用提供 callback 的 API 。同样,B 也不需要做这些操作,A 服务还能及时得到异步处理成功的消息。

3.PNG

1.1.3 MQ 的分类

  • ① ActiveMQ:
    • 优点:单机吞吐量高万级,时效性 ms 级,可用性高,基于主从架构实现高可用,消息可靠性有较低的概率会丢失数据。
    • 缺点:官方社区对 ActiveMQ 5.x 的维护越来越少,高吞吐量场景较少使用。
  • ② Kafka:大数据领域的杀手锏。
  • ③ RocketMQ:阿里巴巴开源的消息中间件,参考了 Kakfa 。
  • ④ RabbitMQ:
    • 2007 年发布,是一个在 AMQP(高级消息队列)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
    • 优点:由于 Erlang 语言的高并发特性,性能较好,吞吐量到万级。MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,如:Python、Ruby、Java 等,文档齐全,开源提供的管理界面非常棒,社区活跃度高,更新频率高。
    • 缺点:商业版需要收费,学习成本高。

1.1.4 MQ 的选择

  • ① Kaka:Kafka 的主要特点是基于 pull 的模式来处理消息,追求吞吐量,一开始的目的就是用于日志收集和传输,适合产生了大量数据的互联网服务的数据收集业务。大型公司建议选用,如果有日志收集功能,首选 Kafka 。
  • ② RocketMQ:天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入的时候,后端可能无法及时处理的情况。RocketMQ 在稳定性上可能更值得信赖,因为这些场景在阿里的双 11 经历了多次考验。
  • ③ RabbitMQ:结合 Erlang 语言本身的并发优势,性能好,时效性 us 级,社区活跃度高,管理界面用起来十分方便,中小型公司优先选择功能完备的 RabbitMQ 。

1.2 RabbitMQ

1.2.1 概述

  • RabbitMQ 是一个消息中间件:它接收并转发消息。
  • 我们可以将 RabbitMQ 当做是一个快递站点,当我们要发送一个包裹的时候,我们可以将包裹送到快递站(目前,也支持上门取件),快递员最终会把快递送到收件人哪里,按照这种逻辑 RabbitMQ 是一个快递站。
  • RabbitMQ 和快递站的主要区别在于:RabbitMQ 不处理快件而是接收、存储和转发消息数据。

1.2.2 四大核心概念

  • ① 生产者:产生数据发送消息的程序。
  • ② 交换机:交换机是 RabbitMQ 中非常重要的一个组件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者将消息丢失,这是由交换机的类型决定的。
  • ③ 队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但是它们只能存储在队列中。队列仅仅受到主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列中接收数据。
  • ④ 消费者:消费者就是等待接收消息的程序。

注意:

  • ① 生产者、消费者以及消息中间件很多时候并不在同一台机器上。
  • ② 同一个应用程序既可以是生产者也可以是消费者。

4.PNG

1.2.3 RabbitMQ 的核心部分(工作模式)

5.png

1.2.4 RabbitMQ 的工作原理

6.png

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker 。
  • Virtual Host:出于多租户和安全因素的设计,将 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 的概念。当多个不同的用户使用同一个 RabbitMQ Server 提供服务的时候,可以划分多个 vhost ,每个用户在自己的 vhost 创建 exchange 、queue 等。
  • Connection:生产者、消费者和 MQ 之间的 TCP 连接。
  • Channel:如果每一次访问 RabbitMQ 都需要建立一次 Connection ,在消息量大的时候建立 TCP Connection 的开销将会十分巨大,效率也比较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP 的 method 包含了 channel 的 id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。换言之,Channel 作为轻量级的 Connection 极大的减少了操作系统建立 TCP Connection 的开销。
  • Exchange:message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key ,分发消息到对应的 queue 中。
  • Queue:消息最终存放的地方,以便消费者进行消费。
  • Binding:Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key ,Binding 信息被保存在 Exchange 的查询表中,用于 message 的分发依据。

1.2.5 RabbitMQ 的安装

注意:

  • 安装是基于 CentOS 7 ,并关闭防火墙。
  • 官网

1.2.5.1 安装 Erlang

  • 下载安装包:
  1. wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.1-1.el7.x86_64.rpm/download.rpm

7.gif

  • 安装:
  1. rpm -ivh erlang-22.3.1-1.el7.x86_64.rpm

8.gif

1.2.5.2 安装 RabbitMQ

  • 下载安装包:
  1. wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.8/rabbitmq-server-3.8.8-1.el7.noarch.rpm

9.gif

  • 安装依赖:
  1. yum -y install socat

11.gif

  • 安装:
  1. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

12.gif

  • 开启管理界面:
  1. rabbitmq-plugins enable rabbitmq_management

13.gif

  • 配置远程可以使用 guest 登录 RabbitMQ :
  1. vim /etc/rabbitmq/rabbitmq.conf
  1. loopback_users = none

14.gif

  • 启动 RabbitMQ :
  1. systemctl start rabbitmq-server
  1. systemctl enable rabbitmq-server

15.gif

  • 访问:

访问地址:http://192.168.65.137:15672/ 。 账号:guest 。 密码:guest 。

16.PNG

1.2.5.3 添加用户、设置角色和赋予权限

  • 添加用户:
  1. rabbitmqctl add_user admin 123456
  • 设置角色:
  1. rabbitmqctl set_user_tags admin administrator
  • 赋予权限:
  1. rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
  • 查看当前系统中的用户:
  1. rabbitmqctl list_users

第二章:核心部分

2.1 Hello World(简单模式)

2.1.1 概述

  • 我们将使用 Java 编写两个程序,发送单个消息的生产者和接收消息并打印出来的消费者。
  • 在下图中,P 是生产者,C 是消费者,中间的框是一个队列(代表使用者保留的消息缓冲区)。

17.PNG

2.1.2 开发环境

  • JDK 1.8 。
  • IDEA 2021 + 。
  • Maven 3.8 。

  • Maven 的依赖:

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. </properties>
  6. <dependencies>
  7. <!-- amqp -->
  8. <dependency>
  9. <groupId>com.rabbitmq</groupId>
  10. <artifactId>amqp-client</artifactId>
  11. <version>5.14.2</version>
  12. </dependency>
  13. <!-- hutool 工具 -->
  14. <dependency>
  15. <groupId>cn.hutool</groupId>
  16. <artifactId>hutool-all</artifactId>
  17. <version>5.8.1</version>
  18. </dependency>
  19. </dependencies>
  20. <build>
  21. <plugins>
  22. <plugin>
  23. <groupId>org.apache.maven.plugins</groupId>
  24. <artifactId>maven-compiler-plugin</artifactId>
  25. <version>3.8.1</version>
  26. <configuration>
  27. <source>8</source>
  28. <target>8</target>
  29. </configuration>
  30. </plugin>
  31. </plugins>
  32. </build>

2.1.3 生产者的代码

  • 示例:
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 生产者
  11. *
  12. * @author 许大仙
  13. * @version 1.0
  14. * @since 2022-05-19 14:57:05
  15. */
  16. public class Producer {
  17. /**
  18. * 队列的名称
  19. */
  20. public static final String QUEUE_NAME = "hello";
  21. public static void main(String[] args) throws IOException, TimeoutException {
  22. // 创建连接工厂
  23. ConnectionFactory connectionFactory = new ConnectionFactory();
  24. // 设置连接 RabbitMQ 的信息
  25. connectionFactory.setHost("192.168.65.137");
  26. connectionFactory.setVirtualHost("/");
  27. connectionFactory.setPort(5672);
  28. connectionFactory.setUsername("guest");
  29. connectionFactory.setPassword("guest");
  30. // 创建连接
  31. Connection connection = connectionFactory.newConnection();
  32. // 创建信道
  33. Channel channel = connection.createChannel();
  34. // 声明创建队列
  35. /**
  36. * 第一个参数:队列的名称
  37. * 第二个参数:是否持久化【存储在磁盘上】,默认为 false ,表示存储在内存中。
  38. * 第三个参数:
  39. * 当 exclusive = true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
  40. * 当 exclusive = false 则设置队列为非排他的,此时不同连接(Connection)的管道Channel可以使用该队列 ;
  41. * 第四个参数:是否自动删除。如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。
  42. * 第五个参数:其他参数
  43. */
  44. channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
  45. // 发送消息
  46. String msg = "你好啊";
  47. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
  48. System.out.println("消息发送完毕");
  49. // 关闭信道
  50. channel.close();
  51. // 关闭连接
  52. connection.close();
  53. // 关闭连接工厂
  54. connectionFactory.clone();
  55. }
  56. }

18.PNG

2.1.4 消费者的代码

  • 示例:
  1. package com.aurorxa;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * 消费者
  8. *
  9. * @author 许大仙
  10. * @version 1.0
  11. * @since 2022-05-19 14:57:34
  12. */
  13. public class Consumer {
  14. /**
  15. * 队列的名称
  16. */
  17. public static final String QUEUE_NAME = "hello";
  18. public static void main(String[] args) throws IOException, TimeoutException {
  19. // 创建连接工厂
  20. ConnectionFactory connectionFactory = new ConnectionFactory();
  21. // 设置连接 RabbitMQ 的信息
  22. connectionFactory.setHost("192.168.65.137");
  23. connectionFactory.setVirtualHost("/");
  24. connectionFactory.setPort(5672);
  25. connectionFactory.setUsername("guest");
  26. connectionFactory.setPassword("guest");
  27. // 创建连接
  28. Connection connection = connectionFactory.newConnection();
  29. // 创建信道
  30. Channel channel = connection.createChannel();
  31. // 声明消费者成功消费的回调
  32. DeliverCallback deliverCallback = (consumerTag, message) -> {
  33. System.out.println("consumerTag = " + consumerTag);
  34. System.out.println("message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  35. };
  36. // 声明消费者取消消费的回调
  37. CancelCallback cancelCallback = (consumerTag) -> {
  38. System.out.println("consumerTag = " + consumerTag);
  39. };
  40. // 第一个参数:队列的名称
  41. // 第二个参数:是否自动确认
  42. // 第三个参数:消费者成功消费的回调
  43. // 第四个参数:消费者取消消费的回调
  44. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  45. // 关闭信道
  46. channel.close();
  47. // 关闭连接
  48. connection.close();
  49. // 关闭连接工厂
  50. connectionFactory.clone();
  51. }
  52. }

2.2 Work queues(工作队列模式)

2.2.1 概述

  • 工作队列(又称:任务队列)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
  • 工作队列也称为公平性队列模式:轮询分发,默认情况下,RabbitMQ 会按照顺序将每条消息发送给下一个消费者,每个消费者将获得相同数量的消息。

注意:消息只能被消费一次。

19.png

2.2.2 轮询分发消息

  • 抽取获取信道的工具类:
  1. package com.aurorxa;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. /**
  6. * @author 许大仙
  7. * @version 1.0
  8. * @since 2022-05-20 14:54:02
  9. */
  10. public class RabbitmqUtils {
  11. /**
  12. * 获取 Channel 通道
  13. *
  14. * @return
  15. * @throws Exception
  16. */
  17. public static Channel getChannel() throws Exception {
  18. ConnectionFactory connectionFactory = new ConnectionFactory();
  19. connectionFactory.setVirtualHost("/");
  20. connectionFactory.setUsername("guest");
  21. connectionFactory.setPassword("guest");
  22. connectionFactory.setPort(5672);
  23. connectionFactory.setHost("127.0.0.1");
  24. Connection connection = connectionFactory.newConnection();
  25. return connection.createChannel();
  26. }
  27. }
  • 生产者的代码:
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.Channel;
  4. import java.nio.charset.StandardCharsets;
  5. /**
  6. * 生产者
  7. *
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-20 15:52:07
  11. */
  12. public class Producer {
  13. /**
  14. * 队列的名称
  15. */
  16. public static final String QUEUE_NAME = "hello";
  17. public static void main(String[] args) throws Exception {
  18. Channel channel = RabbitmqUtils.getChannel();
  19. channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
  20. for (int i = 0; i < 10; i++) {
  21. String msg = "你好啊 " + i;
  22. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
  23. }
  24. System.out.println("消息发送完毕");
  25. }
  26. }
  • 消费者的代码:
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. /**
  8. * 消费者1
  9. *
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-20 15:53:45
  13. */
  14. public class Consumer1 {
  15. /**
  16. * 队列的名称
  17. */
  18. public static final String QUEUE_NAME = "hello";
  19. public static void main(String[] args) throws Exception {
  20. Channel channel = RabbitmqUtils.getChannel();
  21. channel.queueDeclare(QUEUE_NAME, true, false,false, MapUtil.newHashMap());
  22. // 声明消费者成功消费的回调
  23. DeliverCallback deliverCallback = (consumerTag, message) -> {
  24. System.out.println("consumerTag = " + consumerTag);
  25. System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  26. };
  27. // 声明消费者取消消费的回调
  28. CancelCallback cancelCallback = (consumerTag) -> {
  29. System.out.println("consumerTag = " + consumerTag);
  30. };
  31. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  32. }
  33. }
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. /**
  8. * 消费者2
  9. *
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-20 15:53:45
  13. */
  14. public class Consumer2 {
  15. /**
  16. * 队列的名称
  17. */
  18. public static final String QUEUE_NAME = "hello";
  19. public static void main(String[] args) throws Exception {
  20. Channel channel = RabbitmqUtils.getChannel();
  21. channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
  22. // 声明消费者成功消费的回调
  23. DeliverCallback deliverCallback = (consumerTag, message) -> {
  24. System.out.println("consumerTag = " + consumerTag);
  25. System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  26. };
  27. // 声明消费者取消消费的回调
  28. CancelCallback cancelCallback = (consumerTag) -> {
  29. System.out.println("consumerTag = " + consumerTag);
  30. };
  31. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  32. }
  33. }
  • 结果:

20.gif

2.2.3 消息应答

2.2.3.1 概述

  • 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个耗时较长的任务并且只完成了一部分就挂掉了,那么会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉,那么我们将会丢失正在处理的消息以及后续发送给该消费者的消息,因为这个消费者无法接收到。
  • 为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 已经处理好了,RabbitMQ 就可以将该消息删除了

2.2.3.2 自动应答(不靠谱)

  • 消息发送后理解被认为已经传递成功,这种模式需要在 高吞吐量和数据传输安全方面做权衡 ,因为这种模式如果消息在消费者接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就会丢失。
  • 另一方面,在这种模式下,消费者那么可以传递过载的消息,没有对传递的消息进行限制,这样会使得消费者那边由于接收太多还没来得及处理的消息,会导致消息的积压,最终使得内存耗尽,从而导致消费者进程或线程被操作系统杀死。
  • 自动应答机制仅适用于消费者可以高效的以某种速率能够处理这些消息的情况下使用,换言之,实际开发中都是手动应答。

2.2.3.3 手动应答的方法

  • 确认应答:一旦调用该方法,就是通知 RabbitMQ 该消息已经处理成功,可以将其丢弃了
  1. void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • 否定应答:
  1. void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  2. throws IOException;
  • 拒绝应答:表明不处理该消息直接拒绝,可以将其丢弃
  1. void basicReject(long deliveryTag, boolean requeue) throws IOException;

2.2.3.4 multiple 的解释

  • 手动应答的好处是可以批量应答以减少网络拥堵。
  1. // 第一个参数:消息的标记
  2. // 第二个参数:不批量
  3. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  • 如果 multiple 为 true ,则代表批量应答 channel 上未应答的消息(可能会丢失数据)。

21.png

  • 如果 multiple 为 false ,则代表只会应答 channel 上正在处理完毕的消息(推荐使用)。

22.png

2.2.3.5 消息应答重新入队

  • 如果消费者由于有些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消费者未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将其重新排队。
  • 如果此时其他消费者可以处理,它将很快将其重新分发到另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

23.png

2.2.3.6 手动应答

  • 生产者的代码:
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.Channel;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.Scanner;
  6. /**
  7. * 生产者
  8. *
  9. * @author 许大仙
  10. * @version 1.0
  11. * @since 2022-05-20 15:52:07
  12. */
  13. public class Producer {
  14. /**
  15. * 队列的名称
  16. */
  17. public static final String QUEUE_NAME = "hello";
  18. public static void main(String[] args) throws Exception {
  19. Channel channel = RabbitmqUtils.getChannel();
  20. channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
  21. Scanner scanner = new Scanner(System.in);
  22. while (scanner.hasNext()) {
  23. String msg = scanner.next();
  24. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
  25. }
  26. System.out.println("消息发送完毕");
  27. }
  28. }
  • 消费者的代码:
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. /**
  8. * 消费者1
  9. *
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-20 15:53:45
  13. */
  14. public class Consumer1 {
  15. /**
  16. * 队列的名称
  17. */
  18. public static final String QUEUE_NAME = "hello";
  19. public static void main(String[] args) throws Exception {
  20. Channel channel = RabbitmqUtils.getChannel();
  21. channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
  22. DeliverCallback deliverCallback = (consumerTag, message) -> {
  23. try {
  24. // 睡眠 1 s
  25. Thread.sleep(1 * 1000);
  26. } catch (InterruptedException e) {
  27. throw new RuntimeException(e);
  28. }
  29. System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  30. // 第一个参数:消息的标记
  31. // 第二个参数:不批量
  32. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  33. };
  34. CancelCallback cancelCallback = (consumerTag) -> {
  35. System.out.println("consumerTag = " + consumerTag);
  36. };
  37. // 设置为手动应答
  38. boolean autoAck = false;
  39. channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
  40. }
  41. }
  1. package com.aurorxa;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. /**
  8. * 消费者2
  9. *
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-20 15:53:45
  13. */
  14. public class Consumer2 {
  15. /**
  16. * 队列的名称
  17. */
  18. public static final String QUEUE_NAME = "hello";
  19. public static void main(String[] args) throws Exception {
  20. Channel channel = RabbitmqUtils.getChannel();
  21. channel.queueDeclare(QUEUE_NAME, true, false, false, MapUtil.newHashMap());
  22. DeliverCallback deliverCallback = (consumerTag, message) -> {
  23. try {
  24. // 睡眠 10 s
  25. Thread.sleep(10 * 1000);
  26. } catch (InterruptedException e) {
  27. throw new RuntimeException(e);
  28. }
  29. System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  30. // 手动应答
  31. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  32. };
  33. CancelCallback cancelCallback = (consumerTag) -> {
  34. System.out.println("consumerTag = " + consumerTag);
  35. };
  36. // 设置为手动应答
  37. boolean autoAck = false;
  38. channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
  39. }
  40. }
  • 结果:

24.gif

2.2.4 RabbitMQ 持久化

2.2.4.1 概述

  • 上面的示例中,我们已经看到了如何处理任务不丢失,但是如何保障当 RabbitMQ 服务停掉以后,生产者发送过来的消息不丢失?默认情况下,RabbitMQ 退出或由于某种原因崩溃的时候,它会忽视队列和消息,除非我们明确的告诉 RabbitMQ 不要这么做。
  • 确保消息不会丢失需要做两件事情:将队列和消息都标记为持久化

2.2.4.2 队列如何实现持久化

  • 如果需要队列实现持久化,需要在声明队列的时候将 durable 参数设置为 true ,表示队列持久化。
  1. // 队列持久化
  2. boolean durable = true;
  3. channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());

注意:如果之前声明的队列不是持久化的,需要将原先的队列删除,再重新创建一个持久化的队列,否则将会报错。

  • 下面为控制台中持久化和非持久化队列的 UI 显示:

25.png

  • 这样,即使重启 RabbitMQ ,队列也依然存在。

2.2.4.3 消息如何实现持久化

  • 要想让消息实现持久化,需要在消息生产者修改代码,添加 MessageProperties.PERSISTENT_TEXT_PLAIN 属性。
  1. // MessageProperties.PERSISTENT_TEXT_PLAIN
  2. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  • 将消息标记为持久化并不能完全保证不会丢失消息。尽管生产者告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完毕,消息还在缓存中的一个间隔点,此时并没有真正的将消息写入磁盘,持久性保证并不强,如果需要更强有力的持久化策略,参考后面的发布确认模式。

2.2.5 不公平分发

  • 我们知道,RabbitMQ 默认分发消息采取的是轮询分发,但是在某种场景下这种策略并不是很好,如:两个消费者在处理任务,其中有一个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理任务的速度却很慢,这个时候我们还是采用轮询分发的方式,就会使得消费者 2 在很大一部分时间处于空闲状态,而处理慢的那个消费者却一直在干活;换言之,忙的忙死,闲的闲死。此时,在这种情况下的分配方式很不合理,但是 RabbitMQ 并不知道这种情况,它会依然采取轮询方式分发消息。
  • 为了避免上面的情况,我们可以设置参数 channel.basicQos(1);
  1. // 消费者设置不公平分发
  2. int prefetchCount = 1;
  3. channel.basicQos(prefetchCount);
  • 下面为控制台中不公平分发的 UI 显示:

26.png

  • 此参数的含义是:如果这个任务我(消费者)还没有处理完或者我还没有应答,那么就别分配给我,我目前只能处理一个任务,RabbitMQ 就会将该任务分配给没有那么忙的闲的消费者。在这种情况下,如果所有的消费者都没有完成手上的任务,队列还会不停的添加新的任务,队列可能会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变存储任务的策略。

2.2.6 预取值

  • 消息的发送本质上就是异步发送的,所以在任何时候,channel 上肯定不止一个消息,另外消费者的手动确认本质上也是异步的,因此这里就存在一个 未确认的消息缓冲区 ,因此希望开发人员能 限制此缓冲区的大小以避免缓冲区里面无限制的未确认消息的问题
  • 这个时候就可以通过 channel.basicQos(prefetchCount); 方法设置 预取计数 值来完成,该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数据,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。例如:在通道上有未确认的消息 1、2、3、4,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ACK ;假设 tag = 4 这个消息刚刚被确认 ACK ,RabbitMQ 将会感知这个情况并会向此通道中再发送一条消息。
  • 消息应答和 Qos 预取值对用户吞吐量有重大影响。通常情况下,增加预取值将提高向消费者传递消息的速度。虽然自动应答传输消息速度是最佳的,但是,在这种情况下已传递但是尚未处理的消息的数量也会增加,从而增加了消费者的 RAM消耗,所以井盖消息使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载预取值也不同,不过 100 ~ 300 范围内的值通常可以提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的,当然这将会导致吞吐量变得很低,特别是消费者连接延迟很严重的情况下。
  1. int prefetchCount = 300;
  2. channel.basicQos(prefetchCount);

2.3 Publisher Confirms(发布确认模式)

2.3.1 概述

  • 生产者将信道设置为 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上发布的消息都将被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID ),这就使得生产者知道消息已经正确的到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker 会传递给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

27.png

  • confirm 模式最大的好处在于异步,一旦发布一条消息,生产者应用程序就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

2.3.2 开启发布确认的方法

  • 默认情况下,发布确认是关闭的。如果要开启需要调用方法 confirmSelect ,所以当我们想要使用发布确认的时候,都需要在 channel 上调用该方法。
  1. Channel channel = RabbitmqUtils.getChannel();
  2. // 开启发布确认模式
  3. channel.confirmSelect();

2.3.3 单个发布确认

  • 这是一个简单的发布确认方式,它是一种 同步发布确认 的方式,也就是发布一条消息之后只有它被确认,后续的消息才能继续发布。
  • 这种发布确认的方式有一个最大的缺点:发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布。

  • 示例:

  1. package com.github;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.MessageProperties;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * 生产者
  8. *
  9. * @author 许大仙
  10. * @version 1.0
  11. * @since 2022-05-20 15:52:07
  12. */
  13. public class Producer {
  14. /**
  15. * 队列的名称
  16. */
  17. public static final String QUEUE_NAME = "hello";
  18. /**
  19. * 发送消息的个数
  20. */
  21. public static final int MESSAGE_COUNT = 1000;
  22. public static void main(String[] args) throws Exception {
  23. // 单个发布确认 耗时:341
  24. singleReleaseConfirmed();
  25. }
  26. /**
  27. * 单个发布确认
  28. *
  29. * @throws Exception
  30. */
  31. public static void singleReleaseConfirmed() throws Exception {
  32. Channel channel = RabbitmqUtils.getChannel();
  33. // 开启发布确认模式
  34. channel.confirmSelect();
  35. // 队列持久化
  36. boolean durable = true;
  37. channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());
  38. long startTime = System.currentTimeMillis();
  39. // 批量发送消息,单个发布确认
  40. for (int i = 0; i < MESSAGE_COUNT; i++) {
  41. String msg = String.valueOf(i);
  42. // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
  43. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  44. boolean b = channel.waitForConfirms();
  45. if (b) {
  46. System.out.println("消息发送成功");
  47. }
  48. }
  49. long endTime = System.currentTimeMillis();
  50. // 耗时:341
  51. System.out.println("耗时:" + (endTime - startTime));
  52. System.out.println("消息发送完毕");
  53. }
  54. }

2.3.4 批量发布确认

  • 和单个发布确认相比,批量发布确认是先发布一批消息,然后一起确认,可以极大的提高吞吐量,但是这种方式的缺点是:当发生故障的时候会导致发布出现问题时,不知道是哪个消息出现问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
  • 批量发布确认也是同步的,一样会阻塞后续消息的发布。

  • 示例:

  1. package com.github;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.MessageProperties;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * 生产者
  8. *
  9. * @author 许大仙
  10. * @version 1.0
  11. * @since 2022-05-20 15:52:07
  12. */
  13. public class Producer {
  14. /**
  15. * 队列的名称
  16. */
  17. public static final String QUEUE_NAME = "hello";
  18. /**
  19. * 发送消息的个数
  20. */
  21. public static final int MESSAGE_COUNT = 1000;
  22. public static void main(String[] args) throws Exception {
  23. // 批量发布确认 耗时:39
  24. batchReleaseConfirmed();
  25. }
  26. /**
  27. * 批量发布确认
  28. *
  29. * @throws Exception
  30. */
  31. public static void batchReleaseConfirmed() throws Exception {
  32. Channel channel = RabbitmqUtils.getChannel();
  33. // 开启发布确认模式
  34. channel.confirmSelect();
  35. // 队列持久化
  36. boolean durable = true;
  37. channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());
  38. long startTime = System.currentTimeMillis();
  39. // 批量确认消息的大小
  40. int batchSize = 100;
  41. // 批量发送消息,批量发布确认
  42. for (int i = 0; i < MESSAGE_COUNT; i++) {
  43. String msg = String.valueOf(i);
  44. // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
  45. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  46. if (i % batchSize == 0) {
  47. boolean b = channel.waitForConfirms();
  48. if (b) {
  49. System.out.println("消息发送成功");
  50. }
  51. }
  52. }
  53. long endTime = System.currentTimeMillis();
  54. // 耗时:341
  55. System.out.println("耗时:" + (endTime - startTime));
  56. System.out.println("消息发送完毕");
  57. }
  58. }

2.3.5 异步发布确认

  • 异步发布确认虽然编程逻辑比上面两个要复杂,但是性价比是最高的(无论是可靠性还是效率)。它是利用回调函数来达到消息可靠性传递的。

28.png

  • 示例:
  1. package com.github;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.ConfirmCallback;
  5. import com.rabbitmq.client.MessageProperties;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.concurrent.ConcurrentNavigableMap;
  8. import java.util.concurrent.ConcurrentSkipListMap;
  9. /**
  10. * 生产者
  11. *
  12. * @author 许大仙
  13. * @version 1.0
  14. * @since 2022-05-20 15:52:07
  15. */
  16. public class Producer {
  17. /**
  18. * 队列的名称
  19. */
  20. public static final String QUEUE_NAME = "hello";
  21. /**
  22. * 发送消息的个数
  23. */
  24. public static final int MESSAGE_COUNT = 1000;
  25. public static void main(String[] args) throws Exception {
  26. // 异步发布确认 耗时:33
  27. asynchronousReleaseConfirmed();
  28. }
  29. /**
  30. * 异步发布确认
  31. */
  32. public static void asynchronousReleaseConfirmed() throws Exception {
  33. Channel channel = RabbitmqUtils.getChannel();
  34. // 开启发布确认模式
  35. channel.confirmSelect();
  36. // 队列持久化
  37. boolean durable = true;
  38. channel.queueDeclare(QUEUE_NAME, durable, false, false, MapUtil.newHashMap());
  39. /**
  40. * 线程安全有序的哈希表,适用于高并发的情况
  41. * ① 可以将序号和消息进行关联。
  42. * ② 可以批量删除条目。
  43. * ③ 支持高并发。
  44. */
  45. ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();
  46. // 准备消息的监听器,用来监听那些消息成功了,那些消息失败了。
  47. // 消息确认成功的回调函数
  48. ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  49. System.out.println("确认的消息的 ID = " + deliveryTag);
  50. // 删除掉已经确认的消息,剩下的就是未确认的消息
  51. if (multiple) { // 如果是批量
  52. // 删除已经确认的消息
  53. ConcurrentNavigableMap<Long, String> headMap = map.headMap(deliveryTag);
  54. headMap.clear();
  55. } else {
  56. // 只清除当前序列号的消息
  57. map.remove(deliveryTag);
  58. }
  59. };
  60. // 消息确认失败的回调函数
  61. ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  62. System.out.println("未确认的消息的 ID = " + deliveryTag);
  63. // 输出未确认的消息
  64. String msg = map.get(deliveryTag);
  65. System.out.println("未确认的消息 = " + msg);
  66. };
  67. // 异步
  68. channel.addConfirmListener(ackCallback, nackCallback);
  69. long startTime = System.currentTimeMillis();
  70. // 批量发送消息
  71. for (int i = 0; i < MESSAGE_COUNT; i++) {
  72. String msg = String.valueOf(i);
  73. // MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
  74. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  75. // 记录所有要发送的消息
  76. // channel.getNextPublishSeqNo()获取下一个消息的序列号
  77. map.put(channel.getNextPublishSeqNo(), msg);
  78. }
  79. long endTime = System.currentTimeMillis();
  80. // 耗时:341
  81. System.out.println("耗时:" + (endTime - startTime));
  82. System.out.println("消息发送完毕");
  83. }
  84. }

2.4 交换机

2.4.1 交换机的概念

  • RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。一般而言,生产者甚至不知道这些消息传递到了那些队列中。相反,生产者只能将消息发送到交换机(exchange)
  • 交换机的工作内容非常简单,一方面它接收来自生产者的消息,另一方面将消息推送到队列中。交换机必须确切的知道如何处理收到的消息,是将这些消息放到特定的队列、放到许多队列中或直接丢失它们,这是由交换机的类型决定的。

29.png

2.4.2 交换机的类型

  • 交换机的类型分为:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)。

2.4.3 无名交换机

  • 在前面我们不清楚交换机,但是依然能够将消息发送给队列,那是因为我们使用的是默认的交换机,通过空字符串("")进行标识。
  1. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  • 第一个参数是交换机的名称。空字符串表示默认或无名交换机。

30.png

2.4.4 临时队列

  • 之前我们创建的队列都是指定名称的,我们需要指定消费者去消费那个队列的消息;但是,有时我们希望当我们连接 RabbitMQ 的时候,自动为我们创建一个 随机名称的队列 ,当我们断开消费者的连接时队列将被自动删除
  • 创建临时队列的方式如下:
  1. String queueName = channel.queueDeclare().getQueue();

31.png

2.4.5 绑定(binding)

  • 绑定就是交换机(exchange)和队列(queue)之间的桥梁,它告诉我们交换机和那个队列进行了绑定关系。

32.png

2.5 Fanout(广播类型)

2.5.1 概述

  • Fanout 这种类型非常简单,它将接收到所有消息并广播到它知道的所有队列中。系统中默认有 fanout 类型的交换机。

33.png

2.5.2 应用示例

  • Fanout 交换机和临时队列的绑定关系图:

34.png

  • 示例:
  • 消费者1:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. /**
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-23 10:47:45
  11. */
  12. public class Consumer1 {
  13. /**
  14. * 交换机名称
  15. */
  16. public static final String EXCHANGE_NAME = "logs";
  17. public static void main(String[] args) throws Exception {
  18. Channel channel = RabbitmqUtils.getChannel();
  19. // 声明交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  21. // 声明一个临时队列
  22. String queueName = channel.queueDeclare().getQueue();
  23. // 绑定交换机和队列
  24. channel.queueBind(queueName, EXCHANGE_NAME, "");
  25. System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
  26. DeliverCallback deliverCallback = (consumerTag, message) -> {
  27. System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  28. };
  29. CancelCallback cancelCallback = (consumerTag) -> {
  30. System.out.println("consumerTag = " + consumerTag);
  31. };
  32. channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
  33. }
  34. }
  • 消费者2:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. /**
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-23 10:47:45
  11. */
  12. public class Consumer2 {
  13. /**
  14. * 交换机名称
  15. */
  16. public static final String EXCHANGE_NAME = "logs";
  17. public static void main(String[] args) throws Exception {
  18. Channel channel = RabbitmqUtils.getChannel();
  19. // 声明交换机
  20. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  21. // 声明一个临时队列
  22. String queueName = channel.queueDeclare().getQueue();
  23. // 绑定交换机和队列
  24. channel.queueBind(queueName, EXCHANGE_NAME, "");
  25. System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
  26. DeliverCallback deliverCallback = (consumerTag, message) -> {
  27. System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  28. };
  29. CancelCallback cancelCallback = (consumerTag) -> {
  30. System.out.println("consumerTag = " + consumerTag);
  31. };
  32. channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
  33. }
  34. }
  • 生产者:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import java.nio.charset.StandardCharsets;
  5. /**
  6. * @author 许大仙
  7. * @version 1.0
  8. * @since 2022-05-23 11:02:05
  9. */
  10. public class Producer {
  11. /**
  12. * 交换机名称
  13. */
  14. public static final String EXCHANGE_NAME = "logs";
  15. public static void main(String[] args) throws Exception {
  16. Channel channel = RabbitmqUtils.getChannel();
  17. // 声明交换机
  18. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  19. // 发送消息
  20. channel.basicPublish(EXCHANGE_NAME, "", null, "你好啊".getBytes(StandardCharsets.UTF_8));
  21. System.out.println("消息发送完毕");
  22. }
  23. }

35.gif

2.6 Direct(直接类型)

2.6.1 概述

  • 在 Fanout 类型的示例中,我们是将所有的消息广播给所有的消费者,但是有时我们希望这样,有的消费者消费 error 级别信息,有些消费者消费 warning 级别消息,有些消费者消费info 级别的日志信息,此时 Fanout 类型的交换机就不能满足这样的需求,就需要使用 direct 这种类型的交换机来实现这样的功能。

36.png

  • 在上图中,我们可以看到交换机绑定了两个队列,绑定类型是 direct 。队列 Q1 绑定键为 orange ,队列 Q2 绑定键有两个:black 和 green 。
  • 在这种绑定情况下,生产者发送消息到 exchange 上,绑定键为 orange 的消息会被发送到队列 Q1 上,绑定键为 black 或 green 的消息会被发送到队列 Q2 上。

2.6.2 应用示例

  • Direct 交换机和临时队列的绑定关系图:

37.png

  • 示例:
  • 消费者1:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2022-05-23 15:29:42
  10. */
  11. public class Consumer1 {
  12. /**
  13. * 交换机名称
  14. */
  15. public static final String EXCHANGE_NAME = "direct_logs";
  16. /**
  17. * 队列名称
  18. */
  19. public static final String QUEUE_NAME = "disk";
  20. public static void main(String[] args) throws Exception {
  21. Channel channel = RabbitmqUtils.getChannel();
  22. // 声明交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  24. // 声明队列
  25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  26. // 绑定交换机和队列
  27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  28. // 消费
  29. DeliverCallback deliverCallback = (consumerTag, message) -> {
  30. System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  31. System.out.println("消费者1 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
  32. System.out.println("消费者1 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
  33. };
  34. channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
  35. }
  36. }
  • 消费者 2 :
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2022-05-23 15:29:42
  10. */
  11. public class Consumer2 {
  12. /**
  13. * 交换机名称
  14. */
  15. public static final String EXCHANGE_NAME = "direct_logs";
  16. /**
  17. * 队列名称
  18. */
  19. public static final String QUEUE_NAME = "console";
  20. public static void main(String[] args) throws Exception {
  21. Channel channel = RabbitmqUtils.getChannel();
  22. // 声明交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  24. // 声明队列
  25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  26. // 绑定交换机和队列
  27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
  28. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
  29. // 消费
  30. DeliverCallback deliverCallback = (consumerTag, message) -> {
  31. System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  32. System.out.println("消费者2 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
  33. System.out.println("消费者2 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
  34. };
  35. channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
  36. }
  37. }
  • 生产者:
  1. package com.github;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-23 15:38:35
  11. */
  12. public class Producer {
  13. public static final String EXCHANGE_NAME = "direct_logs";
  14. public static void main(String[] args) throws Exception {
  15. Channel channel = RabbitmqUtils.getChannel();
  16. Map<String, String> map = new HashMap<>();
  17. map.put("info", "info 级别的日志信息");
  18. map.put("warning", "warning 级别的日志信息");
  19. map.put("error", "error 级别的日志信息");
  20. map.put("debug", "debug 级别的日志信息");
  21. map.forEach((k, v) -> {
  22. try {
  23. channel.basicPublish(EXCHANGE_NAME, k, null, v.getBytes(StandardCharsets.UTF_8));
  24. } catch (IOException e) {
  25. throw new RuntimeException(e);
  26. }
  27. });
  28. }
  29. }

2.7 Topic (主题类型)

  • Topic类型和 Direct 相比,都是可以根据 routing key 将消息路由到不同的队列,只不过 Exchange 类型为 Topic 可以让队列在绑定 routing key 的时候使用通配符。
  • routing key 一般都是有一个或多个单词组成,多个单词之间以 “.” 分割,例如:item.insert
  • 通配符的规则:
    • #:匹配一个或多个词。
    • *:只能匹配一个词。

38.PNG

  • 上图是一个队列的绑定关系图,下面是一些绑定关系示例: | routing key | 描述 | | —- | —- | | quick.orange.rabbit | Q1 和 Q2 能接收到 | | lazy.orange.elephant | Q1 和 Q2 能接收到 | | quick.orange.fox | Q1 能接收到 | | lazy.brown.fox | Q2 能接收到 | | lazy.pink.rabbit | Q2 能接收到 | | quick.brown.fox | 不匹配任何绑定,不会被任何队列接收到,会被丢弃 | | quick.orange.male.rabbit | 是四个单词,不匹配任何绑定,会被丢弃 | | lazy.orange.male.rabbit | 是四个单词,但匹配 Q2 |

注意:

  • 当一个队列的绑定键是 # ,表示这个队列将接收所有数据,效果如同 fanout 。
  • 当一个队列的绑定键中没有 #*,表示这个队列的绑定类型,效果如同 direct 。
  • 示例:
  • 消费者1:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2022-05-23 10:47:45
  10. */
  11. public class Consumer1 {
  12. /**
  13. * 交换机名称
  14. */
  15. public static final String EXCHANGE_NAME = "topic_logs";
  16. /**
  17. * 队列名称
  18. */
  19. public static final String QUEUE_NAME = "Q1";
  20. public static void main(String[] args) throws Exception {
  21. Channel channel = RabbitmqUtils.getChannel();
  22. // 声明交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  24. // 声明一个临时队列
  25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  26. // 绑定交换机和队列
  27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
  28. System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
  29. // 消费
  30. DeliverCallback deliverCallback = (consumerTag, message) -> {
  31. System.out.println("消费者1 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  32. System.out.println("消费者1 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
  33. System.out.println("消费者1 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
  34. };
  35. channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
  36. }
  37. }
  • 消费者2:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2022-05-23 10:47:45
  10. */
  11. public class Consumer2 {
  12. /**
  13. * 交换机名称
  14. */
  15. public static final String EXCHANGE_NAME = "topic_logs";
  16. /**
  17. * 队列名称
  18. */
  19. public static final String QUEUE_NAME = "Q2";
  20. public static void main(String[] args) throws Exception {
  21. Channel channel = RabbitmqUtils.getChannel();
  22. // 声明交换机
  23. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  24. // 声明一个临时队列
  25. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  26. // 绑定交换机和队列
  27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
  28. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
  29. System.out.println("等待接收消息,将接收到的消息打印在屏幕上。。。");
  30. // 消费
  31. DeliverCallback deliverCallback = (consumerTag, message) -> {
  32. System.out.println("消费者2 消费的 message = " + new String(message.getBody(), StandardCharsets.UTF_8));
  33. System.out.println("消费者2 消费的交换机的名称是 = " + message.getEnvelope().getExchange());
  34. System.out.println("消费者2 消费的 routing key 是 = " + message.getEnvelope().getRoutingKey());
  35. };
  36. channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
  37. }
  38. }
  • 生产者:
  1. package com.github;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-23 15:38:35
  11. */
  12. public class Producer {
  13. public static final String EXCHANGE_NAME = "topic_logs";
  14. public static void main(String[] args) throws Exception {
  15. Channel channel = RabbitmqUtils.getChannel();
  16. Map<String, String> map = new HashMap<>();
  17. map.put("quick.orange.rabbit", "Q1 和 Q2 能接收到");
  18. map.put("lazy.orange.elephant", "Q1 和 Q2 能接收到");
  19. map.put("quick.orange.fox", "Q1 能接收到");
  20. map.put("lazy.brown.fox", "Q2 能接收到");
  21. map.put("lazy.pink.rabbit", "Q2 能接收到");
  22. map.put("quick.brown.fox", "不匹配任何绑定,不会被任何队列接收到,会被丢弃");
  23. map.put("quick.orange.male.rabbit", "是四个单词,不匹配任何绑定,会被丢弃");
  24. map.put("lazy.orange.male.rabbit", "是四个单词,但匹配 Q2");
  25. map.forEach((k, v) -> {
  26. try {
  27. channel.basicPublish(EXCHANGE_NAME, k, null, v.getBytes(StandardCharsets.UTF_8));
  28. } catch (IOException e) {
  29. throw new RuntimeException(e);
  30. }
  31. });
  32. }
  33. }