HelloWorld

我们先编写一个入门案例,简单了解以下 MQ 的使用。发送单个消息的生产者( Producer )和接收消息并打印出来的消费者( Consumer )。

再下图中,Producer 即为生产者,Consumer 即为消费者,中间的框是一个队列-RabbitMQ 代 表使用者保留的消息缓冲区。

核心功能 - 图1

导入依赖

  1. <!--rabbitmq 依赖客户端-->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.14.2</version>
  6. </dependency>
  7. <!--操作文件流的一个依赖-->
  8. <dependency>
  9. <groupId>commons-io</groupId>
  10. <artifactId>commons-io</artifactId>
  11. <version>2.11.0</version>
  12. </dependency>

生产者代码

  1. public class Producer {
  2. public final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. // 创建一个连接工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. // 工厂的IP,RabbitMQ所在服务器的地址
  7. factory.setHost("192.168.20.132");
  8. // 端口号,默认也是5672
  9. factory.setPort(5672);
  10. // 用户名
  11. factory.setUsername("admin");
  12. // 密码
  13. factory.setPassword("123456");
  14. // 创建连接
  15. Connection connection = factory.newConnection();
  16. // 获取信道
  17. Channel channel = connection.createChannel();
  18. // 声明队列,如果队列不存在则生成一个队列
  19. channel.queueDeclare(QUEUE_NAME // 队列名称
  20. , false // 队列里的消息是否需要持久化到磁盘,默认消息存储在内存
  21. , false // 队列是否仅供一个消费者进行消费,是否需要消息共享,true表示可以多个消费者消费
  22. , false // 是否自动删除,最后一个消费者断开连接以后,该队语句会自动删除,true表示自动删除
  23. , null); // 其他参数,以后再细说
  24. // 需要发送的消息
  25. String message = "hello world2";
  26. // 发送一个消息
  27. channel.basicPublish("" // 发送到哪个交换机(空串表示默认交换机)
  28. , QUEUE_NAME // 路由的key是哪个
  29. , null // 其他的参数消息
  30. , message.getBytes(StandardCharsets.UTF_8)); // 发送消息的消息体
  31. System.out.println("消息发送完毕!");
  32. }
  33. }

消费者代码

  1. public class Consumer {
  2. public final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. // 创建一个连接工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. // 工厂的IP,RabbitMQ所在服务器的地址
  7. factory.setHost("192.168.20.132");
  8. // 端口号,默认也是5672
  9. factory.setPort(5672);
  10. // 用户名
  11. factory.setUsername("admin");
  12. // 密码
  13. factory.setPassword("123456");
  14. // 创建连接
  15. Connection connection = factory.newConnection();
  16. // 获取信道
  17. Channel channel = connection.createChannel();
  18. // 推送消息如何进行消费的接口回调
  19. DeliverCallback deliverCallback = (consumeTag, delivery) -> {
  20. String msg = new String(delivery.getBody());
  21. System.out.println(msg);
  22. };
  23. // 取消消费的一个回调接口
  24. CancelCallback cancelCallback = (consumeTag) -> {
  25. System.out.println("消息消费被中断");
  26. };
  27. // 消费消息
  28. channel.basicConsume(QUEUE_NAME // 消费哪个队列
  29. , true // 消费成功之后是否要自动应答
  30. , deliverCallback // 消费成功的回调
  31. , cancelCallback); // 消费失败的回调
  32. }
  33. }

测试

我们先启动生产者的代码,程序执行完成之后,来到 RabbitMQ 的 Web 界面,查看指定的 Queue,可以发现队列中有一条准备好的消息,总共一条消息。

核心功能 - 图2

再启动消费者的代码,程序执行完之后,应该会收到生产者发送的消息。

  1. hello world

抽取工具类

