1.什么是死信队列与延时队列?

  • 死信队列(DLX,dead-letter-exchange),利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。
  • 延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
    简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

2.延时队列的实现

关于RabbitMQ的安装这里不细说了,百度一下或者看我之前的RabbitMQ的安装博客即可,提醒一个点就是ErLang版本要与RabbitMQ的版本对应,别的应该没有问题。

我们的延时队列选择使用RabbitMQ官方提供的插件:rabbitmq_delayed_message_exchange

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

然后我们在Linux中找一下RabbitMQ的插件位置
1.png
我的RabbitMQ是用rpm安装的,插件位置对应在/usr/lib/rabbitmq中
2.png
将下载好的插件上传到该文件夹,然后使用命令rabbitmq-plugins list查看插件目录:
3.png
然后我们加载该插件
4.png
好了,接下来我们上代码,我们用一个消费者一个生产者即可

先填写配置文件:

  1. spring.rabbitmq.host=192.168.60.10
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=123456

再配置交换机与队列:

  1. package com.zym.config;
  2. import org.springframework.amqp.core.CustomExchange;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. @Configuration
  9. public class DelayedRabbitMQConfig {
  10. public static final String DELAYED_QUEUE_NAME = "delayQueue.queue.demo";
  11. public static final String DELAYED_EXCHANGE_NAME = "delayQueue.exchange.demo";
  12. public static final String DELAYED_ROUTING_KEY = "delayQueue.routingkey.demo";
  13. @Bean
  14. public Queue immediateQueue() {
  15. return new Queue(DELAYED_QUEUE_NAME);
  16. }
  17. @Bean
  18. public CustomExchange customExchange() {
  19. Map<String, Object> args = new HashMap<>();
  20. args.put("x-delayed-type", "direct");
  21. return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
  22. }
  23. }

生产者:

  1. package com.zym.controller;
  2. import com.zym.config.DelayedRabbitMQConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.PostMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RequestParam;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.util.Date;
  11. @RestController
  12. @RequestMapping("/demo")
  13. @Slf4j
  14. public class TestController {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. @PostMapping("testRabbitMQ")
  18. public void testRabbitMQ(@RequestParam String msg, @RequestParam Integer delayTime) {
  19. log.info("当前时间:{},收到请求,msg:{},delayTime:{}", new Date(), msg, delayTime);
  20. rabbitTemplate.convertAndSend(DelayedRabbitMQConfig.DELAYED_EXCHANGE_NAME, DelayedRabbitMQConfig.DELAYED_ROUTING_KEY, msg, a -> {
  21. a.getMessageProperties().setDelay(delayTime);
  22. return a;
  23. });
  24. }
  25. }

消费者:

  1. package com.zym.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. import java.util.Date;
  9. @Slf4j
  10. @Component
  11. public class DeadLetterQueueConsumer {
  12. // //死信队列名称(与普通队列的差别)
  13. // @RabbitListener(queues = "delayQueue.deadLetter.demo")
  14. //停止使用死信队列来完成延时队列功能,RabbitMQ安装【延时插件】后,使用【延时队列名称】
  15. @RabbitListener(queues = "delayQueue.queue.demo")
  16. public void testConsumer(Message message, Channel channel) throws IOException {
  17. //获取消息
  18. String msg = new String(message.getBody());
  19. //手动确认已经接受到了消息,RabbitMQ不再Re_Queue(处理消费端报错后,Re_Queue造成的死循环)
  20. //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  21. log.info("当前时间:{},延时队列收到消息:{}", new Date().toString(), msg);
  22. }
  23. }

然后使用postman发请求:
5.png
然后查看控制台:
6.png
这样一个简单的延时队列就实现了

3.死信队列

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

死信队列应该都了解的差不多了,就是为了避免消息丢失重发的一种补偿机制

如何配置死信队列呢?其实很简单,大概可以分为以下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

那么我们再来看代码如何实现

配置文件:

  1. spring.rabbitmq.host=192.168.60.10
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=123456
  5. spring.rabbitmq.listener.type=simple
  6. #manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息
  7. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  8. #这里一定要配置为false,不然无法消费的数据不会进入死信队列的
  9. spring.rabbitmq.listener.simple.default-requeue-rejected=false

