https://baijiahao.baidu.com/s?id=1714409376163850130 作者:文心紫竹

1. 背景

在日常开发中,延时任务是一个无法避免的话题。为了达到延时这一目的,在不同场景下会有不同的解决方案,对各个方案优缺点的认知程度决定了架构决策的有效性。
本文章,以电商订单超时未支付为业务场景,推导多种解决方案,并对每个方案的优缺点进行分析,所涉及的方案包括:

  1. 数据库轮询方案。
  2. 单机内存解决方案。
  3. 分布式延时队列方案。

最后,为了提升研发效率,我们将使用声明式编程思想,对分布式延时队列方案进行封装,有效的分离 业务技术

1.1 业务场景

业务场景非常简单,就是大家最熟悉的电商订单,相信很多细心的小伙伴都发现,我们在电商平台下单后,如果超过一定的时间还未支付,系统自动将订单设置为超时自动取消,从而释放绑定的资源。
核心流程如下:

  1. 在电商平台下单,生成待支付订单;
  2. 在规定的时间内没有完成支付,系统将自动取消订单,订单状态变成“超时取消”;
  3. 在规定的时间内完成支付,订单将变成“已支付”

订单状态机如下:
延时任务从入门到精通 - 图1

1.2 基础组件简介

整个 Demo 采用 DDD 的设计思路,为了便于理解,先介绍所涉及的基础组件:

1.2.1. OrderInfo

订单聚合根,提供构建和取消等业务方法。 具体的代码如下:

  1. @Data
  2. @Entity
  3. @Table(name = "order_info")
  4. public class OrderInfo {
  5. @Id
  6. @GeneratedValue(strategy = GenerationType.IDENTITY)
  7. private Long id;
  8. @Column(name = "status")
  9. @Enumerated(EnumType.STRING)
  10. private OrderInfoStatus orderStatus;
  11. @Column(name = "create_time")
  12. private Date createTime = new Date();
  13. /**
  14. * 取消订单
  15. */
  16. public void cancel() {
  17. setOrderStatus(OrderInfoStatus.CANCELLED);
  18. }
  19. /**
  20. * 创建订单
  21. * @param createDate
  22. * @return
  23. */
  24. public static OrderInfo create(Date createDate){
  25. OrderInfo orderInfo = new OrderInfo();
  26. orderInfo.setCreateTime(createDate);
  27. orderInfo.setOrderStatus(OrderInfoStatus.CREATED);
  28. return orderInfo;
  29. }
  30. }

1.2.2 OrderInfoRepository

基于 Spring Data Jpa 实现,主要用于数据库访问,代码如下:

  1. public interface OrderInfoRepository extends JpaRepository<OrderInfo, Long> {
  2. List<OrderInfo> getByOrderStatusAndCreateTimeLessThan(OrderInfoStatus created, Date overtime);
  3. }

Spring Data 会根据 方法签名 或 @Query 注解生成代理对象,无需我们写任何代码,便能实现基本的数据库访问。

1.2.3. OrderInfoService

应用服务层,面向 User Case,主要完成业务流程编排,核对代码如下:

  1. @Service
  2. @Slf4j
  3. public class OrderInfoService {
  4. @Autowired
  5. private ApplicationEventPublisher eventPublisher;
  6. @Autowired
  7. private OrderInfoRepository orderInfoRepository;
  8. /**
  9. * 生单接口 <br />
  10. * 1. 创建订单,保存至数据库
  11. * 2. 发布领域事件,触发后续处理
  12. * @param createDate
  13. */
  14. @Transactional(readOnly = false)
  15. public void create(Date createDate){
  16. OrderInfo orderInfo = OrderInfo.create(createDate);
  17. this.orderInfoRepository.save(orderInfo);
  18. eventPublisher.publishEvent(new OrderInfoCreateEvent(orderInfo));
  19. }
  20. /**
  21. * 取消订单
  22. * @param orderId
  23. */
  24. @Transactional(readOnly = false)
  25. public void cancel(Long orderId){
  26. Optional<OrderInfo> orderInfoOpt = this.orderInfoRepository.findById(orderId);
  27. if (orderInfoOpt.isPresent()){
  28. OrderInfo orderInfo = orderInfoOpt.get();
  29. orderInfo.cancel();
  30. this.orderInfoRepository.save(orderInfo);
  31. log.info("success to cancel order {}", orderId);
  32. }else {
  33. log.info("failed to find order {}", orderId);
  34. }
  35. }
  36. /**
  37. * 查找超时未支付的订单
  38. * @return
  39. */
  40. @Transactional(readOnly = true)
  41. public List<OrderInfo> findOvertimeNotPaidOrders(Date deadLine){
  42. return this.orderInfoRepository.getByOrderStatusAndCreateTimeLessThan(OrderInfoStatus.CREATED, deadLine);
  43. }
  44. }

