接口与实现类
Store: 接口定义了基本的方法.
Indexer: 在 Store 的基础上添加了几个关于 index 的方法.
ThreadSafeStore: 定义了一系列方法, 与 Indexer 中所有方法(会包括 Store 中的方法)的最大区别是它有 key.
threadSafeMap: 是 ThreadSafeStore 的一个实现类.
cache: 是 Indexer 或 Store 的一个实现类, 它会根据 keyFunc 生成该 obj 对应的一个 key, 然后调用 ThreadSafeStore 的方法.
UML 关系图
Store
# tools/cache/store.go
type Store interface {
// 直接操作数据
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
Indexer
# tools/cache/index.go
type Indexer interface {
// 从 Store 中继承操作数据的接口
Store
// 操作数据对应的 index
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
cache
cache 是 Indexer 接口的实现类, 那么自然也是 Store 接口的实现类, 可以看到 cacheStorage 是一个 ThreadSafeStore 的对象, 而 ThreadSafeStore 是一个根据 key 来操作的类, 所以 cache 中有一个为 obj 生成唯一 key 的 keyFunc 方法, 然后就可以调用 ThreadSafeStore 的对应方法.
# tools/cache/store.go
type cache struct {
// 实际存放数据的数据结构
cacheStorage ThreadSafeStore
// 用于计算 key 的方法
keyFunc KeyFunc
}
ThreadSafeStore
这里与 Store 有点区别的是 ThreadSafeStore 与 index 无关的全部都是针对 key 的操作, 而 index 方面的操作都是与 Indexer 方法意义
# tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
//操作数据
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
// 操作 index 与数据 key 的对应关系 map
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
threadSafeMap
真正逻辑实现核心
# tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// 存放 计算对象 index 方法 的 map
indexers Indexers
// 存放 对象的 key 与 index 对应关系 的 map
indices Indices
}
// 存放 IndexFunc 方法的 map,key 为方法名,value 为 func
type Indexers map[string]IndexFunc
// 根据 obj 中的数据计算并返回 string 类型 index 的方法
type IndexFunc func(obj interface{}) ([]string, error)
// 存放用 IndexFunc 生成索引值的 map,key 为 IndexFunc 方法在 Indexers 中的 key,value 为同一 IndexFunc 生成的所有索引值的 set
type Indices map[string]Index
// 存放同一个 IndexFunc 生成的所有索引值的 set
type Index map[string]sets.String
Indexer
TestIndex
利用 tools/cache/store_test.go 中的 TestIndex 方法分析 Indexer 工作流程
# tools/cache/store_test.go
// 官方提供的 Index 测试方法入口
func TestIndex(t *testing.T) {
// NewIndexer 初始化 Indexer,并传入 doTestIndex 中进行测试
doTestIndex(t, NewIndexer(testStoreKeyFunc, testStoreIndexers()))
}
//初始化一个 Indexers
func testStoreIndexers() Indexers {
indexers := Indexers{}
indexers["by_val"] = testStoreIndexFunc
return indexers
}
// 根据传入的 obj 返回 value 作为 index
func testStoreIndexFunc(obj interface{}) ([]string, error) {
return []string{obj.(testStoreObject).val}, nil
}
TestIndex 主要做了 2 件事
1, 调用 NewIndexer 初始化一个 Indexer
2, 传入 Indexer,并在 doTestIndex 中测试 Indexer 的各种方法
NewIndexer 方法
构造一个 cache 作为 Indexer 作为的实现类
Indexer 初始化过程图
源码分析
# tools/cache/store.go
// keyFunc 是用以生成 key 的 function,indexers 是用以生成索引值的方法 map 集合
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{ //返回一个 cache,cache 是 Indexer 接口的实现类
// 调用 NewThreadSafeStore 方法新建一个 threadsafeMap
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
// 传入 keyFunc 作为 cache 生成 key 的 function
keyFunc: keyFunc,
}
}
# tools/cache/thread_safe_store.go
// Indexers 是生成索引值方法的集合,indices 是储存 obj 的 key-index 关系的 map
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
// 新建一个 threadSafeMap 作为实际操作核心
return &threadSafeMap{
items: map[string]interface{}{}, //存储数据 obj 的数据结构
indexers: indexers, //生成索引值方法的集合
indices: indices, //储存 obj 的 key-index 关系的 map
}
}
doTestIndex 方法
测试传入的 Indexer 的各种方法
# tools/cache/store_test.go
func doTestIndex(t *testing.T, indexer Indexer) {
mkObj := func(id string, val string) testStoreObject {
return testStoreObject{id: id, val: val}
}
// Test Index
expected := map[string]sets.String{}
expected["b"] = sets.NewString("a", "c")
expected["f"] = sets.NewString("e")
expected["h"] = sets.NewString("g")
indexer.Add(mkObj("a", "b")) # 调用 cache 的 Add 方法
indexer.Add(mkObj("c", "b"))
indexer.Add(mkObj("e", "f"))
indexer.Add(mkObj("g", "h"))
{
for k, v := range expected {
found := sets.String{}
indexResults, err := indexer.Index("by_val", mkObj("", k))
if err != nil {
t.Errorf("Unexpected error %v", err)
}
for _, item := range indexResults {
found.Insert(item.(testStoreObject).id)
}
items := v.List()
if !found.HasAll(items...) {
t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List())
}
}
}
}
Add 方法解析
# doTestIndex 中调用 Add 的方法
indexer.Add(mkObj("a", "b"))
# tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj) # 构造时传入的生成 Key 的 func
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj) #调用 threadSafeMap 的 Add 方法
return nil
}
# tools/cache/thread_safe_store.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key] #获取 key 相同的旧 obj
c.items[key] = obj # 把新的 obj 存入 items 中
c.updateIndices(oldObject, obj, key) # 更新 Indices 中的 key 与 index 关系
}
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 删除旧的 key-index 关系
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 循环所有的 indexerFunc,indexers 中存放的是方法名与方法的对应 map
for name, indexFunc := range c.indexers {
// 调用 indexFunc 方法生成索引 indexValues
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 根据方法名获取对应索引集合,indices 中存放的是方法名与对应生成的 index 的 set
index := c.indices[name]
// 若为空,则创建一个新的集合,并把
if index == nil {
index = Index{}
c.indices[name] = index
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}
代码流程分析
利用 tools/cache/store_test.go 测试文件分析代码流程
TestCache
利用 TestCache 方法测试 Cache
TestCache
// tools/cache/store_test.go
// 测试类入口
func TestCache(t *testing.T) {
doTestStore(t, NewStore(testStoreKeyFunc)) #进入 NewStore
}
//返回传入的 testStoreObject 对象的 id 作为 index
func testStoreKeyFunc(obj interface{}) (string, error) {
return obj.(testStoreObject).id, nil
}
//测试类中自定义的类型,用与测试 cache 和 Store
type testStoreObject struct {
id string
val string
}
NewStore
// tools/cache/store.go Store 接口结构文件,及 cache 类的结构类及继承方法的实现文件
//构造一个 cache,用传入的 keyFunc 作为 index 的生成方法,并新建一个 NewThreadSafeStore 作为实际存储对象
//NewThreadSafeStore 方法中传入
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
//Indexers 为一个 '由一组函数组成的 map',用与在生成 index 时取出对应方法并回调生成 index
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)
// TODO
type Indices map[string]Index
type Index map[string]sets.String
NewThreadSafeStore
// tools/cache/thread_safe_store.go ThreadSafeStore 接口的结构和 threadSafeMap 的接口类与继承方法文件
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{}, //实际存储数据的 map
indexers: indexers,
indices: indices,
}
}
// Add 方法,直接上锁,然后对 items 进行操作,update,delete 方法也是一样
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}