上面的代码中,可以发现是有好一些重复的内容,所以我们可以把重复的内容抽取出来,封装成一个工具类,便于我们的开发:

  1. public class RabbitMqUtils {
  2. private final static ConnectionFactory factory;
  3. private final static Connection connection;
  4. // 绑定配置文件
  5. static {
  6. factory = new ConnectionFactory();
  7. Properties rabbitMqProperties = new Properties();
  8. try {
  9. rabbitMqProperties.load(new FileInputStream(".\\src\\main\\resources\\rabbitMq.properties"));
  10. factory.setHost(rabbitMqProperties.getProperty("rabbit-mq.host"));
  11. factory.setPort(Integer.parseInt(rabbitMqProperties.getProperty("rabbit-mq.port")));
  12. factory.setUsername(rabbitMqProperties.getProperty("rabbit-mq.username"));
  13. factory.setPassword(rabbitMqProperties.getProperty("rabbit-mq.password"));
  14. connection = factory.newConnection();
  15. } catch (Exception e) {
  16. throw new RuntimeException(e);
  17. }
  18. }
  19. /**
  20. * 获取一个信道
  21. */
  22. public static Channel getChannel() {
  23. try {
  24. if (connection != null) {
  25. return connection.createChannel();
  26. }
  27. return null;
  28. } catch (Exception e) {
  29. throw new RuntimeException(e);
  30. }
  31. }
  32. /**
  33. * 关闭指定信道
  34. */
  35. public static void close(Channel channel) {
  36. try {
  37. if (channel != null && channel.isOpen()) {
  38. channel.close();
  39. }
  40. } catch (Exception e) {
  41. throw new RuntimeException(e);
  42. }
  43. }
  44. /**
  45. * 关闭指定信道和连接
  46. */
  47. public static void closeAll(Channel channel) {
  48. try {
  49. if (channel != null && channel.isOpen()) {
  50. channel.close();
  51. }
  52. if (connection != null && connection.isOpen()) {
  53. connection.close();
  54. }
  55. } catch (Exception e) {
  56. throw new RuntimeException(e);
  57. }
  58. }
  59. }

使用这个工具类,我们只需要在资源目录下创建对应配置文件,就可以通过配置文件读取配置信息了,通过 getChannel() 方法可以获取信道。

  1. rabbit-mq.host=192.168.20.132
  2. rabbit-mq.port=5672
  3. rabbit-mq.username=admin
  4. rabbit-mq.password=123456

WorkQueues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

轮询分发消息

某些情况下,一个工作队列可能有多个消费者,由于一条消息只能被一个消费者消费,所以在默认情况下,RabbitMQ 会轮流将消息发给这多个消费者,这就是轮询分发消息机制,在这种机制下,能尽可能的保证每个消费者获取的消息量相同。

生产者代码

  1. public class Producer {
  2. public final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  10. // 从控制台接收消息
  11. Scanner scanner = new Scanner(System.in);
  12. while (scanner.hasNext()) {
  13. String msg = scanner.next();
  14. channel.basicPublish("", QUEUE_NAME, false,null, msg.getBytes(StandardCharsets.UTF_8));
  15. System.out.println("发送消息:" + msg + " -> 成功");
  16. }
  17. }
  18. }

消费者代码

消费者的代码和之前是一致的,不过我们可以使用上面的工具类来简化代码:

  1. public class Worker {
  2. public final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. // 推送消息如何进行消费的接口回调
  10. DeliverCallback deliverCallback = (consumeTag, delivery) -> {
  11. String msg = new String(delivery.getBody());
  12. System.out.println("接收到的消息" + msg);
  13. };
  14. // 取消消费的一个回调接口
  15. CancelCallback cancelCallback = (consumeTag) -> {
  16. System.out.println("消息消费被中断");
  17. };
  18. channel.basicConsume(QUEUE_NAME
  19. , true
  20. , deliverCallback
  21. , cancelCallback);
  22. }
  23. }

不过在这个案例中,我们需要开启多个消费者程序,我们在 idea 中需要设置允许应用程序可以有多个实例,然后就可以开启多个 main() 方法了。

核心功能 - 图3

当然,我们也可以使用多线程的方式来模拟两个消费者的情况,这里我们为了简单就开启多个 main() 方法模拟。

测试

然后我们开启两个消费者程序测试,然后使用生产者程序发送四条信息测试:

核心功能 - 图4

第一个消费者:

核心功能 - 图5

第二个消费者:

核心功能 - 图6

可以看到,确实是轮询机制,Worker1 获取到 one 之后,two 就是 Worker2 获取了,其他消息也是这样。

消息应答

概念

一个消息只能被一个消费者消费,但是各个消费者完成一个任务可能需要一段时间,如果某个消费者处理一个长任务时,突然挂掉了!但是 RabbitMQ 认为已经给消费分发了消息,所以将此消息标记为删除,在这种情况下,我们就会丢失正在处理的消息。

为了保证消息在发送过程中不会丢失,所以 RabbitMQ 引入了消息应答机制,消息应答就是:消费者在接收到消息并处理完该消息后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息标记为删除了。

