使用一个queue和两个set(dirty、process)解决并发问题

dirty是代表还需要处理的数据
process是代表正在处理的数据
image.png

Add操作:当一个item要加入到workqueue时

  1. 如果dirty中已经有了,则return
  2. 写入dirty
  3. 如果process中有,则return
  4. 写入queue中

Get操作:当一个item要从workqueue取出时

  1. 从queue中取出
  2. 从dirty中删除
  3. 加入process

Done操作:

  1. 从process删除
  2. 如果dirty中有,则加回到queue中

总结:

正常情况下元素会只会在processing和dirty存在一份,同时存在就说明该元素在被处理的同时又被添加了一次,那么先前的那次可以理解为脏的,后续添加的要再被处理。

通过上述的机制,能保证并发的问题


DelayingQueue

AddAfter

计算出 readyAt 时间,将该时间与 item 一并存入 waitingForAddCh,这个 ch 的大小为 1000,也就是说未达到 1000 时,AddAfter 是不会被阻塞的

waitingLoop

当然为了实现 AddAfter 这个功能,免不了 queue 需要做一些额外的维护事情,最重要的就是 queue 初始化时,开始用协程执行 waitingLoop 方法
这个方法是实现 delaying_queue 功能的核心逻辑

首先会判断这个 item 是否可以加入 queue 了,如果时候还没到,那么将该 item 加入以 readyAt 为排序关键的优先队列中。若时候到了,则加入 queue。处理完第一个 item 之后,会将 waitingForAddCh 中剩余的 item 均按照相同的逻辑处理之

如果加入的 item 已经存在,并且新加入 item 的 readyAt 时间比已经存在的 item 的时间晚,那么不好意思哈,这个 item 会被直接丢弃。只有新加入的 item 的 readyAt 时间比已存在的 item 时间要早,才会更新已存在的 item 的 readyAt 时间,并调整 item 在优先队列中的位置


RateLimitingQueue: DelayingQueue + RateLimiting

通过RateLimiting计算readyAt的时间

所以再说一遍浓缩用法

  • 添加 item 调用 Add 方法
  • 获取 item 调用 Get 方法
  • 处理 item 之后调用 Done 方法
  • 不增加 item 重试次数调用 Forget 方法

再说一遍 rate limit queue 重点,切莫踩坑

  • Add 是异步方法
  • Add 有去重功能
    • 先经过 DelayQueue 去重处理,对于新加入的 item,在其优先队列中依然有相同的 item 时,如果新加入 item 的 readyAt time 较原 item 的 readyAt 时间靠后的话,新加入的 item 会被丢弃
    • 再经过 Queue 去重处理,如果 queue 中有相同 item 则直接被丢弃。若 queue 中没有相同 item,但是 item 处于被处理中,即未被调用 Done 时,会将 item 标记为 dirty,待 item 被调用 Done 时,重新加入 queue
  • 处理 item 结束之后,无论如何调用 Done,标识该 item 已被处理结束
  • 若不需要增加 item 的重试次数,则结束之后调用 Forget 方法,清除该 item 的重试次数统计
  • 如果需要调用 Forget,则先调用 Forget 再调用 Done,确保再次 Add 的时候不受限流影响