1.什么是死信队列与延时队列?
- 死信队列(DLX,dead-letter-exchange),利用DLX,当消息在一个队列中变成死信
(dead message)
之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。 - 延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
2.延时队列的实现
关于RabbitMQ的安装这里不细说了,百度一下或者看我之前的RabbitMQ的安装博客即可,提醒一个点就是ErLang版本要与RabbitMQ的版本对应,别的应该没有问题。
我们的延时队列选择使用RabbitMQ官方提供的插件:rabbitmq_delayed_message_exchange
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9
然后我们在Linux中找一下RabbitMQ的插件位置
我的RabbitMQ是用rpm安装的,插件位置对应在/usr/lib/rabbitmq中
将下载好的插件上传到该文件夹,然后使用命令rabbitmq-plugins list查看插件目录:
然后我们加载该插件
好了,接下来我们上代码,我们用一个消费者一个生产者即可
先填写配置文件:
spring.rabbitmq.host=192.168.60.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
再配置交换机与队列:
package com.zym.config;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedRabbitMQConfig {
public static final String DELAYED_QUEUE_NAME = "delayQueue.queue.demo";
public static final String DELAYED_EXCHANGE_NAME = "delayQueue.exchange.demo";
public static final String DELAYED_ROUTING_KEY = "delayQueue.routingkey.demo";
@Bean
public Queue immediateQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
}
生产者:
package com.zym.controller;
import com.zym.config.DelayedRabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
@RequestMapping("/demo")
@Slf4j
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("testRabbitMQ")
public void testRabbitMQ(@RequestParam String msg, @RequestParam Integer delayTime) {
log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);
rabbitTemplate.convertAndSend(DelayedRabbitMQConfig.DELAYED_EXCHANGE_NAME, DelayedRabbitMQConfig.DELAYED_ROUTING_KEY, msg, a -> {
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
}
消费者:
package com.zym.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
// //死信队列名称(与普通队列的差别)
// @RabbitListener(queues = "delayQueue.deadLetter.demo")
//停止使用死信队列来完成延时队列功能,RabbitMQ安装【延时插件】后,使用【延时队列名称】
@RabbitListener(queues = "delayQueue.queue.demo")
public void testConsumer(Message message, Channel channel) throws IOException {
//获取消息
String msg = new String(message.getBody());
//手动确认已经接受到了消息,RabbitMQ不再Re_Queue(处理消费端报错后,Re_Queue造成的死循环)
//channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("当前时间:{},延时队列收到消息:{}", new Date().toString(), msg);
}
}
然后使用postman发请求:
然后查看控制台:
这样一个简单的延时队列就实现了
3.死信队列
死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
属性被设置为false
。 - 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
死信队列应该都了解的差不多了,就是为了避免消息丢失重发的一种补偿机制
如何配置死信队列呢?其实很简单,大概可以分为以下步骤:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
那么我们再来看代码如何实现
配置文件:
spring.rabbitmq.host=192.168.60.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.listener.type=simple
#manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#这里一定要配置为false,不然无法消费的数据不会进入死信队列的
spring.rabbitmq.listener.simple.default-requeue-rejected=false
DLXRabbitMQConfig:
package com.zym.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DLXRabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明业务队列B
@Bean("businessQueueB")
public Queue businessQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明死信队列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明业务队列B绑定关系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
业务消费者:
package com.zym.consumer;
import com.rabbitmq.client.Channel;
import com.zym.config.DLXRabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class BusinessMessageReceiver {
@RabbitListener(queues = DLXRabbitMQConfig.BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息A:{}", msg);
boolean ack = true;
Exception exception = null;
try {
if (msg.contains("deadletter")){
throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
}
if (!ack){
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = DLXRabbitMQConfig.BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
log.info("收到业务消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
死信消费者:
package com.zym.consumer;
import com.rabbitmq.client.Channel;
import com.zym.config.DLXRabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DeadLetterMessageReceiver {
@RabbitListener(queues = DLXRabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DLXRabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生产者:
package com.zym.controller;
import com.zym.config.DLXRabbitMQConfig;
import com.zym.config.DelayedRabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping("/demo")
@Slf4j
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendmsg")
public void sendMsg(String msg){
rabbitTemplate.convertSendAndReceive(DLXRabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg);
}
}
我们在浏览器里分别输入:http://localhost:8080/demo/sendmsg?msg=msg与http://localhost:8080/demo/sendmsg?msg=deadletter 然后到后台查看日志:
这里也参考了很多网上的资料,在这里贴一下出处:
死信队列:https://www.cnblogs.com/mfrank/p/11184929.html
延时队列:https://blog.csdn.net/qq_40625058/article/details/105584732
我也把我自己写的上次到了Gitee,地址:https://gitee.com/zym213/rabbitmq-dlx-demo