自动应答

在前面我们一直用的是自动应答机制,但是我们没有解释。

在自动应答模式下,消息在发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

其实不管怎么说,我们一般都不推荐使用自动应答的方式,因为这种方式不够安全,我们应该选择更加安全的手动应答方式。

消息应答的方法

如果是手动应答,很快能想到的就是,我们应该在消息处理完毕以后,调用某个方法告诉 RabbitMQ 消息已经处理完了。

  • channel.basicAck(deliveryTag, multiple) :用于肯定确认,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。
  • channel.basicNack(deliveryTag, multiple, requeue) :用于否定确认。
  • channel.basicReject(deliveryTag, requeue) :用于否定确认,相比 basicNack 少了一个形参,不处理该消息了,直接拒接,可以选择将其丢弃。

方法形参的含义如下:

deliveryTag 形参

这个形参需要传入一个 long 类型的实参,需要传入队列的 key ,我们通常使用 delivery.getEnvelope().getDeliveryTag() 获取这个 key ,具体方式见下小节代码。

multiple 形参

channel.basicAck() 方法中,第二个参数位置可以是个名为 multiple 的布尔类型的属性,当传参为 truefalse 时的作用如下:

  • true :开启批量应答,比如 RabbitMQ 可能推送多个消息给消费者,如果开启批量应答,那么当处理完某个消息后,会将这一批消息全部应答。
  • false :关闭批量应答,出现如上的情况,只会应答当前处理完的消息,而其他消息则不进行应答。

核心功能 - 图7

很明显,如果开启了批量应答,也可能会产生消息丢失的风险,所以一般这个参数设置为 false

requeue 形参

指定被拒绝的消息是否重新入对,重新入队的概念见下方。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。

如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。

核心功能 - 图8

改进消费者代码

