kubernetes使用informer机制来保证消息的实时性、可靠性、顺序性等,Kubernetes的其他组件都是通过client-go的Informer机制与Kubernetes API Server进行通信的。

其架构原理如下:

image.png
主要流程如下:

  1. Reflector通过List获取集群的所有对象,并放到本地store缓存起来
  2. 然后Reflector通过Watch时刻监听集群的事件
  3. 如果获得集群事件就把这个事件发送到Delta FIFO
  4. controller从Delta FIFO中依次pop出事件并交给informer的回调函数HandleDeltas
  5. indexer将事件存储在本地的缓存中
  6. distribute负责将资源对象分发到具体的处理函数
  7. Rsync会把本地的缓存的资源对象同步到DeltaFIFO

比如我们要删除一个叫demo的pod,那么其主要流程如下:

  1. 首先是Reflector通过Watch收到要删除名叫demo的pod事件,然后将这个事件发送给Delta FIFO
  2. 从DeltaFIFO中pop这个事件并交给indexer进行本地处理,首先会存储这个事件,其次会从本地store中删除这个pod
  3. 回调函数会调用distribute去处理不同的对象

从上面可以看到其主要组件有以下几个。
(1)、Reflector
Reflector负责List&Watch指定的kubernetes资源,当Watch到监控的资源发生变化时,就会触发相应的变更事件。比如Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存DeltaFIFO中。
(2)、DeltaFIFO
DeltaFIFO就是一个资源对象存储的一个队列。
(3)、indexer
Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer与Etcd集群中的数据完全保持一致。client-go可以很方便地从本地存储中读取相应的资源对象数据,而无须每次从远程Etcd集群中读取,以减轻Kubernetes API Server和Etcd集群的压力。
(4)、informer
InformerClient-go 中的一个核心工具包。在Kubernetes源码中,如果 Kubernetes 的某个组件,需要 List/Get Kubernetes 中的 Object,在绝大多 数情况下,会直接使用Informer实例中的Lister()方法(该方法包含 了 Get 和 List 方法),而很少直接请求Kubernetes APIInformer 最基本 的功能就是List/Get Kubernetes中的 Object

Informer

每一个Kubernetes资源上都实现了Informer机制。每一个Informer上都会实现Informer和Lister方法,例如PodInformer,代码示例如下(staging\src\k8s.io\client-go\informers\core\v1\pod.go):

  1. type PodInformer interface {
  2. Informer() cache.SharedIndexInformer
  3. Lister() v1.PodLister
  4. }

定义不同资源的Informer,允许监控不同资源的资源事件。比如监控Pod的资源事件,当Pod有新增、删除、修改等事件的时候,client-go中的Watch能及时收到资源对象的变更信息。

1、Shared Informer

同一个资源的informer可能会被实例化多次,比如一个Pod可能会被deployment的Informer实例化,也可能会被replicaset实例化,如果一个informer一个Reflector,就会导致有许许多多的Reflector做相同的事情,会导致api server负载过重。所以client-go提供了一种shared informer机制,就是同一资源的Informer共享一个Reflector。

其中Informers字段的定义如下(staging\src\k8s.io\client-go\informers\factory.go):

  1. type sharedInformerFactory struct {
  2. client kubernetes.Interface
  3. namespace string
  4. tweakListOptions internalinterfaces.TweakListOptionsFunc
  5. lock sync.Mutex
  6. defaultResync time.Duration
  7. customResync map[reflect.Type]time.Duration
  8. informers map[reflect.Type]cache.SharedIndexInformer
  9. // startedInformers is used for tracking which informers have been started.
  10. // This allows Start() to be called multiple times safely.
  11. startedInformers map[reflect.Type]bool
  12. }

可以看到informers字段是一个map类型,其中Key是资源类型,value是SharedIndexInformer。

