Broadcast 就是将一个事件去转发给多个 Watcher,所以 Broadcast 需要存储 watcher 的 map,每个 Watcher 会有各自的 result channel,当事件通过 incoming 到来的时候,会遍历所有的 watcher,将事件转发出去。
map 并不是线程安全的,添加 Wathcer 的时候,如何保证不会发生并发错误。
queue 有下面四个特性
- 按照添加的顺序处理
- 一个项目不会被并发的处理多次,一个项目被添加了多次,但只会处理一次
- 多个消费者,多个生产者,允许一个项目在处理时再次进入队列
- 关闭通知
在 item 正在 processing 的时候,不会将 item 放入 queue 中,当 process 结束之后,才会将 item 重新从 dirty 放入 queue 中,这样就保证了一个项目能够再次进入队列
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
return func(obj interface{}) (string, error) {
indexKeys, err := indexFunc(obj)
if err != nil {
return "", err
if len(indexKeys) > 1 {
return "", fmt.Errorf("too many keys: %v", indexKeys)
if len(indexKeys) == 0 {
return "", fmt.Errorf("unexpected empty indexKeys")
return indexKeys[0], nil
KeyFunc 和 IndexFunc 可以互相转化,IndexFunc 的第一个数值就是 KeyFunc?indexFunc 为什么要产生多个值,是需要将这个数据存储多份吗?在 IndexFunc 和 KeyFunc 重复的情况下,这样存储有什么意义?
Index 和 Key 通常是根据 Obj 的 Namespace + Name 确定的,查询缓存的时候只需要通过 Namespace 和 Resource 就可以知道有没有缓存的资源了。
ListerWatcher 会先将需要的资源进行 List,然后就进行 Watcher,如果 ListerWatcher 的时候遇到错误,会经过一段的退避时间之后再重新运行。
Watch 的结果会经过 Decode 生成一系列的事件,根据类型发送到 DeltaFIFO 中,最后通过 Process 函数进行处理。
SharedInformer 会在 Process 中将 Delta 发送给多个 Listener,每个 Listener 有自己的 handler 函数,通过 Delta 的类型去选择对应的处理函数。