参考 https://www.jianshu.com/p/76e7b1a57d2c

接口与实现类

Store: 接口定义了基本的方法.
Indexer: 在 Store 的基础上添加了几个关于 index 的方法.
ThreadSafeStore: 定义了一系列方法, 与 Indexer 中所有方法(会包括 Store 中的方法)的最大区别是它有 key.
threadSafeMap: 是 ThreadSafeStore 的一个实现类.
cache: 是 Indexer 或 Store 的一个实现类, 它会根据 keyFunc 生成该 obj 对应的一个 key, 然后调用 ThreadSafeStore 的方法.

UML 关系图

Store&Indexer.svg

Store

  1. # tools/cache/store.go
  2. type Store interface {
  3. // 直接操作数据
  4. Add(obj interface{}) error
  5. Update(obj interface{}) error
  6. Delete(obj interface{}) error
  7. List() []interface{}
  8. ListKeys() []string
  9. Get(obj interface{}) (item interface{}, exists bool, err error)
  10. GetByKey(key string) (item interface{}, exists bool, err error)
  11. Replace([]interface{}, string) error
  12. Resync() error
  13. }

Indexer

# tools/cache/index.go
type Indexer interface {
  // 从 Store 中继承操作数据的接口
   Store
   // 操作数据对应的 index
   Index(indexName string, obj interface{}) ([]interface{}, error)
   IndexKeys(indexName, indexedValue string) ([]string, error)
   ListIndexFuncValues(indexName string) []string
   ByIndex(indexName, indexedValue string) ([]interface{}, error)
   GetIndexers() Indexers
   AddIndexers(newIndexers Indexers) error
}

cache

cache 是 Indexer 接口的实现类, 那么自然也是 Store 接口的实现类, 可以看到 cacheStorage 是一个 ThreadSafeStore 的对象, 而 ThreadSafeStore 是一个根据 key 来操作的类, 所以 cache 中有一个为 obj 生成唯一 key 的 keyFunc 方法, 然后就可以调用 ThreadSafeStore 的对应方法.

# tools/cache/store.go
type cache struct {
   // 实际存放数据的数据结构
   cacheStorage ThreadSafeStore
   // 用于计算 key 的方法
   keyFunc KeyFunc
}

ThreadSafeStore

这里与 Store 有点区别的是 ThreadSafeStore 与 index 无关的全部都是针对 key 的操作, 而 index 方面的操作都是与 Indexer 方法意义

# tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
   //操作数据
   Add(key string, obj interface{})
   Update(key string, obj interface{})
   Delete(key string)
   Get(key string) (item interface{}, exists bool)
   List() []interface{}
   ListKeys() []string
   Replace(map[string]interface{}, string)
   // 操作 index 与数据 key 的对应关系 map
   Index(indexName string, obj interface{}) ([]interface{}, error)
   IndexKeys(indexName, indexKey string) ([]string, error)
   ListIndexFuncValues(name string) []string
   ByIndex(indexName, indexKey string) ([]interface{}, error)
   GetIndexers() Indexers
   AddIndexers(newIndexers Indexers) error
   Resync() error
}

threadSafeMap

真正逻辑实现核心

# tools/cache/thread_safe_store.go
type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}

   // 存放 计算对象 index 方法 的 map
   indexers Indexers
   // 存放 对象的 key 与 index 对应关系 的 map
   indices Indices
}
// 存放 IndexFunc 方法的 map,key 为方法名,value 为 func
type Indexers map[string]IndexFunc
// 根据 obj 中的数据计算并返回 string 类型 index 的方法
type IndexFunc func(obj interface{}) ([]string, error)

// 存放用 IndexFunc 生成索引值的 map,key 为 IndexFunc 方法在 Indexers 中的 key,value 为同一 IndexFunc 生成的所有索引值的 set
type Indices map[string]Index
// 存放同一个 IndexFunc 生成的所有索引值的 set
type Index map[string]sets.String

Indexer

TestIndex

利用 tools/cache/store_test.go 中的 TestIndex 方法分析 Indexer 工作流程

# tools/cache/store_test.go
// 官方提供的 Index 测试方法入口
func TestIndex(t *testing.T) {
   // NewIndexer 初始化 Indexer,并传入 doTestIndex 中进行测试
   doTestIndex(t, NewIndexer(testStoreKeyFunc, testStoreIndexers()))
}
//初始化一个 Indexers
func testStoreIndexers() Indexers {
   indexers := Indexers{}
   indexers["by_val"] = testStoreIndexFunc
   return indexers
}
// 根据传入的 obj 返回 value 作为 index
func testStoreIndexFunc(obj interface{}) ([]string, error) {
   return []string{obj.(testStoreObject).val}, nil
}

TestIndex 主要做了 2 件事
1, 调用 NewIndexer 初始化一个 Indexer
2, 传入 Indexer,并在 doTestIndex 中测试 Indexer 的各种方法

NewIndexer 方法

构造一个 cache 作为 Indexer 作为的实现类

Indexer 初始化过程图

Snipaste_2020-04-14_20-18-30.jpg

源码分析