而InformerFor方法添加了不同资源的infomer,如下(staging\src\k8s.io\client-go\informers\factory.go):

  1. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
  2. f.lock.Lock()
  3. defer f.lock.Unlock()
  4. informerType := reflect.TypeOf(obj)
  5. informer, exists := f.informers[informerType]
  6. if exists {
  7. return informer
  8. }
  9. resyncPeriod, exists := f.customResync[informerType]
  10. if !exists {
  11. resyncPeriod = f.defaultResync
  12. }
  13. informer = newFunc(f.client, resyncPeriod)
  14. f.informers[informerType] = informer
  15. return informer
  16. }
  • 首先获取资源类型,如果存在则直接返回
  • 获取rsync的时间,如果没有自定义则使用默认的时间
  • 然后创建一个新的informer,更新Informers字段

最后通过Shared Informer的Start方法使f.informers中的每个informer通过goroutine持久运行。

  1. // Start initializes all requested informers.
  2. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  3. f.lock.Lock()
  4. defer f.lock.Unlock()
  5. for informerType, informer := range f.informers {
  6. if !f.startedInformers[informerType] {
  7. go informer.Run(stopCh)
  8. f.startedInformers[informerType] = true
  9. }
  10. }
  11. }

Reflector

Reflector主要通过List&Watch来获取监控集群资源,其主要有两部分:List和Watch。其中List就是列出集群的资源,基于HTTP短链接实现。Watch是监听资源事件,是基于HTTP长连接实现。

Reflector的结构体定义如下,代码路径staging\src\k8s.io\client-go\tools\cache\reflector.go

  1. type Reflector struct {
  2. name string
  3. expectedTypeName string
  4. expectedType reflect.Type
  5. expectedGVK *schema.GroupVersionKind
  6. store Store
  7. listerWatcher ListerWatcher
  8. backoffManager wait.BackoffManager
  9. resyncPeriod time.Duration
  10. ShouldResync func() bool
  11. clock clock.Clock
  12. paginatedResult bool
  13. lastSyncResourceVersion string
  14. isLastSyncResourceVersionUnavailable bool
  15. lastSyncResourceVersionMutex sync.RWMutex
  16. WatchListPageSize int64
  17. watchErrorHandler WatchErrorHandler
  18. }

可以看到Reflector定义了一个listerWatcher,而ListerWatcher是一个接口,定义如下,代码路径:staging\src\k8s.io\client-go\tools\cache\listwatch.go

  1. // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
  2. type ListerWatcher interface {
  3. Lister
  4. Watcher
  5. }

而Lister和Watcher也是一个接口,其分别需要实现List和Watch方法,定义如下:

  1. // Lister is any object that knows how to perform an initial list.
  2. type Lister interface {
  3. List(options metav1.ListOptions) (runtime.Object, error)
  4. }
  5. // Watcher is any object that knows how to start a watch on a resource.
  6. type Watcher interface {
  7. Watch(options metav1.ListOptions) (watch.Interface, error)
  8. }

**
而ListWatch实现了这两个方法,定义如下:

  1. type ListWatch struct {
  2. ListFunc ListFunc
  3. WatchFunc WatchFunc
  4. // DisableChunking requests no chunking for this list watcher.
  5. DisableChunking bool
  6. }
  7. // List a set of apiserver resources
  8. func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
  9. // ListWatch is used in Reflector, which already supports pagination.
  10. // Don't paginate here to avoid duplication.
  11. return lw.ListFunc(options)
  12. }
  13. // Watch a set of apiserver resources
  14. func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
  15. return lw.WatchFunc(options)
  16. }

而ListFunc和WatchFunc是定义的两个类型,如下:

  1. // ListFunc knows how to list resources
  2. type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
  3. // WatchFunc knows how to watch resources
  4. type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

在Reflector中是通过ListAndWatch方法来实现的,代码如下(staging\src\k8s.io\client-go\tools\cache\reflector.go):

