设计

之前学习Redis的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列 现在就尝试实现一下!
上文:SpringBoot 整合:RabbitMQ配置延时队列和消息重试

业务流程

首先我们分析下这个流程

  1. 用户提交任务。首先将任务推送至延迟队列中。
  2. 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
  3. 然后生成延迟任务(仅仅包含任务id)放入某个桶中
  4. 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
  5. 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
  6. 如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容
  7. 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
  8. 完成消费后,发送finish消息,服务端根据job id删除对应信息。

Redis延时队列的简单实现 - 图1

对象

我们现在可以了解到中间存在的几个组件

  • 延迟队列,为Redis延迟队列。实现消息传递
  • Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job
  • Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入
  • Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket
  • Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。

其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。

任务状态

  • ready:可执行状态,
  • delay:不可执行状态,等待时钟周期。
  • reserved:已被消费者读取,但没有完成消费。
  • deleted:已被消费完成或者已被删除。

    对外提供的接口

    Redis延时队列的简单实现 - 图2

    额外的内容

  1. 首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
  2. 根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
  3. 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
  4. 文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。

    实现

    现在我们根据设计内容完成设计。这一块设计我们分四步完成

    任务及相关对象

    目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)
    任务对象

    1. @Data
    2. @AllArgsConstructor
    3. @NoArgsConstructor
    4. public class Job implements Serializable {
    5. /**
    6. * 延迟任务的唯一标识,用于检索任务
    7. */
    8. @JsonSerialize(using = ToStringSerializer.class)
    9. private Long id;
    10. /**
    11. * 任务类型(具体业务类型)
    12. */
    13. private String topic;
    14. /**
    15. * 任务的延迟时间
    16. */
    17. private long delayTime;
    18. /**
    19. * 任务的执行超时时间
    20. */
    21. private long ttrTime;
    22. /**
    23. * 任务具体的消息内容,用于处理具体业务逻辑用
    24. */
    25. private String message;
    26. /**
    27. * 重试次数
    28. */
    29. private int retryCount;
    30. /**
    31. * 任务状态
    32. */
    33. private JobStatus status;
    34. }

    任务引用对象

    1. @Data
    2. @AllArgsConstructor
    3. public class DelayJob implements Serializable {
    4. /**
    5. * 延迟任务的唯一标识
    6. */
    7. private long jodId;
    8. /**
    9. * 任务的执行时间
    10. */
    11. private long delayDate;
    12. /**
    13. * 任务类型(具体业务类型)
    14. */
    15. private String topic;
    16. public DelayJob(Job job) {
    17. this.jodId = job.getId();
    18. this.delayDate = System.currentTimeMillis() + job.getDelayTime();
    19. this.topic = job.getTopic();
    20. }
    21. public DelayJob(Object value, Double score) {
    22. this.jodId = Long.parseLong(String.valueOf(value));
    23. this.delayDate = System.currentTimeMillis() + score.longValue();
    24. }
    25. }

    容器

    目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器
    job任务池,为普通的K/V结构,提供基础的操作

    1. @Component
    2. @Slf4j
    3. public class JobPool {
    4. @Autowired
    5. private RedisTemplate redisTemplate;
    6. private String NAME = "job.pool";
    7. private BoundHashOperations getPool () {
    8. BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
    9. return ops;
    10. }
    11. /**
    12. * 添加任务
    13. * @param job
    14. */
    15. public void addJob (Job job) {
    16. log.info("任务池添加任务:{}", JSON.toJSONString(job));
    17. getPool().put(job.getId(),job);
    18. return ;
    19. }
    20. /**
    21. * 获得任务
    22. * @param jobId
    23. * @return
    24. */
    25. public Job getJob(Long jobId) {
    26. Object o = getPool().get(jobId);
    27. if (o instanceof Job) {
    28. return (Job) o;
    29. }
    30. return null;
    31. }
    32. /**
    33. * 移除任务
    34. * @param jobId
    35. */
    36. public void removeDelayJob (Long jobId) {
    37. log.info("任务池移除任务:{}",jobId);
    38. // 移除任务
    39. getPool().delete(jobId);
    40. }
    41. }

    延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

    1. @Slf4j
    2. @Component
    3. public class DelayBucket {
    4. @Autowired
    5. private RedisTemplate redisTemplate;
    6. private static AtomicInteger index = new AtomicInteger(0);
    7. @Value("${thread.size}")
    8. private int bucketsSize;
    9. private List <String> bucketNames = new ArrayList <>();
    10. @Bean
    11. public List <String> createBuckets() {
    12. for (int i = 0; i < bucketsSize; i++) {
    13. bucketNames.add("bucket" + i);
    14. }
    15. return bucketNames;
    16. }
    17. /**
    18. * 获得桶的名称
    19. * @return
    20. */
    21. private String getThisBucketName() {
    22. int thisIndex = index.addAndGet(1);
    23. int i1 = thisIndex % bucketsSize;
    24. return bucketNames.get(i1);
    25. }
    26. /**
    27. * 获得桶集合
    28. * @param bucketName
    29. * @return
    30. */
    31. private BoundZSetOperations getBucket(String bucketName) {
    32. return redisTemplate.boundZSetOps(bucketName);
    33. }
    34. /**
    35. * 放入延时任务
    36. * @param job
    37. */
    38. public void addDelayJob(DelayJob job) {
    39. log.info("添加延迟任务:{}", JSON.toJSONString(job));
    40. String thisBucketName = getThisBucketName();
    41. BoundZSetOperations bucket = getBucket(thisBucketName);
    42. bucket.add(job,job.getDelayDate());
    43. }
    44. /**
    45. * 获得最新的延期任务
    46. * @return
    47. */
    48. public DelayJob getFirstDelayTime(Integer index) {
    49. String name = bucketNames.get(index);
    50. BoundZSetOperations bucket = getBucket(name);
    51. Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
    52. if (CollectionUtils.isEmpty(set)) {
    53. return null;
    54. }
    55. ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
    56. Object value = typedTuple.getValue();
    57. if (value instanceof DelayJob) {
    58. return (DelayJob) value;
    59. }
    60. return null;
    61. }
    62. /**
    63. * 移除延时任务
    64. * @param index
    65. * @param delayJob
    66. */
    67. public void removeDelayTime(Integer index,DelayJob delayJob) {
    68. String name = bucketNames.get(index);
    69. BoundZSetOperations bucket = getBucket(name);
    70. bucket.remove(delayJob);
    71. }
    72. }

    待完成任务,内部使用topic进行细分,每个topic对应一个list集合 ``` @Component @Slf4j public class ReadyQueue { @Autowired private RedisTemplate redisTemplate; private String NAME = “process.queue”; private String getKey(String topic) {

    1. return NAME + topic;

    } /**

    • 获得队列
    • @param topic
    • @return / private BoundListOperations getQueue (String topic) { BoundListOperations ops = redisTemplate.boundListOps(getKey(topic)); return ops; } /*
    • 设置任务
    • @param delayJob / public void pushJob(DelayJob delayJob) { log.info(“执行队列添加任务:{}”,delayJob); BoundListOperations listOperations = getQueue(delayJob.getTopic()); listOperations.leftPush(delayJob); } /*
    • 移除并获得任务
    • @param topic
    • @return */ public DelayJob popJob(String topic) { BoundListOperations listOperations = getQueue(topic); Object o = listOperations.leftPop(); if (o instanceof DelayJob) {
      1. log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
      2. return (DelayJob) o;
      } return null; }

}

  1. <a name="qzEtn"></a>
  2. ### 轮询处理
  3. 设置了线程池为每个bucket设置一个轮询操作

@Component public class DelayTimer implements ApplicationListener { @Autowired private DelayBucket delayBucket; @Autowired private JobPool jobPool; @Autowired private ReadyQueue readyQueue;

@Value("${thread.size}")
private int length;

@Override 
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
    ExecutorService executorService = new ThreadPoolExecutor(
            length, 
            length,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue <Runnable>());
    for (int i = 0; i < length; i++) {
        executorService.execute(
                new DelayJobHandler(
                        delayBucket,
                        jobPool,
                        readyQueue,
                        i));
    }

}

}

<a name="PWriX"></a>
### 测试请求

/**

  • 测试用请求
  • @author daify **/ @RestController @RequestMapping(“delay”) public class DelayController {

    @Autowired private JobService jobService; /**

    • 添加
    • @param request
    • @return / @RequestMapping(value = “add”,method = RequestMethod.POST) public String addDefJob(Job request) { DelayJob delayJob = jobService.addDefJob(request); return JSON.toJSONString(delayJob); } /*
    • 获取
    • @return / @RequestMapping(value = “pop”,method = RequestMethod.GET) public String getProcessJob(String topic) { Job process = jobService.getProcessJob(topic); return JSON.toJSONString(process); } /*
    • 完成一个执行的任务
    • @param jobId
    • @return */ @RequestMapping(value = “finish”,method = RequestMethod.DELETE) public String finishJob(Long jobId) { jobService.finishJob(jobId); return “success”; } @RequestMapping(value = “delete”,method = RequestMethod.DELETE) public String deleteJob(Long jobId) { jobService.deleteJob(jobId); return “success”; }

}

<a name="yKWB2"></a>
## 测试
<a name="s47A1"></a>
## 添加延迟任务
通过postman请求:localhost:8000/delay/add<br />![](https://cdn.nlark.com/yuque/0/2021/png/2687219/1617936615080-f475065f-3ad8-41ed-9df0-f2195171d960.png#align=left&display=inline&height=588&margin=%5Bobject%20Object%5D&originHeight=588&originWidth=1080&size=0&status=done&style=none&width=1080)<br />此时这条延时任务被添加进了线程池中

2019-08-12 21:21:36.589 INFO 21444 —- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{“delayTime”:10000,”id”:3,”message”:”tag:testid:3”,”retryCount”:0,”status”:”DELAY”,”topic”:”test”,”ttrTime”:10000} 2019-08-12 21:21:36.609 INFO 21444 —- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{“delayDate”:1565616106609,”jodId”:3,”topic”:”test”}

根据设置10秒钟之后任务会被添加至ReadyQueue中

2019-08-12 21:21:46.744 INFO 21444 —- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)

<a name="FE5W8"></a>
### 获得任务
这时候我们请求localhost:8000/delay/pop<br />这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

2019-08-09 19:36:02.342 INFO 58456 —- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{“delayDate”:1565321728704,”jodId”:1,”topic”:”测试”} 2019-08-09 19:36:02.364 INFO 58456 —- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{“delayTime”:10000,”id”:1,”message”:”延迟10秒,超时30秒”,”retryCount”:0,”status”:”RESERVED”,”topic”:”测试”,”ttrTime”:30000} 2019-08-09 19:36:02.384 INFO 58456 —- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{“delayDate”:1565321792364,”jodId”:1,”topic”:”测试”}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

2019-08-12 21:21:48.239 INFO 21444 —- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{“delayDate”:1565616106609,”jodId”:3,”topic”:”test”} 2019-08-12 21:21:48.261 INFO 21444 —- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{“delayTime”:10000,”id”:3,”message”:”tag:testid:3”,”retryCount”:0,”status”:”RESERVED”,”topic”:”test”,”ttrTime”:10000}

<a name="WeKyT"></a>
### 任务的删除/消费
现在我们请求:localhost:8000/delay/delete<br />![](https://cdn.nlark.com/yuque/0/2021/png/2687219/1617936614979-325a37e5-cb8d-4ae1-a38d-ce9446fb04ba.png#align=left&display=inline&height=635&margin=%5Bobject%20Object%5D&originHeight=635&originWidth=1080&size=0&status=done&style=none&width=1080)<br />此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

2019-08-12 21:21:54.880 INFO 21444 —- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3 2019-08-12 21:21:59.104 INFO 21444 —- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{“delayDate”:1565616118261,”jodId”:3,”topic”:”test”} ``` 本篇文章涉及的源码下载地址:

https://gitee.com/daifyutils/springboot-samples

推荐好文

强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!Redis延时队列的简单实现 - 图3