1.2.4. OrderController

对外暴露的 Web 接口,提供接口创建订单,主要用于测试,代码如下:

  1. @RestController
  2. @RequestMapping("order")
  3. public class OrderController {
  4. @Autowired
  5. private OrderInfoService orderInfoService;
  6. /**
  7. * 生成新的订单,主要用于测试
  8. */
  9. @PostMapping("insertTestData")
  10. public void createTestOrder(){
  11. Date date = DateUtils.addMinutes(new Date(), -30);
  12. date = DateUtils.addSeconds(date, 10);
  13. this.orderInfoService.create(date);
  14. }
  15. }

所依赖的组件介绍完了,让我们进入第一个方案。

2. 数据库轮询方案

这是最简单的方案,每个订单都保存了创建时间,只需要写个定时任务,从数据库中查询出已经过期但是尚未支付的订单,依次执行订单取消即可。

2.1. 方案实现

核心流程如下:
延时任务从入门到精通 - 图2

  1. 用户创建订单,将订单信息保存到数据库;
  2. 设定一个定时任务,每一秒触发一次检查任务;
  3. 任务按下面步骤执行
  • 先从数据库中查找 超时未支付 的订单;
  • 依次执行定的 Cancel 操作;
  • 将变更保存到数据库;

核心代码如下:

  1. @Service
  2. @Slf4j
  3. public class DatabasePollStrategy {
  4. @Autowired
  5. private OrderInfoService orderInfoService;
  6. /**
  7. * 每隔 1S 运行一次 <br/>
  8. * 1. 从 DB 中查询过期未支付订单(状态为 CREATED,创建时间小于 deadLintDate)
  9. * 2. 依次执行 取消订单 操作
  10. */
  11. @Scheduled(fixedDelay = 1 * 1000)
  12. public void poll(){
  13. Date now = new Date();
  14. Date overtime = DateUtils.addMinutes(now, -30);
  15. List<OrderInfo> overtimeNotPaidOrders = orderInfoService.findOvertimeNotPaidOrders(overtime);
  16. log.info("load overtime Not paid orders {}", overtimeNotPaidOrders);
  17. overtimeNotPaidOrders.forEach(orderInfo -> this.orderInfoService.cancel(orderInfo.getId()));
  18. }
  19. }

2.2. 方案小结

  1. 优点:简单
  • 开发简单。系统复杂性低,特别是在 Spring Schedule 帮助下;
  • 测试简单。没有外部依赖,逻辑集中,方便快速定位问题;
  • 上线简单。没有繁琐的配置,复杂的申请流程;
  • 缺点:
  • 数据库负担重。不停的轮询,会加重数据库的负载;
  • 时效性不足。任务最高延时为轮询时间,不适合时效要求高的场景(在订单场景已经足够);
  • 存在大量无效轮询。在没有过期订单的情况下,出现大量的无效扫描;
  • 没有消峰能力。短时间出现大量过期订单,会造成任务集中执行,出现明显的业务高峰;

总之,该方案非常适合业务量级小,业务迭代快的项目。

3. 单机内存解决方案

对于延时任务,JDK 为我们准备了大量工具,使用这些工具可以解决我们的问题。

3.1 DelayQueue

DelayQueue 是一种特殊的阻塞队列,可以为每个任务指定延时时间,只有在延时时间到达后,才能获取任务。
整体结构如下:
延时任务从入门到精通 - 图3
核心流程如下:

  1. 用户下单完成后,向延时队列提交一个任务;
  2. 时间达到后,后台工作线程从队列中读取任务;
  3. 工作线程调用 CancelOrder 方法 对过期未支付的订单执行取消操作;

