概念
延时队列,队列内部是有序的,最重要的特性就是它的延时属性,延时队列中的元素是希望到达了指定时间之前或之后取出进行处理,简单来说,延时队列就是用来存放需要在指定时间被处理元素的队列。
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 用户注册成功后,如果三天内没有登录,则进行短信提醒
- 用户发起退款之后,如果三天内没有得到处理则通知相关的运营人员
- 预定会议之后,需要在预定的时间点前十分钟通知各个与会人员参与会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎 使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
RabbitMQ中的TTL
TTL用来表示一条消息或该队列中的所有消息的最大存活时间,单位一般是毫秒。
设置TTL的方式有两种
- 设置队列的TTL
- 设置消息的TTL
如果消息在TTL时间内,没有被消费,那么就会成为死信。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值会生效。
设置TTL
SpringBoot进行整合
使用idea
创建Springboot
的空项目,并添加如下所示的依赖:
添加依赖
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
mvc:
pathmatch:
matching-strategy: ant_path_matcher
server:
port: 6790
由于SpringBoot版本2.6之后进行了相关的修改,所以需要在配置文件中对
mvc
相关的配置进行修改 添加:spring.mvc.pathmatch.matching-strategy=ant_path_matcher
Swagger配置类
package com.ctgu.sheep.mq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.config
* @datetime 2022/9/18 星期日
*/
@Configuration
@EnableSwagger2
public class Swagger2Config {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("SpringBoot RabbitMQ")
.description("SpringBoot RabbitMQ")
.version("1.0")
.build();
}
}
TTL测试
假设我们要实现功能如下所示:
交换机X对key
为XA
和XB
的消息分别路由到队列QA
和QB
中。其中,队列QA
和QB
的过期时间分别是10秒和40秒。
然后两个队列中的死信消息都会由死信交换机路由到死信队列QD
中,再由消费者C
进行处理
配置文件代码
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.config
* @datetime 2022/9/18 星期日
*/
@Configuration
public class TtlQueueConfig {
public static final String X_CHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String QUEUE_C = "QC";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_QUEUE = "QD";
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_CHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA() {
HashMap<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", "YD");
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
@Bean
public Binding queueABinding(@Qualifier("xExchange") DirectExchange xExchange,
@Qualifier("queueA") Queue queueA) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("queueB")
public Queue queueB() {
HashMap<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", "YD");
args.put("x-message-ttl", 20000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
@Bean
public Binding queueBBinding(@Qualifier("xExchange") DirectExchange xExchange,
@Qualifier("queueB") Queue queueB) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_QUEUE);
}
@Bean
public Binding deadLetterBinding(@Qualifier("yExchange") DirectExchange yExchange,
@Qualifier("queueD") Queue queueD) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
消息生产者代码
package com.ctgu.sheep.mq.controller;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.controller
* @datetime 2022/9/18 星期日
*/
@RequestMapping("/ttl")
@Slf4j
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("/sendMsg")
public void sendMsg(@RequestParam String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
死信消费者代码
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.mq
* @datetime 2022/9/18 星期日
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String s = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}", new Date(), s);
}
}
效果
发起一个请求,进行测试:
两条信息分别在10s和40s时分别变成了死信消息,然后被消费掉
但如果按照现在的写法,我们想要修改过期的时间,就要重新编写代码,增加一个新的队列。
这显然是不合理的,因此我们需要对现在的延时队列进行优化——将过期时间设置在消息上
延时队列优化
如上图所示,新建队列QC
,并与交换机X和死信交换机Y进行绑定
队列QC
不配置过期时间,而是将过期时间绑定到消息本体上
代码
效果
如上图所示,虽然确实实现了消息的延时,但是由于消息队列默认只会检测第一条消息是否过期,因此当前后两条消息的过期时间差距较大时,后面的消息并不会优先执行,导致后者的时效性降低。比如上图中,
hello2
这条消息本应该是在13:52
秒过期,但是最后的过期时间为14:06
插件实现延迟队列
上述两种方式都没有很好的实现延迟队列的效果,因此我们需要使用插件来解决此问题
Releases · rabbitmq/rabbitmq-delayed-message-exchange
将下载下来的插件放入plugins
文件夹后,使用如下指令来加载延时队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
实现
package com.ctgu.sheep.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.config
* @datetime 2022/9/18 星期日
*/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
效果
先过期的先消费,符合我们的预期效果