WorkQueue称为工作队列,Kubernetes的WorkQueue队列与普通FIFO(先进先出,First-In,First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。
● 有序:按照添加顺序处理元素(item)。
● 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
● 并发性:多生产者和多消费者。
● 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
● 通知机制:ShutDown方法通过信号量通知队列不再接收新的元素,并通知metric goroutine退出。
● 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
● 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
● Metric:支持metric监控指标,可用于Prometheus监控。

WorkQueue支持3种队列,并提供了3种接口,不同队列实现可应对不同的使用场景,分别介绍如下。
● Interface:FIFO队列接口,先进先出队列,并支持去重机制。
● DelayingInterface:延迟队列接口,基于Interface接口封装,延迟一段时间后再将元素存入队列。
● RateLimitingInterface:限速队列接口,基于DelayingInterface接口封装,支持元素存入队列时进行速率限制。

Interface

Interface是FIFO队列,是最基础的队列,限速和延迟队列都是基于它来实现的。其提供一下方法(源码:staging\src\k8s.io\client-go\util\workqueue\queue.go)

  1. type Interface interface {
  2. Add(item interface{})
  3. Len() int
  4. Get() (item interface{}, shutdown bool)
  5. Done(item interface{})
  6. ShutDown()
  7. ShuttingDown() bool
  8. }

其中:

  • Add():向队列中添加元素
  • Len():统计队列的长度
  • Get():从队列中取第一个元素
  • Done():标记已被处理的元素
  • ShutDown():关闭队列
  • ShuttingDown():查询队列是否关闭

FIFO的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\queue.go):

  1. // Type is a work queue (see the package comment).
  2. type Type struct {
  3. queue []t
  4. dirty set
  5. processing set
  6. cond *sync.Cond
  7. shuttingDown bool
  8. metrics queueMetrics
  9. unfinishedWorkUpdatePeriod time.Duration
  10. clock clock.Clock
  11. }
  12. type empty struct{}
  13. type t interface{}
  14. type set map[t]empty

其中最主要的字段是queue、dirty、processing。它们的作用分别是:

  • queue:存放数据的队列
  • dirty:除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次
  • processing:标记一个元素是否正在被处理

从上面可以看到queue是slice结构,可以存放任何数据。
其主要流程如下:
(1)使用Add()方法将元素添加进队列,源码如下:

  1. func (q *Type) Add(item interface{}) {
  2. q.cond.L.Lock()
  3. defer q.cond.L.Unlock()
  4. if q.shuttingDown {
  5. return
  6. }
  7. if q.dirty.has(item) {
  8. return
  9. }
  10. q.metrics.add(item)
  11. q.dirty.insert(item)
  12. if q.processing.has(item) {
  13. return
  14. }
  15. q.queue = append(q.queue, item)
  16. q.cond.Signal()
  17. }

首先判断这个队列是否存在,其次判断元素是否在dirty中,如果在就不添加了,如果不在就先加入dirty中,然后再判断在processing中是否有相同元素在进行处理,如果有就不添加进queue,如果没有再添加进queue。

(2)使用Get()方法从队列中取第一个元素,源码如下:

  1. func (q *Type) Get() (item interface{}, shutdown bool) {
  2. q.cond.L.Lock()
  3. defer q.cond.L.Unlock()
  4. for len(q.queue) == 0 && !q.shuttingDown {
  5. q.cond.Wait()
  6. }
  7. if len(q.queue) == 0 {
  8. // We must be shutting down.
  9. return nil, true
  10. }
  11. item, q.queue = q.queue[0], q.queue[1:]
  12. q.metrics.get(item)
  13. q.processing.insert(item)
  14. q.dirty.delete(item)
  15. return item, false
  16. }

首先判断队列中是否有元素并且队列是存在的,其次从queue中取第一个元素并更新queue,然后将取出来的元素插入processing中,最后再从dirty中删除这个元素。

(3)使用Done()方法移除已经被处理的元素,源码如下:

  1. func (q *Type) Done(item interface{}) {
  2. q.cond.L.Lock()
  3. defer q.cond.L.Unlock()
  4. q.metrics.done(item)
  5. q.processing.delete(item)
  6. if q.dirty.has(item) {
  7. q.queue = append(q.queue, item)
  8. q.cond.Signal()
  9. }
  10. }

如果该元素被处理完了,直接从processing中删除,不过这里还有一个动作,就是我们第一步Add中提到了,如果一个新加入的元素是正在processing中处理的元素会被暂放到dirty中而不直接放如queue,那么在这里如果这个元素被处理完了,并且dirty中仍有这个元素,则将这个元素加入到queue中。

