DeltaFIFO
是一个生产者-消费者队列,informer
中将Reflector
用作为DeltaFIFO
的生产者,而消费者则是调用Pop()方法的对象,主要作用是存储资源的变化过程 ,供之后的组件调用 client-go版本:https://github.com/kubernetes/client-go/blob/release-1.18 代码位置:https://github.com/kubernetes/client-go/blob/release-1.18/tools/cache/delta_fifo.go
类及结构
继承关系
实体类s
DeltaFIFO,Deltas,Delta,DeltaType
- DeltaFIFO
- items:数据的主要存储位置
- key:通过
keyFunc
方法生成的id- value:
Deltas
- queue:存放通过
keyFunc
方法生成的id的slice- keyFunc:生成odj对应唯一id的方法
- knownObjects:本地缓存,一般是一个Indexer
- populated:判断第一次操作
DeltaFIFO
是通过Add/Update/Delete还是Replace
- Add/Update/Delete则populated为true
- Replace则populated为false
- closed:标记DeltaFifo是否为关闭状态
- Deltas
- 一个存放
Delta
的slice- Delta
- Type:
DeltaType
,obj的变更类型- Object:资源的具体对象obj
- DeltaType
- add,update,delete,sync等资源的变更类型
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{}
}
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
DeletedFinalStateUnknown
当一个
obj
被删除了, 但是这个程序这边由于某种原因miss
了这次deletion event
, 那么假如在做同步操作时, 从服务器获取的列表中已经没有了这个obj
, 因为该程序没有接收到deletion event
, 所以该obj
在本地缓存中依然存在, 所以此时会给这个obj
构造成这个DeletedFinalStateUnknown
类型.
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}
主要方法
图示说明代码地址:https://github.com/ddddx0/customTestClientGO/tree/master/delta_fifo
Add/Update/Delete/queueActionLocked
概述
- Add/Update/Delete 方法主要是通过
keyFunc
生成的id,往items
和queue
中插入描述变更过程的数据(Delta,Deltas)
- items中不存在该id则向queue中都插入指定类型数据
- queue中插入string类型的id
- items中插入Deltas([]Delta),Delta由变更类型(added/updated/deleted)和传入obj构成
- items中存在该id对应的Deltas,则取出Deltas并把新Deltas拼接上去
- 若Deltas的最后2个Delta都为Delete操作,则合并为一个
- Add/Update/Delete 方法其实都是对items与queue增量操作,除了检查性的删除之外,并没有对DeltaFifo进行主动删除操作,所以这些方法可以理解为对DeltaFifo的添加操作,对不同的变更类型添加对应的Delta
源码分析
Add/Update
- Add
- 加锁
- 调用queueActionLocked()方法,传入Added表示变更类型为添加
- Update
- 加锁
- 调用queueActionLocked()方法,传入Updated表示变更类型为更新
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
Delete
- 1:通过
keyFunc
生成obj对应id- 2:若缓存为空,且
items
中没有key为id的对象,则直接返回,没有该对象,则无法进行删除变更- 3:若缓存不为空,但
items
和缓存中都没有改id的对象,则返回,没有对象无法进行删除变更- 4:若不是以上情况,调用queueActionLocked()方法,传入Updated表示变更类型为删除
// Delete is just like Add, but makes an Deleted Delta. If the item does not
// already exist, it will be ignored. (It may have already been deleted by a
// Replace (re-list), for example.
func (f *DeltaFIFO) Delete(obj interface{}) error {
// == 1
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// == 2
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
} else {
// == 3
// We only want to skip the "deletion" action if the object doesn't
// exist in knownObjects and it doesn't have corresponding item in items.
// Note that even if there is a "deletion" action in items, we can ignore it,
// because it will be deduped automatically in "queueActionLocked"
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
}
// == 4
return f.queueActionLocked(Deleted, obj)
}
queueActionLocked
- 1:通过
keyFunc
生成obj对应id- 2:以传入的变更类型和obj构建一个Delta对象,并取出items中该id对应的Deltas,一起拼接成新Deltas>>
newDeltas
- 3:若
newDeltas
中最后2个元素的事件都为delete,则合并为一个元素- 4:若
newDeltas
长度大于0,且items中没有该id对应的Deltas,则说明之前没有关于该obj的变更,则在queue
中添加该id,并在items
中添加key为id,value为newDeltas的数据,并唤醒所有cond变量所在线程(主要是供Pop方法使用)- 5:若
newDeltas
长度不大于0,则从items中删除该id对应的Deltas
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// == 1
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// == 2
newDeltas := append(f.items[id], Delta{actionType, obj})
// == 3
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
// == 4
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// == 5
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}
图示说明
初始化一个DeltasFIFO,并调用Add,Update,Delete方法
func TestDeltaFIFO_addUpdateDelete(t *testing.T) {
f := cache.NewDeltaFIFO(testFifoObjectKeyFunc, nil)
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("foo", 12))
f.Delete(mkFifoObj("foo", 15))
}
初始化DeltaFIFO
初始化一个
items
,queue
,knowObjects
都为空的DeltaFIFO
调用Add
- items中添加了key为foo,value为Delta数组的一组键值,Delta数组中添加了一个{Add,{foo,10}}对象
- queue中添加了foo
调用Update
- items中key为foo的数组中,添加了一个{Update,{foo,12}}
调用Delete
items中key为foo的数组中,添加了一个{Delete,{foo,15}}
小结
- Add/Update/Delete 方法主要是通过
keyFunc
生成的id,往items
和queue
中插入描述变更过程的数据(Delta,Deltas)
- items中不存在该id则向queue中都插入指定类型数据
- queue中插入string类型的id
- items中插入Deltas([]Delta),Delta由变更类型(added/updated/deleted)和传入obj构成
- items中存在该id对应的Deltas,则取出Deltas并把新Deltas拼接上去
- 若Deltas的最后2个Delta都为Delete操作,则合并为一个
- Add/Update/Delete 方法其实都是对items与queue增量操作,除了检查性的删除之外,并没有对DeltaFifo进行主动删除操作,所以这些方法可以理解为对DeltaFifo的添加操作,对不同的变更类型添加对应的Delta
Replace
概述
- Replace方法需要传入一个
list
,若knowsObjects
或items
中存在list
中不存在的obj,则对这个obj对应的Deltas中添加一个Delta{delete,DeletedFinalStateUnknown},即通过对knowsObjects
和items
的检查,往items
中添加变更操作- Replace方法也是对DeltaFifo的增量操作,除检查性删除之外,没有做删除操作
源码分析
遍历新list
- 把id放入
keys
中- 以list中的元素作为obj,向
items
中增加一个变更类型为Sync的Delta
- 若
knowsObjects
为空,说明不存在缓存,则遍历items
,向list
中不存在的Deltas后拼接一个{delete,DeletedFinalStateUnknown},并记录操作元素数量- 遍历
items
,k
为obj的id,olditem
为Deltas
- 若
keys
中存在当前obj的id,则说明list中的元素已存在与items中,则不做操作- 若不存在,则取出
olditem
中最后一个Delta,作为删除变更的obj- 调用queueActionLocked方法,以olditem中最后一个元素作为obj,构建一个变更类型为delete的DeletedFinalStateUnknown
initialPopulationCount
标记当前replace方法对items的操作次数,即插入数据数量(不一定相等,因为queueActionLocked方法中会把结尾为2次delete的Delta合并)
knowsObjects
不为空,则遍历knowsObjects
的keys所组成的sliceknowsKeys
,向list
中不存在的Deltas后拼接一个{delete,DeletedFinalStateUnknown},并记录操作元素数量- 若
knowsKeys
中存在当前obj的id,则说明list中的元素已存在与items中,则不做操作- 从
knowsObjects
中取出当前id对应的obj,若对选obj存在且不为空,- 记录操作数
- 调用queueActionLocked方法,传入一个变更类型为delete的DeletedFinalStateUnknown
initialPopulationCount
标记当前replace方法对items的操作次数,即插入数据数量(不一定相等,因为queueActionLocked方法中会把结尾为2次delete的Delta合并)
// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// == 1
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// == 1.a
keys.Insert(key)
// == 1.b
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// == 2
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
// == 2.a
for k, oldItem := range f.items {
// == 2.a.i
if keys.Has(k) {
continue
}
var deletedObj interface{}
// == 2.a.ii
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
// == 2.a.iii
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// == 2.b
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// == 3
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// == 3.a
if keys.Has(k) {
continue
}
// == 3.b
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// == 3.c
queuedDeletions++
// == 3.d
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// == 3.f
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
图示说明
- 初始化一个带有knowsObjects的DeltaFIFO
- 调用Delete
- 调用Replace
func TestDeltaFIFO_replace(t *testing.T) {
f := cache.NewDeltaFIFO(
testFifoObjectKeyFunc,
customObjects{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)},
)
f.Delete(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
}
初始化DeltaFIFO
初始化一个
items
和queue
为空,knowObjects
中有3个元素的DeltaFIFO
调用Delete
- items中添加了key为baz,value为Delta数组的一组键值,Delta数组中添加了一个{Delete,{baz,10}}
- queue中添加了baz
调用Replace
未调用Replace时
- replaceList为调用Replace时传入的数组
先循环replaceList,对所有元素插入Sync变更
- knowsObjects不为空时,循环knowObjects,对所有replaceList中不存在的元素,插入一个DeletedFinalStateUnknown
- baz对应数组中最后2个Delta都是Delete变更,所以只保留前一个
Resync
概述
- Resync方法即对所有
knowObjects
中存在,items
中不存在的obj进行同步(调用queueActionLocked方法增加Sync变更),即同步knowObjects和items- Replace方法也是对DeltaFifo的增量操作,除检查性删除之外,没有做删除操作
源码分析
向
knownObjects
中每个obj对应的items里添加一个Sync变更类型的Delta
- 循环
knownObjects
,对每个id调用syncKeyLocked方法- 若
knownObjects
中存在对应obj
- 计算obj对应id,若items[id]数组大于0,则返回nil即不插入Sync变更
- 否则,调用queueActionLocked插入Sync变更
// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
if f.knownObjects == nil {
return nil
}
keys := f.knownObjects.ListKeys()
// == 1
for _, k := range keys {
// == 2
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
// ==2.a
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
// If we are doing Resync() and there is already an event queued for that object,
// we ignore the Resync for it. This is to avoid the race, in which the resync
// comes with the previous value of object (since queueing an event for the object
// doesn't trigger changing the underlying store <knownObjects>.
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}
// == 2.b
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
图示说明
初始化DeltaFIFO,并调用Delete,Resync
func TestDeltaFIFO_resync(t *testing.T) {
f := cache.NewDeltaFIFO(
testFifoObjectKeyFunc,
customObjects{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)},
)
f.Delete(mkFifoObj("foo", 10))
f.Resync()
}
初始化DeltaFIFO
初始化一个
items
和queue
为空,knowObjects
中有3个元素的DeltaFIFO
调用Delete
- items中添加了key为foo,value为Delta数组的一组键值,Delta数组中添加了一个{Delete,{foo,10}}
- queue中添加了foo
调用Resync
为
knowObjects
中存在且items
中不存在的obj向items
中添加Sync变更
Pop
概述
- Pop方法即循环queue中的id,取出items中该id对应的obj数组,若传入的PopProcessFunc方法执行结果为ErrRequeue,则重新在queue和items插入刚取出的元素,否则彻底从queue和items中删除取出元素
- Pop方法不同于以上的方法,Pop方法是真正对DeltaFIFO进行删除操作的方法
源码分析
- 若
queue
长度等于0且closed
为true,则表示DeltaFifo为关闭状态,方法返回- 若
queue
长度等于0但closed
为false,则表示DeltaFifo未关闭,当前线程wait,等待唤醒操作- 若
queue
长度不为0
- 取出
queue
中第一个id,并从queue中删除该id- 若replace计数器
initialPopulationCount
的值大于0,则减一- 从
items
中获取id对应item,若item不存在,则跳过此次循环- 若item存在,这从items中删除对应item与id(相当于队列的pop弹出操作)
- 通过传入的
PopProcessFunc
方法操作弹出的item- 若
PopProcessFunc
方法返回的错误类型为ErrRequeue
则说明是可预见性错误,需要调用addIfNotPresent
方法把item重新添加进items
- 所有操作完成,返回被弹出item
// Pop blocks until an item is added to the queue, and then returns it. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// == 1
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
// == 2
f.cond.Wait()
}
// == 3
// == 3.a
id := f.queue[0]
f.queue = f.queue[1:]
// == 3.b
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// == 3.c
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
// == 3.d
delete(f.items, id)
// == 3.e
err := process(item)
// == 3.f
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// == 3.g
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
图示说明
TestDeltaFIFO_pop
- 初始化创建DeltaFIFO
- 调用Add方法
- 调用Update方法
customPopFunc
- 传入Pop方法中用来具体处理obj
func TestDeltaFIFO_pop(t *testing.T) {
f := cache.NewDeltaFIFO(testFifoObjectKeyFunc, nil)
f.Add(mkFifoObj("foo", 10))
_, err := f.Pop(customPopFunc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
// 这里的功能为:如果不为Added变更类型,则打印name并返回成功,否则为pop失败
func customPopFunc(obj interface{}) error {
deltas := obj.(cache.Deltas)
if deltas[len(deltas)-1].Type != Added {
fmt.Println("Added obj : " + deltas[len(deltas)-1].Object.(testFifoObject).name)
return nil
}
return cache.ErrRequeue{Err: nil}
}
初始化DeltaFIFO
初始化一个
items
,queue
,knowObjects
都为空的DeltaFIFO
调用Add&Update
- items中添加了key为foo,value为Delta数组的一组键值,Delta数组中添加了一个{Add,{foo,10}}对象
- queue中添加了foo
- items中添加了key为baz,value为Delta数组的一组键值,Delta数组中添加了一个{Update,{baz,10}}对象
- queue中添加了baz