类与接口

继承关系图示**

sif.svg

Controller

  1. type Controller interface {
  2. // Run does two things. One is to construct and run a Reflector
  3. // to pump objects/notifications from the Config's ListerWatcher
  4. // to the Config's Queue and possibly invoke the occasional Resync
  5. // on that Queue. The other is to repeatedly Pop from the Queue
  6. // and process with the Config's ProcessFunc. Both of these
  7. // continue until `stopCh` is closed.
  8. Run(stopCh <-chan struct{})
  9. // HasSynced delegates to the Config's Queue
  10. HasSynced() bool
  11. // LastSyncResourceVersion delegates to the Reflector when there
  12. // is one, otherwise returns the empty string
  13. LastSyncResourceVersion() string
  14. }

SharedInformer

type SharedInformer interface {

    // 增加用户自己的自定义处理逻辑
    AddEventHandler(handler ResourceEventHandler)

    // 增加用户自己的自定义处理逻辑 带有resyncPeriod时间
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)

    // 获得Store 也就是DeltaFIFO
    GetStore() Store

    // 获得Controller 也就是controller
    GetController() Controller

    // Run starts and runs the shared informer, returning after it stops.
    // The informer will be stopped when stopCh is closed.
    Run(stopCh <-chan struct{})

    // HasSynced returns true if the shared informer's store has been
    // informed by at least one full LIST of the authoritative state
    // of the informer's object collection.  This is unrelated to "resync".
    HasSynced() bool

    // LastSyncResourceVersion is the resource version observed when last synced with the underlying
    // store. The value returned is not synchronized with access to the underlying store and is not
    // thread-safe.
    // 该SharedInformer对应的类型的上一次处理的ResourceVersion
    LastSyncResourceVersion() string
}

SharedIndexInformer

type SharedIndexInformer interface {
    // 继承SharedInformer接口
    SharedInformer
    // AddIndexers add indexers to the informer before it starts.
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}

sharedIndexInformer

type sharedIndexInformer struct {
    // 本地缓存,为一个Indexer,初始化DeltaFIFO时传入作为knowObjects
    indexer    Indexer
    //
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector MutationDetector

    listerWatcher ListerWatcher

    // 指定的接收对象类型
    objectType runtime.Object

    // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
    // shouldResync to check if any of our listeners need a resync.
    // 在reflector中每隔resyncCheckPeriod时间会调用shouldResync方法来判断是否有任何一个listener需要resync操作
    resyncCheckPeriod time.Duration

    // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
    // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
    // value).
    defaultEventHandlerResyncPeriod time.Duration
    // clock allows for testability
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas gives a way to stop all event distribution so that a late event handler
    // can safely join the shared informer.
    // 可以停止分发obj给各个listeners
    // 因为HandleDeltas方法需要得到该锁, 如果失去了该锁, 就只能等到再次获得锁之后再分发
    blockDeltas sync.Mutex
}

主要方法

NewSharedInformer&NewSharedIndexInformer

初始化SharedIndexInformer,构造SharedIndexInformer所必须的前置对象

// 构造一个SharedInformer,实际是调用NewSharedIndexInformer返回SharedIndexInformer
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer,实际是返回一个 {
    return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}

// 构造SharedIndexInformer,初始化所有前置对象
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), //构造一个Indexer作为本地缓存
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

AddEventHandler&AddEventHandlerWithResyncPeriod

增加一个用户自定义的EventHandler实现类,即增加一个使用该EventHandler作为处理方法的listener,并和本地缓存indexer同步数据

func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
    // 调用AddEventHandlerWithResyncPeriod
    s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

const minimumResyncPeriod = 1 * time.Second

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    // 若s.stopped为true,则说明sharedIndexInformer已经关闭,则直接报错返回
    if s.stopped {
        klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
        return
    }

    // 计算重新同步时间间隔
    if resyncPeriod > 0 {
        // 若小于最小时间间隔,则使用最小时间间隔,1s
        if resyncPeriod < minimumResyncPeriod {
            klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
            resyncPeriod = minimumResyncPeriod
        }

        // 如果比该sharedIndexInformer的resyncCheckPeriod小
        // 1. 如果该sharedIndexInformer已经启动 那把resyncPeriod变为resyncCheckPeriod时间
        // 2. 如果该sharedIndexInformer没有启动 那就尽量让resyncCheckPeriod变小点 改成resyncPeriod时间 再重新计算各个listeners的resync时间
        if resyncPeriod < s.resyncCheckPeriod {
            if s.started {
                klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
                resyncPeriod = s.resyncCheckPeriod
            } else {
                // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
                // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
                // accordingly
                s.resyncCheckPeriod = resyncPeriod
                s.processor.resyncCheckPeriodChanged(resyncPeriod)
            }
        }
    }

    // 初始化一个新的processListener并传入handler作为processListener.handler,即使用handler处理从processListener.nextCh取出的notification
    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

    // 若当前sharedIndexInformer还未启动,则直接把listener添加到s.processor中
    if !s.started {
        s.processor.addListener(listener)
        return
    }

    // 若当前sharedIndexInformer已启动,为了线程安全加入则
    // 1,先上锁,阻止其他listener继续发送notification
    // 2,向s.processor添加当前listener
    // 3,循环当前本地缓存indexer,为每个对象构建一个addNotification并添加到当前list中(相当于当前listener中没有任何数据,需要和本地缓存中的元素同步一下)
    // 4,解锁
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    s.processor.addListener(listener)
    for _, item := range s.indexer.List() {
        // 相当于同步 本地缓存和当前listener
        listener.add(addNotification{newObj: item})
    }
}

