死信的概念
在消息队列中,死信,顾名思义就是无法被消费的消息。
一般来说,producer
将消息投递到broker
或者queue
里面了,consumer
从queue
中取出消息进行消费,但某些时候因为特定的一些原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,那么就变成了死信,进入死信队列中。
应用场景:
- 为了保证订单业务的消息数据不丢失,需要使用到
RabbitMQ
的死信队列机制,当消息消费发生异常的时候,将消息投入到死信队列中。- 用户在商城中下单后,在指定时间内未支付后订单自动失效。
死信的来源
- 消息TTL过期
- 队列达到了最大长度(队列满了,无法再添加数据到MQ中)
-
死信实战
消息TTL过期
队列达到最大长度
生产者代码中去除消息的
TTL
属性public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i <= 10; i++) {
String msg = "Hello RabbitMQ" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal", null, msg.getBytes());
System.out.println("生产者发送消息:" + msg);
Thread.sleep(2000);
}
}
}
消费者中声明队列时添加如下参数:
map.put("x-max-length", 6);
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.test10
* @datetime 2022/9/17 星期六
*/
public class Consumer1 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead_queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead");
// 给正常队列绑定死信队列的信息
HashMap<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "dead");
map.put("x-max-length", 6);
String normalQueue = "normal_queue";
channel.queueDeclare(normalQueue, false, false, false, map);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal");
channel.basicConsume(normalQueue, true, (consumerTag, message) -> {
System.out.println("正常队列接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {
});
}
}
消息被拒