(1)、List集群资源的代码简要如下:

  1. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  2. ....
  3. var resourceVersion string
  4. options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
  5. if err := func() error {
  6. initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
  7. defer initTrace.LogIfLong(10 * time.Second)
  8. var list runtime.Object
  9. var paginatedResult bool
  10. var err error
  11. listCh := make(chan struct{}, 1)
  12. panicCh := make(chan interface{}, 1)
  13. go func() {
  14. defer func() {
  15. if r := recover(); r != nil {
  16. panicCh <- r
  17. }
  18. }()
  19. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
  20. return r.listerWatcher.List(opts)
  21. }))
  22. switch {
  23. case r.WatchListPageSize != 0:
  24. pager.PageSize = r.WatchListPageSize
  25. case r.paginatedResult:
  26. case options.ResourceVersion != "" && options.ResourceVersion != "0":
  27. pager.PageSize = 0
  28. }
  29. list, paginatedResult, err = pager.List(context.Background(), options)
  30. if isExpiredError(err) || isTooLargeResourceVersionError(err) {
  31. r.setIsLastSyncResourceVersionUnavailable(true)
  32. list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
  33. }
  34. close(listCh)
  35. }()
  36. select {
  37. case <-stopCh:
  38. return nil
  39. case r := <-panicCh:
  40. panic(r)
  41. case <-listCh:
  42. }
  43. if err != nil {
  44. return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
  45. }
  46. if options.ResourceVersion == "0" && paginatedResult {
  47. r.paginatedResult = true
  48. }
  49. r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
  50. initTrace.Step("Objects listed")
  51. listMetaInterface, err := meta.ListAccessor(list)
  52. if err != nil {
  53. return fmt.Errorf("unable to understand list result %#v: %v", list, err)
  54. }
  55. resourceVersion = listMetaInterface.GetResourceVersion()
  56. initTrace.Step("Resource version extracted")
  57. items, err := meta.ExtractList(list)
  58. if err != nil {
  59. return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
  60. }
  61. initTrace.Step("Objects extracted")
  62. if err := r.syncWith(items, resourceVersion); err != nil {
  63. return fmt.Errorf("unable to sync list result: %v", err)
  64. }
  65. initTrace.Step("SyncWith done")
  66. r.setLastSyncResourceVersion(resourceVersion)
  67. initTrace.Step("Resource version updated")
  68. return nil
  69. }(); err != nil {
  70. return err
  71. }
  72. ......
  73. }

(1)、首先通过r.listerWatcher.List(opts)获取所有资源
(2)、通过listMetaInterface.GetResourceVersion()获取资源版本
(3)、通过meta.ExtractList(list)将资源对象转换为列表形式,将runtime.Object对象转换成[]runtime.Object对象。
(4)、通过r.syncWith(items, resourceVersion)将资源对象列表和版本号存储到DeltaFIFO中
(5)、通过r``.``setLastSyncResourceVersion``(resourceVersion)更新最新的版本

所以List的作用就是在初始化之初获取到集群中指定的所有资源对象并将他们存储到本地。

(2)、Watch集群资源的简要代码

  1. for {
  2. select {
  3. case <-stopCh:
  4. return nil
  5. default:
  6. }
  7. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  8. options = metav1.ListOptions{
  9. ResourceVersion: resourceVersion,
  10. TimeoutSeconds: &timeoutSeconds,
  11. AllowWatchBookmarks: true,
  12. }
  13. // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
  14. start := r.clock.Now()
  15. w, err := r.listerWatcher.Watch(options)
  16. if err != nil {
  17. if utilnet.IsConnectionRefused(err) {
  18. time.Sleep(time.Second)
  19. continue
  20. }
  21. return err
  22. }
  23. if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
  24. if err != errorStopRequested {
  25. switch {
  26. case isExpiredError(err):
  27. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  28. default:
  29. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
  30. }
  31. }
  32. return nil
  33. }
  34. }