对于原来生产者的代码我们可以不进行改变,但是我们应该把消费者的代码改成手动应答的:

  1. public static void main(String[] args) throws IOException {
  2. Channel channel = RabbitMqUtils.getChannel();
  3. if (channel == null) {
  4. System.out.println("无法获取信道");
  5. return;
  6. }
  7. // 推送消息如何进行消费的接口回调
  8. DeliverCallback deliverCallback = (consumeTag, delivery) -> {
  9. String msg = new String(delivery.getBody());
  10. try {
  11. TimeUnit.SECONDS.sleep(20);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. System.out.println("接收到的消息" + msg);
  16. // 手动应答
  17. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  18. };
  19. // 取消消费的一个回调接口
  20. CancelCallback cancelCallback = (consumeTag) -> {
  21. System.out.println("消息消费被中断");
  22. };
  23. // 采用手动应答
  24. boolean autoAck = false;
  25. channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
  26. }

我们可以开启两个程序,两个程序分别让它们获取到消息后休眠一段时间,然后我们突然将其中一个程序的宕掉,检查另一个程序是否能接收到这个消息,如果正常的话应该是可以的。

这就是因为突然宕掉的程序没有进行应答,那么 RabbitMQ 检测到有个连接没有应答就宕掉了,就会将消费掉的消息重新入队,由其他消费者消费。

持久化

概念

默认情况下 RabbitMQ 退出或由于某种情况崩溃时,它会忽视队列和消息进行退出,因为队列和消息是保存在内存中的,除非你告知它不要这么做。确保消息不会丢失需要做两件事:队列和消息都要标记为持久化。

队列持久化

之前我们创建队列都是非持久化的,如果要实现队列持久化,需要在声明队列的时候把第二个形参 durable 设置为 true 表示开启持久化。

  1. // 声明队列
  2. channel.queueDeclare(QUEUE_NAME // 队列名称
  3. , true //! 队列里的消息是否需要持久化到磁盘,默认消息存储在内存
  4. , false // 队列是否仅供一个消费者进行消费,是否需要消息共享
  5. , false // 是否自动删除,最后一个消费者断开连接以后,该队语句会自动删除
  6. , null); // 其他参数,以后再细说

但是如果在声明此队列之前,队列已经存在,并且原本是非持久化的队列,那么就会报如下错误:

  1. ShutdownSignalException: ... received 'true' but current is 'false' ...

解决方法就是先将这个队列删掉,删除队列可以进行 Web 管理界面,点击如 Queues 详情页,选择指定队列进行删除。

核心功能 - 图9

删除之后再声明应该就不会报错了,正常持久化的队列在 Features 位置应该存在 D 字母。

核心功能 - 图10

这个时候即使重启 RabbitMQ 队列也依然存在。

消息持久化

上面只对队列进行了持久化,但是对于消息,我们也是需要持久化,不然重启后只是一个空队列。

我们只需要在发布消息的时候,在 basicPublish() 方法的第三个参数中,设置 MessageProperties.PERSISTENT_TEXT_PLAIN 即可。

  1. channel.basicPublish("" // 发送到哪个交换机(空串表示默认交换机)
  2. , QUEUE_NAME // 路由的key是哪个
  3. , MessageProperties.PERSISTENT_TEXT_PLAIN // 其他的参数消息
  4. , message.getBytes(StandardCharsets.UTF_8)); // 发送消息的消息体

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边发布确认章节。

不公平分发

前面说过,RabbitMQ 默认发送消息是轮询分发原则,但是在某些场景下这种策略并不好:

比方说我们有两个消费者,其中消费者1的能力较强,只需要1s就可以完成任务,而消费者2能力较弱,需要10s才能完成任务。如果采用轮询分发原则,那么假设我们有4条消息,两个消费者各分配到两条消息,消费者1只要2s就执行完了,但是消费者2需要20s,这就会造成消费者1很大部分时间处于闲置状态,消费者2却一直在干活。

所以轮询分发在有些时候并不好,但是 RabbitMQ 并不知道这种情况,它依然在很公平的进行分发。

开启不公平分发很简单,只需要在消费者的信道中通过 channel.basicQos(1) 设置。

  1. // prefetch = 0 表示轮询分发
  2. // prefetch = 1 表示不公平分发
  3. // prefetch > 1 表示预取值(见下小节)
  4. int prefetch = 1;
  5. channel.basicQos(prefetch);

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者。

当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加 新的 worker 或者改变其他存储任务的策略。

预取值

RabbitMQ 中消息的发送是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。

因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

这个时候就可以通过使用 channel.basicQos(prefetch) 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。

不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境 中。对于大多数应用来说,稍微高一点的值将是最佳的。

发布确认

前面在持久化章节,说过队列持久化+消息持久化后还是有消息丢失的风险,所以为了保证消息不丢失,就有了发布确认功能。

发布确认简单来说就是当生产者向 RabbitMQ 发布消息后,生产者会继续等待,当 RabbitMQ 在硬盘上将消息持久化完成后,就会发送一个消息返回给生产者,从而确认此消息已经被发布。

完整的流程应该是下面这样的

  • 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。
  • 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

开启发布确认

发布确认默认是没有开启的,如果需要开启在获取到信道后使用 channel.confirmSelect() 即可开启。

  1. Channel channel = RabbitMqUtils.getChannel();
  2. channel.confirmSelect();

单个发布确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。

  • waitForConfirms() 方法用于消息的发布确认。
  • waitForConfirmsOrDie(long) 方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

  1. public static void publishSingleMessage(Channel channel) throws Exception {
  2. // 声明队列
  3. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  4. // 开启发布确认功能
  5. channel.confirmSelect();
  6. // 开始计时
  7. long start = System.currentTimeMillis();
  8. for (int i = 0; i < MESSAGE_COUNT; i++) {
  9. String msg = i + "";
  10. channel.basicPublish(""
  11. , QUEUE_NAME
  12. , MessageProperties.PERSISTENT_TEXT_PLAIN
  13. , msg.getBytes(StandardCharsets.UTF_8));
  14. // 单个消息发布确认
  15. // 如果服务端返回false,或者超时还未返回,生产者也可以重发消息
  16. boolean flag = channel.waitForConfirms();
  17. System.out.println(flag ? "消息发布成功!" : "消息发布失败!");
  18. }
  19. long stop = System.currentTimeMillis();
  20. System.out.println("发布" + MESSAGE_COUNT + "个消息并单独发布确认耗时:" + (stop - start) + "ms");
  21. }

假设循环中 MESSAGE_COUNT 的长度为 1000,我们可以统计一下发布一千条消息的时间,我测试的时间是 755ms 。但是如果我们把发布确认去掉,只需要 35ms 即可执行完成,可以发现,单个发布确认的效率是很差的。

多个发布确认

单个消息发布确认非常慢,所以为了优化,我们可以先发布一批消息之后再将这些消息一起确认,这样就可以极大的提高吞吐量。

当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。

当然这种方案仍是同步的,也一样阻塞消息的发布。

  1. public static void publishBatchMessage(Channel channel) throws Exception {
  2. // 声明队列
  3. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  4. // 开启发布确认功能
  5. channel.confirmSelect();
  6. // 批量执行的大小
  7. int batchSize = 100;
  8. // 开始计时
  9. long start = System.currentTimeMillis();
  10. for (int i = 1; i <= MESSAGE_COUNT; i++) {
  11. String msg = i + "";
  12. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  13. // 批量消息发布确认
  14. if (i % batchSize == 0) {
  15. boolean flag = channel.waitForConfirms();
  16. System.out.println(flag ? batchSize + "条消息发布成功!" : batchSize + "条消息发布失败!");
  17. }
  18. }
  19. long stop = System.currentTimeMillis();
  20. System.out.println("发布" + MESSAGE_COUNT + "个消息并多个发布确认耗时:" + (stop - start) + "ms");
  21. }

通过这种方式优化后,发布 1000 条消息的耗时只需要 132ms ,相比单个消息发布已经快很多了。当然,多个消息发布确认的效率还要取决于你批量执行的大小,当你批量执行的间隔数值越大,发布确认的频次也就越少,相应的执行时间也越短。所以我们需要选择一个合适的大小进行批量发布。

异步发布确认

上面两种方式的发布确认都有或多或少的缺点,所以就有了异步发布确认,异步发布确认就是发布者尽管发布就好了,消息的确认由异步的监听器来回调处理,能极大的提高效率。

异步发布确认也是性价比最高的发布确认方式,无论是可靠性还是效率都没得说,下面是一部发布确认的流程图:

核心功能 - 图11

  1. public static void publishAsyncMessage(Channel channel) throws Exception {
  2. // 声明队列
  3. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  4. // 开启发布确认功能
  5. channel.confirmSelect();
  6. // 消息确认回调函数
  7. ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  8. System.out.println("确认的消息:" + deliveryTag);
  9. };
  10. // 消息确认失败的回调函数
  11. ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  12. System.out.println("未确认的消息:" + deliveryTag);
  13. };
  14. // 添加消息监听器
  15. channel.addConfirmListener(ackCallback, nackCallback);
  16. // 开始计时
  17. long start = System.currentTimeMillis();
  18. for (int i = 0; i < MESSAGE_COUNT; i++) {
  19. String msg = i + "";
  20. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  21. }
  22. long stop = System.currentTimeMillis();
  23. System.out.println("发布" + MESSAGE_COUNT + "个消息并异步发布确认耗时:" + (stop - start) + "ms");
  24. }