(4)使用ShutDown()方法关闭队列,源码如下:

  1. func (q *Type) ShutDown() {
  2. q.cond.L.Lock()
  3. defer q.cond.L.Unlock()
  4. q.shuttingDown = true
  5. q.cond.Broadcast()
  6. }

关闭很简单,就是将这个队列标记一下。

DelayingInterface

DelayingInterface就是延迟队列,主要基于Interface进行了封装,在其基础上新增了延迟处理方法AddAfter(),定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go):

  1. type DelayingInterface interface {
  2. Interface
  3. AddAfter(item interface{}, duration time.Duration)
  4. }

AddAfter包含了两个参数,一个是待加入的元素,一个时间,这个时间就是延迟时间。其方法实现的代码如下:

  1. func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
  2. // don't add if we're already shutting down
  3. if q.ShuttingDown() {
  4. return
  5. }
  6. q.metrics.retry()
  7. // immediately add things with no delay
  8. if duration <= 0 {
  9. q.Add(item)
  10. return
  11. }
  12. select {
  13. case <-q.stopCh:
  14. // unblock if ShutDown() is called
  15. case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
  16. }
  17. }

首先判断队列是否存在,然后判断duration时间是否小于等于0,如果是则直接加入队列,如果不是这将其发送给waitingForAddCh。

其数据结构定义如下:

  1. type delayingType struct {
  2. Interface
  3. clock clock.Clock
  4. stopCh chan struct{}
  5. stopOnce sync.Once
  6. heartbeat clock.Ticker
  7. waitingForAddCh chan *waitFor
  8. metrics retryMetrics
  9. }

其中最主要的字段是waitingForAddCh,其默认值为1000,初始化代码如下:

  1. func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
  2. ret := &delayingType{
  3. Interface: q,
  4. clock: clock,
  5. heartbeat: clock.NewTicker(maxWait),
  6. stopCh: make(chan struct{}),
  7. waitingForAddCh: make(chan *waitFor, 1000),
  8. metrics: newRetryMetrics(name),
  9. }
  10. go ret.waitingLoop()
  11. return ret
  12. }

这个默认值1000代表的是只有当插入的元素大于1000的时候才阻塞。waitingForAddCh字段中的数据通过goroutine运行的waitingLoop函数持久运行,在初始化的时候可以看到其运行它的程序go ret.waitingLoop()。waitingLoop()的代码如下:

  1. func (q *delayingType) waitingLoop() {
  2. defer utilruntime.HandleCrash()
  3. // Make a placeholder channel to use when there are no items in our list
  4. never := make(<-chan time.Time)
  5. // Make a timer that expires when the item at the head of the waiting queue is ready
  6. var nextReadyAtTimer clock.Timer
  7. waitingForQueue := &waitForPriorityQueue{}
  8. heap.Init(waitingForQueue)
  9. waitingEntryByData := map[t]*waitFor{}
  10. for {
  11. if q.Interface.ShuttingDown() {
  12. return
  13. }
  14. now := q.clock.Now()
  15. // Add ready entries
  16. for waitingForQueue.Len() > 0 {
  17. entry := waitingForQueue.Peek().(*waitFor)
  18. if entry.readyAt.After(now) {
  19. break
  20. }
  21. entry = heap.Pop(waitingForQueue).(*waitFor)
  22. q.Add(entry.data)
  23. delete(waitingEntryByData, entry.data)
  24. }
  25. // Set up a wait for the first item's readyAt (if one exists)
  26. nextReadyAt := never
  27. if waitingForQueue.Len() > 0 {
  28. if nextReadyAtTimer != nil {
  29. nextReadyAtTimer.Stop()
  30. }
  31. entry := waitingForQueue.Peek().(*waitFor)
  32. nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
  33. nextReadyAt = nextReadyAtTimer.C()
  34. }
  35. select {
  36. case <-q.stopCh:
  37. return
  38. case <-q.heartbeat.C():
  39. // continue the loop, which will add ready items
  40. case <-nextReadyAt:
  41. // continue the loop, which will add ready items
  42. case waitEntry := <-q.waitingForAddCh:
  43. if waitEntry.readyAt.After(q.clock.Now()) {
  44. insert(waitingForQueue, waitingEntryByData, waitEntry)
  45. } else {
  46. q.Add(waitEntry.data)
  47. }
  48. drained := false
  49. for !drained {
  50. select {
  51. case waitEntry := <-q.waitingForAddCh:
  52. if waitEntry.readyAt.After(q.clock.Now()) {
  53. insert(waitingForQueue, waitingEntryByData, waitEntry)
  54. } else {
  55. q.Add(waitEntry.data)
  56. }
  57. default:
  58. drained = true
  59. }
  60. }
  61. }
  62. }
  63. }