核心代码如下:

  1. @Slf4j
  2. @Service
  3. public class DelayQueueStrategy implements SmartLifecycle {
  4. private final DelayQueue<DelayTask> delayTasks = new DelayQueue<>();
  5. private final Thread thread = new OrderCancelWorker();
  6. private boolean running;
  7. @Autowired
  8. private OrderInfoService orderInfoService;
  9. @TransactionalEventListener
  10. public void onOrderCreated(OrderInfoCreateEvent event){
  11. // 将 订单号 放入延时队列
  12. this.delayTasks.offer(new DelayTask(event.getOrderInfo().getId(), 10));
  13. log.info("success to add Delay Task for Cancel Order {}", event.getOrderInfo().getId());
  14. }
  15. /**
  16. * 启动后台线程,消费延时队列中的任务
  17. */
  18. @Override
  19. public void start() {
  20. if (this.running){
  21. return;
  22. }
  23. this.thread.start();
  24. this.running = true;
  25. }
  26. /**
  27. * 停止后台线程
  28. */
  29. @Override
  30. public void stop() {
  31. if (!this.running){
  32. return;
  33. }
  34. this.thread.interrupt();
  35. this.running = false;
  36. }
  37. @Override
  38. public boolean isRunning() {
  39. return this.running;
  40. }
  41. @Override
  42. public boolean isAutoStartup() {
  43. return true;
  44. }
  45. /**
  46. * 延时任务
  47. */
  48. @Value
  49. private static class DelayTask implements Delayed{
  50. private final Long orderId;
  51. private final Date runAt;
  52. private DelayTask(Long orderId, int delayTime) {
  53. this.orderId = orderId;
  54. this.runAt = DateUtils.addSeconds(new Date(), delayTime);
  55. }
  56. /**
  57. * 获取剩余时间
  58. * @param timeUnit
  59. * @return
  60. */
  61. @Override
  62. public long getDelay(TimeUnit timeUnit) {
  63. return timeUnit.convert(getRunAt().getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  64. }
  65. @Override
  66. public int compareTo(Delayed delayed) {
  67. if (delayed == this) {
  68. return 0;
  69. } else {
  70. long d = this.getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);
  71. return d == 0L ? 0 : (d < 0L ? -1 : 1);
  72. }
  73. }
  74. }
  75. /**
  76. * 后台线程,消费延时队列中的消息
  77. */
  78. private class OrderCancelWorker extends Thread {
  79. @Override
  80. public void run() {
  81. // 根据中断状态,确定是否退出
  82. while (!Thread.currentThread().isInterrupted()){
  83. DelayTask task = null;
  84. try {
  85. // 从队列中获取任务
  86. task = delayTasks.take();
  87. } catch (InterruptedException e) {
  88. e.printStackTrace();
  89. }
  90. // 取消订单
  91. if (task != null){
  92. orderInfoService.cancel(task.getOrderId());
  93. log.info("Success to Run Delay Task, Cancel Order {}", task.getOrderId());
  94. }
  95. }
  96. }
  97. }
  98. }

这个方案,思路非常简单,但是有一定的复杂性,需要对工作线程的生命周期进行手工维护。相对来说,JDK 已经为我们的这种场景进行了封装,也就是基于 DelayQueue 的 ScheduledExecutorService。

3.2 ScheduledExecutorService

ScheduledExecutorService 是基于 DelayQueue 构建的定时调度组件,相对之前的 Timer 有非常大的优势。
整体架构如下:
延时任务从入门到精通 - 图4
核心流程如下:

  1. 用户下单完成后,向 ScheduledExecutorService 注册一个定时任务;
  2. 时间达到后,ScheduledExecutorService 将启动任务;
  3. 线程池线程调用 CancelOrder 方法 对过期未支付的订单执行取消操作;

核心代码如下:

  1. @Slf4j
  2. @Service
  3. public class ScheduleExecutorStrategy {
  4. @Autowired
  5. private OrderInfoService orderInfoService;
  6. private ScheduledExecutorService scheduledExecutorService;
  7. public ScheduleExecutorStrategy(){
  8. BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder()
  9. .namingPattern("Schedule-Cancel-Thread-%d")
  10. .daemon(true)
  11. .build();
  12. this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, basicThreadFactory);
  13. }
  14. @TransactionalEventListener
  15. public void onOrderCreated(OrderInfoCreateEvent event){
  16. // 添加定时任务
  17. this.scheduledExecutorService.schedule(new CancelTask(event.getOrderInfo().getId()), 5, TimeUnit.SECONDS);
  18. log.info("Success to add cancel task for order {}", event.getOrderInfo().getId());
  19. }
  20. private class CancelTask implements Runnable{
  21. private final Long orderId;
  22. private CancelTask(Long orderId) {
  23. this.orderId = orderId;
  24. }
  25. @Override
  26. public void run() {
  27. // 执行订单取消操作
  28. orderInfoService.cancel(this.orderId);
  29. log.info("Success to cancel task for order {}", this.orderId);
  30. }
  31. }
  32. }