通过这样优化之后,最后的发布1000条消息的耗时大概只要 40ms ,这个耗时和没使用发布确认之前的效率差不多,同时我们还保证了发布消息的可靠性,所以推荐使用这种方式。

如何处理异步未确认消息

对于异步未确认的消息,最好的方式就是把未确认的消息全部先放在一个基于内存的能被发布线程访问的队列中,可以使用如 ConcurrentSkipListMap 来实现。

  1. public static void publishAsyncMessage(Channel channel) throws Exception {
  2. // 声明队列
  3. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  4. // 开启发布确认功能
  5. channel.confirmSelect();
  6. // 创建线程安全有序的一个哈希表,适用于高并发的情况,存储所有未确认的消息
  7. ConcurrentSkipListMap<Long, String> unConfirmMap = new ConcurrentSkipListMap<>();
  8. // 消息确认回调函数
  9. ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
  10. // 判断是否为批量确认消息
  11. if (multiple) {
  12. // 如果是,获取小于等于当前序列号的的未确认消息
  13. ConcurrentNavigableMap<Long, String> confirmed = unConfirmMap.headMap(deliveryTag + 1);
  14. // 清除该部分未确认消息,表示这部分消息已确认
  15. confirmed.clear();
  16. } else {
  17. // 如果不是,删除当前的消息即可
  18. unConfirmMap.remove(deliveryTag);
  19. }
  20. };
  21. // 消息确认失败的回调函数
  22. ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
  23. System.out.println("未确认的消息:" + deliveryTag);
  24. };
  25. // 添加消息监听器
  26. channel.addConfirmListener(ackCallback, nackCallback);
  27. // 开始计时
  28. long start = System.currentTimeMillis();
  29. for (int i = 0; i < MESSAGE_COUNT; i++) {
  30. String msg = i + "";
  31. // 添加未确认的消息
  32. unConfirmMap.put(channel.getNextPublishSeqNo(), msg);
  33. // 发布消息
  34. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  35. }
  36. long stop = System.currentTimeMillis();
  37. System.out.println("发布" + MESSAGE_COUNT + "个消息并异步发布确认耗时:" + (stop - start) + "ms");
  38. }