Run&HandleDeltas

Run

  • 构造运行sharedIndexInformer所需的数据结构与前置对象,并调用HandleDeltas作为Config.Process,即用以处理从DeltaFIFO中pop出的obj(具体细节参照Config&Controller)
  • 启动cacheMutationDetector,sharedProcessor,controller,开始从api-server中获取数据
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    // 构造DeltaFIFO作为存储obj的数据结构,并使用s.indexer本地缓存
    fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KnownObjects:          s.indexer,
        EmitDeltaTypeReplaced: true,
    })

    // 构造controller所需Config,传入fifo作为Queue,并使用s.HandleDeltas作为Process
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        // 使用Config构建controller,并赋值给s.controller
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock

        // 表示当前sharedIndexInformer已启动
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop

    // 启动cacheMutationDetector
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)

    // 启动sharedProcessor,即启动sharedProcessor中所有的listeners,开始从addCh中取出notify往nextCh中插入
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()

    // 启动controller,即启动Reflector,使用ListAndWatch从api-server中获取数据,并往DeltasFIFO中放入数据,供pop方法弹出后使用HandlerDeltas处理
    s.controller.Run(stopCh)
}

HandleDeltas

处理从DeltaFIFO中pop出的Deltas,根据类型放入processor.listeners或syncingListeners中,并调用listener.add方法放入addCh中

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            // 向cacheMutationDetector中加入当前Delta,cacheMutationDetector用来检测Delta的突变
            s.cacheMutationDetector.AddObject(d.Object)

            // 从indexer本地缓存中取出数据(通过传入obj计算id,利用id取出对应obj)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                // 用当前obj替换本地缓存中的原oldObj
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }

                isSync := false
                switch {
                case d.Type == Sync:
                    // 若为Sync变动,则同步
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                            // 如果当前obj与oldObj的资源版本一致,则同步                        
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
                // 调用processor.distribute,根据isSync判断,向processor中syncingListeners或listeners插入当前updateNotification
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                // 若本地缓存indexer中没有当前obj,则向indexer中添加obj
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                // 调用processor.distribute,向processor的listeners插入当前addNotification
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            // 从本地缓存中删除当前obj
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            // 调用processor.distribute,向processor的listeners插入当前deleteNotification
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

GetController

获取controller

func (s *sharedIndexInformer) GetController() Controller {
    return &dummyController{informer: s}
}

HasSynced

判断是否需要同步

func (s *sharedIndexInformer) HasSynced() bool {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    if s.controller == nil {
        return false
    }
    // 调用controller.HasSynced,最终调用DeltasFIFO的HasSynced方法,根据DeltasFIFO是否是先调用add/update/delete/AddIfNotPresent来增加元素,判断是否需要同步
    return s.controller.HasSynced()
}

LastSyncResourceVersion

获取最后一次同步的资源版本

func (s *sharedIndexInformer) LastSyncResourceVersion() string {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    if s.controller == nil {
        return ""
    }
    // 最终调用Reflector.LastSyncResourceVersion,获取Reflector.lastSyncResourceVersion
    return s.controller.LastSyncResourceVersion()
}

GetStore

获取store,返回当前sharedIndexInformer.indexer

func (s *sharedIndexInformer) GetStore() Store {
    return s.indexer
}

GetIndexer

获取indexer,返回当前sharedIndexInformer.indexer

func (s *sharedIndexInformer) GetIndexer() Indexer {
    return s.indexer
}

AddIndexers

在sharedIndexInformer没有启动的情况下,向indexer中批量添加IndexFunc

func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    // 若当前sharedIndexInformer已启动,则返回报错
    if s.started {
        return fmt.Errorf("informer has already started")
    }
    // 最终调用threadSafeMap.AddIndexers,向threadSafeMap中批量添加IndexFunc
    return s.indexer.AddIndexers(indexers)
}

总流程图示

  1. listWatch负责从api-server获取数据
  2. Reflect调用run方法,调用listWatch从中api-serve获取数据放入DeltaFIFO中
  3. DeltaFIFO以Indexer作为本地缓存
  4. 调用Reflector.processLoop,即循环调用HandleDelta方法取出obj
  5. HandleDelta方法中调用distribute方法把obj转化成notification存入sharedProcessor中的processorListener中(调用processorListener.add)
  6. 在processorListener用pop方法把notification从addCh中存入nextCh中
  7. 最后调用processorListener.run方法,调用自定义handler处理消息**

总流程.svg

参考

墙裂推荐!!!
https://www.jianshu.com/p/12d2912d5ac3