相对 DelayQueue 方案,ScheduledExecutorService 代码量少了很多,避免了繁琐的细节。

3.3 小结

优点:

  1. 避免了对DB的轮询,降低 DB 的压力;
  2. 整体方案简单,使用 JDK 组件完成,没有额外依赖;

缺点:

  1. 任务容易丢失。任务存储于内存中,服务重启或机器宕机,会造成内存任务丢失;
  2. 单机策略,缺少集群能力。

为了解决 单机内存方案 的问题,我们需要引入分布式方案。
在单机内存方案中,除了 延时队列 实现外,还有一种 “时间轮” 方案,能够大幅降低内存消耗,有兴趣的伙伴可以研究一下。

4. 分布式延时队列方案

内存队列自身存在很多限制,在实际工作中,我们一般会引入分布式解决方案。

4.1 基于 Redis 延时队列

Redis 是最常用的基础设施,作为一个数据结构服务器,在丰富的数据结构帮助下,可以封装成多种高级结构,延时队列便是其中一种。
为了避免重复发明轮子,我们直接使用 Redisson 中的 延时队列。
整体架构与 DelayQueue 基本一致,只是将 内存延时队列 升级为 分布式延时队列,在此就不在论述。
首先,在 pom 中引入 Redisson 相关依赖

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson-spring-boot-starter</artifactId>
  4. <version>3.16.2</version>
  5. </dependency>

然后,在 application 配置文件中增加 redis 相关配置

  1. spring.redis.host=127.0.0.1
  2. spring.redis.port=6379
  3. spring.redis.database=0

最后,就可以注入核心组件 RedissonClient 了

  1. @Autowiredprivate
  2. RedissonClient redissonClient;

流程整合后的代码如下:

  1. @Slf4j
  2. @Service
  3. public class RDelayQueueStrategy implements SmartLifecycle {
  4. private boolean running;
  5. private Thread thread = new OrderCancelWorker();
  6. private RBlockingQueue<Long> cancelOrderQueue;
  7. private RDelayedQueue<Long> delayedQueue;
  8. @Autowired
  9. private OrderInfoService orderInfoService;
  10. @Autowired
  11. private RedissonClient redissonClient;
  12. /**
  13. * 创建 Redis 队列
  14. */
  15. @PostConstruct
  16. public void init(){
  17. this.cancelOrderQueue = redissonClient.getBlockingQueue("DelayQueueForCancelOrder");
  18. this.delayedQueue = redissonClient.getDelayedQueue(cancelOrderQueue);
  19. }
  20. @TransactionalEventListener
  21. public void onOrderCreated(OrderInfoCreateEvent event){
  22. this.delayedQueue.offer(event.getOrderInfo().getId(), 5L, TimeUnit.SECONDS);
  23. log.info("success to add Delay Task for Cancel Order {}", event.getOrderInfo().getId());
  24. }
  25. /**
  26. * 启动后台线程
  27. */
  28. @Override
  29. public void start() {
  30. if (this.running){
  31. return;
  32. }
  33. thread.start();
  34. this.running = true;
  35. }
  36. /**
  37. * 停止后台线程
  38. */
  39. @Override
  40. public void stop() {
  41. if (!this.running){
  42. return;
  43. }
  44. thread.interrupt();
  45. this.running = false;
  46. }
  47. @Override
  48. public boolean isRunning() {
  49. return this.running;
  50. }
  51. @Override
  52. public boolean isAutoStartup() {
  53. return true;
  54. }
  55. private class OrderCancelWorker extends Thread {
  56. @Override
  57. public void run() {
  58. // 根据中断状态,确定是否退出
  59. while (!Thread.currentThread().isInterrupted()){
  60. Long orderId = null;
  61. try {
  62. // 从队列中获取 订单号
  63. orderId = cancelOrderQueue.take();
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. }
  67. // 取消订单
  68. if (orderId != null){
  69. orderInfoService.cancel(orderId);
  70. log.info("Success to Run Delay Task, Cancel Order {}", orderId);
  71. }
  72. }
  73. }
  74. }
  75. }

