第一章:死信队列

1.1 死信队列的概念

  • 死信队列,英文缩写是:DLX(Dead Letter Exchange),其实应该称为死信交换机更为合适。
  • 当消息成为死信后,可以被重新发送到另一个交换机,这个交换机就是死信交换机。

1.png

  • 实际上,死信队列就是普通的交换机,只不过我们人为的给其赋予了特殊的含义:当消息成为死信后,会重新发送到 DLX(死信交换机)。
  • 默认情况下,当消息成为死信(过期、队列满了、消息 TTL 过期)的时候,RabbitMQ 会将这些消息进行清理,但是当配置了死信队列之后,RabbitMQ 会将死信发送到 DLX (死信交换机)中,这样就可以避免消息丢失。
  • 死信队列的应用场景:
    • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中。
    • 用户在商城下单成功并进行支付活动,如果在指定的时候没有支付,将会将订单自动失效。
    • ……

1.2 消息成为死信的三种情况

  • ① 消息 TTL 过期。
  • ② 队列达到最大程度(队列满了,无法再添加数据到 MQ 中)。
  • ③ 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false。

1.3 死信队列实战

1.3.1 架构图

2.png

注意:在演示消息 TTL 过期或队列达到最大长度的时候,只需要让消费者 C1 启动然后关闭即可,因为没有消费者消费普通队列的消息,就会将消息变为死信。

1.3.2 消息 TTL 过期

  • 生产者的代码:
  1. package com.github;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2022-05-25 09:30:23
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws Exception {
  13. Channel channel = RabbitmqUtils.getChannel();
  14. channel.exchangeDeclare(Consumer1.NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  15. for (int i = 0; i < 100; i++) {
  16. String msg = "消息" + i;
  17. // 为什么要此处设置 TTL ,是因为生产者设置 TTL 更加灵活
  18. String expiration = String.valueOf(10 * 1000);
  19. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration(expiration).build();
  20. channel.basicPublish(Consumer1.NORMAL_EXCHANGE, Consumer1.NORMAL_ROUTING_KEY, basicProperties, msg.getBytes(StandardCharsets.UTF_8));
  21. }
  22. System.out.println("发送消息完毕");
  23. }
  24. }
  • 消费者 1 的代码:
  1. package com.github;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Map;
  8. /**
  9. * 消费者1
  10. *
  11. * @author 许大仙
  12. * @version 1.0
  13. * @since 2022-05-25 09:02:24
  14. */
  15. public class Consumer1 {
  16. /**
  17. * 普通交换机
  18. */
  19. public static final String NORMAL_EXCHANGE = "normal_exchange";
  20. /**
  21. * 普通队列
  22. */
  23. public static final String NORMAL_QUEUE = "normal_queue";
  24. /**
  25. * 普通交换机和普通队列之间的routing key
  26. */
  27. public static final String NORMAL_ROUTING_KEY = "normal";
  28. /**
  29. * 死信交换机
  30. */
  31. public static final String DEAD_EXCHANGE = "dead_exchange";
  32. /**
  33. * 死信队列
  34. */
  35. public static final String DEAD_QUEUE = "dead_queue";
  36. /**
  37. * 死信交换机和死信队列之间的routing key
  38. */
  39. public static final String DEAD_ROUTING_KEY = "dead";
  40. public static void main(String[] args) throws Exception {
  41. Channel channel = RabbitmqUtils.getChannel();
  42. /* 声明普通交换机 direct 模式 */
  43. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  44. /* 声明普通队列 */
  45. // 为什么要设置 arguments , 是因为当消息成为死信之后,这里的普通队列就相当于生产者,而生产者需要知道交换机的名称和 routing key
  46. // 至于为什么这么设置 x-message-ttl x-dead-letter-exchange x-dead-letter-routing-key,后面有解释。
  47. Map<String, Object> arguments = MapUtil.newHashMap();
  48. // 设置 TTL 时间,单位是 ms ,一般是由生产者来设置,不然就会写死。
  49. // arguments.put("x-message-ttl", 10 * 1000);
  50. // 设置死信交换机
  51. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  52. // 设置死信交换机的routing key
  53. arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
  54. channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
  55. /* 绑定普通交换机和普通队列 */
  56. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
  57. /* 声明死信交换机 direct 模式 */
  58. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  59. /* 声明死信队列 */
  60. channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
  61. /* 绑定死信交换机和死信队列 */
  62. channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
  63. // 消费消息
  64. DeliverCallback deliverCallback = (consumerTag, message) -> {
  65. System.out.println("消费者消费的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
  66. };
  67. channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (consumerTag) -> {});
  68. }
  69. }

启动之后,就关闭该消费者,模拟其接收不到消息。

  • 解释为什么在普通队列的 arguments 设置 x-message-ttlx-message-ttlx-dead-letter-routing-key,它们是从哪里来的?

