DeltaFIFO是一个生产者-消费者队列,informer中将Reflector用作为DeltaFIFO的生产者,而消费者则是调用Pop()方法的对象,主要作用是存储资源的变化过程 ,供之后的组件调用 client-go版本:https://github.com/kubernetes/client-go/blob/release-1.18 代码位置:https://github.com/kubernetes/client-go/blob/release-1.18/tools/cache/delta_fifo.go

类及结构

继承关系

11177530-110d90f146bb3b22.png

实体类s

DeltaFIFO,Deltas,Delta,DeltaType

  • DeltaFIFO
    • items:数据的主要存储位置
      • key:通过keyFunc方法生成的id
      • value:Deltas
    • queue:存放通过keyFunc方法生成的id的slice
    • keyFunc:生成odj对应唯一id的方法
    • knownObjects:本地缓存,一般是一个Indexer
    • populated:判断第一次操作DeltaFIFO是通过Add/Update/Delete还是Replace
      • Add/Update/Delete则populated为true
      • Replace则populated为false
    • closed:标记DeltaFifo是否为关闭状态
  • Deltas
    • 一个存放Delta的slice
  • Delta
    • Type:DeltaType,obj的变更类型
    • Object:资源的具体对象obj
  • DeltaType
    • add,update,delete,sync等资源的变更类型
  1. type DeltaFIFO struct {
  2. // lock/cond protects access to 'items' and 'queue'.
  3. lock sync.RWMutex
  4. cond sync.Cond
  5. // We depend on the property that items in the set are in
  6. // the queue and vice versa, and that all Deltas in this
  7. // map have at least one Delta.
  8. items map[string]Deltas
  9. queue []string
  10. // populated is true if the first batch of items inserted by Replace() has been populated
  11. // or Delete/Add/Update was called first.
  12. populated bool
  13. // initialPopulationCount is the number of items inserted by the first call of Replace()
  14. initialPopulationCount int
  15. // keyFunc is used to make the key used for queued item
  16. // insertion and retrieval, and should be deterministic.
  17. keyFunc KeyFunc
  18. // knownObjects list keys that are "known", for the
  19. // purpose of figuring out which items have been deleted
  20. // when Replace() or Delete() is called.
  21. knownObjects KeyListerGetter
  22. // Indication the queue is closed.
  23. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  24. // Currently, not used to gate any of CRED operations.
  25. closed bool
  26. closedLock sync.Mutex
  27. }
  28. // Deltas is a list of one or more 'Delta's to an individual object.
  29. // The oldest delta is at index 0, the newest delta is the last one.
  30. type Deltas []Delta
  31. // Delta is the type stored by a DeltaFIFO. It tells you what change
  32. // happened, and the object's state after* that change.
  33. //
  34. // [*] Unless the change is a deletion, and then you'll get the final
  35. // state of the object before it was deleted.
  36. type Delta struct {
  37. Type DeltaType
  38. Object interface{}
  39. }
  40. // DeltaType is the type of a change (addition, deletion, etc)
  41. type DeltaType string
  42. // Change type definition
  43. const (
  44. Added DeltaType = "Added"
  45. Updated DeltaType = "Updated"
  46. Deleted DeltaType = "Deleted"
  47. // The other types are obvious. You'll get Sync deltas when:
  48. // * A watch expires/errors out and a new list/watch cycle is started.
  49. // * You've turned on periodic syncs.
  50. // (Anything that trigger's DeltaFIFO's Replace() method.)
  51. Sync DeltaType = "Sync"
  52. )

DeletedFinalStateUnknown

当一个obj被删除了, 但是这个程序这边由于某种原因miss了这次deletion event, 那么假如在做同步操作时, 从服务器获取的列表中已经没有了这个obj, 因为该程序没有接收到deletion event, 所以该obj在本地缓存中依然存在, 所以此时会给这个obj构造成这个DeletedFinalStateUnknown类型.

// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
    Key string
    Obj interface{}
}

主要方法

图示说明代码地址:https://github.com/ddddx0/customTestClientGO/tree/master/delta_fifo

Add/Update/Delete/queueActionLocked

概述

  • Add/Update/Delete 方法主要是通过keyFunc生成的id,往itemsqueue中插入描述变更过程的数据(Delta,Deltas)
    • items中不存在该id则向queue中都插入指定类型数据
      • queue中插入string类型的id
      • items中插入Deltas([]Delta),Delta由变更类型(added/updated/deleted)和传入obj构成
    • items中存在该id对应的Deltas,则取出Deltas并把新Deltas拼接上去
    • 若Deltas的最后2个Delta都为Delete操作,则合并为一个
  • Add/Update/Delete 方法其实都是对items与queue增量操作,除了检查性的删除之外,并没有对DeltaFifo进行主动删除操作,所以这些方法可以理解为对DeltaFifo的添加操作,对不同的变更类型添加对应的Delta

