概念

延时队列,队列内部是有序的,最重要的特性就是它的延时属性,延时队列中的元素是希望到达了指定时间之前或之后取出进行处理,简单来说,延时队列就是用来存放需要在指定时间被处理元素的队列。

使用场景

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,如果三天内没有登录,则进行短信提醒
  • 用户发起退款之后,如果三天内没有得到处理则通知相关的运营人员
  • 预定会议之后,需要在预定的时间点前十分钟通知各个与会人员参与会议

    这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

下面以卖票时创建订单为例,来进行流程图的介绍:
image.png

RabbitMQ中的TTL

TTL用来表示一条消息或该队列中的所有消息的最大存活时间,单位一般是毫秒。
设置TTL的方式有两种

  • 设置队列的TTL
  • 设置消息的TTL

如果消息在TTL时间内,没有被消费,那么就会成为死信。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值会生效。

设置TTL

image.png

SpringBoot进行整合

使用idea创建Springboot的空项目,并添加如下所示的依赖:

添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>cn.hutool</groupId>
  4. <artifactId>hutool-all</artifactId>
  5. <version>5.8.6</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>mysql</groupId>
  9. <artifactId>mysql-connector-java</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter-data-jpa</artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-test</artifactId>
  22. <scope>test</scope>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-amqp</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>com.alibaba</groupId>
  34. <artifactId>fastjson</artifactId>
  35. <version>1.2.78</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.projectlombok</groupId>
  39. <artifactId>lombok</artifactId>
  40. </dependency>
  41. <dependency>
  42. <groupId>io.springfox</groupId>
  43. <artifactId>springfox-swagger2</artifactId>
  44. <version>2.9.2</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>io.springfox</groupId>
  48. <artifactId>springfox-swagger-ui</artifactId>
  49. <version>2.9.2</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.springframework.amqp</groupId>
  53. <artifactId>spring-rabbit-test</artifactId>
  54. <scope>test</scope>
  55. </dependency>
  56. </dependencies>

配置文件

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. mvc:
  8. pathmatch:
  9. matching-strategy: ant_path_matcher
  10. server:
  11. port: 6790

由于SpringBoot版本2.6之后进行了相关的修改,所以需要在配置文件中对mvc相关的配置进行修改 添加:spring.mvc.pathmatch.matching-strategy=ant_path_matcher

Swagger配置类

  1. package com.ctgu.sheep.mq.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import springfox.documentation.builders.ApiInfoBuilder;
  5. import springfox.documentation.service.ApiInfo;
  6. import springfox.documentation.spi.DocumentationType;
  7. import springfox.documentation.spring.web.plugins.Docket;
  8. import springfox.documentation.swagger2.annotations.EnableSwagger2;
  9. /**
  10. * Created By Intellij IDEA
  11. *
  12. * @author ssssheep
  13. * @package com.ctgu.sheep.mq.config
  14. * @datetime 2022/9/18 星期日
  15. */
  16. @Configuration
  17. @EnableSwagger2
  18. public class Swagger2Config {
  19. @Bean
  20. public Docket webApiConfig() {
  21. return new Docket(DocumentationType.SWAGGER_2)
  22. .groupName("webApi")
  23. .apiInfo(webApiInfo())
  24. .select()
  25. .build();
  26. }
  27. private ApiInfo webApiInfo() {
  28. return new ApiInfoBuilder()
  29. .title("SpringBoot RabbitMQ")
  30. .description("SpringBoot RabbitMQ")
  31. .version("1.0")
  32. .build();
  33. }
  34. }

TTL测试

假设我们要实现功能如下所示:
image.png
交换机XkeyXAXB的消息分别路由到队列QAQB中。其中,队列QAQB的过期时间分别是10秒和40秒。
然后两个队列中的死信消息都会由死信交换机路由到死信队列QD中,再由消费者C进行处理

