1、reflector成员变量

A、listerWatcher用于获取和监控资源,lister可以获取对象的全量,watcher可以获取对象的增量(变化)。系统会周期性的执行list-watch的流程,一旦过程中失败就要重新执行流程,这个重新执行的周期就是period指定的 B、expectedType规定了监控对象的类型,非此类型的对象将会被忽略。 实例化后的expectedType类型的对象会被添加到store中; C、kubernetes资源在api_server中都是有版本的,对象的任何除了修改(添加、删除、更新)都会造成资源版本更新,所以lastSyncResourceVersion就是指的这个版本

D、 如果使用者需要定期同步全量对象,那么Reflector就会定期产生全量对象的同步事件给DeltaFIFO

  1. type Reflector struct {
  2. // 反射器的名字
  3. name string
  4. // 反射的类型名称
  5. expectedTypeName string
  6. expectedType reflect.Type
  7. expectedGVK *schema.GroupVersionKind
  8. // 存储,就是DeltaFIFO
  9. store Store
  10. listerWatcher ListerWatcher
  11. // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
  12. // 这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
  13. backoffManager wait.BackoffManager
  14. // 重新同步的周期,很多人肯定认为这个同步周期指的是从api_server的同步周期
  15. // 其实这里面同步指的是shared_informer使用者需要定期同步全量对象
  16. // 主要防止部分obj,用户worker处理发生异常,需要将LocalStore的对象全量的同步给用户
  17. resyncPeriod time.Duration
  18. ShouldResync func() bool
  19. clock clock.Clock
  20. paginatedResult bool
  21. // 最后一次同步的资源版本
  22. lastSyncResourceVersion string
  23. // 同步的时候发生了410错误码
  24. isLastSyncResourceVersionGone bool
  25. // 还专门为最后一次同步的资源版本弄了个锁
  26. lastSyncResourceVersionMutex sync.RWMutex
  27. WatchListPageSize int64
  28. }

