StreamWathcer
BroadcastWatcher
Broadcast 就是将一个事件去转发给多个 Watcher,所以 Broadcast 需要存储 watcher 的 map,每个 Watcher 会有各自的 result channel,当事件通过 incoming 到来的时候,会遍历所有的 watcher,将事件转发出去。
map 并不是线程安全的,添加 Wathcer 的时候,如何保证不会发生并发错误。
Workqueue
queue 有下面四个特性
- 按照添加的顺序处理
- 一个项目不会被并发的处理多次,一个项目被添加了多次,但只会处理一次
- 多个消费者,多个生产者,允许一个项目在处理时再次进入队列
- 关闭通知
在 item 正在 processing 的时候,不会将 item 放入 queue 中,当 process 结束之后,才会将 item 重新从 dirty 放入 queue 中,这样就保证了一个项目能够再次进入队列
这样进入队列,不会与第二条中被添加多次,只会处理一次冲突吗?
Indexer
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
return func(obj interface{}) (string, error) {
indexKeys, err := indexFunc(obj)
if err != nil {
return "", err
}
if len(indexKeys) > 1 {
return "", fmt.Errorf("too many keys: %v", indexKeys)
}
if len(indexKeys) == 0 {
return "", fmt.Errorf("unexpected empty indexKeys")
}
return indexKeys[0], nil
}
}
KeyFunc 和 IndexFunc 可以互相转化,IndexFunc 的第一个数值就是 KeyFunc?indexFunc 为什么要产生多个值,是需要将这个数据存储多份吗?在 IndexFunc 和 KeyFunc 重复的情况下,这样存储有什么意义?
Index 和 Key 通常是根据 Obj 的 Namespace + Name 确定的,查询缓存的时候只需要通过 Namespace 和 Resource 就可以知道有没有缓存的资源了。
Informer
ListerWatcher 会先将需要的资源进行 List,然后就进行 Watcher,如果 ListerWatcher 的时候遇到错误,会经过一段的退避时间之后再重新运行。
Watch 的结果会经过 Decode 生成一系列的事件,根据类型发送到 DeltaFIFO 中,最后通过 Process 函数进行处理。
SharedInformer
SharedInformer 会在 Process 中将 Delta 发送给多个 Listener,每个 Listener 有自己的 handler 函数,通过 Delta 的类型去选择对应的处理函数。