配置文件代码

  1. /**
  2. * Created By Intellij IDEA
  3. *
  4. * @author ssssheep
  5. * @package com.ctgu.sheep.mq.config
  6. * @datetime 2022/9/18 星期日
  7. */
  8. @Configuration
  9. public class TtlQueueConfig {
  10. public static final String X_CHANGE = "X";
  11. public static final String QUEUE_A = "QA";
  12. public static final String QUEUE_B = "QB";
  13. public static final String QUEUE_C = "QC";
  14. public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
  15. public static final String DEAD_QUEUE = "QD";
  16. @Bean("xExchange")
  17. public DirectExchange xExchange() {
  18. return new DirectExchange(X_CHANGE);
  19. }
  20. @Bean("yExchange")
  21. public DirectExchange yExchange() {
  22. return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
  23. }
  24. @Bean("queueA")
  25. public Queue queueA() {
  26. HashMap<String, Object> args = new HashMap<>(3);
  27. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  28. args.put("x-dead-letter-routing-key", "YD");
  29. args.put("x-message-ttl", 10000);
  30. return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
  31. }
  32. @Bean
  33. public Binding queueABinding(@Qualifier("xExchange") DirectExchange xExchange,
  34. @Qualifier("queueA") Queue queueA) {
  35. return BindingBuilder.bind(queueA).to(xExchange).with("XA");
  36. }
  37. @Bean("queueB")
  38. public Queue queueB() {
  39. HashMap<String, Object> args = new HashMap<>(3);
  40. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  41. args.put("x-dead-letter-routing-key", "YD");
  42. args.put("x-message-ttl", 20000);
  43. return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
  44. }
  45. @Bean
  46. public Binding queueBBinding(@Qualifier("xExchange") DirectExchange xExchange,
  47. @Qualifier("queueB") Queue queueB) {
  48. return BindingBuilder.bind(queueB).to(xExchange).with("XB");
  49. }
  50. @Bean("queueD")
  51. public Queue queueD() {
  52. return new Queue(DEAD_QUEUE);
  53. }
  54. @Bean
  55. public Binding deadLetterBinding(@Qualifier("yExchange") DirectExchange yExchange,
  56. @Qualifier("queueD") Queue queueD) {
  57. return BindingBuilder.bind(queueD).to(yExchange).with("YD");
  58. }
  59. }

消息生产者代码

  1. package com.ctgu.sheep.mq.controller;
  2. import lombok.Getter;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RequestParam;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.util.Date;
  11. /**
  12. * Created By Intellij IDEA
  13. *
  14. * @author ssssheep
  15. * @package com.ctgu.sheep.mq.controller
  16. * @datetime 2022/9/18 星期日
  17. */
  18. @RequestMapping("/ttl")
  19. @Slf4j
  20. @RestController
  21. public class SendMsgController {
  22. @Autowired
  23. private RabbitTemplate rabbitTemplate;
  24. public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
  25. public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
  26. @GetMapping("/sendMsg")
  27. public void sendMsg(@RequestParam String message) {
  28. log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
  29. rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
  30. rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
  31. }
  32. }

死信消费者代码

  1. /**
  2. * Created By Intellij IDEA
  3. *
  4. * @author ssssheep
  5. * @package com.ctgu.sheep.mq.mq
  6. * @datetime 2022/9/18 星期日
  7. */
  8. @Slf4j
  9. @Component
  10. public class DeadLetterQueueConsumer {
  11. @RabbitListener(queues = "QD")
  12. public void receiveD(Message message, Channel channel) {
  13. String s = new String(message.getBody());
  14. log.info("当前时间:{},收到死信队列的消息:{}", new Date(), s);
  15. }
  16. }

效果

发起一个请求,进行测试:
image.png
两条信息分别在10s和40s时分别变成了死信消息,然后被消费掉
但如果按照现在的写法,我们想要修改过期的时间,就要重新编写代码,增加一个新的队列。
这显然是不合理的,因此我们需要对现在的延时队列进行优化——将过期时间设置在消息上

延时队列优化

image.png
如上图所示,新建队列QC,并与交换机X和死信交换机Y进行绑定
队列QC不配置过期时间,而是将过期时间绑定到消息本体上

代码

image.png
image.png

效果

image.png

如上图所示,虽然确实实现了消息的延时,但是由于消息队列默认只会检测第一条消息是否过期,因此当前后两条消息的过期时间差距较大时,后面的消息并不会优先执行,导致后者的时效性降低。比如上图中,hello2这条消息本应该是在13:52秒过期,但是最后的过期时间为14:06

插件实现延迟队列

上述两种方式都没有很好的实现延迟队列的效果,因此我们需要使用插件来解决此问题
Releases · rabbitmq/rabbitmq-delayed-message-exchange
将下载下来的插件放入plugins文件夹后,使用如下指令来加载延时队列插件

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

实现

image.png

  1. package com.ctgu.sheep.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.CustomExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. /**
  11. * Created By Intellij IDEA
  12. *
  13. * @author ssssheep
  14. * @package com.ctgu.sheep.mq.config
  15. * @datetime 2022/9/18 星期日
  16. */
  17. @Configuration
  18. public class DelayedQueueConfig {
  19. public static final String DELAYED_QUEUE_NAME = "delayed.queue";
  20. public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
  21. public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
  22. @Bean
  23. public Queue delayedQueue() {
  24. return new Queue(DELAYED_QUEUE_NAME);
  25. }
  26. @Bean
  27. public CustomExchange delayedExchange() {
  28. HashMap<String, Object> args = new HashMap<>();
  29. args.put("x-delayed-type", "direct");
  30. return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
  31. }
  32. @Bean
  33. public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
  34. @Qualifier("delayedExchange") CustomExchange exchange) {
  35. return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
  36. }
  37. }

image.png
监听延时队列发送的消息
image.png

效果

image.png
先过期的先消费,符合我们的预期效果