kubernetes源码中,很多地方都用到了wait包,下面对此包做简单介绍和示例,加深理解。

JitterUntil函数

等待管道结束之前周期的做一些事情,wait是个很好的选择。直接看代码怎么实现

  1. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
  2. var t *time.Timer
  3. var sawTimeout bool
  4. for {
  5. select {
  6. case <-stopCh:
  7. return
  8. default:
  9. }
  10. jitteredPeriod := period
  11. if jitterFactor > 0.0 {
  12. jitteredPeriod = Jitter(period, jitterFactor)
  13. }
  14. if !sliding {
  15. t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
  16. }
  17. func() {
  18. defer runtime.HandleCrash()
  19. f()
  20. }()
  21. if sliding {
  22. t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
  23. }
  24. // NOTE: b/c there is no priority selection in golang
  25. // it is possible for this to race, meaning we could
  26. // trigger t.C and stopCh, and t.C select falls through.
  27. // In order to mitigate we re-check stopCh at the beginning
  28. // of every loop to prevent extra executions of f().
  29. select {
  30. case <-stopCh:
  31. return
  32. case <-t.C:
  33. sawTimeout = true
  34. }
  35. }
  36. }

sliding确定系统周期调用的时候,计时的起点,如果是TRUE的话,就是f函数执行完成后计时周期。

Until函数

func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
    JitterUntil(f, period, 0.0, true, stopCh)
}

until函数传入ture,也就是在f函数结束后间隔period时间后,继续执行f函数,直到stopCh接收到信号,或被关闭。

sliding为true意味着周期计时器在f完成之后开始计时。

示例1

下面看看使用

package main

import (
    "k8s.io/apimachinery/pkg/util/wait"
    "log"
    "time"
)

func main() {
    ch := make(chan struct{})
    go func() {
        log.Println("sleep 1s")
        time.Sleep(1 * time.Second)
        close(ch)
    }()
    wait.Until(func() {
        time.Sleep(100 * time.Millisecond)
        log.Println("test")
    }, 100 * time.Millisecond, ch)
    log.Println("main exit")
}

先睡眠1s,在这1s内,执行周期是0.1秒,然后函数执行耗时0.1秒,这样在1s内只能执行6次。输出结果如下:

2020/08/11 14:48:52 sleep 1s
2020/08/11 14:48:52 test
2020/08/11 14:48:52 test
2020/08/11 14:48:52 test
2020/08/11 14:48:52 test
2020/08/11 14:48:53 test
2020/08/11 14:48:53 test
2020/08/11 14:48:53 main exit

示例2

package main

import (
    "k8s.io/apimachinery/pkg/util/wait"
    "log"
    "strconv"
    "time"
)

func main() {
    podKillCh := make(chan *student, 50)

    go func() {
        i := 0
        for {
            time.Sleep(10 * time.Second)
            podKillCh <- &CusPod{
                ID: i,
                Name: strconv.Itoa(i),
            }
            i++
        }
    }()

    wait.Until(func() {
        for stu := range podKillCh {
            log.Printf("%+v\n", stu)
        }
    }, 1*time.Second, wait.NeverStop)


    log.Println("main exit")
}

type CusPod struct {
    ID int
    Name string
}

会一直输出:

2020/08/11 09:44:10 &{ID:0 Name:0}
2020/08/11 09:44:20 &{ID:1 Name:1}
2020/08/11 09:44:30 &{ID:2 Name:2}
2020/08/11 09:44:40 &{ID:3 Name:3}
2020/08/11 09:44:50 &{ID:4 Name:4}
2020/08/11 09:45:00 &{ID:5 Name:5}
2020/08/11 09:45:10 &{ID:6 Name:6}
2020/08/11 09:45:20 &{ID:7 Name:7}
2020/08/11 09:45:30 &{ID:8 Name:8}
2020/08/11 09:45:40 &{ID:9 Name:9}
2020/08/11 09:45:50 &{ID:10 Name:10}

在func执行期间(遍历podKillCh,空channel会阻塞),不会重新计时再次执行func。

NonSlidingUntil函数

func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
    JitterUntil(f, period, 0.0, false, stopCh)
}

给JitterUntil传入false。NonSlidingUntil函数传入false,sliding为false意味着,周期的计时器在f函数启动前开始计时,直到stopCh接收到信号,或被关闭。

示例1

下面看看使用。

package main

import (
    "k8s.io/apimachinery/pkg/util/wait"
    "log"
    "time"
)

func main() {
    ch := make(chan struct{})
    go func() {
        log.Println("sleep 1s")
        time.Sleep(1 * time.Second)
        close(ch)
    }()
    wait.NonSlidingUntil(func() {
        time.Sleep(100 * time.Millisecond)
        log.Println("test")
    }, 100 * time.Millisecond, ch)
    log.Println("main exit")
}

输出结果如下:

2020/08/11 14:50:49 sleep 1s
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 main exit

1s内执行了11次,排除了函数自己执行的时间。

总结

Until是函数运行开始前开始计时,时间周期是会将函数执行时间算进去;NonSlidingUntil是函数运行结束后开始计时,计时周期不会将函数执行时间算进去。也就是在给定时间内,NonSlidingUntil会执行更多次函数。(如果传入的函数内有死循环,则二者都只执行一次)

参考

kubernetes源码阅读之工具函数wait使用