3.gif

  • 消费者 2 的代码:
  1. package com.github;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import java.nio.charset.StandardCharsets;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2022-05-25 09:27:41
  10. */
  11. public class Consumer2 {
  12. public static void main(String[] args) throws Exception {
  13. Channel channel = RabbitmqUtils.getChannel();
  14. /* 其实:下面的声明和绑定也可以不写,因为我们在 Consumer1 中已经配置过了 */
  15. /* 声明死信交换机 direct 模式 */
  16. channel.exchangeDeclare(Consumer1.DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  17. /* 声明死信队列 */
  18. channel.queueDeclare(Consumer1.DEAD_QUEUE, true, false, false, null);
  19. /* 绑定死信交换机和死信队列 */
  20. channel.queueBind(Consumer1.DEAD_QUEUE, Consumer1.DEAD_EXCHANGE, Consumer1.DEAD_ROUTING_KEY);
  21. // 消费消息
  22. DeliverCallback deliverCallback = (consumerTag, message) -> {
  23. System.out.println("消费者2消费的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
  24. };
  25. channel.basicConsume(Consumer1.DEAD_QUEUE, true, deliverCallback, (consumerTag) -> {});
  26. }
  27. }
  • 结果显示:
    • ① 生产者没有发送消息。

4.png

  • ② 生产者发送了消息,正常队列中有 100 数据没有消费,时间过去 10 秒,正常队列中的消息由于没有消费,消息就进入了死信队列中 ,然后让消费者 2 进行消费。

5.gif

1.3.3 队列达到最大长度

  • 修改消费者 1 的代码:
  1. package com.github;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Map;
  8. /**
  9. * 消费者1
  10. *
  11. * @author 许大仙
  12. * @version 1.0
  13. * @since 2022-05-25 09:02:24
  14. */
  15. public class Consumer1 {
  16. /**
  17. * 普通交换机
  18. */
  19. public static final String NORMAL_EXCHANGE = "normal_exchange";
  20. /**
  21. * 普通队列
  22. */
  23. public static final String NORMAL_QUEUE = "normal_queue";
  24. /**
  25. * 普通交换机和普通队列之间的routing key
  26. */
  27. public static final String NORMAL_ROUTING_KEY = "normal";
  28. /**
  29. * 死信交换机
  30. */
  31. public static final String DEAD_EXCHANGE = "dead_exchange";
  32. /**
  33. * 死信队列
  34. */
  35. public static final String DEAD_QUEUE = "dead_queue";
  36. /**
  37. * 死信交换机和死信队列之间的routing key
  38. */
  39. public static final String DEAD_ROUTING_KEY = "dead";
  40. public static void main(String[] args) throws Exception {
  41. Channel channel = RabbitmqUtils.getChannel();
  42. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  43. Map<String, Object> arguments = MapUtil.newHashMap();
  44. // 设置队列的最大长度
  45. arguments.put("x-max-length", 10);
  46. // 设置死信交换机
  47. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  48. // 设置死信交换机的routing key
  49. arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
  50. channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
  51. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
  52. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  53. channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
  54. channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
  55. DeliverCallback deliverCallback = (consumerTag, message) -> {
  56. System.out.println("消费者消费的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
  57. };
  58. channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (consumerTag) -> {});
  59. }
  60. }

注意:需要将原先的队列删除,因为参数修改了。

  • 结果显示:

6.gif

1.3.4 消息被拒

  • 修改消费者 1 的代码,让其拒收消息:
  1. package com.github;
  2. import cn.hutool.core.map.MapUtil;
  3. import com.rabbitmq.client.BuiltinExchangeType;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Map;
  8. /**
  9. * 消费者1
  10. *
  11. * @author 许大仙
  12. * @version 1.0
  13. * @since 2022-05-25 09:02:24
  14. */
  15. public class Consumer1 {
  16. /**
  17. * 普通交换机
  18. */
  19. public static final String NORMAL_EXCHANGE = "normal_exchange";
  20. /**
  21. * 普通队列
  22. */
  23. public static final String NORMAL_QUEUE = "normal_queue";
  24. /**
  25. * 普通交换机和普通队列之间的routing key
  26. */
  27. public static final String NORMAL_ROUTING_KEY = "normal";
  28. /**
  29. * 死信交换机
  30. */
  31. public static final String DEAD_EXCHANGE = "dead_exchange";
  32. /**
  33. * 死信队列
  34. */
  35. public static final String DEAD_QUEUE = "dead_queue";
  36. /**
  37. * 死信交换机和死信队列之间的routing key
  38. */
  39. public static final String DEAD_ROUTING_KEY = "dead";
  40. public static void main(String[] args) throws Exception {
  41. Channel channel = RabbitmqUtils.getChannel();
  42. /* 声明普通交换机 direct 模式 */
  43. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  44. /* 声明普通队列 */
  45. // 为什么要设置 arguments , 是因为当消息成为死信之后,这里的普通队列就相当于生产者,而生产者需要知道交换机的名称和 routing key
  46. // 至于为什么这么设置 x-message-ttl x-dead-letter-exchange x-dead-letter-routing-key,后面有解释。
  47. Map<String, Object> arguments = MapUtil.newHashMap();
  48. // 设置 TTL 时间,单位是 ms ,一般是由生产者来设置,不然就会写死。
  49. // arguments.put("x-message-ttl", 10 * 1000);
  50. // 设置死信交换机
  51. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  52. // 设置死信交换机的routing key
  53. arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
  54. channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
  55. /* 绑定普通交换机和普通队列 */
  56. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
  57. /* 声明死信交换机 direct 模式 */
  58. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  59. /* 声明死信队列 */
  60. channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
  61. /* 绑定死信交换机和死信队列 */
  62. channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
  63. // 消费消息
  64. DeliverCallback deliverCallback = (consumerTag, message) -> {
  65. String msg = new String(message.getBody(), StandardCharsets.UTF_8);
  66. if (msg.contains("1")) { // 拒收消息
  67. channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
  68. } else if (msg.contains("2")) {
  69. channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false);
  70. }
  71. };
  72. channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag) -> {});
  73. }
  74. }

注意:在这个示例中,消费者1 也是要启动的,不然怎么拒收,而且需要将原先的队列删除,避免影响结果。

  • 结果显示:

7.gif

第二章: 延迟队列

2.1 延迟队列的概念

  • 延迟队列,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定的时间到了以后或之前取出和处理,换言之,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

2.2 延迟队列的应用场景

  • 延迟队列的应用场景:
    • ① 订单在 10 分钟之内没有付款就自动取消。
    • ② 新创建的店铺,如果在 10 天之内都没有上传过商品,则自动发送消息提醒。
    • ③ 用户注册成功后,如果三天没有登录,则发送短信进行提醒。
    • ④ 用户发起退款,如果三天之内没有得到处理,则通知相关运营人员。
    • ⑤ 预定会议后,需要在预定的时间点前 10 分钟通知各个与会人员参加会议。
    • ……
  • 上面的场景都有一个特点,需要在某个事件发生之前或之后的某个时间点去完成某件事情。例如:订单在 10 分钟之内没有付款就自动取消 ,看起来非常像定时任务,我们只需要一致轮询数据,1 秒查询一次,取出订单数据,然后进行处理。但是,如果数据量很大,很难在 1 秒之内就能遍历出所有的数据;况且,轮询对数据库的压力非常大,无法满足要求。

