死信队列 DLX + TTL 方式实现延迟队列

  1. 队列,意味着内部的元素是有序的,元素的出队和入队是有方向性的,元素从一端进入,从另一端取出
  2. 延时,这是最重要的特性,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望等待特定时间后,消费者才能拿到这个消息进行消费。

    TTL

    TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用。
    也就是说我们可以利用这个机制,让消息过期后,变成死信,就又交给我们的死信交换机来处理了~
    延时队列可以解决很多特定场景下,带时间属性的任务需求,如:订单创建半小时内未支付进行取消订单。。。

有两种方式设置 TTL 值,

第一种是在创建队列的时候设置队列的 “x-message-ttl” 属性

  1. @Bean
  2. public Queue delayQueue() {
  3. String queueName = "delay_queue";
  4. Map<String, Object> args = new HashMap<>(1);
  5. args.put("x-message-ttl", "6000");
  6. return new Queue(queueName, true, false, false, args);
  7. }

另一种方式是针对每条消息设置 TTL

  1. rabbitTemplate.convertAndSend(exchange, routingKey, (message) -> {
  2. message.getMessageProperties().setExpiration("6000");
  3. return message;
  4. });

区别:

  • 设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃
  • 给消息设置 TTL 属性,消息过期也不一定会马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果队列存在消息积压问题,那么已过期的消息可能还会存活较长些时间

死信队列 + 消息TTL = 延迟队列

一.设置队列 TTL 属性来实现延迟队列

消息大致流向
死信队列 DLX   TTL 方式实现延迟队列 - 图1
延迟队列 (delay_queu_A) 设置 TTL 能让信息在延迟多久后成为死信,成为死信后的消息都会被投递到死信队列中,这样只需要消费者一直消费死信队列(dlx_queue_A) 里就好了,因为里面的消息都是希望被处理的延迟后的消息。

声明交换机、队列以及他们的关系:

  1. // 配置延迟队列
  2. @Bean
  3. public TopicExchange delayExchange() {
  4. String exchangeName = "delay_exchange";
  5. return new TopicExchange(exchangeName);
  6. }
  7. @Bean
  8. public Queue delayQueueA() {
  9. String queueName = "delay_queue_A";
  10. // 设置死信发送至 dlx_exchange 交换机,设置路由键为 bind.dlx.A
  11. String dlxExchangeName = "dlx_exchange";
  12. String bindDlxRoutingKeyA = "bind.dlx.A";
  13. Map<String, Object> args = new HashMap<>(3);
  14. // 设置队列的延迟属性,6秒
  15. args.put("x-message-ttl", 6000);
  16. args.put("x-dead-letter-exchange", dlxExchangeName);
  17. args.put("x-dead-letter-routing-key", bindDlxRoutingKeyA);
  18. return new Queue(queueName, true, false, false, args);
  19. }
  20. @Bean
  21. public Binding bindingDelayExchange() {
  22. String routingKey = "bind.delay.A";
  23. return BindingBuilder.bind(delayQueueA()).to(delayExchange()).with(routingKey);
  24. }
  25. // 配置死信队列
  26. @Bean
  27. public TopicExchange dlxExchange() {
  28. String exchangeName = "dlx_exchange";
  29. return new TopicExchange(exchangeName);
  30. }
  31. @Bean
  32. public Queue dlxQueueA() {
  33. String queueName = "dlx_queue_A";
  34. return new Queue(queueName);
  35. }
  36. @Bean
  37. public Binding bindingDlxExchange() {
  38. String routingKey = "#.A";
  39. return BindingBuilder.bind(dlxQueueA()).to(dlxExchange()).with(routingKey);
  40. }

yml

  1. spring:
  2. rabbitmq:
  3. host: 192.168.159.129
  4. port: 5672
  5. username: admin
  6. password: admin
  7. # 虚拟host 可以不设置,使用 server 默认 host
  8. virtual-host:
  9. listener:
  10. simple:
  11. default-requeue-rejected:
  12. acknowledge-mode: manual

