使用一个queue和两个set(dirty、process)解决并发问题
dirty是代表还需要处理的数据
process是代表正在处理的数据
Add操作:当一个item要加入到workqueue时
- 如果dirty中已经有了,则return
- 写入dirty
- 如果process中有,则return
- 写入queue中
Get操作:当一个item要从workqueue取出时
- 从queue中取出
- 从dirty中删除
- 加入process
Done操作:
- 从process删除
- 如果dirty中有,则加回到queue中
总结:
正常情况下元素会只会在processing和dirty存在一份,同时存在就说明该元素在被处理的同时又被添加了一次,那么先前的那次可以理解为脏的,后续添加的要再被处理。
通过上述的机制,能保证并发的问题
DelayingQueue
AddAfter
计算出 readyAt 时间,将该时间与 item 一并存入 waitingForAddCh
,这个 ch 的大小为 1000,也就是说未达到 1000 时,AddAfter 是不会被阻塞的
waitingLoop
当然为了实现 AddAfter 这个功能,免不了 queue 需要做一些额外的维护事情,最重要的就是 queue 初始化时,开始用协程执行 waitingLoop
方法
这个方法是实现 delaying_queue
功能的核心逻辑
首先会判断这个 item 是否可以加入 queue 了,如果时候还没到,那么将该 item 加入以 readyAt 为排序关键的优先队列中。若时候到了,则加入 queue。处理完第一个 item 之后,会将 waitingForAddCh
中剩余的 item 均按照相同的逻辑处理之
如果加入的 item 已经存在,并且新加入 item 的 readyAt 时间比已经存在的 item 的时间晚,那么不好意思哈,这个 item 会被直接丢弃。只有新加入的 item 的 readyAt 时间比已存在的 item 时间要早,才会更新已存在的 item 的 readyAt 时间,并调整 item 在优先队列中的位置
RateLimitingQueue: DelayingQueue + RateLimiting
通过RateLimiting计算readyAt的时间
所以再说一遍浓缩用法
- 添加 item 调用 Add 方法
- 获取 item 调用 Get 方法
- 处理 item 之后调用 Done 方法
- 不增加 item 重试次数调用 Forget 方法
再说一遍 rate limit queue 重点,切莫踩坑
- Add 是异步方法
- Add 有去重功能
- 先经过 DelayQueue 去重处理,对于新加入的 item,在其优先队列中依然有相同的 item 时,如果新加入 item 的 readyAt time 较原 item 的 readyAt 时间靠后的话,新加入的 item 会被丢弃
- 再经过 Queue 去重处理,如果 queue 中有相同 item 则直接被丢弃。若 queue 中没有相同 item,但是 item 处于被处理中,即未被调用 Done 时,会将 item 标记为 dirty,待 item 被调用 Done 时,重新加入 queue
- 处理 item 结束之后,无论如何调用 Done,标识该 item 已被处理结束
- 若不需要增加 item 的重试次数,则结束之后调用 Forget 方法,清除该 item 的重试次数统计
- 如果需要调用 Forget,则先调用 Forget 再调用 Done,确保再次 Add 的时候不受限流影响