8.png

2.3 RabbitMQ 中的 TTL(Time To Live)

2.3.1 概述

  • TTL 是 RabbitMQ 中一个消息或队列的属性,表明一条消息或该队列中的所有消息的最大存活时间。
  • 单位是毫秒,换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,就会变成 死信 。如果同时配置了队列的 TTL 和消息的 TTL ,那么较小的那个值将会被使用。

2.3.2 消息设置 TTL

  • 示例:
  1. String expiration = String.valueOf(10 * 1000);
  2. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration(expiration).build();
  3. channel.basicPublish(Consumer1.NORMAL_EXCHANGE, Consumer1.NORMAL_ROUTING_KEY, basicProperties, msg.getBytes(StandardCharsets.UTF_8));

2.3.3 队列设置 TTL

  • 示例:
  1. Map<String, Object> arguments = MapUtil.newHashMap();
  2. // 设置 TTL 时间,单位是 ms ,一般是由生产者来设置,不然就会写死。
  3. arguments.put("x-message-ttl", 10 * 1000);
  4. // 设置死信交换机
  5. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  6. // 设置死信交换机的routing key
  7. arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
  8. channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);

2.3.4 区别

  • 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢失(如果配置了死信队列,会转发到死信队列中)。
  • 如果设置了消息的 TTL 属性,消息即使过期,也不一定会马上被丢失,因为消息是否过期是在即将投递到消费者之前判断的,如果当前队列有严重的消息积压情况,则已过期的消息也许会存活较长时间。
  • 如果不设置 TTL ,表示消息永不过期。
  • 如果设置 TTL 为 0 ,则表示除非此时可以直接投递给消费者,否则该消息将会丢弃。

2.4 整合 SpringBoot

2.4.1 前提

  • JDK:1.8 。
  • IDEA:2021+ 。
  • Maven:3.6+。

2.4.2 环境准备

  • Maven 依赖:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.5.3</version>
  9. <relativePath/>
  10. </parent>
  11. <groupId>com.example</groupId>
  12. <artifactId>rabbitmq-springboot</artifactId>
  13. <version>1.0</version>
  14. <name>rabbitmq-springboot</name>
  15. <description>rabbitmq-springboot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <scope>runtime</scope>
  32. <optional>true</optional>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.projectlombok</groupId>
  36. <artifactId>lombok</artifactId>
  37. <optional>true</optional>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.springframework.boot</groupId>
  41. <artifactId>spring-boot-starter-test</artifactId>
  42. <scope>test</scope>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.springframework.amqp</groupId>
  46. <artifactId>spring-rabbit-test</artifactId>
  47. <scope>test</scope>
  48. </dependency>
  49. </dependencies>
  50. <build>
  51. <plugins>
  52. <plugin>
  53. <groupId>org.springframework.boot</groupId>
  54. <artifactId>spring-boot-maven-plugin</artifactId>
  55. <configuration>
  56. <excludes>
  57. <exclude>
  58. <groupId>org.projectlombok</groupId>
  59. <artifactId>lombok</artifactId>
  60. </exclude>
  61. </excludes>
  62. </configuration>
  63. </plugin>
  64. </plugins>
  65. </build>
  66. </project>
  • application.yml:
  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. virtual-host: /

2.5 队列的 TTL

2.5.1 架构图

9.png

2.5.2 配置类代码

  • RabbitmqConfig.java
  1. package com.github.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * 配置类,用来声明交换机和队列,并配置之间的关系
  7. *
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-25 13:53:15
  11. */
  12. @Configuration
  13. public class RabbitmqConfig {
  14. /**
  15. * 普通交换机 X
  16. */
  17. public static final String EXCHANGE_X = "X";
  18. /**
  19. * 普通队列 QA
  20. */
  21. public static final String QUEUE_A = "QA";
  22. /**
  23. * 普通 routing key
  24. */
  25. public static final String ROUTING_KEY_XA = "XA";
  26. /**
  27. * 普通队列 QB
  28. */
  29. public static final String QUEUE_B = "QB";
  30. /**
  31. * 普通 routing key
  32. */
  33. public static final String ROUTING_KEY_XB = "XB";
  34. /**
  35. * 死信交换机 Y
  36. */
  37. public static final String DEAD_EXCHANGE_Y = "Y";
  38. /**
  39. * 死信队列 QD
  40. */
  41. public static final String DEAD_QUEUE_D = "QD";
  42. /**
  43. * 死信 routing key
  44. */
  45. public static final String DEAD_ROUTING_KEY_YD = "YD";
  46. /**
  47. * 声明交换机
  48. */
  49. @Bean
  50. public DirectExchange xExchange() {
  51. return new DirectExchange(EXCHANGE_X);
  52. }
  53. /**
  54. * 声明交换机
  55. */
  56. @Bean
  57. public DirectExchange yExchange() {
  58. return new DirectExchange(DEAD_EXCHANGE_Y);
  59. }
  60. /**
  61. * 声明队列
  62. */
  63. @Bean
  64. public Queue aQueue() {
  65. return QueueBuilder.durable(QUEUE_A)
  66. // 声明当前队列绑定的死信交换机
  67. .deadLetterExchange(DEAD_EXCHANGE_Y)
  68. // 声明当前队列绑定的死信队列
  69. .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
  70. // 设置 TTL 时间
  71. .ttl(10 * 1000)
  72. .build();
  73. }
  74. /**
  75. * 声明队列
  76. */
  77. @Bean
  78. public Queue bQueue() {
  79. return QueueBuilder.durable(QUEUE_B)
  80. // 声明当前队列绑定的死信交换机
  81. .deadLetterExchange(DEAD_EXCHANGE_Y)
  82. // 声明当前队列绑定的死信队列
  83. .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
  84. // 设置 TTL 时间
  85. .ttl(40 * 1000)
  86. .build();
  87. }
  88. /**
  89. * 声明队列
  90. */
  91. @Bean
  92. public Queue dQueue() {
  93. return QueueBuilder.durable(DEAD_QUEUE_D).build();
  94. }
  95. /**
  96. * 绑定关系
  97. */
  98. @Bean
  99. public Binding xaBinding() {
  100. return BindingBuilder.bind(aQueue()).to(xExchange()).with(ROUTING_KEY_XA);
  101. }
  102. /**
  103. * 绑定关系
  104. */
  105. @Bean
  106. public Binding xbBinding() {
  107. return BindingBuilder.bind(bQueue()).to(xExchange()).with(ROUTING_KEY_XB);
  108. }
  109. /**
  110. * 绑定关系
  111. */
  112. @Bean
  113. public Binding ydBinding() {
  114. return BindingBuilder.bind(dQueue()).to(yExchange()).with(DEAD_ROUTING_KEY_YD);
  115. }
  116. }