(1)、通过metav1.ListOptions获取最新的资源
(2)、通过r.listerWatcher.Watch(options)来监控资源,实际调用了Pod Informer下的WatchFunc函数,它通过ClientSet客户端与Kubernetes API Server建立长连接,监控指定资源的变更事件
(3)、通过r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh)来处理资源的变更,当触发Added(资源添加)事件、Updated (资源更新)事件、Deleted(资源删除)事件时,将对应的资源对象更新到本地缓存DeltaFIFO中并更新ResourceVersion资源版本号,代码如下:

  1. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  2. eventCount := 0
  3. // Stopping the watcher should be idempotent and if we return from this function there's no way
  4. // we're coming back in with the same watch interface.
  5. defer w.Stop()
  6. loop:
  7. for {
  8. select {
  9. case <-stopCh:
  10. return errorStopRequested
  11. case err := <-errc:
  12. return err
  13. case event, ok := <-w.ResultChan():
  14. if !ok {
  15. break loop
  16. }
  17. if event.Type == watch.Error {
  18. return apierrors.FromObject(event.Object)
  19. }
  20. if r.expectedType != nil {
  21. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
  22. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  23. continue
  24. }
  25. }
  26. if r.expectedGVK != nil {
  27. if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
  28. utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
  29. continue
  30. }
  31. }
  32. meta, err := meta.Accessor(event.Object)
  33. if err != nil {
  34. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  35. continue
  36. }
  37. newResourceVersion := meta.GetResourceVersion()
  38. switch event.Type {
  39. case watch.Added:
  40. err := r.store.Add(event.Object)
  41. if err != nil {
  42. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  43. }
  44. case watch.Modified:
  45. err := r.store.Update(event.Object)
  46. if err != nil {
  47. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  48. }
  49. case watch.Deleted:
  50. // TODO: Will any consumers need access to the "last known
  51. // state", which is passed in event.Object? If so, may need
  52. // to change this.
  53. err := r.store.Delete(event.Object)
  54. if err != nil {
  55. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  56. }
  57. case watch.Bookmark:
  58. // A `Bookmark` means watch has synced here, just update the resourceVersion
  59. default:
  60. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  61. }
  62. *resourceVersion = newResourceVersion
  63. r.setLastSyncResourceVersion(newResourceVersion)
  64. eventCount++
  65. }
  66. watchDuration := r.clock.Since(start)
  67. if watchDuration < 1*time.Second && eventCount == 0 {
  68. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  69. }
  70. klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
  71. return nil
  72. }

DeltaFIFO

DeltaFIFO可以分开理解,FIFO是一个先进先出的队列,它拥有队列操作的基本方法,例如Add、Update、Delete、List、Pop、Close等,而Delta是一个资源对象存储,它可以保存资源对象的操作类型,例如Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等。

DeltaFIFO的结构体定义如下(staging\src\k8s.io\client-go\tools\cache\delta_fifo.go):

  1. type DeltaFIFO struct {
  2. lock sync.RWMutex
  3. cond sync.Cond
  4. items map[string]Deltas
  5. queue []string
  6. populated bool
  7. initialPopulationCount int
  8. keyFunc KeyFunc
  9. knownObjects KeyListerGetter
  10. closed bool
  11. emitDeltaTypeReplaced bool
  12. }

其中queue存储资源对象的key,items以map数据结构存储key对应的Deltas,Deltas是Delta的集合,Delta的数据结构如下:

  1. type Delta struct {
  2. Type DeltaType
  3. Object interface{}
  4. }

DeltaFIFO是一个先进先出的队列,有数据的生产者和消费者,其中生成者是Reflector调用的Add方法,消费者是Controller调用的Pop方法。

1、生产者

DeltaFIFO队列中的资源对象在Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件中都调用了queueActionLocked函数,它是DeltaFIFO实现的关键,代码示例如下(staging\src\k8s.io\client-go\tools\cache\delta_fifo.go):

  1. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  2. id, err := f.KeyOf(obj)
  3. if err != nil {
  4. return KeyError{obj, err}
  5. }
  6. newDeltas := append(f.items[id], Delta{actionType, obj})
  7. newDeltas = dedupDeltas(newDeltas)
  8. if len(newDeltas) > 0 {
  9. if _, exists := f.items[id]; !exists {
  10. f.queue = append(f.queue, id)
  11. }
  12. f.items[id] = newDeltas
  13. f.cond.Broadcast()
  14. } else {
  15. delete(f.items, id)
  16. }
  17. return nil
  18. }