三种方式对比

单独发布消息:同步等待确认,简单,但吞吐量非常有限。

批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。

异步处理消息:最佳性能和资源使用,在出现错误的情况下可以很好地控制,推荐使用。

效率方面:异步发布 > 批量发布 > 单独发布

交换机

在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情,我们将消息传达给多个消费者。这种模式称为 ”发布/订阅”。

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:

  • 第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者。
  • 其中一个消费者接收到消息后把日志存储在磁盘, 另外一个消费者接收到消息后把消息打印在屏幕上。
  • 事实上第一个程序发出的日志消息将广播给所有消费者。

Exchanges

概念

RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

核心功能 - 图12

Exchanges 的类型

总共有以下类型:

  • 直接(direct),也被称为路由类型。
  • 主题(topic)
  • 标题(headers),也被称为头类型。
  • 扇出(fanout),也被称为发布订阅类型。

无名 Exchanges

在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

  1. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));

第一个参数是交换机的名称。空字符串表示默认交换机或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey) 绑定 key 指定的,如果它存在的话。默认 exchange 隐式绑定到每个队列,路由键等于队列名。不可能显式地绑定或取消与默认交换的绑定。也不能被删除。

临时队列

之前的章节我们使用的是具有特定名称的队列,队列的名称我们来说至关重要我们需要指定我们的消费者去消费哪个队列的消息。

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除

  1. String queueName = channel.queueDeclare().getQueue();

生成的临时队列如下:

核心功能 - 图13

绑定( bindings )

什么是 bindings 呢?bindings 其实是 exchangequeue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定。

核心功能 - 图14

Fanout

介绍

前面在介绍交换机的类型的时候,就简单说到了 fanout 类型,这种类型也被称为发布订阅类型,能够讲接收到的所有消息广播到它知道的所有队列中,并且忽视 routingKey

系统中有如下横线的交换机就是默认的 fanout 类型的交换机:

核心功能 - 图15

在这个类型中,我们不需要特别关系具体的队列,因为我们采用的是广播方式,所以我们一般采用临时队列的方式。

消费者

  1. public class ReceiveLogs {
  2. public final static String EXCHANGE_NAME = "logs_exchange";
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. // 声明交换机,指定fanout类型
  10. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  11. // 为了简单,生成一个临时队列测试
  12. String queueName = channel.queueDeclare().getQueue();
  13. // 把该临时队列绑定到我们的交换机中,第三个参数是routingKey,我们可以为空字符串
  14. // 广播模式下routingKey不管是什么,都不会影响广播
  15. channel.queueBind(queueName, EXCHANGE_NAME, "");
  16. System.out.println("log1 等待接收到消息,把接收到的消息打印在屏幕上...");
  17. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  18. String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
  19. System.out.println("接收到的消息:" + msg);
  20. // 消息应答
  21. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  22. };
  23. channel.basicConsume(queueName, false, deliverCallback, (consumerTag, delivery) -> {});
  24. }
  25. }

生产者

  1. public class EmitLog {
  2. public final static String EXCHANGE_NAME = "logs_exchange";
  3. public static void main(String[] args) throws IOException {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. // 声明交换机,指定fanout类型
  10. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  11. Scanner scanner = new Scanner(System.in);
  12. while (scanner.hasNext()) {
  13. String msg = scanner.next();
  14. // 向交换机发布消息,广播模式下路由的key是无所谓的,所以我们可以为空串
  15. channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
  16. System.out.println("生产者发出消息:" + msg);
  17. }
  18. }
  19. }

测试

我们先开启两个消费者的线程( idea 中开启多个应用实例),这两个消费者会创建两个临时的队列,并等待从队列中接收消息。然后我们再开启生产者程序,我们发送四个消息,两个消费者程序应该都可以接收到。

生产者:

核心功能 - 图16

消费者1:

核心功能 - 图17

消费者2:

核心功能 - 图18

Direct

