死信队列

  1. producer将消息投递到broker或者直接到queue里,consumerqueue取出消息进行消费,但由于某些时候
  2. 由于特定的原因导致queue中的某些消息无法被消费,这样的消息没有后续处理就变成了死信,死信交换机是
  3. 直接交换机。为了保证业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发送异常时,
  4. 将消息投递到死信队列中。
  5. 死信队列消息来源:
  6. 1.消息TTL过期
  7. 2.队列达到最大长度
  8. 3.消息被否定应答nack

Provider

  1. //普通交换机
  2. private static final String NORMAL_EXCHANGE = "normal-exchange";
  3. public static void main(String[] args) {
  4. //获取连接
  5. Connection connection = RabbitMqUtils.getConnection();
  6. Channel channel = null;
  7. try {
  8. //获取信道
  9. channel = connection.createChannel();
  10. //发送死信消息 设置TTL时间(毫秒)
  11. AMQP.BasicProperties properties =
  12. new AMQP.BasicProperties()
  13. .builder().expiration("10000").build();
  14. for (int i = 0; i < 10; i++) {
  15. channel.basicPublish(NORMAL_EXCHANGE, "key1", properties,
  16. ("msg" + i).getBytes());
  17. }
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. } finally {
  21. RabbitMqUtils.close(connection, channel);
  22. }
  23. }

ConsumerA

  1. //普通交换机
  2. private static final String NORMAL_EXCHANGE = "normal-exchange";
  3. //死信交换机
  4. private static final String DEAD_EXCHANGE = "dead-exchange";
  5. //普通队列
  6. private static final String NORMAL_QUEUE = "normal-queue";
  7. //死信队列
  8. private static final String DEAD_QUEUE = "dead-queue";
  9. public static void main(String[] args) {
  10. //获取信道
  11. Channel channel = RabbitMqUtils.getChannel();
  12. try {
  13. //声明普通交换机和死信交换机
  14. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
  15. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
  16. //声明普通队列
  17. Map<String, Object> arguments = new HashMap<>();
  18. //1.设置队列长度[超出最大长度的消息投递到死信队列]
  19. //arguments.put("x-max-length", 6);
  20. //2.指定过期之后死信交换机
  21. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
  22. //3.设置死信RoutingKey
  23. arguments.put("x-dead-letter-routing-key", "key2");
  24. channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
  25. //声明死信队列
  26. channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
  27. //绑定交换机和队列
  28. channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "key1");
  29. channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "key2");
  30. //消费消息
  31. DeliverCallback deliverCallback = (consumerTag, message) -> {
  32. //拒绝msg5消息nack
  33. if ("msg5".equals(new String(message.getBody()))) {
  34. System.err.println(new String(message.getBody()) + "被拒绝了");
  35. //获取消息的标签进行拒绝,不重新放回普通队列中
  36. channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
  37. } else {
  38. System.out.println(new String(message.getBody()));
  39. //手动确认应答
  40. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  41. }
  42. };
  43. CancelCallback cancelCallback = message -> {
  44. };
  45. System.out.println("ConsumerA准备就绪...");
  46. channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. }
  50. }