2.5.3 生产者代码

  • ProducerController.java
  1. package com.github.web;
  2. import com.github.config.RabbitmqConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.time.LocalDateTime;
  10. /**
  11. * 生产者
  12. *
  13. * @author 许大仙
  14. * @version 1.0
  15. * @since 2022-05-25 14:14:52
  16. */
  17. @Slf4j
  18. @RestController
  19. public class ProducerController {
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. @GetMapping("/send/{msg}")
  23. public String msg(@PathVariable("msg") String msg) {
  24. log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", LocalDateTime.now(), msg);
  25. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XA, "消息来自 ttl 为 10S 的队列: " + msg);
  26. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XB, "消息来自 ttl 为 40s 的队列: " + msg);
  27. return "发送消息成功";
  28. }
  29. }

2.5.4 消费者代码

  • RabbitmqListener.java
  1. package com.github.listener;
  2. import com.github.config.RabbitmqConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.nio.charset.StandardCharsets;
  8. import java.time.LocalDateTime;
  9. /**
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-25 14:20:14
  13. */
  14. @Slf4j
  15. @Component
  16. public class RabbitmqListener {
  17. @RabbitListener(queues = RabbitmqConfig.DEAD_QUEUE_D)
  18. public void receive(Message message) {
  19. log.info("当前时间:{},收到死信队列信息:{}", LocalDateTime.now(), new String(message.getBody(), StandardCharsets.UTF_8));
  20. }
  21. }

2.5.5 结果

  • 发送请求:
  1. curl 'http://127.0.0.1:8080/send/哈哈' -X GET
  • IDEA 控制台结果如下:

10.png

  • 虽然,结果出来了,但是此时有些问题,如果现在我需要 5 min、10 min……,那么我岂不是每增加一个时间需求,就需要增加一个队列,如果是预定会议提前通知的场景,难道要增加无数个队列来满足要求?就是在消费者那边设置消息的 TTL 时间不就可以了。

2.6 延迟队列优化

2.6.1 架构图

  • 新增一个队列 QC ,不设置 TTL 时间:

11.png

2.6.2 配置类代码

  • RabbitmqConfig.java
  1. package com.github.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * 配置类,用来声明交换机和队列,并配置之间的关系
  7. *
  8. * @author 许大仙
  9. * @version 1.0
  10. * @since 2022-05-25 13:53:15
  11. */
  12. @Configuration
  13. public class RabbitmqConfig {
  14. /**
  15. * 普通交换机 X
  16. */
  17. public static final String EXCHANGE_X = "X";
  18. /**
  19. * 普通队列 QA
  20. */
  21. public static final String QUEUE_A = "QA";
  22. /**
  23. * 普通 routing key
  24. */
  25. public static final String ROUTING_KEY_XA = "XA";
  26. /**
  27. * 普通队列 QB
  28. */
  29. public static final String QUEUE_B = "QB";
  30. /**
  31. * 普通 routing key
  32. */
  33. public static final String ROUTING_KEY_XB = "XB";
  34. /**
  35. * 普通队列 QC
  36. */
  37. public static final String QUEUE_C = "QC";
  38. /**
  39. * 普通 routing key
  40. */
  41. public static final String ROUTING_KEY_XC = "XC";
  42. /**
  43. * 死信交换机 Y
  44. */
  45. public static final String DEAD_EXCHANGE_Y = "Y";
  46. /**
  47. * 死信队列 QD
  48. */
  49. public static final String DEAD_QUEUE_D = "QD";
  50. /**
  51. * 死信 routing key
  52. */
  53. public static final String DEAD_ROUTING_KEY_YD = "YD";
  54. /**
  55. * 声明交换机
  56. */
  57. @Bean
  58. public DirectExchange xExchange() {
  59. return new DirectExchange(EXCHANGE_X);
  60. }
  61. /**
  62. * 声明交换机
  63. */
  64. @Bean
  65. public DirectExchange yExchange() {
  66. return new DirectExchange(DEAD_EXCHANGE_Y);
  67. }
  68. /**
  69. * 声明队列
  70. */
  71. @Bean
  72. public Queue aQueue() {
  73. return QueueBuilder.durable(QUEUE_A)
  74. // 声明当前队列绑定的死信交换机
  75. .deadLetterExchange(DEAD_EXCHANGE_Y)
  76. // 声明当前队列绑定的死信队列
  77. .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
  78. // 设置 TTL 时间
  79. .ttl(10 * 1000)
  80. .build();
  81. }
  82. /**
  83. * 声明队列
  84. */
  85. @Bean
  86. public Queue bQueue() {
  87. return QueueBuilder.durable(QUEUE_B)
  88. // 声明当前队列绑定的死信交换机
  89. .deadLetterExchange(DEAD_EXCHANGE_Y)
  90. // 声明当前队列绑定的死信队列
  91. .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
  92. // 设置 TTL 时间
  93. .ttl(40 * 1000)
  94. .build();
  95. }
  96. /**
  97. * 声明队列
  98. */
  99. @Bean
  100. public Queue cQueue() {
  101. return QueueBuilder.durable(QUEUE_C)
  102. // 声明当前队列绑定的死信交换机
  103. .deadLetterExchange(DEAD_EXCHANGE_Y)
  104. // 声明当前队列绑定的死信队列
  105. .deadLetterRoutingKey(DEAD_ROUTING_KEY_YD)
  106. .build();
  107. }
  108. /**
  109. * 声明队列
  110. */
  111. @Bean
  112. public Queue dQueue() {
  113. return QueueBuilder.durable(DEAD_QUEUE_D).build();
  114. }
  115. /**
  116. * 绑定关系
  117. */
  118. @Bean
  119. public Binding xaBinding() {
  120. return BindingBuilder.bind(aQueue()).to(xExchange()).with(ROUTING_KEY_XA);
  121. }
  122. /**
  123. * 绑定关系
  124. */
  125. @Bean
  126. public Binding xbBinding() {
  127. return BindingBuilder.bind(bQueue()).to(xExchange()).with(ROUTING_KEY_XB);
  128. }
  129. /**
  130. * 绑定关系
  131. */
  132. @Bean
  133. public Binding xcBinding() {
  134. return BindingBuilder.bind(cQueue()).to(xExchange()).with(ROUTING_KEY_XC);
  135. }
  136. /**
  137. * 绑定关系
  138. */
  139. @Bean
  140. public Binding ydBinding() {
  141. return BindingBuilder.bind(dQueue()).to(yExchange()).with(DEAD_ROUTING_KEY_YD);
  142. }
  143. }