源码分析

Add/Update

  • Add
    • 加锁
    • 调用queueActionLocked()方法,传入Added表示变更类型为添加
  • Update
    • 加锁
    • 调用queueActionLocked()方法,传入Updated表示变更类型为更新
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

Delete

  • 1:通过keyFunc生成obj对应id
  • 2:若缓存为空,且items中没有key为id的对象,则直接返回,没有该对象,则无法进行删除变更
  • 3:若缓存不为空,但items和缓存中都没有改id的对象,则返回,没有对象无法进行删除变更
  • 4:若不是以上情况,调用queueActionLocked()方法,传入Updated表示变更类型为删除
// Delete is just like Add, but makes an Deleted Delta. If the item does not
// already exist, it will be ignored. (It may have already been deleted by a
// Replace (re-list), for example.
func (f *DeltaFIFO) Delete(obj interface{}) error {
    // == 1
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    // == 2
    if f.knownObjects == nil {
        if _, exists := f.items[id]; !exists {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            return nil
        }
    } else {
        // == 3
        // We only want to skip the "deletion" action if the object doesn't
        // exist in knownObjects and it doesn't have corresponding item in items.
        // Note that even if there is a "deletion" action in items, we can ignore it,
        // because it will be deduped automatically in "queueActionLocked"
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            return nil
        }
    }
    // == 4
    return f.queueActionLocked(Deleted, obj)
}

queueActionLocked

  • 1:通过keyFunc生成obj对应id
  • 2:以传入的变更类型和obj构建一个Delta对象,并取出items中该id对应的Deltas,一起拼接成新Deltas>> newDeltas
  • 3:若newDeltas中最后2个元素的事件都为delete,则合并为一个元素
  • 4:若newDeltas长度大于0,且items中没有该id对应的Deltas,则说明之前没有关于该obj的变更,则在queue中添加该id,并在items中添加key为id,value为newDeltas的数据,并唤醒所有cond变量所在线程(主要是供Pop方法使用)
  • 5:若newDeltas长度不大于0,则从items中删除该id对应的Deltas
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // == 1
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    // == 2
    newDeltas := append(f.items[id], Delta{actionType, obj})
    // == 3 
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        // == 4 
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        // == 5
        // We need to remove this from our map (extra items in the queue are
        // ignored if they are not in the map).
        delete(f.items, id)
    }
    return nil
}

图示说明

初始化一个DeltasFIFO,并调用Add,Update,Delete方法

func TestDeltaFIFO_addUpdateDelete(t *testing.T) {
    f := cache.NewDeltaFIFO(testFifoObjectKeyFunc, nil)
    f.Add(mkFifoObj("foo", 10))
    f.Update(mkFifoObj("foo", 12))
    f.Delete(mkFifoObj("foo", 15))
}

初始化DeltaFIFO

初始化一个items,queue,knowObjects都为空的DeltaFIFO

DeltaFIFO.jpg

调用Add

  • items中添加了key为foo,value为Delta数组的一组键值,Delta数组中添加了一个{Add,{foo,10}}对象
  • queue中添加了foo

Add.jpg

调用Update

  • items中key为foo的数组中,添加了一个{Update,{foo,12}}

Update.jpg

调用Delete

items中key为foo的数组中,添加了一个{Delete,{foo,15}}

Delete.jpg

小结

  • Add/Update/Delete 方法主要是通过keyFunc生成的id,往itemsqueue中插入描述变更过程的数据(Delta,Deltas)
    • items中不存在该id则向queue中都插入指定类型数据
      • queue中插入string类型的id
      • items中插入Deltas([]Delta),Delta由变更类型(added/updated/deleted)和传入obj构成
    • items中存在该id对应的Deltas,则取出Deltas并把新Deltas拼接上去
    • 若Deltas的最后2个Delta都为Delete操作,则合并为一个
  • Add/Update/Delete 方法其实都是对items与queue增量操作,除了检查性的删除之外,并没有对DeltaFifo进行主动删除操作,所以这些方法可以理解为对DeltaFifo的添加操作,对不同的变更类型添加对应的Delta

Replace

概述

  • Replace方法需要传入一个list,若knowsObjectsitems中存在list中不存在的obj,则对这个obj对应的Deltas中添加一个Delta{delete,DeletedFinalStateUnknown},即通过对knowsObjectsitems的检查,往items中添加变更操作
  • Replace方法也是对DeltaFifo的增量操作,除检查性删除之外,没有做删除操作