使用一个for循环来处理延迟数据,从队列中取出数据以及时间参数,当元素的延迟时间不大于当前时间时,说明还需要延迟将元素插入FIFO队列的时间,此时将该元素放入优先队列(waitForPriorityQueue)中。当元素的延迟时间大于当前时间时,则将该元素插入FIFO队列中。另外,还会遍历优先队列(waitForPriorityQueue)中的元素,按照上述逻辑验证时间。

RateLimitingInterface

RateLimitingInterface是限速队列,基于延迟队列和FIFO队列接口封装,在原有功能上增加了AddRateLimited、Forget、NumRequeues方法。限速队列的重点不在于RateLimitingInterface接口,而在于它提供的4种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。

RateLimitingInterface的方法接口如下(源码:staging\src\k8s.io\client-go\util\workqueue\rate_limiting_queue.go):

  1. type RateLimitingInterface interface {
  2. DelayingInterface
  3. AddRateLimited(item interface{})
  4. Forget(item interface{})
  5. NumRequeues(item interface{}) int
  6. }

数据结构定义如下:

  1. type rateLimitingType struct {
  2. DelayingInterface
  3. rateLimiter RateLimiter
  4. }

rateLimitingType就定义了两个,一个是继承了延迟队列接口,另一个是RateLimiter具体的限速算法。

AddRateLimited、Forget、NumRequeues方法的实现代码如下:

  1. func (q *rateLimitingType) AddRateLimited(item interface{}) {
  2. q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
  3. }
  4. func (q *rateLimitingType) NumRequeues(item interface{}) int {
  5. return q.rateLimiter.NumRequeues(item)
  6. }
  7. func (q *rateLimitingType) Forget(item interface{}) {
  8. q.rateLimiter.Forget(item)
  9. }

AddRateLimited是通过DelayingInterface.AddAfter来实现的,NumRequeues和Forget都是调用了rateLimiter的方法。

RateLimiter接口的定义如下:

  1. type RateLimiter interface {
  2. When(item interface{}) time.Duration
  3. Forget(item interface{})
  4. NumRequeues(item interface{}) int
  5. }

其中:

  • When:获取元素的等待时间
  • Forget:释放指定元素
  • NumRequeues:获取指定元素的排队数

注意:这里有一个非常重要的概念——限速周期,一个限速周期是指从执行AddRateLimited方法到执行完Forget方法之间的时间。如果该元素被Forget方法处理完,则清空排队数。

RateLimiter中有四种限速算法,分别是:

  • 令牌桶算法(BucketRateLimiter)。
  • 排队指数算法(ItemExponentialFailureRateLimiter)。
  • 计数器算法(ItemFastSlowRateLimiter)。
  • 混合模式(MaxOfRateLimiter),将多种限速算法混合使用。

令牌桶算法(BucketRateLimiter)

令牌桶算法内部实现了一个存放token(令牌)的“桶”,初始时“桶”是空的,token会以固定速率往“桶”里填充,直到将其填满为止,多余的token会被丢弃。每个元素都会从令牌桶得到一个token,只有得到token的元素才允许通过(accept),而没有得到token的元素处于等待状态。令牌桶算法通过控制发放token来达到限速目的。

在初始化的时候会给令牌桶设置一个默认值,代码如下:

  1. func DefaultControllerRateLimiter() RateLimiter {
  2. return NewMaxOfRateLimiter(
  3. NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
  4. // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
  5. &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  6. )
  7. }

通过rate.NewLimiter(rate.Limit(10), 100)来实例化令牌桶的,实例化的时候传递了两个参数,NewLimiter的代码如下:

  1. func NewLimiter(r Limit, b int) *Limiter {
  2. return &Limiter{
  3. limit: r,
  4. burst: b,
  5. }
  6. }

其中:

  • r表示qps。每秒往桶里放token的数量
  • b表示令牌桶的大小,也就是总共可以存放多少token

在工作中会通过r.Limiter.Reserve().Delay()来返回指定元素的等待时间。假设在一个限速周期内插入了1000个元素,通过r.Limiter.Reserve().Delay函数返回指定元素应该等待的时间,那么前b(即100)个元素会被立刻处理,而后面元素的延迟时间分别为item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。

排队指数算法(ItemExponentialFailureRateLimiter)

排队指数算法是将相同元素得排队数,相同元素越多,排队数越大,相应的速率也会增长,但是不会超过maxDelay。