2.6.3 生产者代码

  • ProducerController.java
  1. package com.github.web;
  2. import com.github.config.RabbitmqConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.MessagePostProcessor;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.time.LocalDateTime;
  11. /**
  12. * 生产者
  13. *
  14. * @author 许大仙
  15. * @version 1.0
  16. * @since 2022-05-25 14:14:52
  17. */
  18. @Slf4j
  19. @RestController
  20. public class ProducerController {
  21. @Autowired
  22. private RabbitTemplate rabbitTemplate;
  23. @GetMapping("/send/{msg}")
  24. public String msg(@PathVariable("msg") String msg) {
  25. log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", LocalDateTime.now(), msg);
  26. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XA, "消息来自 ttl 为 10S 的队列: " + msg);
  27. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XB, "消息来自 ttl 为 40s 的队列: " + msg);
  28. return "发送消息成功";
  29. }
  30. @GetMapping("/send/{msg}/{ttl}")
  31. public String msg(@PathVariable("msg") String msg, @PathVariable("ttl") Integer ttl) {
  32. log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列:{}", LocalDateTime.now(), ttl, msg);
  33. MessagePostProcessor messagePostProcessor = (message) -> {
  34. message.getMessageProperties().setExpiration(String.valueOf(ttl));
  35. return message;
  36. };
  37. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_X, RabbitmqConfig.ROUTING_KEY_XC, msg, messagePostProcessor);
  38. return "发送消息成功";
  39. }
  40. }

2.6.4 结果

  • 发送请求:
  1. curl 'http://127.0.0.1:8080/send/哈哈/20000' -X GET
  1. curl 'http://127.0.0.1:8080/send/哈哈/1000' -X GET
  • IDEA 控制台结果如下:

12.png

  • 虽然好像能实现效果,但是我们知道,在消息属性上设置 TTL 的方式,消息可能并不会 按时死亡因为 RabbitMQ 只会检查第一个消息是否过期,如果过期就丢到死信队列中,如果第一个消息的延迟时长很长,而第二个消息的延迟时长很短,第二个消息并不会优先得到执行。

2.7 RabbitMQ 插件实现延迟队列(推荐使用)

2.7.1 安装延时队列插件

  • 官网,下载 rabbitmq_delayed_message_exchange 插件,并解压到 RabbitMQ 的插件目录。
  • 进入 RabbitMQ 的插件目录:
  1. cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
  • 下载插件:
  1. wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
  • 启用插件:
  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 重启 RabbitMQ 服务:
  1. systemctl restart rabbitmq-server
  • 添加延迟队列插件之后:

13.png

2.7.2 架构图

14.png

2.7.3 配置类

  • RabbitmqConfig.java
  1. package com.github.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * 配置类,用来声明交换机和队列,并配置之间的关系
  9. *
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-25 13:53:15
  13. */
  14. @Configuration
  15. public class RabbitmqConfig {
  16. /**
  17. * 普通交换机
  18. */
  19. public static final String EXCHANGE = "delayed.exchange";
  20. /**
  21. * routingkey
  22. */
  23. public static final String ROUTING_KEY = "delayed.routingkey";
  24. /**
  25. * 普通队列
  26. */
  27. public static final String QUEUE = "delayed.queue";
  28. @Bean
  29. public CustomExchange exchange() {
  30. Map<String, Object> args = new HashMap<>();
  31. args.put("x-delayed-type", "direct");
  32. return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, args);
  33. }
  34. /**
  35. * 声明队列
  36. */
  37. @Bean
  38. public Queue queue() {
  39. return QueueBuilder.durable(QUEUE).build();
  40. }
  41. /**
  42. * 绑定关系
  43. */
  44. @Bean
  45. public Binding binding() {
  46. return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();
  47. }
  48. }

