继承关系图
ResourceEventHandler
用户继承此接口,用以自定义对应方法的逻辑
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
processorListener
类与构造方法
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
// 自定义处理方法的Handler
handler ResourceEventHandler
// 无限的环形数据结构,用以保存所有未分发的通知
pendingNotifications buffer.RingGrowing
// 从sharedInformer中同步全部数据的时间间隔
requestedResyncPeriod time.Duration
resyncPeriod time.Duration
// 下一次同步的时间
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
主要方法
add
向addChan中放入通知
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
pop
概述
- pop方法主要是通过pendingNotifications在addCh和nextCh之间传递notification,通过pendingNotifications实现消息传递的异步缓冲
- addCh和nextCh都是无缓冲channel
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
// 若nextCh可以插入值,即nextCh不为nil
case nextCh <- notification:
// Notification dispatched
var ok bool
// 从pendingNotifications取出一个值,赋值给notification
// 若成功表示pendingNotifications中还有剩余的消息未处理,取出赋值给赋值给notification,用作下次放入nextCh中
// 若失败则说明pendingNotifications中的缓存值都已取出,则nextCh赋值为nil
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 从addCh中取出一个obj赋值给notificationToAdd
case notificationToAdd, ok := <-p.addCh:
// 若从addCh中取值失败,则说明addCh关闭或没有值,直接推出pop方法
if !ok {
return
}
// notification为nil说明第一次取值,则需要初始化notification和nextCh
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // notification不为nil,说明已经有值在等待处放入nextChan中了,直接把从addCh中取出的值放入pendingNotifications中缓存
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
run
消费pop方法放入nextCh中notification,根据不同类型调用handler的不同方法
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
// 每秒调用一次
wait.Until(func() {
// 循环从nextCh中取出notification,根据类型调用handler对应方法
for next := range p.nextCh中取出 {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
shouldResync
根据p.resyncPeriod判断当前时间是否需要重新同步数据
func (p *processorListener) shouldResync(now time.Time) bool {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
// 若resyncPeriod为0,则说明不用同步
if p.resyncPeriod == 0 {
return false
}
// 大于或等于nextResync则表示需要同步,即指定时间已经大于或等于下一次同步时间
return now.After(p.nextResync) || now.Equal(p.nextResync)
}
determineNextResync
根据resyncPeriod计算nextResync,即下一次同步时间
func (p *processorListener) determineNextResync(now time.Time) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
p.nextResync = now.Add(p.resyncPeriod)
}
setResyncPeriod
设置resyncPeriod,即同步时间间隔
func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
p.resyncPeriod = resyncPeriod
}
流程图示
- add方法作为生产者,把notification放入addCh中
- pop方法作为addCh的消费者,把addCh中的数据放入pendingNotifications中缓冲(主要是为了异步,防止notification太多,处理速度跟不上新增速度),再从pendingNotifications取出放入nextCh中
- run方法作为nextCh的消费者,每秒从nextCh中取出一次数据,调用handler进行处理
SharedProcessor
类与构造方法
// 这里sharedProcessor就是管理着所有的processorListener, 简单一点理解就是当拿到一个数据, 然后可以分发给所有的listeners.
type sharedProcessor struct {
// 判断listeners有没有启动
listenersStarted bool
listenersLock sync.RWMutex
// 所有的processorListener
listeners []*processorListener
// 所有的需要sync的processorListener 动态变化
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
主要方法
addListener&&addListenerLocked
// 向listeners和syncingListeners中添加传入的processorListener,并根据listenersStarte判断是否调用processorListener的pop和run方法
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
// 向listeners和syncingListeners中添加传入的processorListener
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
distribute
// 根据传入的sync判断,循环调用syncingListeners或listeners中的processorListener.run方法,向每个processorListener中传入obj
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
run
// 启动所有listeners中的processorListener
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 等待stopCh
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
//循环关闭所有 listeners中的processorListener.addCh
for _, listener := range p.listeners中的 {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
// Wait for all .pop() and .run() to stop
// 等待所有的pop(),run()执行完毕
p.wg.Wait()
}
shouldResync
// shouldResync queries every listener to determine if any of them need a resync, based on each
// listener's resyncPeriod.
// 根据listeners中processorListener.shouldResync判断当前processorListener是否需要同步,需要则把当前processorListener放入syncingListeners中
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
// 循环listeners
for _, listener := range p.listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
// 调用shouldResync判断当前processorListener是否需要同步
if listener.shouldResync判断当前(now) {
resyncNeeded = true
// 放入syncingListeners中
p.syncingListeners = append(p.syncingListeners, listener)
// 计算当前processorListener的下一次同步时间
listener.determineNextResync(now)
}
}
// 如果所有的listeners都没有到resync时间, 那该sharedProcessor对象的shouldResync会返回false. 否则会返回true.
return resyncNeeded
}
resyncCheckPeriodChanged
// resyncCheckPeriodChanged的作用是根据resyncCheckPeriod会重新生成一下每个listener自己的resyncPeriod.
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
// 1. 如果自己要求的requestedResyncPeriod为0或被要求的resyncCheckPeriod其中一个是0, 则返回0.
// 2. 则返回两个其中最大的一个.
func determineResyncPeriod(desired, check time.Duration) time.Duration {
if desired == 0 {
return desired
}
if check == 0 {
klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
return 0
}
if desired < check {
klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
return check
}
return desired
}