ItemExponentialFailureRateLimiter的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\default_rate_limiters.go):

  1. type ItemExponentialFailureRateLimiter struct {
  2. failuresLock sync.Mutex
  3. failures map[interface{}]int
  4. baseDelay time.Duration
  5. maxDelay time.Duration
  6. }

其中:

  • failures:用于统计排队数
  • baseDelay:最初的限速单位
  • maxDelay:最大的限速单位

限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。

在同一限速周期内,如果不存在相同元素,那么所有元素的延迟时间为baseDelay;而在同一限速周期内,如果存在相同元素,那么相同元素的延迟时间呈指数级增长,最长延迟时间不超过maxDelay。

核心实现代码块如下:

  1. func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
  2. r.failuresLock.Lock()
  3. defer r.failuresLock.Unlock()
  4. exp := r.failures[item]
  5. r.failures[item] = r.failures[item] + 1
  6. // The backoff is capped such that 'calculated' value never overflows.
  7. backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
  8. if backoff > math.MaxInt64 {
  9. return r.maxDelay
  10. }
  11. calculated := time.Duration(backoff)
  12. if calculated > r.maxDelay {
  13. return r.maxDelay
  14. }
  15. return calculated
  16. }

每当通过AddAfter添加一个元素,排队数failures通过r.failures[item] = r.failures[item] + 1进行加1操作,然后计算延迟时间,不得超出maxDelay。

我们假定baseDelay是1time.Millisecond,maxDelay是1000time.Second。假设在一个限速周期内通过AddRateLimited方法插入10个相同元素,那么第1个元素会通过延迟队列的AddAfter方法插入并设置延迟时间为1ms(即baseDelay),第2个相同元素的延迟时间为2ms,第3个相同元素的延迟时间为4ms,第4个相同元素的延迟时间为8ms,第5个相同元素的延迟时间为16ms……第10个相同元素的延迟时间为512ms,最长延迟时间不超过1000s(即maxDelay)。

计数器算法(ItemFastSlowRateLimiter)

计数器算法限制一段时间内允许通过的元素数量,例如在1分钟内只允许通过100个元素,每插入一个元素,计数器自增1,当计数器数到100的阈值且还在限速周期内时,则不允许元素再通过。但WorkQueue在此基础上扩展了fast和slow速率。也就是超出了指定的元素数量不是丢弃,而是指定slow速率。

ItemFastSlowRateLimiter的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\default_rate_limiters.go):

  1. type ItemFastSlowRateLimiter struct {
  2. failuresLock sync.Mutex
  3. failures map[interface{}]int
  4. maxFastAttempts int
  5. fastDelay time.Duration
  6. slowDelay time.Duration
  7. }

其中:

  • failures:用于统计排队数,每新增一个,则加1
  • maxFastAttempts:控制速率是fast的数量,超出该数速率则变为slow
  • fastDelay:fast的速率
  • slowDelay:slow的速率

核心代码如下:

  1. func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
  2. r.failuresLock.Lock()
  3. defer r.failuresLock.Unlock()
  4. r.failures[item] = r.failures[item] + 1
  5. if r.failures[item] <= r.maxFastAttempts {
  6. return r.fastDelay
  7. }
  8. return r.slowDelay
  9. }

就是新进一个元素,排队数加1,然后排队数和maxFastAttempts进行对比,如果小于则返回fastDelay,如果大于则返回slowDelay。

假设fastDelay是5time.Millisecond,slowDelay是10time.Second,maxFastAttempts是3。在一个限速周期内通过AddRateLimited方法插入4个相同的元素,那么前3个元素使用fastDelay定义的fast速率,当触发maxFastAttempts字段时,第4个元素使用slowDelay定义的slow速率。

混合模式(MaxOfRateLimiter)

混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法。比如默认就同时时候排队算法和令牌桶算法,如下:

  1. func DefaultControllerRateLimiter() RateLimiter {
  2. return NewMaxOfRateLimiter(
  3. NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
  4. // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
  5. &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  6. )
  7. }

其数据结构定义如下:

  1. type MaxOfRateLimiter struct {
  2. limiters []RateLimiter
  3. }

是一个RateLimiter的slice类型,可以存放多种算法。然后在获取元素延迟时间的时候也是循环这个slice进行获取,如下:

  1. func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
  2. ret := time.Duration(0)
  3. for _, limiter := range r.limiters {
  4. curr := limiter.When(item)
  5. if curr > ret {
  6. ret = curr
  7. }
  8. }
  9. return ret
  10. }