死信队列
producer将消息投递到broker或者直接到queue里,consumer从queue取出消息进行消费,但由于某些时候
由于特定的原因导致queue中的某些消息无法被消费,这样的消息没有后续处理就变成了死信,死信交换机是
直接交换机。为了保证业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发送异常时,
将消息投递到死信队列中。
死信队列消息来源:
1.消息TTL过期
2.队列达到最大长度
3.消息被否定应答nack
Provider
//普通交换机
private static final String NORMAL_EXCHANGE = "normal-exchange";
public static void main(String[] args) {
//获取连接
Connection connection = RabbitMqUtils.getConnection();
Channel channel = null;
try {
//获取信道
channel = connection.createChannel();
//发送死信消息 设置TTL时间(毫秒)
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
channel.basicPublish(NORMAL_EXCHANGE, "key1", properties,
("msg" + i).getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
RabbitMqUtils.close(connection, channel);
}
}
ConsumerA
//普通交换机
private static final String NORMAL_EXCHANGE = "normal-exchange";
//死信交换机
private static final String DEAD_EXCHANGE = "dead-exchange";
//普通队列
private static final String NORMAL_QUEUE = "normal-queue";
//死信队列
private static final String DEAD_QUEUE = "dead-queue";
public static void main(String[] args) {
//获取信道
Channel channel = RabbitMqUtils.getChannel();
try {
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
Map<String, Object> arguments = new HashMap<>();
//1.设置队列长度[超出最大长度的消息投递到死信队列]
//arguments.put("x-max-length", 6);
//2.指定过期之后死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//3.设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "key2");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "key1");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "key2");
//消费消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
//拒绝msg5消息nack
if ("msg5".equals(new String(message.getBody()))) {
System.err.println(new String(message.getBody()) + "被拒绝了");
//获取消息的标签进行拒绝,不重新放回普通队列中
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println(new String(message.getBody()));
//手动确认应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
CancelCallback cancelCallback = message -> {
};
System.out.println("ConsumerA准备就绪...");
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
}
ConsumerB
//死信队列
private static final String DEAD_QUEUE = "dead-queue";
public static void main(String[] args) {
//获取信道
Channel channel = RabbitMqUtils.getChannel();
try {
//消费消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
CancelCallback cancelCallback = message -> {
};
System.out.println("ConsumerA准备就绪...");
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
} catch (IOException e) {
e.printStackTrace();
}
}
Spring Boot for RabbitMQ
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 应用名称
spring.application.name=spring-boot-rabbitmq
# rabbitmq配置
spring.rabbitmq.host=47.172.193.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
TTL死信队列
队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的元素是希望在指定时间到
了以后或之前取出和进行处理,延迟队列就是用来存放需要在指定时间被处理的元素队列。
#应用场景
未支付订单在10分钟后自动取消
TTLQueueConfig
@Configuration
public class TTLQueueConfig {
//普通交换机名称
private static final String X_EXCHANGE = "X";
//死信交换机名称
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列名称
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";
private static final String QUEUE_C = "QC";
//死信队列名称
private static final String DEAD_LETTER_QUEUE_D = "QD";
//声明普通交换机
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
//声明死信交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明普通队列
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//设置过期时间TTL ms
arguments.put("x-message-ttl", 10000);
return QueueBuilder
.durable(QUEUE_A)
.withArguments(arguments)
.build();
}
//声明普通队列
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
//设置过期时间TTL ms
arguments.put("x-message-ttl", 40000);
return QueueBuilder
.durable(QUEUE_B)
.withArguments(arguments)
.build();
}
//非延迟队列
@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>(2);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder
.durable(QUEUE_C)
.withArguments(arguments)
.build();
}
//声明死信队列
@Bean("queueD")
public Queue queueD() {
return QueueBuilder
.durable(DEAD_LETTER_QUEUE_D)
.build();
}
//绑定
@Bean
public Binding queueABindX() {
return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");
}
@Bean
public Binding queueBBindX() {
return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");
}
@Bean
public Binding queueCBindX() {
return BindingBuilder.bind(queueC()).to(xExchange()).with("XC");
}
@Bean
public Binding queueDBindY() {
return BindingBuilder.bind(queueD()).to(yExchange()).with("YD");
}
}
Provider
@GetMapping("/send")
public void send() {
log.info("当前时间:{},发送一条消息", new Date().toString());
rabbitTemplate.convertAndSend("X", "XA", "msg-10");
log.info("当前时间:{},发送一条消息", new Date().toString());
rabbitTemplate.convertAndSend("X", "XB", "msg-40");
}
//在消息生产端设置消息过期时间可以自由控制消息过期时间
@GetMapping("/sendForTime/{time}")
public void sendForTime(@PathVariable("time") String time) {
log.info("当前时间:{},发送一条消息", new Date().toString());
rabbitTemplate.convertAndSend("X", "XC",
"msg-" + Integer.parseInt(time) / 1000, msg -> {
msg.getMessageProperties().setExpiration(time);
return msg;
});
}
Consumer
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message) {
log.info("当前时间{},收到死信队列消息为{}", new Date().toString(),
new String(message.getBody()));
}
}
基于延迟交换机
自定义延迟队列问题
在生产端设置消息过期时间很灵活,但如果第一个消息耗时TTL是20秒,第二个消息TTL是2秒,
那么第二个消息会在第一个消息进入死信队列后才能进入,会造成消息阻塞问题。
延迟交换机安装
su
mv rabbitmq_delayed_message_exchange-3.8.0.ez \
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#安装完毕后交换机类型多了一个x-delayed-message
DelayedQueueConfig
@Configuration
public class DelayedQueueConfig {
//延迟交换机名称
private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//队列
private static final String DELAYED_QUEUE_NAME = "delayed.queue";
//routingKey
private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
//声明延迟交换机 - 自定义交换机
@Bean("customExchange")
public CustomExchange customExchange() {
/*
* 1.交换机名称
* 2.交换机类型
* 3.是否需要持久化
* 4.是否需要自动删除
* 5.参数
*/
Map<String, Object> arguments = new HashMap<>(1);
arguments.put("x-delayed-type", "direct");//延迟类型
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
false, false, arguments);
}
//声明队列
@Bean("queue")
public Queue queue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//绑定
@Bean
public Binding delayedQueueBindingDelayedExchange() {
return BindingBuilder.bind(queue()).to(customExchange()).with(DELAYED_ROUTING_KEY).noargs();
}
}
Provider
//基于延迟消息交换机的TTL队列
@GetMapping("/plugins/{time}")
public void plugins(@PathVariable("time") String time) {
log.info("当前时间{},发送一条消息", new Date().toString());
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey",
"msg-" + Integer.parseInt(time) / 1000, msg -> {
msg.getMessageProperties().setDelay(Integer.parseInt(time));
return msg;
});
}
Consumer
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message) {
log.info("当前时间{},收到死信队列消息为{}", new Date().toString(),
new String(message.getBody()));
}
}