这个方案非常简单,应用于大多数业务场景。但是,Redis 本身是遵循 AP 而非 CP 模型,在集群切换时会出现消息丢失的情况,所以对于一致性要求高的场景,建议使用 RocketMQ 方案。

4.2 基于 RocketMQ 延时队列

RocketMQ 是 阿里开源的分布式消息中间件,其整体设计从 Kafka 借鉴了大量思想,但针对业务场景增加了部分扩展,其中延时队列便是其中最为重要的一部分。
整体架构设计如下:
延时任务从入门到精通 - 图5
核心流程如下:

  1. 用户下单完成后,向 RocketMQ 提交一个消息;
  2. 时间达到后,消费线程从工作队列中获取消息;
  3. 消费线程解析消息后调用 CancelOrder 方法 对过期未支付的订单执行取消操作;

首先,需要增加 RocketMQ 相关依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.1</version>
  5. </dependency>

然后,在 application 添加相关配置

  1. rocketmq.name-server=http://127.0.0.1:9876
  2. rocketmq.producer.group=delay-task-demo

最后,我们就可以使用 RocketMQTemplate 发送消息

  1. @Autowiredprivate
  2. RocketMQTemplate rocketMQTemplate;

注:RocketMQ 并不支持任意的时间,而是提供了几个固定的延时时间,一般情况下可以满足我们的业务需求,如果现有固定延时无法满足需求,可以通过多次投递的方式进行解决。比如,RocketMQ 最大支持 2H 延时,而业务需要延时 24H,只需在消息体中增加期望执行时间,获取消息后,如果尚未达到期望执行时间,将消息重新发送回延时队列;如果达到期望执行时间,则执行对于的任务。
发送延时消息:

  1. @Service
  2. @Slf4j
  3. public class RocketMQBasedDelayStrategy {
  4. private static final String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  5. @Autowired
  6. private RocketMQTemplate rocketMQTemplate;
  7. @TransactionalEventListener
  8. public void onOrderCreated(OrderInfoCreateEvent event){
  9. // 将数据 发送至 RocketMQ 的延时队列
  10. Message<String> message = MessageBuilder
  11. .withPayload(String.valueOf(event.getOrderInfo().getId()))
  12. .build();
  13. this.rocketMQTemplate.syncSend("delay-task-topic", message, 200, 2);
  14. log.info("success to sent Delay Task to RocketMQ for Cancel Order {}", event.getOrderInfo().getId());
  15. }
  16. }

构建 Consumer 消费消息

  1. @Service
  2. @Slf4j
  3. @RocketMQMessageListener(topic = "delay-task-topic", consumerGroup = "delay-task-consumer-group")
  4. public class RocketMQBasedDelayTaskConsumer implements RocketMQListener<MessageExt> {
  5. @Autowired
  6. private OrderInfoService orderInfoService;
  7. /**
  8. * 接收消息回调,执行取消订单操作
  9. * @param message
  10. */
  11. @Override
  12. public void onMessage(MessageExt message) {
  13. byte[] body = message.getBody();
  14. String idAsStr = new String(body);
  15. orderInfoService.cancel(Long.valueOf(idAsStr));
  16. }
  17. }

4.3 小结

一般互联网公司都会使用 RocketMQ 方案来解决延时问题。
优点,主要来自于分布式服务特性:

  1. 高性能。作为削峰填谷的利器,发送端、服务器、消费端都提供较高性能;
  2. 高可用。Redis、RocketMQ 都提供了丰富的部署模式,是高可用的基础;
  3. 可扩展。Redis、RocketMQ 集群具有良好的扩展能力;