2.7.4 生产者代码

  • ProducerController.java
  1. package com.github.web;
  2. import com.github.config.RabbitmqConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.MessagePostProcessor;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.time.LocalDateTime;
  11. /**
  12. * 生产者
  13. *
  14. * @author 许大仙
  15. * @version 1.0
  16. * @since 2022-05-25 14:14:52
  17. */
  18. @Slf4j
  19. @RestController
  20. public class ProducerController {
  21. @Autowired
  22. private RabbitTemplate rabbitTemplate;
  23. @GetMapping("/send/{msg}/{ttl}")
  24. public String msg(@PathVariable("msg") String msg, @PathVariable("ttl") Integer ttl) {
  25. log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列:{}", LocalDateTime.now(), ttl, msg);
  26. MessagePostProcessor messagePostProcessor = (message) -> {
  27. // 注意,这里不再是 setExpiration ,而是 setDelay
  28. message.getMessageProperties().setDelay(ttl);
  29. return message;
  30. };
  31. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE, RabbitmqConfig.ROUTING_KEY, msg, messagePostProcessor);
  32. return "发送消息成功";
  33. }
  34. }

2.7.5 消费者代码

  • RabbitmqListener.java
  1. package com.github.listener;
  2. import com.github.config.RabbitmqConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.nio.charset.StandardCharsets;
  8. import java.time.LocalDateTime;
  9. /**
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-25 14:20:14
  13. */
  14. @Slf4j
  15. @Component
  16. public class RabbitmqListener {
  17. @RabbitListener(queues = RabbitmqConfig.QUEUE)
  18. public void receive(Message message) {
  19. log.info("当前时间:{},收到死信队列信息:{}", LocalDateTime.now(), new String(message.getBody(), StandardCharsets.UTF_8));
  20. }
  21. }

2.7.6 结果

  • 发送请求:
  1. curl 'http://127.0.0.1:8080/send/消息1/20000' -X GET
  1. curl 'http://127.0.0.1:8080/send/消息2/2000' -X GET
  • IDEA 控制台结果显示:

15.png

2.8 总结

  • 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
  • 当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,具体问题具体分析。

第三章 发布确认高级

3.1 概述

  • 在生产环境中,由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者发送的消息会投递失败,导致消息丢失,需要手动处理和恢复。
  • 那么如何才能保证 RabbitMQ 的消息可靠投递?特别在比较极端的情况下,RabbitMQ 集群不可用的时候,无法投递的消息如何处理?

16.png

3.2 交换机确认回调

3.2.1 架构图

17.png

3.2.2 配置文件

  • application.yaml
  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. virtual-host: /
  8. publisher-confirm-type: correlated # 消息发送交换机触发回调方法

注意:publisher-confirm-type 有如下三个取值

  • NONE:发布确认模式将禁止。
  • correlated:消息发送交换机触发回调方法,不管交换机是否收到消息(推荐)。
  • simple:① 效果和 correlated 一样,② 发布消息称后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 MQ 节点返回结果,根据返回结果来判断下一步的逻辑,如果 waitForConfirmsOrDie 方法返回 false 会关闭 channel ,那么下面就无法继续发送消息到 broker 。

3.2.3 配置类

  • ConfirmConfig.java
  1. package com.github.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @author 许大仙
  7. * @version 1.0
  8. * @since 2022-05-27 10:49:11
  9. */
  10. @Configuration
  11. public class ConfirmConfig {
  12. /**
  13. * 交换机名称
  14. */
  15. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  16. /**
  17. * 队列的名称
  18. */
  19. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  20. /**
  21. * routing_key
  22. */
  23. public static final String CONFIRM_ROUTING_KEY = "confirm";
  24. /**
  25. * 配置交换机
  26. */
  27. @Bean
  28. public DirectExchange confirmExchange() {
  29. return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  30. }
  31. /**
  32. * 配置队列
  33. */
  34. @Bean
  35. public Queue confirmQueue() {
  36. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  37. }
  38. /**
  39. * 配置绑定关系
  40. */
  41. @Bean
  42. public Binding confirmBinding() {
  43. return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
  44. }
  45. }

3.2.4 生产者

  • Producer.java
  1. package com.github.web;
  2. import com.github.config.ConfirmConfig;
  3. import lombok.RequiredArgsConstructor;
  4. import lombok.Value;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.rabbit.connection.CorrelationData;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.nio.charset.StandardCharsets;
  13. /**
  14. * @author 许大仙
  15. * @version 1.0
  16. * @since 2022-05-27 13:37:39
  17. */
  18. @Slf4j
  19. @Value
  20. @RestController
  21. @RequiredArgsConstructor
  22. @RequestMapping(value = "/confirm")
  23. public class ConfirmController {
  24. RabbitTemplate rabbitTemplate;
  25. @GetMapping(value = "/sendMsg/{msg}")
  26. public String sendMsg(@PathVariable String msg) {
  27. log.info("发送的消息是:{}", msg);
  28. String id = "1";
  29. CorrelationData correlationData = new CorrelationData(id);
  30. // 发送消息
  31. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), correlationData);
  32. // 注意:消息 2 发送的交换机不存在,可以观察回调是否监听到
  33. id = "2";
  34. correlationData = new CorrelationData(id);
  35. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME + "2", ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), correlationData);
  36. return "发送消息";
  37. }
  38. }

3.2.5 消费者

  • RabbitmqListener.java
  1. package com.github.listener;
  2. import com.github.config.ConfirmConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.nio.charset.StandardCharsets;
  8. /**
  9. * @author 许大仙
  10. * @version 1.0
  11. * @since 2022-05-25 14:20:14
  12. */
  13. @Slf4j
  14. @Component
  15. public class RabbitmqListener {
  16. @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
  17. public void receive(Message message) {
  18. String msg = new String(message.getBody(), StandardCharsets.UTF_8);
  19. log.info("接收到的消息是:{}", msg);
  20. }
  21. }

