WorkQueue称为工作队列,Kubernetes的WorkQueue队列与普通FIFO(先进先出,First-In,First-Out)队列相比,实现略显复杂,它的主要功能在于标记和去重,并支持如下特性。
● 有序:按照添加顺序处理元素(item)。
● 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
● 并发性:多生产者和多消费者。
● 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
● 通知机制:ShutDown方法通过信号量通知队列不再接收新的元素,并通知metric goroutine退出。
● 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
● 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
● Metric:支持metric监控指标,可用于Prometheus监控。
WorkQueue支持3种队列,并提供了3种接口,不同队列实现可应对不同的使用场景,分别介绍如下。
● Interface:FIFO队列接口,先进先出队列,并支持去重机制。
● DelayingInterface:延迟队列接口,基于Interface接口封装,延迟一段时间后再将元素存入队列。
● RateLimitingInterface:限速队列接口,基于DelayingInterface接口封装,支持元素存入队列时进行速率限制。
Interface
Interface是FIFO队列,是最基础的队列,限速和延迟队列都是基于它来实现的。其提供一下方法(源码:staging\src\k8s.io\client-go\util\workqueue\queue.go)
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
其中:
- Add():向队列中添加元素
- Len():统计队列的长度
- Get():从队列中取第一个元素
- Done():标记已被处理的元素
- ShutDown():关闭队列
- ShuttingDown():查询队列是否关闭
FIFO的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\queue.go):
// Type is a work queue (see the package comment).
type Type struct {
queue []t
dirty set
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{}
type set map[t]empty
其中最主要的字段是queue、dirty、processing。它们的作用分别是:
- queue:存放数据的队列
- dirty:除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次
- processing:标记一个元素是否正在被处理
从上面可以看到queue是slice结构,可以存放任何数据。
其主要流程如下:
(1)使用Add()方法将元素添加进队列,源码如下:
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
首先判断这个队列是否存在,其次判断元素是否在dirty中,如果在就不添加了,如果不在就先加入dirty中,然后再判断在processing中是否有相同元素在进行处理,如果有就不添加进queue,如果没有再添加进queue。
(2)使用Get()方法从队列中取第一个元素,源码如下:
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
首先判断队列中是否有元素并且队列是存在的,其次从queue中取第一个元素并更新queue,然后将取出来的元素插入processing中,最后再从dirty中删除这个元素。
(3)使用Done()方法移除已经被处理的元素,源码如下:
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
如果该元素被处理完了,直接从processing中删除,不过这里还有一个动作,就是我们第一步Add中提到了,如果一个新加入的元素是正在processing中处理的元素会被暂放到dirty中而不直接放如queue,那么在这里如果这个元素被处理完了,并且dirty中仍有这个元素,则将这个元素加入到queue中。
(4)使用ShutDown()方法关闭队列,源码如下:
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}
关闭很简单,就是将这个队列标记一下。
DelayingInterface
DelayingInterface就是延迟队列,主要基于Interface进行了封装,在其基础上新增了延迟处理方法AddAfter(),定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go):
type DelayingInterface interface {
Interface
AddAfter(item interface{}, duration time.Duration)
}
AddAfter包含了两个参数,一个是待加入的元素,一个时间,这个时间就是延迟时间。其方法实现的代码如下:
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
首先判断队列是否存在,然后判断duration时间是否小于等于0,如果是则直接加入队列,如果不是这将其发送给waitingForAddCh。
其数据结构定义如下:
type delayingType struct {
Interface
clock clock.Clock
stopCh chan struct{}
stopOnce sync.Once
heartbeat clock.Ticker
waitingForAddCh chan *waitFor
metrics retryMetrics
}
其中最主要的字段是waitingForAddCh,其默认值为1000,初始化代码如下:
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
return ret
}
这个默认值1000代表的是只有当插入的元素大于1000的时候才阻塞。waitingForAddCh字段中的数据通过goroutine运行的waitingLoop函数持久运行,在初始化的时候可以看到其运行它的程序go ret.waitingLoop()
。waitingLoop()的代码如下:
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// continue the loop, which will add ready items
case <-nextReadyAt:
// continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
使用一个for循环来处理延迟数据,从队列中取出数据以及时间参数,当元素的延迟时间不大于当前时间时,说明还需要延迟将元素插入FIFO队列的时间,此时将该元素放入优先队列(waitForPriorityQueue)中。当元素的延迟时间大于当前时间时,则将该元素插入FIFO队列中。另外,还会遍历优先队列(waitForPriorityQueue)中的元素,按照上述逻辑验证时间。
RateLimitingInterface
RateLimitingInterface是限速队列,基于延迟队列和FIFO队列接口封装,在原有功能上增加了AddRateLimited、Forget、NumRequeues方法。限速队列的重点不在于RateLimitingInterface接口,而在于它提供的4种限速算法接口(RateLimiter)。其原理是,限速队列利用延迟队列的特性,延迟某个元素的插入时间,达到限速目的。
RateLimitingInterface的方法接口如下(源码:staging\src\k8s.io\client-go\util\workqueue\rate_limiting_queue.go):
type RateLimitingInterface interface {
DelayingInterface
AddRateLimited(item interface{})
Forget(item interface{})
NumRequeues(item interface{}) int
}
数据结构定义如下:
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
rateLimitingType就定义了两个,一个是继承了延迟队列接口,另一个是RateLimiter具体的限速算法。
AddRateLimited、Forget、NumRequeues方法的实现代码如下:
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
AddRateLimited是通过DelayingInterface.AddAfter来实现的,NumRequeues和Forget都是调用了rateLimiter的方法。
RateLimiter接口的定义如下:
type RateLimiter interface {
When(item interface{}) time.Duration
Forget(item interface{})
NumRequeues(item interface{}) int
}
其中:
- When:获取元素的等待时间
- Forget:释放指定元素
- NumRequeues:获取指定元素的排队数
注意:这里有一个非常重要的概念——限速周期,一个限速周期是指从执行AddRateLimited方法到执行完Forget方法之间的时间。如果该元素被Forget方法处理完,则清空排队数。
RateLimiter中有四种限速算法,分别是:
- 令牌桶算法(BucketRateLimiter)。
- 排队指数算法(ItemExponentialFailureRateLimiter)。
- 计数器算法(ItemFastSlowRateLimiter)。
- 混合模式(MaxOfRateLimiter),将多种限速算法混合使用。
令牌桶算法(BucketRateLimiter)
令牌桶算法内部实现了一个存放token(令牌)的“桶”,初始时“桶”是空的,token会以固定速率往“桶”里填充,直到将其填满为止,多余的token会被丢弃。每个元素都会从令牌桶得到一个token,只有得到token的元素才允许通过(accept),而没有得到token的元素处于等待状态。令牌桶算法通过控制发放token来达到限速目的。
在初始化的时候会给令牌桶设置一个默认值,代码如下:
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
通过rate.NewLimiter(rate.Limit(10), 100)
来实例化令牌桶的,实例化的时候传递了两个参数,NewLimiter的代码如下:
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
其中:
- r表示qps。每秒往桶里放token的数量
- b表示令牌桶的大小,也就是总共可以存放多少token
在工作中会通过r.Limiter.Reserve().Delay()
来返回指定元素的等待时间。假设在一个限速周期内插入了1000个元素,通过r.Limiter.Reserve().Delay函数返回指定元素应该等待的时间,那么前b(即100)个元素会被立刻处理,而后面元素的延迟时间分别为item100/100ms、item101/200ms、item102/300ms、item103/400ms,以此类推。
排队指数算法(ItemExponentialFailureRateLimiter)
排队指数算法是将相同元素得排队数,相同元素越多,排队数越大,相应的速率也会增长,但是不会超过maxDelay。
ItemExponentialFailureRateLimiter的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\default_rate_limiters.go):
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
其中:
- failures:用于统计排队数
- baseDelay:最初的限速单位
- maxDelay:最大的限速单位
限速队列利用延迟队列的特性,延迟多个相同元素的插入时间,达到限速目的。
在同一限速周期内,如果不存在相同元素,那么所有元素的延迟时间为baseDelay;而在同一限速周期内,如果存在相同元素,那么相同元素的延迟时间呈指数级增长,最长延迟时间不超过maxDelay。
核心实现代码块如下:
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
每当通过AddAfter添加一个元素,排队数failures通过r.failures[item] = r.failures[item] + 1
进行加1操作,然后计算延迟时间,不得超出maxDelay。
我们假定baseDelay是1time.Millisecond,maxDelay是1000time.Second。假设在一个限速周期内通过AddRateLimited方法插入10个相同元素,那么第1个元素会通过延迟队列的AddAfter方法插入并设置延迟时间为1ms(即baseDelay),第2个相同元素的延迟时间为2ms,第3个相同元素的延迟时间为4ms,第4个相同元素的延迟时间为8ms,第5个相同元素的延迟时间为16ms……第10个相同元素的延迟时间为512ms,最长延迟时间不超过1000s(即maxDelay)。
计数器算法(ItemFastSlowRateLimiter)
计数器算法限制一段时间内允许通过的元素数量,例如在1分钟内只允许通过100个元素,每插入一个元素,计数器自增1,当计数器数到100的阈值且还在限速周期内时,则不允许元素再通过。但WorkQueue在此基础上扩展了fast和slow速率。也就是超出了指定的元素数量不是丢弃,而是指定slow速率。
ItemFastSlowRateLimiter的数据结构定义如下(源码:staging\src\k8s.io\client-go\util\workqueue\default_rate_limiters.go):
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
其中:
- failures:用于统计排队数,每新增一个,则加1
- maxFastAttempts:控制速率是fast的数量,超出该数速率则变为slow
- fastDelay:fast的速率
- slowDelay:slow的速率
核心代码如下:
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
就是新进一个元素,排队数加1,然后排队数和maxFastAttempts进行对比,如果小于则返回fastDelay,如果大于则返回slowDelay。
假设fastDelay是5time.Millisecond,slowDelay是10time.Second,maxFastAttempts是3。在一个限速周期内通过AddRateLimited方法插入4个相同的元素,那么前3个元素使用fastDelay定义的fast速率,当触发maxFastAttempts字段时,第4个元素使用slowDelay定义的slow速率。
混合模式(MaxOfRateLimiter)
混合模式是将多种限速算法混合使用,即多种限速算法同时生效。例如,同时使用排队指数算法和令牌桶算法。比如默认就同时时候排队算法和令牌桶算法,如下:
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
其数据结构定义如下:
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
是一个RateLimiter的slice类型,可以存放多种算法。然后在获取元素延迟时间的时候也是循环这个slice进行获取,如下:
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}