ConsumerB

  1. //死信队列
  2. private static final String DEAD_QUEUE = "dead-queue";
  3. public static void main(String[] args) {
  4. //获取信道
  5. Channel channel = RabbitMqUtils.getChannel();
  6. try {
  7. //消费消息
  8. DeliverCallback deliverCallback = (consumerTag, message) -> {
  9. System.out.println(new String(message.getBody()));
  10. };
  11. CancelCallback cancelCallback = message -> {
  12. };
  13. System.out.println("ConsumerA准备就绪...");
  14. channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }

Spring Boot for RabbitMQ

  1. <dependency>
  2. <groupId>org.projectlombok</groupId>
  3. <artifactId>lombok</artifactId>
  4. <scope>provided</scope>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-web</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-amqp</artifactId>
  17. </dependency>
  1. # 应用名称
  2. spring.application.name=spring-boot-rabbitmq
  3. # rabbitmq配置
  4. spring.rabbitmq.host=47.172.193.131
  5. spring.rabbitmq.port=5672
  6. spring.rabbitmq.username=admin
  7. spring.rabbitmq.password=123

TTL死信队列

  1. 队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的元素是希望在指定时间到
  2. 了以后或之前取出和进行处理,延迟队列就是用来存放需要在指定时间被处理的元素队列。
  3. #应用场景
  4. 未支付订单在10分钟后自动取消

TTLQueueConfig

  1. @Configuration
  2. public class TTLQueueConfig {
  3. //普通交换机名称
  4. private static final String X_EXCHANGE = "X";
  5. //死信交换机名称
  6. private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
  7. //普通队列名称
  8. private static final String QUEUE_A = "QA";
  9. private static final String QUEUE_B = "QB";
  10. private static final String QUEUE_C = "QC";
  11. //死信队列名称
  12. private static final String DEAD_LETTER_QUEUE_D = "QD";
  13. //声明普通交换机
  14. @Bean("xExchange")
  15. public DirectExchange xExchange() {
  16. return new DirectExchange(X_EXCHANGE);
  17. }
  18. //声明死信交换机
  19. @Bean("yExchange")
  20. public DirectExchange yExchange() {
  21. return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
  22. }
  23. //声明普通队列
  24. @Bean("queueA")
  25. public Queue queueA() {
  26. Map<String, Object> arguments = new HashMap<>(3);
  27. //设置死信交换机
  28. arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  29. //设置死信RoutingKey
  30. arguments.put("x-dead-letter-routing-key", "YD");
  31. //设置过期时间TTL ms
  32. arguments.put("x-message-ttl", 10000);
  33. return QueueBuilder
  34. .durable(QUEUE_A)
  35. .withArguments(arguments)
  36. .build();
  37. }
  38. //声明普通队列
  39. @Bean("queueB")
  40. public Queue queueB() {
  41. Map<String, Object> arguments = new HashMap<>(3);
  42. //设置死信交换机
  43. arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  44. //设置死信RoutingKey
  45. arguments.put("x-dead-letter-routing-key", "YD");
  46. //设置过期时间TTL ms
  47. arguments.put("x-message-ttl", 40000);
  48. return QueueBuilder
  49. .durable(QUEUE_B)
  50. .withArguments(arguments)
  51. .build();
  52. }
  53. //非延迟队列
  54. @Bean("queueC")
  55. public Queue queueC() {
  56. Map<String, Object> arguments = new HashMap<>(2);
  57. //设置死信交换机
  58. arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  59. //设置死信RoutingKey
  60. arguments.put("x-dead-letter-routing-key", "YD");
  61. return QueueBuilder
  62. .durable(QUEUE_C)
  63. .withArguments(arguments)
  64. .build();
  65. }
  66. //声明死信队列
  67. @Bean("queueD")
  68. public Queue queueD() {
  69. return QueueBuilder
  70. .durable(DEAD_LETTER_QUEUE_D)
  71. .build();
  72. }
  73. //绑定
  74. @Bean
  75. public Binding queueABindX() {
  76. return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");
  77. }
  78. @Bean
  79. public Binding queueBBindX() {
  80. return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");
  81. }
  82. @Bean
  83. public Binding queueCBindX() {
  84. return BindingBuilder.bind(queueC()).to(xExchange()).with("XC");
  85. }
  86. @Bean
  87. public Binding queueDBindY() {
  88. return BindingBuilder.bind(queueD()).to(yExchange()).with("YD");
  89. }
  90. }

Provider

  1. @GetMapping("/send")
  2. public void send() {
  3. log.info("当前时间:{},发送一条消息", new Date().toString());
  4. rabbitTemplate.convertAndSend("X", "XA", "msg-10");
  5. log.info("当前时间:{},发送一条消息", new Date().toString());
  6. rabbitTemplate.convertAndSend("X", "XB", "msg-40");
  7. }
  8. //在消息生产端设置消息过期时间可以自由控制消息过期时间
  9. @GetMapping("/sendForTime/{time}")
  10. public void sendForTime(@PathVariable("time") String time) {
  11. log.info("当前时间:{},发送一条消息", new Date().toString());
  12. rabbitTemplate.convertAndSend("X", "XC",
  13. "msg-" + Integer.parseInt(time) / 1000, msg -> {
  14. msg.getMessageProperties().setExpiration(time);
  15. return msg;
  16. });
  17. }

Consumer

  1. @Slf4j
  2. @Component
  3. public class DeadLetterQueueConsumer {
  4. @RabbitListener(queues = "QD")
  5. public void receiveD(Message message) {
  6. log.info("当前时间{},收到死信队列消息为{}", new Date().toString(),
  7. new String(message.getBody()));
  8. }
  9. }

基于延迟交换机

  1. 自定义延迟队列问题
  2. 在生产端设置消息过期时间很灵活,但如果第一个消息耗时TTL20秒,第二个消息TTL2秒,
  3. 那么第二个消息会在第一个消息进入死信队列后才能进入,会造成消息阻塞问题。

延迟交换机安装

  1. su
  2. mv rabbitmq_delayed_message_exchange-3.8.0.ez \
  3. /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
  4. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  5. #安装完毕后交换机类型多了一个x-delayed-message

DelayedQueueConfig

  1. @Configuration
  2. public class DelayedQueueConfig {
  3. //延迟交换机名称
  4. private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
  5. //队列
  6. private static final String DELAYED_QUEUE_NAME = "delayed.queue";
  7. //routingKey
  8. private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
  9. //声明延迟交换机 - 自定义交换机
  10. @Bean("customExchange")
  11. public CustomExchange customExchange() {
  12. /*
  13. * 1.交换机名称
  14. * 2.交换机类型
  15. * 3.是否需要持久化
  16. * 4.是否需要自动删除
  17. * 5.参数
  18. */
  19. Map<String, Object> arguments = new HashMap<>(1);
  20. arguments.put("x-delayed-type", "direct");//延迟类型
  21. return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
  22. false, false, arguments);
  23. }
  24. //声明队列
  25. @Bean("queue")
  26. public Queue queue() {
  27. return new Queue(DELAYED_QUEUE_NAME);
  28. }
  29. //绑定
  30. @Bean
  31. public Binding delayedQueueBindingDelayedExchange() {
  32. return BindingBuilder.bind(queue()).to(customExchange()).with(DELAYED_ROUTING_KEY).noargs();
  33. }
  34. }

Provider

  1. //基于延迟消息交换机的TTL队列
  2. @GetMapping("/plugins/{time}")
  3. public void plugins(@PathVariable("time") String time) {
  4. log.info("当前时间{},发送一条消息", new Date().toString());
  5. rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey",
  6. "msg-" + Integer.parseInt(time) / 1000, msg -> {
  7. msg.getMessageProperties().setDelay(Integer.parseInt(time));
  8. return msg;
  9. });
  10. }

Consumer

  1. @Slf4j
  2. @Component
  3. public class DeadLetterQueueConsumer {
  4. @RabbitListener(queues = "QD")
  5. public void receiveD(Message message) {
  6. log.info("当前时间{},收到死信队列消息为{}", new Date().toString(),
  7. new String(message.getBody()));
  8. }
  9. }