缺点:

  1. 需要中间支持。首先,需要基础设施的支持,Redis、RocketMQ 都会增加运维成本;
  2. 需要学习新的 API。需要掌握新的 API,增加学习成本,使用不当还可能出现问题;

    5. 声明式编程

    架构设计中有一个非常重要的原则:有效分离技术和业务,避免两者的相互影响。

    5.1 声明式编程

    声明式编程(英语:Declarative programming)是一种编程范式,与命令式编程相对立。它描述目标的性质,让计算机明白目标,而非流程。声明式编程不用告诉计算机问题领域,从而避免随之而来的副作用。而命令式编程则需要用算法来明确的指出每一步该怎么做。
    每引入一个中间件,研发人员都需要学习一套新的API,如何有效降低接入成本是一个巨大的挑战,而最常用的重要手段之一就是:声明式编程。
    简单来说,就是将能力抽象化,使其能够通过配置的方式灵活的应用于需要的场景。
    首先,让我们先看下最终的效果:

    1. @Service
    2. @Slf4j
    3. public class RocketMQBasedDelayService {
    4. @Autowired
    5. private OrderInfoService orderInfoService;
    6. /**
    7. * 通过 RocketMQBasedDelay 指定方法为延时方法,该 注解做两件事:<br />
    8. * 1. 基于 AOP 技术,拦截对 cancelOrder 的调用,将参数转为为 Message, 并发送到 RocketMQ 的延时队列
    9. * 2. 针对 cancelOrder 方法,创建 DefaultMQPushConsumer 并订阅相关消息,进行消息处理
    10. * @param orderId
    11. */
    12. @RocketMQBasedDelay(topic = "delay-task-topic-ann",
    13. delayLevel = 2, consumerGroup = "CancelOrderGroup")
    14. public void cancelOrder(Long orderId){
    15. if (orderId == null){
    16. log.info("param is invalidate");
    17. return;
    18. }
    19. this.orderInfoService.cancel(orderId);
    20. log.info("success to cancel Order for {}", orderId);
    21. }
    22. }

相比于普通方法,增加 @RocketMQBasedDelay 便可以赋予方法延时能力,这便是“声明式编程”的威力

  1. 首先在方法上添加 @RocketMQBasedDelay 注解,配置延时队列名称,延时时间,消费者信息;
  2. 当方法被调用时,并不会直接执行,而是将请求转发给 RocketMQ 的延时队列,然后直接返回;
  3. 当到达消息延时时间时,Consumer 从 延时队列中获取消息,并调用 cancelOrder 方法来处理业务流程。

使用这种方式,大大减少了接入成本,降低了出错的概率。

5.2 核心设计

核心设计如下:
延时任务从入门到精通 - 图6
在启动时,增加了两个扩展点:

  1. 扫描 @RocketMQBasedDelay 注解方法,为方法增加 SendMessageInterceptor 拦截器;
  2. 扫描 @RocketMQBasedDelay 注解方法,生成 RocketMQConsumerContainer 托管对象,并完成 DefaultMQPushConsumer 的配置和启动;

具体的执行流程如下:

  1. 当方法被调用时,调用被 SendMessageInterceptor 拦截,从而改变原有执行规则,新的流程如下:
  • 从 @RocketMQBasedDelay 获取相关的配置参数;
  • 对请求参数进行序列化处理;
  • 使用 RocketMQTemplate 发送延时消息;
  • 直接返回,中断原有方法调用;
  • 当延时时间到达时,RocketMQConsumerContainer 中的 DefaultMQPushConsumer 会获取到消息进行业务处理:
  • 反序列化调用参数;
  • 调用业务方法;
  • 返回消费状态;

    5.3 核心实现

    核心组件,主要分为两类:
  1. 工作组件。
  • SendMessageInterceptor。拦截请求,将请求转发至 RocketMQ 的延时队列;
  • RocketMQConsumerContainer。对 DefaultMQPushConsumer 的封装,主要完成 Consumer 的配置,注册监听器,消息到达后触发任务的执行;
  • 配置组件。
  • RocketMQConsumerContainerRegistry。对 Spring 容器中的 Bean 进行扫描,将@RocketMQBasedDelay注解的方法封装成 RocketMQConsumerContainer,并注册到 Spring 容器中;
  • RocketMQBasedDelayConfiguration。向 Spring 容器注册 AOP 拦截器 和 RocketMQConsumerContainerRegistry;

RocketMQBasedDelay 注解如下:

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. public @interface RocketMQBasedDelay {
  4. /**
  5. * RocketMQ topic
  6. * @return
  7. */
  8. String topic();
  9. /**
  10. * 延时级别
  11. * @return
  12. */
  13. int delayLevel();
  14. /**
  15. * 消费者组信息
  16. * @return
  17. */
  18. String consumerGroup();
  19. }

