继承关系图

sp.svg

ResourceEventHandler

用户继承此接口,用以自定义对应方法的逻辑

  1. type ResourceEventHandler interface {
  2. OnAdd(obj interface{})
  3. OnUpdate(oldObj, newObj interface{})
  4. OnDelete(obj interface{})
  5. }

processorListener

类与构造方法

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}

    // 自定义处理方法的Handler
    handler ResourceEventHandler

    // 无限的环形数据结构,用以保存所有未分发的通知
    pendingNotifications buffer.RingGrowing

    // 从sharedInformer中同步全部数据的时间间隔
    requestedResyncPeriod time.Duration

    resyncPeriod time.Duration
    // 下一次同步的时间
    nextResync time.Time
    // resyncLock guards access to resyncPeriod and nextResync
    resyncLock sync.Mutex
}

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
    ret := &processorListener{
        nextCh:                make(chan interface{}),
        addCh:                 make(chan interface{}),
        handler:               handler,
        pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
        requestedResyncPeriod: requestedResyncPeriod,
        resyncPeriod:          resyncPeriod,
    }

    ret.determineNextResync(now)

    return ret
}

主要方法

add

向addChan中放入通知

func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}

pop

概述

  • pop方法主要是通过pendingNotifications在addCh和nextCh之间传递notification,通过pendingNotifications实现消息传递的异步缓冲
  • addCh和nextCh都是无缓冲channel
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        // 若nextCh可以插入值,即nextCh不为nil
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            // 从pendingNotifications取出一个值,赋值给notification
            // 若成功表示pendingNotifications中还有剩余的消息未处理,取出赋值给赋值给notification,用作下次放入nextCh中
            // 若失败则说明pendingNotifications中的缓存值都已取出,则nextCh赋值为nil
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        // 从addCh中取出一个obj赋值给notificationToAdd
        case notificationToAdd, ok := <-p.addCh:
            // 若从addCh中取值失败,则说明addCh关闭或没有值,直接推出pop方法
            if !ok {
                return
            }
              // notification为nil说明第一次取值,则需要初始化notification和nextCh
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // notification不为nil,说明已经有值在等待处放入nextChan中了,直接把从addCh中取出的值放入pendingNotifications中缓存
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

run

消费pop方法放入nextCh中notification,根据不同类型调用handler的不同方法

func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    // 每秒调用一次
    wait.Until(func() {
        // 循环从nextCh中取出notification,根据类型调用handler对应方法
        for next := range p.nextCh中取出 {
            switch notification := next.(type) {
            case updateNotification:
                p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.OnAdd(notification.newObj)
            case deleteNotification:
                p.handler.OnDelete(notification.oldObj)
            default:
                utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
        }
        // the only way to get here is if the p.nextCh is empty and closed
        close(stopCh)
    }, 1*time.Second, stopCh)
}

shouldResync

根据p.resyncPeriod判断当前时间是否需要重新同步数据

func (p *processorListener) shouldResync(now time.Time) bool {
    p.resyncLock.Lock()
    defer p.resyncLock.Unlock()

    // 若resyncPeriod为0,则说明不用同步
    if p.resyncPeriod == 0 {
        return false
    }
    // 大于或等于nextResync则表示需要同步,即指定时间已经大于或等于下一次同步时间
    return now.After(p.nextResync) || now.Equal(p.nextResync)
}

determineNextResync

根据resyncPeriod计算nextResync,即下一次同步时间

func (p *processorListener) determineNextResync(now time.Time) {
    p.resyncLock.Lock()
    defer p.resyncLock.Unlock()

    p.nextResync = now.Add(p.resyncPeriod)
}

setResyncPeriod

设置resyncPeriod,即同步时间间隔

func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
    p.resyncLock.Lock()
    defer p.resyncLock.Unlock()

    p.resyncPeriod = resyncPeriod
}

流程图示

  • add方法作为生产者,把notification放入addCh中
  • pop方法作为addCh的消费者,把addCh中的数据放入pendingNotifications中缓冲(主要是为了异步,防止notification太多,处理速度跟不上新增速度),再从pendingNotifications取出放入nextCh中
  • run方法作为nextCh的消费者,每秒从nextCh中取出一次数据,调用handler进行处理

pl.svg

SharedProcessor

类与构造方法

// 这里sharedProcessor就是管理着所有的processorListener, 简单一点理解就是当拿到一个数据, 然后可以分发给所有的listeners.
type sharedProcessor struct {
    // 判断listeners有没有启动
    listenersStarted bool
    listenersLock    sync.RWMutex
    // 所有的processorListener
    listeners        []*processorListener
    // 所有的需要sync的processorListener 动态变化
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

主要方法

addListener&&addListenerLocked

// 向listeners和syncingListeners中添加传入的processorListener,并根据listenersStarte判断是否调用processorListener的pop和run方法
func (p *sharedProcessor) addListener(listener *processorListener) {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.addListenerLocked(listener)
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}

// 向listeners和syncingListeners中添加传入的processorListener
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
    p.listeners = append(p.listeners, listener)
    p.syncingListeners = append(p.syncingListeners, listener)
}

distribute

// 根据传入的sync判断,循环调用syncingListeners或listeners中的processorListener.run方法,向每个processorListener中传入obj
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

run

// 启动所有listeners中的processorListener
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()

    // 等待stopCh
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    //循环关闭所有 listeners中的processorListener.addCh
    for _, listener := range p.listeners中的 {
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }

    // Wait for all .pop() and .run() to stop
    // 等待所有的pop(),run()执行完毕
    p.wg.Wait() 
}

shouldResync

// shouldResync queries every listener to determine if any of them need a resync, based on each
// listener's resyncPeriod.
// 根据listeners中processorListener.shouldResync判断当前processorListener是否需要同步,需要则把当前processorListener放入syncingListeners中
func (p *sharedProcessor) shouldResync() bool {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.syncingListeners = []*processorListener{}

    resyncNeeded := false
    now := p.clock.Now()
    // 循环listeners
    for _, listener := range p.listeners {
        // need to loop through all the listeners to see if they need to resync so we can prepare any
        // listeners that are going to be resyncing.
        // 调用shouldResync判断当前processorListener是否需要同步
        if listener.shouldResync判断当前(now) {
            resyncNeeded = true
            // 放入syncingListeners中
            p.syncingListeners = append(p.syncingListeners, listener)
            // 计算当前processorListener的下一次同步时间
            listener.determineNextResync(now)
        }
    }
    // 如果所有的listeners都没有到resync时间, 那该sharedProcessor对象的shouldResync会返回false. 否则会返回true.
    return resyncNeeded
}

resyncCheckPeriodChanged

// resyncCheckPeriodChanged的作用是根据resyncCheckPeriod会重新生成一下每个listener自己的resyncPeriod.
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    for _, listener := range p.listeners {
        resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
        listener.setResyncPeriod(resyncPeriod)
    }
}

// 1. 如果自己要求的requestedResyncPeriod为0或被要求的resyncCheckPeriod其中一个是0, 则返回0.
// 2. 则返回两个其中最大的一个.
func determineResyncPeriod(desired, check time.Duration) time.Duration {
    if desired == 0 {
        return desired
    }
    if check == 0 {
        klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
        return 0
    }
    if desired < check {
        klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
        return check
    }
    return desired
}