设计
之前学习Redis的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列 现在就尝试实现一下!
上文:SpringBoot 整合:RabbitMQ配置延时队列和消息重试
业务流程
首先我们分析下这个流程
- 用户提交任务。首先将任务推送至延迟队列中。
- 延迟队列接收到任务后,首先将任务推送至job pool中,然后计算其执行时间。
- 然后生成延迟任务(仅仅包含任务id)放入某个桶中
- 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
- 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
- 如果合法则计算时间,如果时间合法:根据topic将任务放入对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容
- 消费端轮询对应topic的ready queue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
- 完成消费后,发送finish消息,服务端根据job id删除对应信息。
对象
我们现在可以了解到中间存在的几个组件
- 延迟队列,为Redis延迟队列。实现消息传递
- Job pool 任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为job
- Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入
- Timer 时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个Bucket
- Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。
其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。
任务状态
- 首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
- 根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
- 文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。
实现
任务及相关对象
目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delay job)
任务对象@Data@AllArgsConstructor@NoArgsConstructorpublic class Job implements Serializable {/*** 延迟任务的唯一标识,用于检索任务*/@JsonSerialize(using = ToStringSerializer.class)private Long id;/*** 任务类型(具体业务类型)*/private String topic;/*** 任务的延迟时间*/private long delayTime;/*** 任务的执行超时时间*/private long ttrTime;/*** 任务具体的消息内容,用于处理具体业务逻辑用*/private String message;/*** 重试次数*/private int retryCount;/*** 任务状态*/private JobStatus status;}
任务引用对象
@Data@AllArgsConstructorpublic class DelayJob implements Serializable {/*** 延迟任务的唯一标识*/private long jodId;/*** 任务的执行时间*/private long delayDate;/*** 任务类型(具体业务类型)*/private String topic;public DelayJob(Job job) {this.jodId = job.getId();this.delayDate = System.currentTimeMillis() + job.getDelayTime();this.topic = job.getTopic();}public DelayJob(Object value, Double score) {this.jodId = Long.parseLong(String.valueOf(value));this.delayDate = System.currentTimeMillis() + score.longValue();}}
容器
目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器
job任务池,为普通的K/V结构,提供基础的操作@Component@Slf4jpublic class JobPool {@Autowiredprivate RedisTemplate redisTemplate;private String NAME = "job.pool";private BoundHashOperations getPool () {BoundHashOperations ops = redisTemplate.boundHashOps(NAME);return ops;}/*** 添加任务* @param job*/public void addJob (Job job) {log.info("任务池添加任务:{}", JSON.toJSONString(job));getPool().put(job.getId(),job);return ;}/*** 获得任务* @param jobId* @return*/public Job getJob(Long jobId) {Object o = getPool().get(jobId);if (o instanceof Job) {return (Job) o;}return null;}/*** 移除任务* @param jobId*/public void removeDelayJob (Long jobId) {log.info("任务池移除任务:{}",jobId);// 移除任务getPool().delete(jobId);}}
延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作
@Slf4j@Componentpublic class DelayBucket {@Autowiredprivate RedisTemplate redisTemplate;private static AtomicInteger index = new AtomicInteger(0);@Value("${thread.size}")private int bucketsSize;private List <String> bucketNames = new ArrayList <>();@Beanpublic List <String> createBuckets() {for (int i = 0; i < bucketsSize; i++) {bucketNames.add("bucket" + i);}return bucketNames;}/*** 获得桶的名称* @return*/private String getThisBucketName() {int thisIndex = index.addAndGet(1);int i1 = thisIndex % bucketsSize;return bucketNames.get(i1);}/*** 获得桶集合* @param bucketName* @return*/private BoundZSetOperations getBucket(String bucketName) {return redisTemplate.boundZSetOps(bucketName);}/*** 放入延时任务* @param job*/public void addDelayJob(DelayJob job) {log.info("添加延迟任务:{}", JSON.toJSONString(job));String thisBucketName = getThisBucketName();BoundZSetOperations bucket = getBucket(thisBucketName);bucket.add(job,job.getDelayDate());}/*** 获得最新的延期任务* @return*/public DelayJob getFirstDelayTime(Integer index) {String name = bucketNames.get(index);BoundZSetOperations bucket = getBucket(name);Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);if (CollectionUtils.isEmpty(set)) {return null;}ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];Object value = typedTuple.getValue();if (value instanceof DelayJob) {return (DelayJob) value;}return null;}/*** 移除延时任务* @param index* @param delayJob*/public void removeDelayTime(Integer index,DelayJob delayJob) {String name = bucketNames.get(index);BoundZSetOperations bucket = getBucket(name);bucket.remove(delayJob);}}
待完成任务,内部使用topic进行细分,每个topic对应一个list集合 ``` @Component @Slf4j public class ReadyQueue { @Autowired private RedisTemplate redisTemplate; private String NAME = “process.queue”; private String getKey(String topic) {
return NAME + topic;
} /**
- 获得队列
- @param topic
- @return / private BoundListOperations getQueue (String topic) { BoundListOperations ops = redisTemplate.boundListOps(getKey(topic)); return ops; } /*
- 设置任务
- @param delayJob / public void pushJob(DelayJob delayJob) { log.info(“执行队列添加任务:{}”,delayJob); BoundListOperations listOperations = getQueue(delayJob.getTopic()); listOperations.leftPush(delayJob); } /*
- 移除并获得任务
- @param topic
- @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
} return null; }log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));return (DelayJob) o;
}
<a name="qzEtn"></a>### 轮询处理设置了线程池为每个bucket设置一个轮询操作
@Component
public class DelayTimer implements ApplicationListener
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
<a name="PWriX"></a>
### 测试请求
/**
- 测试用请求
@author daify **/ @RestController @RequestMapping(“delay”) public class DelayController {
@Autowired private JobService jobService; /**
- 添加
- @param request
- @return / @RequestMapping(value = “add”,method = RequestMethod.POST) public String addDefJob(Job request) { DelayJob delayJob = jobService.addDefJob(request); return JSON.toJSONString(delayJob); } /*
- 获取
- @return / @RequestMapping(value = “pop”,method = RequestMethod.GET) public String getProcessJob(String topic) { Job process = jobService.getProcessJob(topic); return JSON.toJSONString(process); } /*
- 完成一个执行的任务
- @param jobId
- @return */ @RequestMapping(value = “finish”,method = RequestMethod.DELETE) public String finishJob(Long jobId) { jobService.finishJob(jobId); return “success”; } @RequestMapping(value = “delete”,method = RequestMethod.DELETE) public String deleteJob(Long jobId) { jobService.deleteJob(jobId); return “success”; }
}
<a name="yKWB2"></a>
## 测试
<a name="s47A1"></a>
## 添加延迟任务
通过postman请求:localhost:8000/delay/add<br /><br />此时这条延时任务被添加进了线程池中
2019-08-12 21:21:36.589 INFO 21444 —- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{“delayTime”:10000,”id”:3,”message”:”tag:testid:3”,”retryCount”:0,”status”:”DELAY”,”topic”:”test”,”ttrTime”:10000} 2019-08-12 21:21:36.609 INFO 21444 —- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{“delayDate”:1565616106609,”jodId”:3,”topic”:”test”}
根据设置10秒钟之后任务会被添加至ReadyQueue中
2019-08-12 21:21:46.744 INFO 21444 —- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
<a name="FE5W8"></a>
### 获得任务
这时候我们请求localhost:8000/delay/pop<br />这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中
2019-08-09 19:36:02.342 INFO 58456 —- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{“delayDate”:1565321728704,”jodId”:1,”topic”:”测试”} 2019-08-09 19:36:02.364 INFO 58456 —- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{“delayTime”:10000,”id”:1,”message”:”延迟10秒,超时30秒”,”retryCount”:0,”status”:”RESERVED”,”topic”:”测试”,”ttrTime”:30000} 2019-08-09 19:36:02.384 INFO 58456 —- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{“delayDate”:1565321792364,”jodId”:1,”topic”:”测试”}
按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中
2019-08-12 21:21:48.239 INFO 21444 —- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{“delayDate”:1565616106609,”jodId”:3,”topic”:”test”} 2019-08-12 21:21:48.261 INFO 21444 —- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{“delayTime”:10000,”id”:3,”message”:”tag:testid:3”,”retryCount”:0,”status”:”RESERVED”,”topic”:”test”,”ttrTime”:10000}
<a name="WeKyT"></a>
### 任务的删除/消费
现在我们请求:localhost:8000/delay/delete<br /><br />此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。
2019-08-12 21:21:54.880 INFO 21444 —- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3 2019-08-12 21:21:59.104 INFO 21444 —- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{“delayDate”:1565616118261,”jodId”:3,”topic”:”test”} ``` 本篇文章涉及的源码下载地址:
推荐好文
强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!