(1)、首先通过f.KeyOf(obj)获取资源对象的key
(2)、然后使用append(f.items[id], Delta{actionType, obj})将Delta加入到对应的items中
(3)、使用dedupDeltas(newDeltas)对数据进行去重操作
(4)、使用f.items[id] = newDeltas更新新的Deltas,并通过cond.Broadcast通知所有消费者解除阻塞

2、消费者

Pop方法作为消费者方法使用,从DeltaFIFO的头部取出最早进入队列中的资源对象数据。Pop方法须传入process函数,用于接收并处理对象的回调方法,代码示例如下:

  1. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  2. f.lock.Lock()
  3. defer f.lock.Unlock()
  4. for {
  5. for len(f.queue) == 0 {
  6. if f.closed {
  7. return nil, ErrFIFOClosed
  8. }
  9. f.cond.Wait()
  10. }
  11. id := f.queue[0]
  12. f.queue = f.queue[1:]
  13. if f.initialPopulationCount > 0 {
  14. f.initialPopulationCount--
  15. }
  16. item, ok := f.items[id]
  17. if !ok {
  18. continue
  19. }
  20. delete(f.items, id)
  21. err := process(item)
  22. if e, ok := err.(ErrRequeue); ok {
  23. f.addIfNotPresent(id, item)
  24. err = e.Err
  25. }
  26. return item, err
  27. }
  28. }

(1)、如果队列为空,则进入阻塞状态
(2)、通过f.queue[0]取队列头部,并通过f.queue = f.queue[1:]更新队列
(3)、通过item, ok := f.items[id]判断Key是否存在,如果存在则通过process(item)进行回调处理
(4)、如果处理失败,则重新加入队列

其实queue中的数据是通过Controller的processLoop方法来实现的,如下(staging\src\k8s.io\client-go\tools\cache\controller.go):

  1. func (c *controller) processLoop() {
  2. for {
  3. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
  4. if err != nil {
  5. if err == ErrFIFOClosed {
  6. return
  7. }
  8. if c.config.RetryOnError {
  9. // This is the safe way to re-enqueue.
  10. c.config.Queue.AddIfNotPresent(obj)
  11. }
  12. }
  13. }
  14. }

取出来后交给process的回调函数HandleDeltas,代码如下(staging\src\k8s.io\client-go\tools\cache\shared_informer.go):

  1. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  2. s.blockDeltas.Lock()
  3. defer s.blockDeltas.Unlock()
  4. // from oldest to newest
  5. for _, d := range obj.(Deltas) {
  6. switch d.Type {
  7. case Sync, Replaced, Added, Updated:
  8. s.cacheMutationDetector.AddObject(d.Object)
  9. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  10. if err := s.indexer.Update(d.Object); err != nil {
  11. return err
  12. }
  13. isSync := false
  14. switch {
  15. case d.Type == Sync:
  16. // Sync events are only propagated to listeners that requested resync
  17. isSync = true
  18. case d.Type == Replaced:
  19. if accessor, err := meta.Accessor(d.Object); err == nil {
  20. if oldAccessor, err := meta.Accessor(old); err == nil {
  21. // Replaced events that didn't change resourceVersion are treated as resync events
  22. // and only propagated to listeners that requested resync
  23. isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
  24. }
  25. }
  26. }
  27. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  28. } else {
  29. if err := s.indexer.Add(d.Object); err != nil {
  30. return err
  31. }
  32. s.processor.distribute(addNotification{newObj: d.Object}, false)
  33. }
  34. case Deleted:
  35. if err := s.indexer.Delete(d.Object); err != nil {
  36. return err
  37. }
  38. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  39. }
  40. }
  41. return nil
  42. }

HandleDeltas函数作为process回调函数,当资源对象的操作类型为Added、Updated、Deleted时,将该资源对象存储至Indexer(它是并发安全的存储),并通过distribute函数将资源对象分发至SharedInformer。