消费者

  1. @RabbitListener(queues = "dlx_queue_A")
  2. public void receiver2(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
  3. System.out.println("当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "死信队列 - dlx_queue_A 收到消息:" + msg);
  4. channel.basicAck(deliveryTag, false);
  5. }

生产者

  1. @Test
  2. public void demo_10_Producer() {
  3. String exchange = "delay_exchange_A";
  4. String routingKey = "delay.routing.key.A";
  5. String msg = "发送给延迟队列 delay_queue_A 的消息";
  6. System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg);
  7. rabbitTemplate.convertAndSend(exchange, routingKey, msg);
  8. }

输出

  1. 当前时间:2020-09-27 21:10:20 开始发送消息:发送给延迟队列 delay_queue_A 的消息
  2. 当前时间:2020-09-27 21:10:26 死信队列 - dlx_queue_A 收到消息:发送给延迟队列 delay_queue_A 的消息

缺陷

如果这样使用的话,每增加一个新的时间需求,就要新增一个队列。如需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果此时消息的过期时间不确定或者消息过期时间维度过多,在消费端我们就要去监听多个消息队列,岂不是要增加无数个队列才能满足需求??

二.设置消息的 TTL 属性来实现延迟队列

设计成一个通用的延时队列,我们可以给不同的消息设置不同的 TTL 过期时间,以达到动态设置延迟时间。

声明交换机、队列以及它们的关系:

  1. // 配置延迟队列
  2. @Bean
  3. public TopicExchange delayExchange() {
  4. String exchangeName = "delay_exchange";
  5. return new TopicExchange(exchangeName);
  6. }
  7. @Bean
  8. public Queue delayQueueB() {
  9. String queueName = "delay_queue_B";
  10. // 设置死信发送至 dlx_exchange 交换机,设置路由键为 bind_dlx_B
  11. String dlxExchangeName = "dlx_exchange";
  12. String bindDlxRoutingKeyB = "bind.dlx.B";
  13. Map<String, Object> args = new HashMap<>(2);
  14. args.put("x-dead-letter-exchange", dlxExchangeName);
  15. args.put("x-dead-letter-routing-key", bindDlxRoutingKeyB);
  16. return new Queue(queueName, true, false, false, args);
  17. }
  18. @Bean
  19. public Binding bindingDelayExchange() {
  20. String routingKey = "bind.delay.B";
  21. return BindingBuilder.bind(delayQueueB()).to(delayExchange()).with(routingKey);
  22. }
  23. // 配置死信队列
  24. @Bean
  25. public TopicExchange dlxExchange() {
  26. String exchangeName = "dlx_exchange";
  27. return new TopicExchange(exchangeName);
  28. }
  29. @Bean
  30. public Queue dlxQueueB() {
  31. String queueName = "dlx_queue_B";
  32. return new Queue(queueName);
  33. }
  34. @Bean
  35. public Binding bindingDlxExchange() {
  36. String routingKey = "#.B";
  37. return BindingBuilder.bind(dlxQueueB()).to(dlxExchange()).with(routingKey);
  38. }

消费者

  1. @RabbitListener(queues = "dlx_queue_B")
  2. public void receiverB(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
  3. System.out.println("当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 死信队列 - dlx_queue_B 收到消息:" + msg);
  4. channel.basicAck(deliveryTag, false);
  5. }

生产者

  1. @Test
  2. public void producer_B() throws Exception {
  3. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  4. String exchange = "delay_exchange";
  5. String routingKey = "bind.delay.B";
  6. String msg = "我是第一条消息";
  7. // 延迟时间
  8. String delayTime = "6000";
  9. System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
  10. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyMessagePostProcessor(delayTime));
  11. Thread.sleep(30000L);
  12. }