2、核心循环

  1. // 定期调用ListAndWatch方法
  2. // 这里面我们不用关心wait.BackoffUntil是如何实现的,只要知道他调用函数f会被backoffManager周期执行一次
  3. func (r *Reflector) Run(stopCh <-chan struct{}) {
  4. wait.BackoffUntil(func() {
  5. if err := r.ListAndWatch(stopCh); err != nil {
  6. utilruntime.HandleError(err)
  7. }
  8. }, r.backoffManager, true, stopCh)
  9. }
  10. // 真正的核心函数
  11. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  12. // 很多存储类的系统都是这样设计的,数据采用版本的方式记录,数据每变化(添加、删除、更新)都会触发版本更新,
  13. // 这样的做法可以避免全量数据访问。以api_server资源监控为例,只要监控比缓存中资源版本大的对象就可以了,
  14. // 把变化的部分更新到缓存中就可以达到与api_server一致的效果,一般资源的初始版本为0,从0版本开始列举就是全量的对象了
  15. // 此处先获取缓存里面的最新版本号
  16. var resourceVersion string
  17. options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
  18. // 1、通过list获取资源
  19. if err := func() error {
  20. var list runtime.Object
  21. var paginatedResult bool
  22. var err error
  23. listCh := make(chan struct{}, 1)
  24. panicCh := make(chan interface{}, 1)
  25. go func() {
  26. defer func() {
  27. if r := recover(); r != nil {
  28. panicCh <- r
  29. }
  30. }()
  31. p := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
  32. return r.listerWatcher.List(opts)
  33. }))
  34. switch {
  35. case r.WatchListPageSize != 0:
  36. p.PageSize = r.WatchListPageSize
  37. case r.paginatedResult:
  38. case options.ResourceVersion != "" && options.ResourceVersion != "0":
  39. p.PageSize = 0
  40. }
  41. list, err = p.List(context.Background(), options)
  42. if isExpiredError(err) {
  43. r.setIsLastSyncResourceVersionExpired(true)
  44. list, err = p.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
  45. }
  46. close(listCh)
  47. }()
  48. select {
  49. case <-stopCh:
  50. return nil
  51. case r := <-panicCh:
  52. panic(r)
  53. case <-listCh:
  54. }
  55. if err != nil {
  56. return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
  57. }
  58. if options.ResourceVersion == "0" && paginatedResult {
  59. r.paginatedResult = true
  60. }
  61. r.setIsLastSyncResourceVersionExpired(false) // list was successful
  62. listMetaInterface, err := meta.ListAccessor(list)
  63. if err != nil {
  64. return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
  65. }
  66. resourceVersion = listMetaInterface.GetResourceVersion()
  67. items, err := meta.ExtractList(list)
  68. if err != nil {
  69. return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
  70. }
  71. if err := r.syncWith(items, resourceVersion); err != nil {
  72. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
  73. }
  74. r.setLastSyncResourceVersion(resourceVersion)
  75. return nil
  76. }(); err != nil {
  77. return err
  78. }
  79. // 2、下面要启动一个后台协程实现定期的同步操作,这个同步就是将SharedInformer里面的对象全量以同步事件的方式通知使用者
  80. // 我们暂且称之为“后台同步协程”,Run()函数退出需要后台同步协程退出,所以下面的cancelCh就是干这个用的
  81. // 利用defer close(cancelCh)实现的,而resyncErrC是后台同步协程反向通知Run()函数的报错通道
  82. // 当后台同步协程出错,Run()函数接收到信号就可以退出了
  83. resyncErrC := make(chan error, 1)
  84. cancelCh := make(chan struct{})
  85. defer close(cancelCh)
  86. go func() {
  87. resyncCh, cleanup := r.resyncChan()
  88. defer func() {
  89. cleanup()
  90. }()
  91. for {
  92. select {
  93. case <-resyncCh:
  94. case <-stopCh:
  95. return
  96. case <-cancelCh:
  97. return
  98. }
  99. if r.ShouldResync == nil || r.ShouldResync() {
  100. klog.V(4).Infof("%s: forcing resync", r.name)
  101. if err := r.store.Resync(); err != nil {
  102. resyncErrC <- err
  103. return
  104. }
  105. }
  106. cleanup()
  107. resyncCh, cleanup = r.resyncChan()
  108. }
  109. }()
  110. // 3、前面已经列举了全量对象,接下来就是watch的逻辑了
  111. for {
  112. // 如果有退出信号就立刻返回,否则就会往下走,因为有default
  113. select {
  114. case <-stopCh:
  115. return nil
  116. default:
  117. }
  118. // 计算watch的超时时间
  119. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  120. // 设置watch的选项,因为前期列举了全量对象,从这里只要监听最新版本以后的资源就可以了
  121. // 并设置超时时间
  122. options = metav1.ListOptions{
  123. ResourceVersion: resourceVersion,
  124. TimeoutSeconds: &timeoutSeconds,
  125. AllowWatchBookmarks: true,
  126. }
  127. start := r.clock.Now()
  128. // 开始监控对象
  129. w, err := r.listerWatcher.Watch(options)
  130. // watch产生错误了,大部分错误就要退出函数然后再重新来一遍流程
  131. if err != nil {
  132. switch {
  133. case isExpiredError(err):
  134. // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
  135. // has a semantic that it returns data at least as fresh as provided RV.
  136. // So first try to LIST with setting RV to resource version of last observed object.
  137. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  138. case err == io.EOF:
  139. case err == io.ErrUnexpectedEOF:
  140. klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
  141. default:
  142. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
  143. }
  144. // 类似于网络拒绝连接的错误要等一会儿再试,因为可能网络繁忙
  145. // 这种时候不需要重新list资源,重启watch资源即可
  146. if utilnet.IsConnectionRefused(err) {
  147. time.Sleep(time.Second)
  148. continue
  149. }
  150. return nil
  151. }
  152. // watch返回是流,api_server会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
  153. // 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncErrC和stopCh
  154. // 用于异步通知退出或者后台同步协程错误
  155. if err := r.watchHandler(start, w, &resourceVersion, resyncErrC, stopCh); err != nil {
  156. if err != errorStopRequested {
  157. switch {
  158. case isExpiredError(err):
  159. // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
  160. // has a semantic that it returns data at least as fresh as provided RV.
  161. // So first try to LIST with setting RV to resource version of last observed object.
  162. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  163. default:
  164. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
  165. }
  166. }
  167. return nil
  168. }
  169. }
  170. }