源码分析

遍历新list

  1. 把id放入keys
  2. 以list中的元素作为obj,向items中增加一个变更类型为Sync的Delta
    1. knowsObjects为空,说明不存在缓存,则遍历items,向list中不存在的Deltas后拼接一个{delete,DeletedFinalStateUnknown},并记录操作元素数量
  3. 遍历itemsk为obj的id,olditem为Deltas
    1. keys中存在当前obj的id,则说明list中的元素已存在与items中,则不做操作
    2. 若不存在,则取出olditem中最后一个Delta,作为删除变更的obj
    3. 调用queueActionLocked方法,以olditem中最后一个元素作为obj,构建一个变更类型为delete的DeletedFinalStateUnknown
  4. initialPopulationCount标记当前replace方法对items的操作次数,即插入数据数量(不一定相等,因为queueActionLocked方法中会把结尾为2次delete的Delta合并)
    1. knowsObjects不为空,则遍历knowsObjects的keys所组成的slice knowsKeys,向list中不存在的Deltas后拼接一个{delete,DeletedFinalStateUnknown},并记录操作元素数量
  5. knowsKeys中存在当前obj的id,则说明list中的元素已存在与items中,则不做操作
  6. knowsObjects中取出当前id对应的obj,若对选obj存在且不为空,
  7. 记录操作数
  8. 调用queueActionLocked方法,传入一个变更类型为delete的DeletedFinalStateUnknown
  9. initialPopulationCount标记当前replace方法对items的操作次数,即插入数据数量(不一定相等,因为queueActionLocked方法中会把结尾为2次delete的Delta合并)
// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))

    // == 1
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        // == 1.a
        keys.Insert(key)
        // == 1.b
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }

    // == 2
    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        queuedDeletions := 0
        // == 2.a
        for k, oldItem := range f.items {
            // == 2.a.i
            if keys.Has(k) {
                continue
            }
            var deletedObj interface{}
            // == 2.a.ii
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            queuedDeletions++
            // == 2.a.iii
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }

        // == 2.b
        if !f.populated {
            f.populated = true
            // While there shouldn't be any queued deletions in the initial
            // population of the queue, it's better to be on the safe side.
            f.initialPopulationCount = len(list) + queuedDeletions
        }

        return nil
    }

    // == 3
    // Detect deletions not already in the queue.
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        // == 3.a
        if keys.Has(k) {
            continue
        }
        // == 3.b
        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        // == 3.c
        queuedDeletions++
        // == 3.d
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }
    // == 3.f
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(list) + queuedDeletions
    }

    return nil
}

图示说明

  • 初始化一个带有knowsObjects的DeltaFIFO
  • 调用Delete
  • 调用Replace
func TestDeltaFIFO_replace(t *testing.T) {
    f := cache.NewDeltaFIFO(
        testFifoObjectKeyFunc,
        customObjects{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)},
    )
    f.Delete(mkFifoObj("baz", 10))
    f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
}

初始化DeltaFIFO

初始化一个itemsqueue为空,knowObjects中有3个元素的DeltaFIFO

DeltaFIFO.jpg

调用Delete

  • items中添加了key为baz,value为Delta数组的一组键值,Delta数组中添加了一个{Delete,{baz,10}}
  • queue中添加了baz

Delete.jpg

调用Replace

未调用Replace时

  • replaceList为调用Replace时传入的数组

Replace1.jpg

先循环replaceList,对所有元素插入Sync变更

Replace2.jpg

  • knowsObjects不为空时,循环knowObjects,对所有replaceList中不存在的元素,插入一个DeletedFinalStateUnknown
  • baz对应数组中最后2个Delta都是Delete变更,所以只保留前一个

Replace3.jpg

Resync

概述

  • Resync方法即对所有knowObjects中存在,items中不存在的obj进行同步(调用queueActionLocked方法增加Sync变更),即同步knowObjects和items
  • Replace方法也是对DeltaFifo的增量操作,除检查性删除之外,没有做删除操作

源码分析

knownObjects中每个obj对应的items里添加一个Sync变更类型的Delta

  1. 循环knownObjects,对每个id调用syncKeyLocked方法
  2. knownObjects中存在对应obj
    1. 计算obj对应id,若items[id]数组大于0,则返回nil即不插入Sync变更
    2. 否则,调用queueActionLocked插入Sync变更
