Config

contorller类的参数类,controller类的New方法使用一个Config类作为参数,Config中Queue,ListerWatcher,ObjectType,FullResyncPeriod用于在controller的run方法中作为构建Reflector的参数

  1. // Config contains all the settings for one of these low-level controllers.
  2. type Config struct {
  3. // The queue for your objects - has to be a DeltaFIFO due to
  4. // assumptions in the implementation. Your Process() function
  5. // should accept the output of this Queue's Pop() method.
  6. //存储数据的主要数据结构,一般为DeltaFIFO
  7. Queue
  8. // Something that can list and watch your objects.
  9. //用于从api-server中获取资源变化
  10. ListerWatcher
  11. // Something that can process a popped Deltas.
  12. //处理从Queue中pop出来的Deltas对象
  13. Process ProcessFunc
  14. // ObjectType is an example object of the type this controller is
  15. // expected to handle. Only the type needs to be right, except
  16. // that when that is `unstructured.Unstructured` the object's
  17. // `"apiVersion"` and `"kind"` must also be right.
  18. //controller需要监控的资源类型
  19. ObjectType runtime.Object
  20. // FullResyncPeriod is the period at which ShouldResync is considered.
  21. //调用ShouldResync方法的时间间隔
  22. FullResyncPeriod time.Duration
  23. // ShouldResync is periodically used by the reflector to determine
  24. // whether to Resync the Queue. If ShouldResync is `nil` or
  25. // returns true, it means the reflector should proceed with the
  26. // resync.
  27. //reflector所使用的调用同步方法的时间间隔
  28. ShouldResync ShouldResyncFunc
  29. // If true, when Process() returns an error, re-enqueue the object.
  30. // TODO: add interface to let you inject a delay/backoff or drop
  31. // the object completely if desired. Pass the object in
  32. // question to this interface as a parameter. This is probably moot
  33. // now that this functionality appears at a higher level.
  34. //调用Process()方法返回err后是否重新把obj插入Queue中的tag,若为true则重新插入,否则不插入
  35. RetryOnError bool
  36. }

Controller

类与接口

Controller接口

type Controller interface {
    // Run does two things.  One is to construct and run a Reflector
    // to pump objects/notifications from the Config's ListerWatcher
    // to the Config's Queue and possibly invoke the occasional Resync
    // on that Queue.  The other is to repeatedly Pop from the Queue
    // and process with the Config's ProcessFunc.  Both of these
    // continue until `stopCh` is closed.
    // Run方法主要做了2件事
    // 1, 构造一个Reflector,从api-server中获取资源的event,放入store中,并定时同步
    // 2, 从store中取出objs,调用ProcessFunc方法处理obj
    Run(stopCh <-chan struct{})

    // HasSynced delegates to the Config's Queue
    // 使用Config.Queue的HasSynced
    HasSynced() bool

    // LastSyncResourceVersion delegates to the Reflector when there
    // is one, otherwise returns the empty string
    // 使用Reflector的lastSyncResourceVersion
    LastSyncResourceVersion() string
}

controller类

type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

主要方法

Run

概述

  • 构造一个Reflector,执行reflector.run方法用以从api-server中获取数据并放入config.Queue中
  • 每秒调用一次processLoop,用以把数据取出处理
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    //stopChan有值时关闭Queue
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    //用config的参数构造Reflector
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()

    //开启线程执行Reflector.Run
    wg.StartWithChannel(stopCh, r.Run)

    //每秒调用一次processLoop
    wait.Until(c.processLoop, time.Second, stopCh)
}

processLoop

循环用config.Process处理store中pop出的Delta

func (c *controller) processLoop() {
    for {
        // 调用Queue的pop方法弹出Delta,用c.config.Process处理弹出的Delta
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            // 若为RetryOnError类型,则从新放入Queue中
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

HasSynced

调用DeltaFIFO的HasSynced()

func (c *controller) HasSynced() bool {
    return c.config.Queue.HasSynced()
}

LastSyncResourceVersion

调用reflector.LastSyncResourceVersion()

func (c *controller) LastSyncResourceVersion() string {
    c.reflectorMutex.RLock()
    defer c.reflectorMutex.RUnlock()
    if c.reflector == nil {
        return ""
    }
    return c.reflector.LastSyncResourceVersion()
}

流程图

11177530-18242997b08c51bf.png

参考文章

强烈推荐!!!
https://www.jianshu.com/p/5cd7f7666797
https://www.jianshu.com/p/12d2912d5ac3