3、Rsync机制

Resync机制会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型。Resync函数在Reflector中定时执行,它的执行周期由NewReflector函数传入的resyncPeriod参数设定。代码如下(staging\src\k8s.io\client-go\tools\cache\delta_fifo.go):

  1. func (f *DeltaFIFO) Resync() error {
  2. f.lock.Lock()
  3. defer f.lock.Unlock()
  4. if f.knownObjects == nil {
  5. return nil
  6. }
  7. keys := f.knownObjects.ListKeys()
  8. for _, k := range keys {
  9. if err := f.syncKeyLocked(k); err != nil {
  10. return err
  11. }
  12. }
  13. return nil
  14. }
  15. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  16. obj, exists, err := f.knownObjects.GetByKey(key)
  17. if err != nil {
  18. klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  19. return nil
  20. } else if !exists {
  21. klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  22. return nil
  23. }
  24. id, err := f.KeyOf(obj)
  25. if err != nil {
  26. return KeyError{obj, err}
  27. }
  28. if len(f.items[id]) > 0 {
  29. return nil
  30. }
  31. if err := f.queueActionLocked(Sync, obj); err != nil {
  32. return fmt.Errorf("couldn't queue object: %v", err)
  33. }
  34. return nil
  35. }

(1)、首先通过f.knownObjects.ListKeys()把本地所有的资源对象都找出来
(2)、然后调用f.syncKeyLocked(k)来判断在DeltaFIFO中是否存在这个对象,如果不存在则调用f.queueActionLocked(Sync, obj)把它加进来

Indexer

Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Controller的processLoop从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer中的数据与Etcd集群中的数据保持完全一致。client-go可以很方便地从本地存储中读取相应的资源对象数据,而无须每次都从远程Etcd集群中读取,这样可以减轻Kubernetes APIServer和Etcd集群的压力。

Indexer在threadSafeMap的基础上进行了封装,它继承了threadSafeMap的相关功能。

1、threadSafeMap

ThreadSafeMap是一个内存中的存储,其中的数据并不会写入本地磁盘中,每次的增、删、改、查操作都会加锁,以保证数据的一致性。ThreadSafeMap将资源对象数据存储于一个map数据结构中。其数据结构如下(staging\src\k8s.io\client-go\tools\cache\thread_safe_store.go):

  1. type threadSafeMap struct {
  2. lock sync.RWMutex
  3. items map[string]interface{}
  4. // indexers maps a name to an IndexFunc
  5. indexers Indexers
  6. // indices maps a name to an Index
  7. indices Indices
  8. }

其中items就是用来存储资源对象数据的。

每次增加、删除、修改都会调用updateIndices或者deleteFromIndices来执行操作,如下:

  1. func (c *threadSafeMap) Add(key string, obj interface{}) {
  2. c.lock.Lock()
  3. defer c.lock.Unlock()
  4. oldObject := c.items[key]
  5. c.items[key] = obj
  6. c.updateIndices(oldObject, obj, key)
  7. }
  8. func (c *threadSafeMap) Update(key string, obj interface{}) {
  9. c.lock.Lock()
  10. defer c.lock.Unlock()
  11. oldObject := c.items[key]
  12. c.items[key] = obj
  13. c.updateIndices(oldObject, obj, key)
  14. }
  15. func (c *threadSafeMap) Delete(key string) {
  16. c.lock.Lock()
  17. defer c.lock.Unlock()
  18. if obj, exists := c.items[key]; exists {
  19. c.deleteFromIndices(obj, key)
  20. delete(c.items, key)
  21. }
  22. }

