类与接口
继承关系图示**
Controller
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
HasSynced() bool
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
}
SharedInformer
type SharedInformer interface {
// 增加用户自己的自定义处理逻辑
AddEventHandler(handler ResourceEventHandler)
// 增加用户自己的自定义处理逻辑 带有resyncPeriod时间
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 获得Store 也就是DeltaFIFO
GetStore() Store
// 获得Controller 也就是controller
GetController() Controller
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
// 该SharedInformer对应的类型的上一次处理的ResourceVersion
LastSyncResourceVersion() string
}
SharedIndexInformer
type SharedIndexInformer interface {
// 继承SharedInformer接口
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
sharedIndexInformer
type sharedIndexInformer struct {
// 本地缓存,为一个Indexer,初始化DeltaFIFO时传入作为knowObjects
indexer Indexer
//
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
// 指定的接收对象类型
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
// 在reflector中每隔resyncCheckPeriod时间会调用shouldResync方法来判断是否有任何一个listener需要resync操作
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
// 可以停止分发obj给各个listeners
// 因为HandleDeltas方法需要得到该锁, 如果失去了该锁, 就只能等到再次获得锁之后再分发
blockDeltas sync.Mutex
}
主要方法
NewSharedInformer&NewSharedIndexInformer
初始化SharedIndexInformer,构造SharedIndexInformer所必须的前置对象
// 构造一个SharedInformer,实际是调用NewSharedIndexInformer返回SharedIndexInformer
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer,实际是返回一个 {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
}
// 构造SharedIndexInformer,初始化所有前置对象
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), //构造一个Indexer作为本地缓存
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
AddEventHandler&AddEventHandlerWithResyncPeriod
增加一个用户自定义的EventHandler实现类,即增加一个使用该EventHandler作为处理方法的listener,并和本地缓存indexer同步数据
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
// 调用AddEventHandlerWithResyncPeriod
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 若s.stopped为true,则说明sharedIndexInformer已经关闭,则直接报错返回
if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
// 计算重新同步时间间隔
if resyncPeriod > 0 {
// 若小于最小时间间隔,则使用最小时间间隔,1s
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
// 如果比该sharedIndexInformer的resyncCheckPeriod小
// 1. 如果该sharedIndexInformer已经启动 那把resyncPeriod变为resyncCheckPeriod时间
// 2. 如果该sharedIndexInformer没有启动 那就尽量让resyncCheckPeriod变小点 改成resyncPeriod时间 再重新计算各个listeners的resync时间
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
// 初始化一个新的processListener并传入handler作为processListener.handler,即使用handler处理从processListener.nextCh取出的notification
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
// 若当前sharedIndexInformer还未启动,则直接把listener添加到s.processor中
if !s.started {
s.processor.addListener(listener)
return
}
// 若当前sharedIndexInformer已启动,为了线程安全加入则
// 1,先上锁,阻止其他listener继续发送notification
// 2,向s.processor添加当前listener
// 3,循环当前本地缓存indexer,为每个对象构建一个addNotification并添加到当前list中(相当于当前listener中没有任何数据,需要和本地缓存中的元素同步一下)
// 4,解锁
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
// 相当于同步 本地缓存和当前listener
listener.add(addNotification{newObj: item})
}
}
Run&HandleDeltas
Run
- 构造运行sharedIndexInformer所需的数据结构与前置对象,并调用HandleDeltas作为Config.Process,即用以处理从DeltaFIFO中pop出的obj(具体细节参照Config&Controller)
- 启动cacheMutationDetector,sharedProcessor,controller,开始从api-server中获取数据
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 构造DeltaFIFO作为存储obj的数据结构,并使用s.indexer本地缓存
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 构造controller所需Config,传入fifo作为Queue,并使用s.HandleDeltas作为Process
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 使用Config构建controller,并赋值给s.controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
// 表示当前sharedIndexInformer已启动
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 启动cacheMutationDetector
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 启动sharedProcessor,即启动sharedProcessor中所有的listeners,开始从addCh中取出notify往nextCh中插入
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 启动controller,即启动Reflector,使用ListAndWatch从api-server中获取数据,并往DeltasFIFO中放入数据,供pop方法弹出后使用HandlerDeltas处理
s.controller.Run(stopCh)
}
HandleDeltas
处理从DeltaFIFO中pop出的Deltas,根据类型放入processor.listeners或syncingListeners中,并调用listener.add方法放入addCh中
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
// 向cacheMutationDetector中加入当前Delta,cacheMutationDetector用来检测Delta的突变
s.cacheMutationDetector.AddObject(d.Object)
// 从indexer本地缓存中取出数据(通过传入obj计算id,利用id取出对应obj)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 用当前obj替换本地缓存中的原oldObj
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// 若为Sync变动,则同步
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// 如果当前obj与oldObj的资源版本一致,则同步
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 调用processor.distribute,根据isSync判断,向processor中syncingListeners或listeners插入当前updateNotification
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 若本地缓存indexer中没有当前obj,则向indexer中添加obj
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 调用processor.distribute,向processor的listeners插入当前addNotification
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
// 从本地缓存中删除当前obj
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 调用processor.distribute,向processor的listeners插入当前deleteNotification
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
GetController
获取controller
func (s *sharedIndexInformer) GetController() Controller {
return &dummyController{informer: s}
}
HasSynced
判断是否需要同步
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
return false
}
// 调用controller.HasSynced,最终调用DeltasFIFO的HasSynced方法,根据DeltasFIFO是否是先调用add/update/delete/AddIfNotPresent来增加元素,判断是否需要同步
return s.controller.HasSynced()
}
LastSyncResourceVersion
获取最后一次同步的资源版本
func (s *sharedIndexInformer) LastSyncResourceVersion() string {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
return ""
}
// 最终调用Reflector.LastSyncResourceVersion,获取Reflector.lastSyncResourceVersion
return s.controller.LastSyncResourceVersion()
}
GetStore
获取store,返回当前sharedIndexInformer.indexer
func (s *sharedIndexInformer) GetStore() Store {
return s.indexer
}
GetIndexer
获取indexer,返回当前sharedIndexInformer.indexer
func (s *sharedIndexInformer) GetIndexer() Indexer {
return s.indexer
}
AddIndexers
在sharedIndexInformer没有启动的情况下,向indexer中批量添加IndexFunc
func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 若当前sharedIndexInformer已启动,则返回报错
if s.started {
return fmt.Errorf("informer has already started")
}
// 最终调用threadSafeMap.AddIndexers,向threadSafeMap中批量添加IndexFunc
return s.indexer.AddIndexers(indexers)
}
总流程图示
- listWatch负责从api-server获取数据
- Reflect调用run方法,调用listWatch从中api-serve获取数据放入DeltaFIFO中
- DeltaFIFO以Indexer作为本地缓存
- 调用Reflector.processLoop,即循环调用HandleDelta方法取出obj
- HandleDelta方法中调用distribute方法把obj转化成notification存入sharedProcessor中的processorListener中(调用processorListener.add)
- 在processorListener用pop方法把notification从addCh中存入nextCh中
- 最后调用processorListener.run方法,调用自定义handler处理消息**