在上一节中,使用 fanout 类型构建了一个简单的日志记录系统,可以向全部接收者广播日志消息。在本 节我们将向其中添加一些特别的功能:比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解,队列只对它绑定的交换机的消息感兴趣。绑定用参数 routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey") ,绑定之后的 意义由其交换类型决定。

介绍

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。

核心功能 - 图19

在上面这张图中,我们可以看到 X 交换即绑定了两个队列,绑定的类型是 direct。Q1队列只一个 routingKey 进行绑定,而 Q2队列有两个 routingKey 绑定。在这种情况下,绑定 orange 的消息会发送到 Q1 队列上,而绑定 blackgreen 的消息都会发送到 Q2 队列上。

多重绑定

核心功能 - 图20

多重绑定是指在 X 交换机上,在 Q1 队列和 Q2 队列上绑定相同的 routingKey ,如图上的 black 。这是向 black 发布的消息在 Q1 和 Q2 队列中都会收到,那么 C1 和 C2 两个消费者都能接收到,在这种情况下,就表现的和 fanout 有些类似了。

消费者

下面我们创建两个消费者:

  • ReceiveLogsDirect01 :接收全部日志消息,并在控制台输入。
  • ReceiveLogsDirect02 :只接收 error 级别的日志消息,在控制台输出并持久化到日志文件中。
  1. public class ReceiveLogsDirect01 {
  2. public final static String EXCHANGE_NAME = "direct_exchange";
  3. public final static String QUEUE_NAME = "console";
  4. public static void main(String[] args) throws Exception {
  5. Channel channel = RabbitMqUtils.getChannel();
  6. if (channel == null) {
  7. System.out.println("无法获取信道");
  8. return;
  9. }
  10. // 声明一个交换机,并且将交换机类型声明为直接(direct)类型
  11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  12. // 声明队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 绑定队列
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  18. // 接收消息的回调
  19. DeliverCallback deliverCallback = (consumerTag, message) -> {
  20. String routingKey = message.getEnvelope().getRoutingKey();
  21. String msg = new String(message.getBody());
  22. System.out.println("来自 routingKey[" + routingKey + "] 的消息 -> " + msg);
  23. // 手动应答
  24. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  25. };
  26. // 接收消息
  27. channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag, message) -> {
  28. });
  29. }
  30. }
  1. public class ReceiveLogsDirect02 {
  2. public final static String EXCHANGE_NAME = "direct_exchange";
  3. public final static String QUEUE_NAME = "disk";
  4. public static void main(String[] args) throws Exception {
  5. Channel channel = RabbitMqUtils.getChannel();
  6. if (channel == null) {
  7. System.out.println("无法获取信道");
  8. return;
  9. }
  10. // 声明一个交换机,并且将交换机类型声明为直接(direct)类型
  11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  12. // 声明队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 绑定队列
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
  16. // 接收消息的回调
  17. DeliverCallback deliverCallback = (consumerTag, message) -> {
  18. String routingKey = message.getEnvelope().getRoutingKey();
  19. String msg = new String(message.getBody());
  20. System.out.println("来自 routingKey[" + routingKey + "] 的消息 -> " + msg + " 将消息存储到日志文件");
  21. // 手动应答
  22. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  23. };
  24. // 接收消息
  25. channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag, message) -> {
  26. });
  27. }
  28. }