3.2.6 交换机的确认回调

  • ConfirmCallback.java
  1. package com.github.callback;
  2. import lombok.Value;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. import java.util.Optional;
  9. /**
  10. * @author 许大仙
  11. * @version 1.0
  12. * @since 2022-05-27 13:51:05
  13. */
  14. @Slf4j
  15. @Value
  16. @Component
  17. public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
  18. RabbitTemplate rabbitTemplate;
  19. /**
  20. * 需要将当前类的对象注入到 RabbitTemplate 中,因为 RabbitTemplate.ConfirmCallback 是一个内部接口
  21. */
  22. @PostConstruct
  23. public void init() {
  24. rabbitTemplate.setConfirmCallback(this);
  25. }
  26. /**
  27. * 不管交换机是否收到消息都会触发这个回调方法
  28. *
  29. * @param correlationData 回调的相关数据
  30. * @param ack true 或 false
  31. * @param cause 失败的原因
  32. */
  33. @Override
  34. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  35. String id = Optional.ofNullable(correlationData).orElseGet(CorrelationData::new).getId();
  36. if (ack) {
  37. log.info("交换机已经接收到 id 为 {} 的消息", id);
  38. } else {
  39. log.info("交换机未接收到 id 为 {} 的消息,原因是:{}", id, cause);
  40. }
  41. }
  42. }

3.2.7 结果

  • 发送请求:
curl 'http://127.0.0.1:8080/confirm/sendMsg/哈哈' -X GET
  • 结果显示:

18.png

  • 事实证明,不管交换机是否能接收到消息,只要生产者发送了消息,就会触发交换机确认回调方法。

3.3 队列消息回退回调

3.3.1 概述

  • 在生产者开启了确认机制的情况下,交换机不管是否能接收到消息,都会触发确认回调函数,但是如果该消息不可路由(交换机没有找到对应的队列),那么消息就会被丢失,这种情况是不能容忍的。
  • 我们可以通过设置 mandatory= true 参数,表示当消息在传递过程中不可到达目的地的时候,就可以将消息返回给生产者。

3.3.2 架构图

19.png

3.3.3 配置文件

  • application.yaml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated # 消息发送到交换机会触发回调方法
    publisher-returns: true # 是否开启生产者消息回退功能,队列中的消息不可路由回退到生产者

3.3.4 配置类

  • ConfirmConfig.java
package com.github.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 10:49:11
 */
@Configuration
public class ConfirmConfig {

    /**
     * 交换机名称
     */
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    /**
     * 队列的名称
     */
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    /**
     * routing_key
     */
    public static final String CONFIRM_ROUTING_KEY = "confirm";

    /**
     * 配置交换机
     */
    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    /**
     * 配置队列
     */
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /**
     * 配置绑定关系
     */
    @Bean
    public Binding confirmBinding() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
    }

}

3.2.5 生产者

  • ConfirmController.java
package com.github.web;

import com.github.config.ConfirmConfig;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 13:37:39
 */
@Slf4j
@Value
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/confirm")
public class ConfirmController {

    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendMsg/{msg}")
    public String sendMsg(@PathVariable String msg) {
        log.info("发送的消息是:{}", msg);
        String id = "1";
        CorrelationData correlationData = new CorrelationData(id);
        // 发送消息
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), correlationData);

        // 注意:消息2 的路由不可达
        id = "2";
        correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY + id, msg.getBytes(StandardCharsets.UTF_8), correlationData);

        return "发送消息";
    }

}

3.2.6 消费者

  • RabbitmqListener.java
package com.github.listener;

import com.github.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-25 14:20:14
 */
@Slf4j
@Component
public class RabbitmqListener {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receive(Message message) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("接收到的消息是:{}", msg);
    }

}

3.2.7 队列消息的回退回调

  • ReturnCallback.java
package com.github.callback;

import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 13:51:05
 */
@Value
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnsCallback {

    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(this);
        /*
        * 如果 mandatory = true ,表示交换机在无法将消息进行路由的时候,会将该消息返回给生产者。
        * 如果 mandatory = false ,表示发现消息无法进行路由,则直接丢失。
        * */
        rabbitTemplate.setMandatory(true);
    }

    /**
     * 在消息传递过程中不可达目的地的时候,将消息返回给生产者。
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息 {} 被交换机 {} 给退回了,退回的原因是:{} ,路由键是:{}", new String(returned.getMessage().getBody(), StandardCharsets.UTF_8), returned.getExchange(), returned.getReplyText(), returned.getRoutingKey());
    }
}

3.2.8 结果

  • 发送请求:
curl 'http://127.0.0.1:8080/confirm/sendMsg/哈哈' -X GET
  • 结果显示:

20.png

3.4 备份交换机

3.4.1 概述

  • 有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递的时候发现并处理。但是有时候,我们并不清楚该如何处理这些无法路由的消息,难道只能打日志,手动处理这些无法投递的消息。
  • 通过日志来处理这些无法路由的消息是很不优雅的,特别是当生产者所在的服务器有多台机器的时候,手动复制日志会更加麻烦而且容易出错,而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回消息的逻辑。如果既想不丢失消息,又不想增加处理生产者消息的复杂性,该怎么做?
  • 在前提提到的死信队列的时候,可以为队列设置死信交换机来存储那些处理失败的消息,但是这些路由不可达消息是没有机会进入到队列的,因此也无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的解决这个问题。
  • 备份交换机可以理解为 RabbitMQ 中交换机的 "备胎" ,当我们为某一个交换机声明一个对应的备份交换机的时候,就是为其创建一个备胎,当交换机接收到一个路由不可达消息的时候,将会将这条消息发送给备份交换机,由备份交换机进行转发和处理,通常备份交换机的类型是 fanout ,这样就能将消息都投递到与其绑定的队列中,只要在备份交换机下绑定一个队列,这样原先交换机无法被路由的消息,就会进入到此队列中。当然,我们还可以建立一个报警队列,用独立消费者来进行检测和报警。

注意:生活中不推荐做备胎(舔狗)

22.png

3.4.2 修改配置类

  • ConfirmConfig.java
package com.github.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 10:49:11
 */
@Configuration
public class ConfirmConfig {