该注解可以放置在方法之上,并在 运行时 生效。
SendMessageInterceptor 核心代码如下:

  1. /**
  2. * 拦截方法调用,并将请求封装成 Message 发送至 RocketMQ 的 Topic
  3. */
  4. @Slf4j
  5. public class SendMessageInterceptor implements MethodInterceptor {
  6. @Autowired
  7. private RocketMQTemplate rocketMQTemplate;
  8. @Override
  9. public Object invoke(MethodInvocation methodInvocation) throws Throwable {
  10. Method method = methodInvocation.getMethod();
  11. // 1. 获取 方法上的注解信息
  12. RocketMQBasedDelay rocketMQBasedDelay = method.getAnnotation(RocketMQBasedDelay.class);
  13. // 2. 将请求参数 转换为 MQ
  14. Object[] arguments = methodInvocation.getArguments();
  15. String argData = serialize(arguments);
  16. Message<String> message = MessageBuilder
  17. .withPayload(argData)
  18. .build();
  19. // 3. 发送 MQ
  20. this.rocketMQTemplate.syncSend(rocketMQBasedDelay.topic(), message , 200, rocketMQBasedDelay.delayLevel());
  21. log.info("success to sent Delay Task to RocketMQ for {}", Arrays.toString(arguments));
  22. return null;
  23. }
  24. private String serialize(Object[] arguments) {
  25. Map<String, String> result = Maps.newHashMapWithExpectedSize(arguments.length);
  26. for (int i = 0; i < arguments.length; i++){
  27. result.put(String.valueOf(i), SerializeUtil.serialize(arguments[i]));
  28. }
  29. return SerializeUtil.serialize(result);
  30. }
  31. }

RocketMQConsumerContainer 源码如下:

  1. /**
  2. * Consumer 容器,用于对 DefaultMQPushConsumer 的封装
  3. */
  4. @Data
  5. @Slf4j
  6. public class RocketMQConsumerContainer implements InitializingBean, SmartLifecycle {
  7. private DefaultMQPushConsumer consumer;
  8. private boolean running;
  9. private String consumerGroup;
  10. private String nameServerAddress;
  11. private String topic;
  12. private Object bean;
  13. private Method method;
  14. @Override
  15. public boolean isAutoStartup() {
  16. return true;
  17. }
  18. @Override
  19. public void start() {
  20. if (this.running){
  21. return;
  22. }
  23. try {
  24. this.consumer.start();
  25. } catch (MQClientException e) {
  26. e.printStackTrace();
  27. }
  28. this.running = true;
  29. }
  30. @Override
  31. public void stop() {
  32. this.running = false;
  33. this.consumer.shutdown();
  34. }
  35. @Override
  36. public boolean isRunning() {
  37. return running;
  38. }
  39. @Override
  40. public void afterPropertiesSet() throws Exception {
  41. // 构建 DefaultMQPushConsumer
  42. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
  43. consumer.setConsumerGroup(this.consumerGroup);
  44. consumer.setNamesrvAddr(this.nameServerAddress);
  45. // 订阅 topic
  46. consumer.subscribe(topic, "*");
  47. // 增加拦截器
  48. consumer.setMessageListener(new DefaultMessageListenerOrderly());
  49. this.consumer = consumer;
  50. }
  51. private class DefaultMessageListenerOrderly implements MessageListenerOrderly {
  52. @Override
  53. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  54. for (MessageExt messageExt : msgs) {
  55. log.debug("received msg: {}", messageExt);
  56. try {
  57. long now = System.currentTimeMillis();
  58. // 从 Message 中反序列化数据,获得方法调用参数
  59. byte[] body = messageExt.getBody();
  60. String bodyAsStr = new String(body);
  61. Map deserialize = SerializeUtil.deserialize(bodyAsStr, Map.class);
  62. Object[] params = new Object[method.getParameterCount()];
  63. for (int i = 0; i< method.getParameterCount(); i++){
  64. String o = (String)deserialize.get(String.valueOf(i));
  65. if (o == null){
  66. params[i] = null;
  67. }else {
  68. params[i] = SerializeUtil.deserialize(o, method.getParameterTypes()[i]);
  69. }
  70. }
  71. // 执行业务方法
  72. method.invoke(bean, params);
  73. long costTime = System.currentTimeMillis() - now;
  74. log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  75. } catch (Exception e) {
  76. log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
  77. context.setSuspendCurrentQueueTimeMillis(1000);
  78. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  79. }
  80. }
  81. return ConsumeOrderlyStatus.SUCCESS;
  82. }
  83. }
  84. }