生产者

  1. public class DirectLogs {
  2. public final static String EXCHANGE_NAME = "direct_exchange";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  10. // 消息map,key为routingKey,value为发布的消息
  11. Map<String, String> messages = new HashMap<>();
  12. messages.put("info", "普通日志消息");
  13. messages.put("warn", "出现警告消息");
  14. messages.put("error", "出现严重错误!!!!!");
  15. // 遍历map集合发布消息
  16. messages.forEach((routingKey, msg) -> {
  17. try {
  18. // 发布消息
  19. channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes(StandardCharsets.UTF_8));
  20. System.out.println("向 routingKey[" + routingKey + "] 发布消息 -> " + msg);
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. RabbitMqUtils.closeAll(channel);
  26. }
  27. }

测试

生产者:

核心功能 - 图21

消费者1:

核心功能 - 图22

消费者2:

核心功能 - 图23

在这种情况下,如果开启多个相同的消费者,默认还是会进行轮询消费,和之前的效果一致。

Topic

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是 使用了 direct 交换机,从而有能实现有选择性地接收日志。

尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性,比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型。

介绍

topic 交换机可以实现 routingKey 的模糊匹配,但是又有一定的规则,不能随意写,topic 交换机的 routingKey 必须是一串单词列表,单词于单词之间以 . 分隔,单词可以是任意单词,但是最好见名知意,如 logs.info.base ,当然,这个单词列表最多不能超过 255 字节。

怎么实现模糊匹配的功能呢,我们可以通过下面两个替换符实现:

  • # :可以代替 0 个或多个单词组。
  • . :可以代替 1 个单词。

匹配案例

核心功能 - 图24

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的:

  • quick.orange.rabbit :被队列 Q1Q2 接收到。
  • lazy.orange.elephant :被队列 Q1Q2 接收到。
  • quick.orange.fox :被队列 Q1 接收到。
  • lazy.brown.fox :被队列 Q2 接收到。
  • lazy.pink.rabbit :虽然满足两个绑定但只被队列 Q2 接收一次。
  • quick.brown.fox :不匹配任何绑定不会被任何队列接收到会被丢弃。
  • quick.orange.male.rabbit :是四个单词不匹配任何绑定会被丢弃。
  • lazy.orange.male.rabbit :是四个单词但匹配 Q2。

当队列绑定关系是下列这种情况时需要引起注意:

  • 当一个队列绑定键是 # ,那么这个队列将接收所有数据,就有点像 fanout 了 。
  • 如果队列绑定键当中没有 #* 出现,那么该队列绑定类型就是 direct 了。

消费者

我们创建两个消费者,就按照上面匹配案例中的消费者进行代码演示。

  1. public class ReceiveLogsTopic01 {
  2. public final static String EXCHANGE_NAME = "topic_logs";
  3. public final static String QUEUE_NAME = "Q1";
  4. public static void main(String[] args) throws Exception {
  5. Channel channel = RabbitMqUtils.getChannel();
  6. if (channel == null) {
  7. System.out.println("无法获取信道");
  8. return;
  9. }
  10. // 声明交换机
  11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  12. // 声明队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 绑定交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
  16. // 接收消息回调
  17. DeliverCallback deliverCallback = (consumerTag, message) -> {
  18. String routingKey = message.getEnvelope().getRoutingKey();
  19. String msg = new String(message.getBody());
  20. System.out.println("来自 routingKey[" + routingKey + "] 的消息 -> " + msg);
  21. // 手动应答
  22. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  23. };
  24. // 接收消息
  25. channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag, message)->{});
  26. }
  27. }
  1. public class ReceiveLogsTopic02 {
  2. public final static String EXCHANGE_NAME = "topic_logs";
  3. public final static String QUEUE_NAME = "Q2";
  4. public static void main(String[] args) throws Exception {
  5. Channel channel = RabbitMqUtils.getChannel();
  6. if (channel == null) {
  7. System.out.println("无法获取信道");
  8. return;
  9. }
  10. // 声明交换机
  11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  12. // 声明队列
  13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  14. // 绑定交换机
  15. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
  17. // 接收消息回调
  18. DeliverCallback deliverCallback = (consumerTag, message) -> {
  19. String routingKey = message.getEnvelope().getRoutingKey();
  20. String msg = new String(message.getBody());
  21. System.out.println("来自 routingKey[" + routingKey + "] 的消息 -> " + msg);
  22. // 手动应答
  23. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  24. };
  25. // 接收消息
  26. channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag, message)->{});
  27. }
  28. }

生产者

  1. public class EmitLogTopic {
  2. public final static String EXCHANGE_NAME = "topic_logs";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. if (channel == null) {
  6. System.out.println("无法获取信道");
  7. return;
  8. }
  9. // 声明队列
  10. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  11. // 创建消息集合
  12. Map<String, String> messages = new HashMap<>();
  13. messages.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
  14. messages.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
  15. messages.put("quick.orange.fox", "被队列 Q1 接收到");
  16. messages.put("lazy.brown.fox", "被队列 Q2 接收到");
  17. messages.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
  18. messages.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
  19. messages.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
  20. messages.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
  21. // 发送消息
  22. messages.forEach((routingKey, msg)-> {
  23. try {
  24. channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes(StandardCharsets.UTF_8));
  25. System.out.println("向 routingKey[" + routingKey + "] 发布消息 -> " + msg);
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. });
  30. RabbitMqUtils.closeAll(channel);
  31. }
  32. }

测试

生产者:

核心功能 - 图25

消费者1:

核心功能 - 图26

消费者2:

核心功能 - 图27


本章完。