MyMessagePostProcessor

  1. /**
  2. * 因为要给消息设置 TTL,这里创建了一个 MessagePostProcessor 的实例来设置过期时间
  3. */
  4. class MyMessagePostProcessor implements MessagePostProcessor {
  5. // 延迟时间 毫秒
  6. private String delayTime;
  7. MyMessagePostProcessor(String delayTime) {
  8. this.delayTime = delayTime;
  9. }
  10. @Override
  11. public Message postProcessMessage(Message message) throws AmqpException {
  12. // 设置延迟时间
  13. message.getMessageProperties().setExpiration(delayTime);
  14. return message;
  15. }
  16. }

这里的 convertAndSend 使用的第四参数为 MessagePostProcessor,我这里采用构造函数的方式来动态设置消息的过期时间。

另外一种实现方式

  1. package com.matrix.queue.rabbitmq.service.comm;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.AllArgsConstructor;
  4. import org.springframework.amqp.AmqpException;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessagePostProcessor;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.stereotype.Service;
  9. /**
  10. * @className: CommSendService
  11. * @package: com.matrix.rabbitmq.service
  12. * @description: 消息队列通用发送类
  13. * @datetime: 2021/11/30 18:50
  14. * @author: swordmeng8@163.com
  15. */
  16. @Service
  17. @AllArgsConstructor
  18. public class CommSendService {
  19. /** 使用RabbitTemplate,这提供了接收/发送等等方法 */
  20. private final RabbitTemplate rabbitTemplate;
  21. public void commSend(Object obj, long times,String exchange,String exchangeRoutingKey){
  22. MessagePostProcessor processor = new MessagePostProcessor(){
  23. @Override
  24. public Message postProcessMessage(Message message) throws AmqpException {
  25. message.getMessageProperties().setExpiration(String.valueOf(times));
  26. return message;
  27. }
  28. };
  29. rabbitTemplate.convertAndSend(
  30. exchange,
  31. exchangeRoutingKey,
  32. JSON.toJSONString(obj), processor);
  33. }
  34. }

效果

  1. 当前时间:2020-09-27 21:20:20 开始发送消息:我是第一条消息 延迟的时间为:6000
  2. 当前时间:2020-09-27 21:20:26 死信队列 - dlx_queue_B 收到消息:我是第一条消息

缺陷

但是上面我们也提到了消息过期也不一定会马上丢弃。消息到了过期时间可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。

例子:

生产者

  1. @Test
  2. public void producer_B() throws Exception {
  3. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  4. String exchange = "delay_exchange";
  5. String routingKey = "bind.delay.B";
  6. String msg = "我是第一条消息";
  7. // 延迟时间
  8. String delayTime = "6000";
  9. System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
  10. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyMessagePostProcessor(delayTime));
  11. msg = "我是第二条消息";
  12. // 修改延迟时间
  13. delayTime = "3000";
  14. System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg + " 延迟的时间为:" + delayTime);
  15. rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyMessagePostProcessor(delayTime));
  16. Thread.sleep(30000L);
  17. }

效果

  1. 当前时间:2020-09-27 21:23:20 开始发送消息:我是第一条消息 延迟的时间为:6000
  2. 当前时间:2020-09-27 21:23:20 开始发送消息:我是第二条消息 延迟的时间为:3000
  3. 当前时间:2020-09-27 21:23:26 死信队列 - dlx_queue_B 收到消息:我是第一条消息
  4. 当前时间:2020-09-27 21:23:26 死信队列 - dlx_queue_B 收到消息:我是第二条消息

可以看到但延迟最久的第一条信息消费后,紧跟其后的已经过期了的第二条消息也接着消费了

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失
死信队列 DLX + TTL 的方式来实现延迟队列,这也是一种通用的做法。
不管哪种方式都有各自的优缺点,根据业务情况来考虑。如果要实现在消息粒度上添加TTL,并使其在设置的TTL时间及时死亡,可以使用 RabbitMQ 的 rabbitmq_delayed_message_exchange插件的方式实现。