RocketMQConsumerContainerRegistry 源码如下:

  1. /**
  2. * 基于 BeanPostProcessor#postProcessAfterInitialization 对每个 bean 进行处理
  3. * 扫描 bean 中被 @RocketMQBasedDelay 注解的方法,并将方法封装成 RocketMQConsumerContainer,
  4. * 以启动 DefaultMQPushConsumer
  5. */
  6. public class RocketMQConsumerContainerRegistry implements BeanPostProcessor {
  7. private final AtomicInteger id = new AtomicInteger(1);
  8. @Autowired
  9. private GenericApplicationContext applicationContext;
  10. @Value("${rocketmq.name-server}")
  11. private String nameServerAddress;
  12. /**
  13. * 对每个 bean 依次进行处理
  14. * @param bean
  15. * @param beanName
  16. * @return
  17. * @throws BeansException
  18. */
  19. @Override
  20. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  21. // 1. 获取 @RocketMQBasedDelay 注解方法
  22. Class targetCls = AopUtils.getTargetClass(bean);
  23. List<Method> methodsListWithAnnotation = MethodUtils.getMethodsListWithAnnotation(targetCls, RocketMQBasedDelay.class);
  24. // 2. 为每个 @RocketMQBasedDelay 注解方法 注册 RocketMQConsumerContainer
  25. for(Method method : methodsListWithAnnotation){
  26. String containerBeanName = targetCls.getName() + "#" + method.getName() + id.getAndIncrement();
  27. RocketMQBasedDelay annotation = method.getAnnotation(RocketMQBasedDelay.class);
  28. applicationContext.registerBean(containerBeanName, RocketMQConsumerContainer.class, () -> createContainer(bean, method, annotation));
  29. }
  30. return bean;
  31. }
  32. /**
  33. * 构建 RocketMQConsumerContainer
  34. * @param proxy
  35. * @param method
  36. * @param annotation
  37. * @return
  38. */
  39. private RocketMQConsumerContainer createContainer(Object proxy, Method method, RocketMQBasedDelay annotation) {
  40. Object bean = AopProxyUtils.getSingletonTarget(proxy);
  41. RocketMQConsumerContainer container = new RocketMQConsumerContainer();
  42. container.setBean(bean);
  43. container.setMethod(method);
  44. container.setConsumerGroup(annotation.consumerGroup());
  45. container.setNameServerAddress(nameServerAddress);
  46. container.setTopic(annotation.topic());
  47. return container;
  48. }
  49. }

RocketMQBasedDelayConfiguration 源码如下:

  1. @Configuration
  2. public class RocketMQBasedDelayConfiguration {
  3. /**
  4. * 声明 RocketMQConsumerContainerRegistry,扫描 RocketMQBasedDelay 方法,
  5. * 创建 DefaultMQPushConsumer 并完成注册
  6. * @return
  7. */
  8. @Bean
  9. public RocketMQConsumerContainerRegistry rocketMQConsumerContainerRegistry(){
  10. return new RocketMQConsumerContainerRegistry();
  11. }
  12. /**
  13. * 声明 AOP 拦截器
  14. * 在调用 @RocketMQBasedDelay 注解方法时,自动拦截,将请求发送至 RocketMQ
  15. * @return
  16. */
  17. @Bean
  18. public SendMessageInterceptor messageSendInterceptor(){
  19. return new SendMessageInterceptor();
  20. }
  21. /**
  22. * 对 @RocketMQBasedDelay 标注方法进行拦截
  23. * @param sendMessageInterceptor
  24. * @return
  25. */
  26. @Bean
  27. public PointcutAdvisor pointcutAdvisor(@Autowired SendMessageInterceptor sendMessageInterceptor){
  28. return new DefaultPointcutAdvisor(new AnnotationMatchingPointcut(null, RocketMQBasedDelay.class), sendMessageInterceptor);
  29. }
  30. }

5.4 小结

声明式编程,在设计时会有比较明显的门槛,但这种代价换来的是 使用上的便利性。这种一次性投入,多次创造价值的做法,非常推荐应用,大大提升研发效率、降低错误出现概率。

6. 小结

本文,以自动对超时未支付订单执行取消操作为业务场景,先后介绍了

  1. DB 轮询方案;
  2. 基于延时队列和ScheduleExecutorService的单机内存方案;
  3. 基于 Redis 和 RocketMQ 的分布式延时队列方案;

并详细阐述了各个方案优缺点,希望各位伙伴能在实际开发中根据业务场景选择最优解决方案。
最后,对“声明式编程”进行了简单介绍,通过技术手段降低接入成本。