DLXRabbitMQConfig:

  1. package com.zym.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. @Configuration
  9. public class DLXRabbitMQConfig {
  10. public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
  11. public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
  12. public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
  13. public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
  14. public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
  15. public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
  16. public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
  17. public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
  18. // 声明业务Exchange
  19. @Bean("businessExchange")
  20. public FanoutExchange businessExchange(){
  21. return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
  22. }
  23. // 声明死信Exchange
  24. @Bean("deadLetterExchange")
  25. public DirectExchange deadLetterExchange(){
  26. return new DirectExchange(DEAD_LETTER_EXCHANGE);
  27. }
  28. // 声明业务队列A
  29. @Bean("businessQueueA")
  30. public Queue businessQueueA(){
  31. Map<String, Object> args = new HashMap<>(2);
  32. // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
  33. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
  34. // x-dead-letter-routing-key 这里声明当前队列的死信路由key
  35. args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
  36. return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
  37. }
  38. // 声明业务队列B
  39. @Bean("businessQueueB")
  40. public Queue businessQueueB(){
  41. Map<String, Object> args = new HashMap<>(2);
  42. // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
  43. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
  44. // x-dead-letter-routing-key 这里声明当前队列的死信路由key
  45. args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
  46. return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
  47. }
  48. // 声明死信队列A
  49. @Bean("deadLetterQueueA")
  50. public Queue deadLetterQueueA(){
  51. return new Queue(DEAD_LETTER_QUEUEA_NAME);
  52. }
  53. // 声明死信队列B
  54. @Bean("deadLetterQueueB")
  55. public Queue deadLetterQueueB(){
  56. return new Queue(DEAD_LETTER_QUEUEB_NAME);
  57. }
  58. // 声明业务队列A绑定关系
  59. @Bean
  60. public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
  61. @Qualifier("businessExchange") FanoutExchange exchange){
  62. return BindingBuilder.bind(queue).to(exchange);
  63. }
  64. // 声明业务队列B绑定关系
  65. @Bean
  66. public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
  67. @Qualifier("businessExchange") FanoutExchange exchange){
  68. return BindingBuilder.bind(queue).to(exchange);
  69. }
  70. // 声明死信队列A绑定关系
  71. @Bean
  72. public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
  73. @Qualifier("deadLetterExchange") DirectExchange exchange){
  74. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
  75. }
  76. // 声明死信队列B绑定关系
  77. @Bean
  78. public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
  79. @Qualifier("deadLetterExchange") DirectExchange exchange){
  80. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
  81. }
  82. }

业务消费者:

  1. package com.zym.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import com.zym.config.DLXRabbitMQConfig;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. @Slf4j
  10. @Component
  11. public class BusinessMessageReceiver {
  12. @RabbitListener(queues = DLXRabbitMQConfig.BUSINESS_QUEUEA_NAME)
  13. public void receiveA(Message message, Channel channel) throws IOException {
  14. String msg = new String(message.getBody());
  15. log.info("收到业务消息A:{}", msg);
  16. boolean ack = true;
  17. Exception exception = null;
  18. try {
  19. if (msg.contains("deadletter")){
  20. throw new RuntimeException("dead letter exception");
  21. }
  22. } catch (Exception e){
  23. ack = false;
  24. exception = e;
  25. }
  26. if (!ack){
  27. log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
  28. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  29. } else {
  30. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  31. }
  32. }
  33. @RabbitListener(queues = DLXRabbitMQConfig.BUSINESS_QUEUEB_NAME)
  34. public void receiveB(Message message, Channel channel) throws IOException {
  35. log.info("收到业务消息B:" + new String(message.getBody()));
  36. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  37. }
  38. }

死信消费者:

  1. package com.zym.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import com.zym.config.DLXRabbitMQConfig;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. @Component
  9. public class DeadLetterMessageReceiver {
  10. @RabbitListener(queues = DLXRabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
  11. public void receiveA(Message message, Channel channel) throws IOException {
  12. System.out.println("收到死信消息A:" + new String(message.getBody()));
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  14. }
  15. @RabbitListener(queues = DLXRabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)
  16. public void receiveB(Message message, Channel channel) throws IOException {
  17. System.out.println("收到死信消息B:" + new String(message.getBody()));
  18. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  19. }
  20. }

生产者:

  1. package com.zym.controller;
  2. import com.zym.config.DLXRabbitMQConfig;
  3. import com.zym.config.DelayedRabbitMQConfig;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.*;
  8. import java.util.Date;
  9. @RestController
  10. @RequestMapping("/demo")
  11. @Slf4j
  12. public class TestController {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. @GetMapping("sendmsg")
  16. public void sendMsg(String msg){
  17. rabbitTemplate.convertSendAndReceive(DLXRabbitMQConfig.BUSINESS_EXCHANGE_NAME, "", msg);
  18. }
  19. }

我们在浏览器里分别输入:http://localhost:8080/demo/sendmsg?msg=msg与http://localhost:8080/demo/sendmsg?msg=deadletter 然后到后台查看日志:
7.png
这里也参考了很多网上的资料,在这里贴一下出处:

死信队列:https://www.cnblogs.com/mfrank/p/11184929.html

延时队列:https://blog.csdn.net/qq_40625058/article/details/105584732

我也把我自己写的上次到了Gitee,地址:https://gitee.com/zym213/rabbitmq-dlx-demo