# tools/cache/store.go
// keyFunc 是用以生成 key 的 function,indexers 是用以生成索引值的方法 map 集合
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
   return &cache{ //返回一个 cache,cache 是 Indexer 接口的实现类
      // 调用 NewThreadSafeStore 方法新建一个 threadsafeMap
      cacheStorage: NewThreadSafeStore(indexers, Indices{}),
      // 传入 keyFunc 作为 cache 生成 key 的 function
      keyFunc:      keyFunc,
   }
}
# tools/cache/thread_safe_store.go
// Indexers 是生成索引值方法的集合,indices 是储存 obj 的 key-index 关系的 map
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
   // 新建一个 threadSafeMap 作为实际操作核心
   return &threadSafeMap{
      items:    map[string]interface{}{}, //存储数据 obj 的数据结构
      indexers: indexers, //生成索引值方法的集合
      indices:  indices, //储存 obj 的 key-index 关系的 map
   }
}

doTestIndex 方法

测试传入的 Indexer 的各种方法

# tools/cache/store_test.go
func doTestIndex(t *testing.T, indexer Indexer) {
   mkObj := func(id string, val string) testStoreObject {
      return testStoreObject{id: id, val: val}
   }

   // Test Index
   expected := map[string]sets.String{}
   expected["b"] = sets.NewString("a", "c")
   expected["f"] = sets.NewString("e")
   expected["h"] = sets.NewString("g")
   indexer.Add(mkObj("a", "b")) # 调用 cache 的 Add 方法
   indexer.Add(mkObj("c", "b"))
   indexer.Add(mkObj("e", "f"))
   indexer.Add(mkObj("g", "h"))
   {
      for k, v := range expected {
         found := sets.String{}
         indexResults, err := indexer.Index("by_val", mkObj("", k))
         if err != nil {
            t.Errorf("Unexpected error %v", err)
         }
         for _, item := range indexResults {
            found.Insert(item.(testStoreObject).id)
         }
         items := v.List()
         if !found.HasAll(items...) {
            t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List())
         }
      }
   }
}

Add 方法解析

# doTestIndex 中调用 Add 的方法
indexer.Add(mkObj("a", "b"))
# tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
   key, err := c.keyFunc(obj) # 构造时传入的生成 Key 的 func
   if err != nil {
      return KeyError{obj, err}
   }
   c.cacheStorage.Add(key, obj) #调用 threadSafeMap 的 Add 方法
   return nil
}
# tools/cache/thread_safe_store.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
   c.lock.Lock()
   defer c.lock.Unlock()
   oldObject := c.items[key] #获取 key 相同的旧 obj
   c.items[key] = obj  # 把新的 obj 存入 items 中
   c.updateIndices(oldObject, obj, key) # 更新 Indices 中的 key 与 index 关系
}

func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
   // 删除旧的 key-index 关系
   if oldObj != nil {
      c.deleteFromIndices(oldObj, key)
   }
   // 循环所有的 indexerFunc,indexers 中存放的是方法名与方法的对应 map
   for name, indexFunc := range c.indexers {
      // 调用 indexFunc 方法生成索引 indexValues 
      indexValues, err := indexFunc(newObj)
      if err != nil {
         panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
      }

      // 根据方法名获取对应索引集合,indices 中存放的是方法名与对应生成的 index 的 set
      index := c.indices[name]
      // 若为空,则创建一个新的集合,并把
      if index == nil {
         index = Index{}
         c.indices[name] = index
      }

      for _, indexValue := range indexValues {
         set := index[indexValue]
         if set == nil {
            set = sets.String{}
            index[indexValue] = set
         }
         set.Insert(key)
      }
   }
}

代码流程分析

利用 tools/cache/store_test.go 测试文件分析代码流程

TestCache

利用 TestCache 方法测试 Cache

TestCache

// tools/cache/store_test.go
// 测试类入口
func TestCache(t *testing.T) {
   doTestStore(t, NewStore(testStoreKeyFunc)) #进入 NewStore
}
//返回传入的 testStoreObject 对象的 id 作为 index
func testStoreKeyFunc(obj interface{}) (string, error) {
   return obj.(testStoreObject).id, nil
}
//测试类中自定义的类型,用与测试 cache 和 Store
type testStoreObject struct {
   id  string
   val string
}

NewStore

// tools/cache/store.go Store 接口结构文件,及 cache 类的结构类及继承方法的实现文件
//构造一个 cache,用传入的 keyFunc 作为 index 的生成方法,并新建一个 NewThreadSafeStore 作为实际存储对象
//NewThreadSafeStore 方法中传入
func NewStore(keyFunc KeyFunc) Store {
   return &cache{
      cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
      keyFunc:      keyFunc,
   }
}
//Indexers 为一个 '由一组函数组成的 map',用与在生成 index 时取出对应方法并回调生成 index
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)

// TODO
type Indices map[string]Index

type Index map[string]sets.String

NewThreadSafeStore

// tools/cache/thread_safe_store.go ThreadSafeStore 接口的结构和 threadSafeMap 的接口类与继承方法文件
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
   return &threadSafeMap{
      items:    map[string]interface{}{}, //实际存储数据的 map
      indexers: indexers,
      indices:  indices,
   }
}
// Add 方法,直接上锁,然后对 items 进行操作,update,delete 方法也是一样
func (c *threadSafeMap) Add(key string, obj interface{}) {
   c.lock.Lock()
   defer c.lock.Unlock()
   oldObject := c.items[key]
   c.items[key] = obj
   c.updateIndices(oldObject, obj, key)
}