Config
contorller类的参数类,controller类的New方法使用一个Config类作为参数,Config中Queue,ListerWatcher,ObjectType,FullResyncPeriod用于在controller的run方法中作为构建Reflector的参数
类
// Config contains all the settings for one of these low-level controllers.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
//存储数据的主要数据结构,一般为DeltaFIFO
Queue
// Something that can list and watch your objects.
//用于从api-server中获取资源变化
ListerWatcher
// Something that can process a popped Deltas.
//处理从Queue中pop出来的Deltas对象
Process ProcessFunc
// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
//controller需要监控的资源类型
ObjectType runtime.Object
// FullResyncPeriod is the period at which ShouldResync is considered.
//调用ShouldResync方法的时间间隔
FullResyncPeriod time.Duration
// ShouldResync is periodically used by the reflector to determine
// whether to Resync the Queue. If ShouldResync is `nil` or
// returns true, it means the reflector should proceed with the
// resync.
//reflector所使用的调用同步方法的时间间隔
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
//调用Process()方法返回err后是否重新把obj插入Queue中的tag,若为true则重新插入,否则不插入
RetryOnError bool
}
Controller
类与接口
Controller接口
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
// Run方法主要做了2件事
// 1, 构造一个Reflector,从api-server中获取资源的event,放入store中,并定时同步
// 2, 从store中取出objs,调用ProcessFunc方法处理obj
Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
// 使用Config.Queue的HasSynced
HasSynced() bool
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
// 使用Reflector的lastSyncResourceVersion
LastSyncResourceVersion() string
}
controller类
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
主要方法
Run
概述
- 构造一个Reflector,执行reflector.run方法用以从api-server中获取数据并放入config.Queue中
- 每秒调用一次processLoop,用以把数据取出处理
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//stopChan有值时关闭Queue
go func() {
<-stopCh
c.config.Queue.Close()
}()
//用config的参数构造Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
//开启线程执行Reflector.Run
wg.StartWithChannel(stopCh, r.Run)
//每秒调用一次processLoop
wait.Until(c.processLoop, time.Second, stopCh)
}
processLoop
循环用config.Process处理store中pop出的Delta
func (c *controller) processLoop() {
for {
// 调用Queue的pop方法弹出Delta,用c.config.Process处理弹出的Delta
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
// 若为RetryOnError类型,则从新放入Queue中
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
HasSynced
调用DeltaFIFO的HasSynced()
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
LastSyncResourceVersion
调用reflector.LastSyncResourceVersion()
func (c *controller) LastSyncResourceVersion() string {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
if c.reflector == nil {
return ""
}
return c.reflector.LastSyncResourceVersion()
}
流程图
参考文章
强烈推荐!!!
https://www.jianshu.com/p/5cd7f7666797
https://www.jianshu.com/p/12d2912d5ac3