3、工具函数

  1. // 返回最新的资源版本号
  2. func (r *Reflector) LastSyncResourceVersion() string {
  3. r.lastSyncResourceVersionMutex.RLock()
  4. defer r.lastSyncResourceVersionMutex.RUnlock()
  5. return r.lastSyncResourceVersion
  6. }
  7. // 定时通知Informer使用者同步数据
  8. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
  9. if r.resyncPeriod == 0 {
  10. return neverExitWatch, func() bool { return false }
  11. }
  12. t := r.clock.NewTimer(r.resyncPeriod)
  13. return t.C(), t.Stop
  14. }
  15. // 实现api_server全量对象的同步
  16. // 将数据传给delta_fifo队列
  17. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  18. found := make([]interface{}, 0, len(items))
  19. for _, item := range items {
  20. found = append(found, item)
  21. }
  22. return r.store.Replace(found, resourceVersion)
  23. }
  24. // watch到obj的处理器
  25. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errC chan error, stopCh <-chan struct{}) error {
  26. eventCount := 0
  27. // 有异常退出需要关闭w
  28. defer w.Stop()
  29. // 这里就开始无限循环的从chan中读取资源的变化,也可以理解为资源的增量变化,同时还要监控各种信号
  30. loop:
  31. for {
  32. select {
  33. case <-stopCh:
  34. return errorStopRequested
  35. case err := <-errC:
  36. return err
  37. case event, ok := <-w.ResultChan():
  38. // 如果w返回ok为false,则break loop
  39. if !ok {
  40. break loop
  41. }
  42. // 看来event可以作为错误的返回值,挺有意思,而不是通过关闭chan,这种方式可以传递错误信息,关闭chan做不到
  43. if event.Type == watch.Error {
  44. return apierrors.FromObject(event.Object)
  45. }
  46. if r.expectedType != nil {
  47. // 不是预期类型则跳过
  48. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
  49. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  50. continue
  51. }
  52. }
  53. // 验证gvk
  54. if r.expectedGVK != nil {
  55. if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
  56. utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
  57. continue
  58. }
  59. }
  60. // 和list操作相似,也要获取对象的版本,要更新缓存中的版本,下次watch就可以忽略这些资源了
  61. m, err := meta.Accessor(event.Object)
  62. if err != nil {
  63. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  64. continue
  65. }
  66. newResourceVersion := m.GetResourceVersion()
  67. // 调用store(delta_fifo队列)根据event_type执行相应的操作
  68. switch event.Type {
  69. case watch.Added:
  70. err := r.store.Add(event.Object)
  71. if err != nil {
  72. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  73. }
  74. case watch.Modified:
  75. err := r.store.Update(event.Object)
  76. if err != nil {
  77. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  78. }
  79. case watch.Deleted:
  80. err := r.store.Delete(event.Object)
  81. if err != nil {
  82. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  83. }
  84. case watch.Bookmark:
  85. default:
  86. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  87. }
  88. // 更新最新资源版本
  89. *resourceVersion = newResourceVersion
  90. r.setLastSyncResourceVersion(newResourceVersion)
  91. eventCount++
  92. }
  93. }
  94. watchDuration := r.clock.Since(start)
  95. // 若watch时间小于1秒或者监听的event为0,则异常退出
  96. if watchDuration < 1*time.Second && eventCount == 0 {
  97. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  98. }
  99. return nil
  100. }
  101. // 设置最新的资源版本
  102. func (r *Reflector) setLastSyncResourceVersion(v string) {
  103. r.lastSyncResourceVersionMutex.Lock()
  104. defer r.lastSyncResourceVersionMutex.Unlock()
  105. r.lastSyncResourceVersion = v
  106. }
  107. func (r *Reflector) relistResourceVersion() string {
  108. r.lastSyncResourceVersionMutex.RLock()
  109. defer r.lastSyncResourceVersionMutex.RUnlock()
  110. if r.isLastSyncResourceVersionGone {
  111. return ""
  112. }
  113. if r.lastSyncResourceVersion == "" {
  114. return "0"
  115. }
  116. return r.lastSyncResourceVersion
  117. }
  118. func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) {
  119. r.lastSyncResourceVersionMutex.Lock()
  120. defer r.lastSyncResourceVersionMutex.Unlock()
  121. r.isLastSyncResourceVersionGone = isExpired
  122. }
  123. // 设置想要监听的类型
  124. func (r *Reflector) setExpectedType(expectedType interface{}) {
  125. r.expectedType = reflect.TypeOf(expectedType)
  126. if r.expectedType == nil {
  127. r.expectedTypeName = defaultExpectedTypeName
  128. return
  129. }
  130. r.expectedTypeName = r.expectedType.String()
  131. if obj, ok := expectedType.(*unstructured.Unstructured); ok {
  132. gvk := obj.GroupVersionKind()
  133. if gvk.Empty() {
  134. klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
  135. return
  136. }
  137. r.expectedGVK = &gvk
  138. r.expectedTypeName = gvk.String()
  139. }
  140. }
  141. func isExpiredError(err error) bool {
  142. return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
  143. }