updateIndices或者deleteFromIndices的方法如下:

  1. func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
  2. // if we got an old object, we need to remove it before we add it again
  3. if oldObj != nil {
  4. c.deleteFromIndices(oldObj, key)
  5. }
  6. // name: 索引的名字 indexFunc:索引器的函数
  7. for name, indexFunc := range c.indexers {
  8. indexValues, err := indexFunc(newObj) // 获取索引的资源数据
  9. if err != nil {
  10. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  11. }
  12. index := c.indices[name] // 通过索引的名字获取缓存的数据
  13. if index == nil { // 如果数据是空的,则创建一个新的缓存map
  14. index = Index{}
  15. c.indices[name] = index
  16. }
  17. // indexValue: 具体的单个资源数据
  18. for _, indexValue := range indexValues {
  19. set := index[indexValue] // 获取缓存数据的值
  20. if set == nil { // 如果这个值不存在,创建一个新的map用于存放指
  21. set = sets.String{}
  22. index[indexValue] = set
  23. }
  24. set.Insert(key) // 将值存进去
  25. }
  26. }
  27. }
  28. // deleteFromIndices removes the object from each of the managed indexes
  29. // it is intended to be called from a function that already has a lock on the cache
  30. func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
  31. for name, indexFunc := range c.indexers {
  32. indexValues, err := indexFunc(obj)
  33. if err != nil {
  34. panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
  35. }
  36. index := c.indices[name]
  37. if index == nil {
  38. continue
  39. }
  40. for _, indexValue := range indexValues {
  41. set := index[indexValue]
  42. if set != nil {
  43. set.Delete(key)
  44. // If we don't delete the set when zero, indices with high cardinality
  45. // short lived resources can cause memory to increase over time from
  46. // unused empty sets. See `kubernetes/kubernetes/issues/84959`.
  47. if len(set) == 0 {
  48. delete(index, indexValue)
  49. }
  50. }
  51. }
  52. }
  53. }

其中主要涉及Index,Indices,IndexFunc,Indexers这四个数据结构,它们的定义如下:

  1. // Index maps the indexed value to a set of keys in the store that match on that value
  2. type Index map[string]sets.String
  3. // Indexers maps a name to a IndexFunc
  4. type Indexers map[string]IndexFunc
  5. // Indices maps a name to an Index
  6. type Indices map[string]Index
  7. // IndexFunc knows how to compute the set of indexed values for an object.
  8. type IndexFunc func(obj interface{}) ([]string, error)

其中Indexers引用了IndexFunc,Indices引用了Index,它们关系如下图:
image.png

  • Index存储了缓存数据,是map类型,其中sets.String是map [string] struct {}类型,以减少内存消耗
  • Indices是存储缓存器,key是缓存器名字,value是缓存的数据,也就是上面的Index
  • IndexFunc是索引器函数,返回的是一个资源列表
  • Indexers是存储索引器,key是索引器名字,value是索引器函数,也就是上面的IndexFunc

它们的关系错综复杂,大概如下:
image.png

在缓存中获取索引的结果是通过ByIndex方法,调用方如下(staging\src\k8s.io\client-go\tools\cache\store.go):

  1. func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
  2. return c.cacheStorage.ByIndex(indexName, indexKey)
  3. }

ByIndex的具体实现如下(staging\src\k8s.io\client-go\tools\cache\thread_safe_store.go):

  1. // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
  2. func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
  3. c.lock.RLock()
  4. defer c.lock.RUnlock()
  5. indexFunc := c.indexers[indexName]
  6. if indexFunc == nil {
  7. return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  8. }
  9. index := c.indices[indexName]
  10. set := index[indexedValue]
  11. list := make([]interface{}, 0, set.Len())
  12. for key := range set {
  13. list = append(list, c.items[key])
  14. }
  15. return list, nil
  16. }

ByIndex接收两个参数:IndexName(索引器名称)和indexKey(需要检索的key)。首先从c.indexers中查找指定的索引器函数,从c.indices中查找指定的缓存器函数,然后根据需要检索的indexKey从缓存数据中查到并返回数据。

Index中的缓存数据为Set集合数据结构,Set本质与Slice相同,但Set中不存在相同元素。由于Go语言标准库没有提供Set数据结构,Go语言中的map结构类型是不能存在相同key的,所以Kubernetes将map结构类型的key作为Set数据结构,实现Set去重特性。