    /**
     * 交换机名称
     */
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    /**
     * 备份交换机名称
     */
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";

    /**
     * 队列的名称
     */
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    /**
     * 备份队列的名称
     */
    public static final String BACKUP_QUEUE_NAME = "backup.queue";

    /**
     * routing_key
     */
    public static final String CONFIRM_ROUTING_KEY = "confirm";

    /**
     * 配置交换机
     */
    @Bean
    public DirectExchange confirmExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return new DirectExchange(CONFIRM_EXCHANGE_NAME, true, false, arguments);
    }

    /**
     * 配置备份交换机
     */
    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    /**
     * 配置队列
     */
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /**
     * 配置备份队列
     */
    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    /**
     * 配置绑定关系
     */
    @Bean
    public Binding confirmBinding() {
        return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
    }

    /**
     * 配置绑定关系
     */
    @Bean
    public Binding backupBinding() {
        return BindingBuilder.bind(backupQueue()).to(backupExchange());
    }
}
  • 说明 alternate-exchange 参数,从哪里来?

23.gif

3.4.3 修改消费者

  • RabbitmqListener.java
package com.github.listener;

import com.github.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-25 14:20:14
 */
@Slf4j
@Component
public class RabbitmqListener {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receive(Message message) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("接收到的消息是:{}", msg);
    }

    @RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE_NAME)
    public void receiveBackup(Message message) {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("备份队列接收到的消息是:{}", msg);
    }

}

3.4.4 结果

  • 发送请求:
curl 'http://127.0.0.1:8080/confirm/sendMsg/哈哈' -X GET
  • 结果显示:

24.png

注意:备份交换机的优先级要比 mandatory 参数高。

第四章 其他

4.1 幂等性

4.1.1 概述

  • 幂等性:用户对于同一操作发起的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。
  • 举例:
    • 支付场景:用户购买商品后进行支付,支付扣款成功,但是返回结果的时候出现网络异常,用户的钱已经扣除完毕,但是用户却不知道已经付款成功,用户会再次点击付款按钮,此时会进行第二次扣款,返回结果成功,用户查询余额的时候却发现多扣了钱,并且流水记录也变为两条。
    • 解决方案:在以前的单体系统中,我们会将数据操作的流程放入到事务中,一旦出现异常就立即回滚,但是在响应客户端的时候依然会出现网络中断或者异常等等。

4.1.2 消息重复消费

  • 消费者在消费 MQ 中的消息的时候,MQ 会将消息发送给消费者,消费者在给 MQ 返回 ack 的时候网络中断,导致 MQ 没有收到确认消息,那么 MQ 会将该条消息重新发送给其他消费者,或者当网络重连后再次发送给该消费者,但是实际上该消费者已经成功消费了该条消息,造成消费者消费了重复的消息。

4.1.3 解决思路

  • MQ 的消费者的幂等性的解决一般使用全局 ID (雪花算法等,当然消费者也可以利用 MQ 中的 消息 id),在每次消费前用该 id 判断该消息是否消费过。

4.1.4 消费端的幂等性保障

  • 在海量订单生成的业务高峰期,生产者可能会重复发送多条消息,这时候消费者就需要实现幂等性,这意味着我们的消息永远不能被消费多次,即使我们收到一样的消息。
  • 业务主流的幂等性解决方案:
    • ① 唯一 ID + 指纹码机制,利用数据库主键去重。
    • ② Redis 的 RedLock 分布式锁(推荐)。

4.2 优先级队列

4.2.1 概述

  • 在电商系统中,经常会出现 订单催付 的场景,比如:客户在淘宝进行下单,淘宝会及时将订单发送给客户,如果在用户设定的时候没有付款就给用户推送一条短信,但是对于淘宝来说,像苹果、小米等公司是大客户,所以需要优先处理,这个时候就需要使用到 RabbitMQ 的优先级队列了,给大客户的订单设置一个相对比较高的优先级,其他客户的订单就是默认优先级。

25.png

4.2.2 如何添加

  • 控制台如何添加:

26.gif

注意:x-max-priority 的范围是 0 ~ 255 ,不过一般设置为 10 即可,那么消息的范围就必须 <= 10 。

  • 队列中添加优先级:
/**
* 配置队列
*/
@Bean
public Queue confirmQueue() {
    return QueueBuilder.durable(CONFIRM_QUEUE_NAME)
        // 设置优先级
        .maxPriority(10)
        .build();
}
  • 消息中设置优先级:
package com.github.web;

import com.github.config.ConfirmConfig;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

/**
 * @author 许大仙
 * @version 1.0
 * @since 2022-05-27 13:37:39
 */
@Slf4j
@Value
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/confirm")
public class ConfirmController {

    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/sendMsg/{msg}")
    public String sendMsg(@PathVariable String msg) {
        log.info("发送的消息是:{}", msg);

        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                MessagePostProcessor messagePostProcessor = message -> {
                    MessageProperties messageProperties = message.getMessageProperties();
                    // 设置消息的优先级
                    messageProperties.setPriority(5);
                    return message;
                };
                // 发送消息
                rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8), messagePostProcessor);
            }
        }
        // 发送消息
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg.getBytes(StandardCharsets.UTF_8));

        return "发送消息";
    }
}

注意:

  • ① 队列需要设置为优先级队列。
  • ② 消息需要设置消息的优先级。
  • ③ 生产者必须发送批量消息,消费者才有机会对消息进行排序。

4.3 惰性队列

4.3.1 概述

  • RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
  • 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

4.3.2 两种模式

  • 队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
  • 在队列声明的时候可以通过 x-queue-mode 参数来设置队列的模式,取值为 defaultlazy 。下面示例中演示了一个惰性队列的声明细节:
/**
* 配置队列
*/
@Bean
public Queue confirmQueue() {
    return QueueBuilder.durable(CONFIRM_QUEUE_NAME)
        // 设置惰性队列
        .lazy()
        .build();
}

4.3.3 内存开销对比

27.png

  • 在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB 。