死信的概念

在消息队列中,死信,顾名思义就是无法被消费的消息。
一般来说,producer将消息投递到broker或者queue里面了,consumerqueue中取出消息进行消费,但某些时候因为特定的一些原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,那么就变成了死信,进入死信队列中。

应用场景:

  • 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入到死信队列中。
  • 用户在商城中下单后,在指定时间内未支付后订单自动失效。

死信的来源

  • 消息TTL过期
  • 队列达到了最大长度(队列满了,无法再添加数据到MQ中)
  • 消息被拒绝处理,并且requeue为false

    死信实战

    消息TTL过期

    image.png
    image.png
    image.png

    队列达到最大长度

    生产者代码中去除消息的TTL属性

    1. public class Producer {
    2. public static final String NORMAL_EXCHANGE = "normal_exchange";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = MQUtil.getChannel();
    5. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    6. // AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    7. for (int i = 1; i <= 10; i++) {
    8. String msg = "Hello RabbitMQ" + i;
    9. channel.basicPublish(NORMAL_EXCHANGE, "normal", null, msg.getBytes());
    10. System.out.println("生产者发送消息:" + msg);
    11. Thread.sleep(2000);
    12. }
    13. }
    14. }

    消费者中声明队列时添加如下参数:
    map.put("x-max-length", 6);

    1. /**
    2. * Created By Intellij IDEA
    3. *
    4. * @author ssssheep
    5. * @package com.ctgu.sheep.test10
    6. * @datetime 2022/9/17 星期六
    7. */
    8. public class Consumer1 {
    9. public static final String NORMAL_EXCHANGE = "normal_exchange";
    10. public static final String DEAD_EXCHANGE = "dead_exchange";
    11. public static void main(String[] args) throws Exception {
    12. Channel channel = MQUtil.getChannel();
    13. channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    14. channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    15. String deadQueue = "dead_queue";
    16. channel.queueDeclare(deadQueue, false, false, false, null);
    17. channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead");
    18. // 给正常队列绑定死信队列的信息
    19. HashMap<String, Object> map = new HashMap<>();
    20. map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    21. map.put("x-dead-letter-routing-key", "dead");
    22. map.put("x-max-length", 6);
    23. String normalQueue = "normal_queue";
    24. channel.queueDeclare(normalQueue, false, false, false, map);
    25. channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal");
    26. channel.basicConsume(normalQueue, true, (consumerTag, message) -> {
    27. System.out.println("正常队列接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
    28. }, consumerTag -> {
    29. });
    30. }
    31. }

    消息被拒