kubernetes源码中,很多地方都用到了wait包,下面对此包做简单介绍和示例,加深理解。
JitterUntil函数
等待管道结束之前周期的做一些事情,wait是个很好的选择。直接看代码怎么实现
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
for {
select {
case <-stopCh:
return
default:
}
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
return
case <-t.C:
sawTimeout = true
}
}
}
sliding确定系统周期调用的时候,计时的起点,如果是TRUE的话,就是f函数执行完成后计时周期。
Until函数
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}
until函数传入ture,也就是在f函数结束后间隔period时间后,继续执行f函数,直到stopCh接收到信号,或被关闭。
sliding为true意味着周期计时器在f完成之后开始计时。
示例1
下面看看使用
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"log"
"time"
)
func main() {
ch := make(chan struct{})
go func() {
log.Println("sleep 1s")
time.Sleep(1 * time.Second)
close(ch)
}()
wait.Until(func() {
time.Sleep(100 * time.Millisecond)
log.Println("test")
}, 100 * time.Millisecond, ch)
log.Println("main exit")
}
先睡眠1s,在这1s内,执行周期是0.1秒,然后函数执行耗时0.1秒,这样在1s内只能执行6次。输出结果如下:
2020/08/11 14:48:52 sleep 1s
2020/08/11 14:48:52 test
2020/08/11 14:48:52 test
2020/08/11 14:48:52 test
2020/08/11 14:48:52 test
2020/08/11 14:48:53 test
2020/08/11 14:48:53 test
2020/08/11 14:48:53 main exit
示例2
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"log"
"strconv"
"time"
)
func main() {
podKillCh := make(chan *student, 50)
go func() {
i := 0
for {
time.Sleep(10 * time.Second)
podKillCh <- &CusPod{
ID: i,
Name: strconv.Itoa(i),
}
i++
}
}()
wait.Until(func() {
for stu := range podKillCh {
log.Printf("%+v\n", stu)
}
}, 1*time.Second, wait.NeverStop)
log.Println("main exit")
}
type CusPod struct {
ID int
Name string
}
会一直输出:
2020/08/11 09:44:10 &{ID:0 Name:0}
2020/08/11 09:44:20 &{ID:1 Name:1}
2020/08/11 09:44:30 &{ID:2 Name:2}
2020/08/11 09:44:40 &{ID:3 Name:3}
2020/08/11 09:44:50 &{ID:4 Name:4}
2020/08/11 09:45:00 &{ID:5 Name:5}
2020/08/11 09:45:10 &{ID:6 Name:6}
2020/08/11 09:45:20 &{ID:7 Name:7}
2020/08/11 09:45:30 &{ID:8 Name:8}
2020/08/11 09:45:40 &{ID:9 Name:9}
2020/08/11 09:45:50 &{ID:10 Name:10}
在func执行期间(遍历podKillCh,空channel会阻塞),不会重新计时再次执行func。
NonSlidingUntil函数
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, false, stopCh)
}
给JitterUntil传入false。NonSlidingUntil函数传入false,sliding为false意味着,周期的计时器在f函数启动前开始计时,直到stopCh接收到信号,或被关闭。
示例1
下面看看使用。
package main
import (
"k8s.io/apimachinery/pkg/util/wait"
"log"
"time"
)
func main() {
ch := make(chan struct{})
go func() {
log.Println("sleep 1s")
time.Sleep(1 * time.Second)
close(ch)
}()
wait.NonSlidingUntil(func() {
time.Sleep(100 * time.Millisecond)
log.Println("test")
}, 100 * time.Millisecond, ch)
log.Println("main exit")
}
输出结果如下:
2020/08/11 14:50:49 sleep 1s
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:49 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 test
2020/08/11 14:50:50 main exit
1s内执行了11次,排除了函数自己执行的时间。
总结
Until是函数运行开始前开始计时,时间周期是会将函数执行时间算进去;NonSlidingUntil是函数运行结束后开始计时,计时周期不会将函数执行时间算进去。也就是在给定时间内,NonSlidingUntil会执行更多次函数。(如果传入的函数内有死循环,则二者都只执行一次)