Resync 机制的引入,定时将 Indexer 缓存事件重新同步到 Delta FIFO 队列中,在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理。并且通过入队前判断 FIFO 队列中是否已经有了更新版本的 event,来决定是否丢弃 Indexer 缓存不进行 Resync 入队。在处理 Delta FIFO 队列中的 Resync 的事件数据时,触发 onUpdate 回调来让事件重新处理。

Reflector的ListAndWatch会启动一个gorouting,执行resync。

1.先执行sharedProcessor的shouldResync,将注册了resync的listener加入到sharedProcessor的syncingListeners中
2.再执行DeltaFIFO的resync,通过syncKeyLocked分发sync事件到sharedProcessor的syncingListeners中

go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup }()
for {
select {
case <-resyncCh:
case <-stopCh:
return case <-cancelCh:
return }
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof(“%s: forcing resync”, r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err return }
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()

这是sharedProcessor的shouldResync

func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()

p.syncingListeners = []*processorListener{}

resyncNeeded := false now := p.clock.Now()
for , listener := range p.listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any // listeners that are going to be resyncing. _if listener.shouldResync(now) {
resyncNeeded = true p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded}

这是DeltaFIFO的resync

func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()

if f.knownObjects == nil {
return nil }

keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err }
}
return nil}