// Resync will send a sync event for each item
func (f *DeltaFIFO) Resync() error {
    f.lock.Lock()
    defer f.lock.Unlock()

    if f.knownObjects == nil {
        return nil
    }

    keys := f.knownObjects.ListKeys()
    // == 1
    for _, k := range keys {
        // == 2
        if err := f.syncKeyLocked(k); err != nil {
            return err
        }
    }
    return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
    // ==2.a
    obj, exists, err := f.knownObjects.GetByKey(key)
    if err != nil {
        klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
        return nil
    } else if !exists {
        klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
        return nil
    }

    // If we are doing Resync() and there is already an event queued for that object,
    // we ignore the Resync for it. This is to avoid the race, in which the resync
    // comes with the previous value of object (since queueing an event for the object
    // doesn't trigger changing the underlying store <knownObjects>.
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if len(f.items[id]) > 0 {
        return nil
    }
    // == 2.b
    if err := f.queueActionLocked(Sync, obj); err != nil {
        return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}

图示说明

初始化DeltaFIFO,并调用Delete,Resync

func TestDeltaFIFO_resync(t *testing.T) {
    f := cache.NewDeltaFIFO(
        testFifoObjectKeyFunc,
        customObjects{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)},
    )
    f.Delete(mkFifoObj("foo", 10))
    f.Resync()
}

初始化DeltaFIFO

初始化一个itemsqueue为空,knowObjects中有3个元素的DeltaFIFO

DeltaFIFO.jpg

调用Delete

  • items中添加了key为foo,value为Delta数组的一组键值,Delta数组中添加了一个{Delete,{foo,10}}
  • queue中添加了foo

Deleta.jpg

调用Resync

knowObjects中存在且items中不存在的obj向items中添加Sync变更

Resync.jpg

Pop

概述

  • Pop方法即循环queue中的id,取出items中该id对应的obj数组,若传入的PopProcessFunc方法执行结果为ErrRequeue,则重新在queue和items插入刚取出的元素,否则彻底从queue和items中删除取出元素
  • Pop方法不同于以上的方法,Pop方法是真正对DeltaFIFO进行删除操作的方法

源码分析

  1. queue长度等于0且closed为true,则表示DeltaFifo为关闭状态,方法返回
  2. queue长度等于0但closed为false,则表示DeltaFifo未关闭,当前线程wait,等待唤醒操作
  3. queue长度不为0
    1. 取出queue中第一个id,并从queue中删除该id
    2. 若replace计数器initialPopulationCount的值大于0,则减一
    3. items中获取id对应item,若item不存在,则跳过此次循环
    4. 若item存在,这从items中删除对应item与id(相当于队列的pop弹出操作)
    5. 通过传入的PopProcessFunc方法操作弹出的item
    6. PopProcessFunc方法返回的错误类型为ErrRequeue则说明是可预见性错误,需要调用addIfNotPresent方法把item重新添加进items
    7. 所有操作完成,返回被弹出item
// Pop blocks until an item is added to the queue, and then returns it.  If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // == 1
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }
            // == 2
            f.cond.Wait()
        }
        // == 3
        // == 3.a
        id := f.queue[0]
        f.queue = f.queue[1:]
        // == 3.b
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        // == 3.c
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        // == 3.d
        delete(f.items, id)
        // == 3.e
        err := process(item)
        // == 3.f
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // == 3.g
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

图示说明

TestDeltaFIFO_pop

  • 初始化创建DeltaFIFO
  • 调用Add方法
  • 调用Update方法

customPopFunc

  • 传入Pop方法中用来具体处理obj
func TestDeltaFIFO_pop(t *testing.T) {
    f := cache.NewDeltaFIFO(testFifoObjectKeyFunc, nil)

    f.Add(mkFifoObj("foo", 10))
    _, err := f.Pop(customPopFunc)
    if err != nil {
        t.Fatalf("unexpected error: %v", err)
    }
}

// 这里的功能为:如果不为Added变更类型,则打印name并返回成功,否则为pop失败
func customPopFunc(obj interface{}) error {
    deltas := obj.(cache.Deltas)
    if deltas[len(deltas)-1].Type != Added {
        fmt.Println("Added obj : " + deltas[len(deltas)-1].Object.(testFifoObject).name)
        return nil
    }
    return cache.ErrRequeue{Err: nil}
}

初始化DeltaFIFO

初始化一个items,queue,knowObjects都为空的DeltaFIFO

DeltaFIFO.jpg

调用Add&Update

  • items中添加了key为foo,value为Delta数组的一组键值,Delta数组中添加了一个{Add,{foo,10}}对象
  • queue中添加了foo
  • items中添加了key为baz,value为Delta数组的一组键值,Delta数组中添加了一个{Update,{baz,10}}对象
  • queue中添加了baz

Add&Update.jpg

调用Pop

Pop.jpg

参考及借鉴

https://www.jianshu.com/p/095de3ee5f7b