Reflector监控指定类型的资源,使得所有的变化都能被反射到指定的store中 代码版本:https://github.com/kubernetes/client-go/tree/release-1.18 代码位置:https://github.com/kubernetes/client-go/blob/release-1.18/tools/cache/reflector.go

  • name:reflector名称
  • expectedTypeName:期望的监控资源名称,为空则使用expectedType的字符串
  • expectedType:期望的监控资源类型
  • expectedGVK:期望的监控资源组类别
  • store:存储从k8s-apiserver中获取的obj,一般为DeltaFIFO
  • listerWatcher:从k8s-apiserver中监控并获取资源变动
  • backoffManager:退避算法,用于处理k8s-apiserver不稳定时的重试间隔
  • resyncPeriod:同步时间间隔
  • ShouldResync:是否执行Resync(),即是否在初始同步后,持续同步
  • lastSyncResourceVersion:最后一次sync的resourceversion
  1. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  2. type Reflector struct {
  3. // name identifies this reflector. By default it will be a file:line if possible.
  4. name string
  5. // The name of the type we expect to place in the store. The name
  6. // will be the stringification of expectedGVK if provided, and the
  7. // stringification of expectedType otherwise. It is for display
  8. // only, and should not be used for parsing or comparison.
  9. expectedTypeName string
  10. // An example object of the type we expect to place in the store.
  11. // Only the type needs to be right, except that when that is
  12. // `unstructured.Unstructured` the object's `"apiVersion"` and
  13. // `"kind"` must also be right.
  14. expectedType reflect.Type
  15. // The GVK of the object we expect to place in the store if unstructured.
  16. expectedGVK *schema.GroupVersionKind
  17. // The destination to sync up with the watch source
  18. store Store
  19. // listerWatcher is used to perform lists and watches.
  20. listerWatcher ListerWatcher
  21. // backoff manages backoff of ListWatch
  22. backoffManager wait.BackoffManager
  23. resyncPeriod time.Duration
  24. // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
  25. ShouldResync func() bool
  26. // clock allows tests to manipulate time
  27. clock clock.Clock
  28. // paginatedResult defines whether pagination should be forced for list calls.
  29. // It is set based on the result of the initial list call.
  30. paginatedResult bool
  31. // lastSyncResourceVersion is the resource version token last
  32. // observed when doing a sync with the underlying store
  33. // it is thread safe, but not synchronized with the underlying store
  34. lastSyncResourceVersion string
  35. // isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
  36. // failed with an HTTP 410 (Gone) status code.
  37. isLastSyncResourceVersionGone bool
  38. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  39. lastSyncResourceVersionMutex sync.RWMutex
  40. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
  41. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
  42. // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
  43. // it will turn off pagination to allow serving them from watch cache.
  44. // NOTE: It should be used carefully as paginated lists are always served directly from
  45. // etcd, which is significantly less efficient and may lead to serious performance and
  46. // scalability problems.
  47. WatchListPageSize int64
  48. }

方法

NewReflector&NewNamedReflector

NewReflector

  • 概述
    • 创建一个新的Reflector,并保持store中的数据与给定服务器中数据同步
    • Reflector仅将类型为ExpectedType的东西放入store,除非ExpectedType为nil
    • 如果resyncPeriod不为零,则Reflector将定期查阅其ShouldResync函数,以确定是否调用商店的Resync操作;否则,Reflector将重新调用它的ShouldResync函数。 ShouldResync == nil总是true。 这使您可以使用ShouldResync来定期处理所有事物,并逐步处理变化的事物。
  • 调用NewNamedReflector

NewNamedReflector

  • 根据传入的参数构建Reflector
// NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes".  This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    realClock := &clock.RealClock{}
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
        // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
        // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
        backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
        resyncPeriod:   resyncPeriod,
        clock:          realClock,
    }
    r.setExpectedType(expectedType)
    return r
}

Run

概述**

每隔backoffManager计算出的时间间隔,调用一次ListAndWatch

源码分析

Run

  • 调用wait.BackoffUntilListAndWatch

wait.BackoffUntil

  • stopCh关闭时推出
  • sliding为false,则在执行f之前重新计算backoff时间,若sliding为true,则在执行f之后重新计算backoff时间
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.backoffManager, true, stopCh)
    klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
    var t clock.Timer
    for {
        select {
        case <-stopCh:
            return
        default:
        }

        if !sliding {
            t = backoff.Backoff()
        }

        func() {
            defer runtime.HandleCrash()
            f()
        }()

        if sliding {
            t = backoff.Backoff()
        }

        // NOTE: b/c there is no priority selection in golang
        // it is possible for this to race, meaning we could
        // trigger t.C and stopCh, and t.C select falls through.
        // In order to mitigate we re-check stopCh at the beginning
        // of every loop to prevent extra executions of f().
        select {
        case <-stopCh:
            return
        case <-t.C():
        }
    }
}

ListAndWatch

概述

ListAndWatch首先列出所有项目,并在调用时获取资源版本,然后对当前资源版本后的所有指定资源进行监控,根据间隔时间对store添加同步Sync变更,最后调用watchHandler方法处理获取到的资源

源码分析

  1. 从k8s-apiserver获取指定类型资源的所有数据list
  2. 对list中的obj进行处理
    1. 检查是否实现list接口
    2. 获取版本号
    3. 返回obj数组
    4. 调用syncWith
      1. syncWith中循环obj数组,组成新的items数组,并调用storeReplace方法把items放入store
    5. 设置最中同步版本号
  3. 启动一个线程,根据resyncChan所生成的timer,以resyncPeriod为时间间隔对store中所有的obj执行Resync方法
    1. 调用store.Resync,对store中的obj增加Sync变更
  4. 循环watch resourceVersion版本之后的资源变动并调用watchHandler方法
    1. 调用r.listerWatcher.Watch方法获取资源数组
    2. 调用watchHandler方法对资源进行处理
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // == 1
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            switch {
            case r.WatchListPageSize != 0:
                pager.PageSize = r.WatchListPageSize
            case r.paginatedResult:
                // We got a paginated result initially. Assume this resource and server honor
                // paging requests (i.e. watch cache is probably disabled) and leave the default
                // pager size set.
            case options.ResourceVersion != "" && options.ResourceVersion != "0":
                // User didn't explicitly request pagination.
                //
                // With ResourceVersion != "", we have a possibility to list from watch cache,
                // but we do that (for ResourceVersion != "0") only if Limit is unset.
                // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                // switch off pagination to force listing from watch cache (if enabled).
                // With the existing semantic of RV (result is at least as fresh as provided RV),
                // this is correct and doesn't lead to going back in time.
                //
                // We also don't turn off pagination for ResourceVersion="0", since watch cache
                // is ignoring Limit in that case anyway, and if watch cache is not enabled
                // we don't introduce regression.
                pager.PageSize = 0
            }

            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) {
                r.setIsLastSyncResourceVersionExpired(true)
                // Retry immediately if the resource version used to list is expired.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, or the full list might fail because the
                // resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
                // to recover and ensure the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
        }

        // We check if the list was paginated and if so set the paginatedResult based on that.
        // However, we want to do that only for the initial list (which is the only case
        // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
        // situations we may force listing directly from etcd (by setting ResourceVersion="")
        // which will return paginated result, even if watch cache is enabled. However, in
        // that case, we still want to prefer sending requests to watch cache if possible.
        //
        // Paginated result returned for request with ResourceVersion="0" mean that watch
        // cache is disabled and there are a lot of objects of a given type. In such case,
        // there is no need to prefer listing from watch cache.
        if options.ResourceVersion == "0" && paginatedResult {
            r.paginatedResult = true
        }

        // == 2
        r.setIsLastSyncResourceVersionExpired(false) // list was successful
        initTrace.Step("Objects listed")
        // == 2.a
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        // == 2.b
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        // == 2.c
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        // == 2.d
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        // == 2.e
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    // == 3
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                // == 3.a
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    // == 4
    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }
        // == 4.a
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch {
            case isExpiredError(err):
                // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
                // has a semantic that it returns data at least as fresh as provided RV.
                // So first try to LIST with setting RV to resource version of last observed object.
                klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
            case err == io.EOF:
                // watch closed normally
            case err == io.ErrUnexpectedEOF:
                klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
            }
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if utilnet.IsConnectionRefused(err) {
                time.Sleep(time.Second)
                continue
            }
            return nil
        }

        // == 4.b
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

watchHandler

概述

watchHandler循环从w取出watch到的资源变化obj,并根据类型调用store不同的方法把obj存入store中,并保持*resourceVersion为最新。

源码分析

  1. 若stopChan有值,则返回error
  2. 若errc中有值,则返回error
  3. 调用watch.ResultChan()取出结果
    1. 没有结果则break
    2. watch.Type不为watch.Error,否则返回
    3. 根据expectedType,检查obj是否为指定监控类型
    4. 根据expectedGVK,检查资源GroupVersionKind即资源组类别
    5. 检查并返回指定类别接口,若不为指定接口的继承类,则返回err
    6. 获取最新版本号newResourceVersion
    7. 根据event.Type向store中添加不同的变更类型
      1. Added则调用store.Add()
      2. watch.Modified则调用store.Update()
      3. watch.Deleted则调用store.Delete()
      4. 设置最新版本号,eventCount执行次数+1
  4. 若执行时间小于1s或eventCount(执行次数)为0,则报错
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        // == 1
        case <-stopCh:
            return errorStopRequested
        // == 2    
        case err := <-errc:
            return err
        // == 3
        case event, ok := <-w.ResultChan():
            // == 3.a
            if !ok {
                break loop
            }
            // == 3.b
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            }
            // == 3.c
            if r.expectedType != nil {
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            // == 3.d
            if r.expectedGVK != nil {
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
            // == 3.e
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            // == 3.f
            newResourceVersion := meta.GetResourceVersion()
            // == 3.g
            switch event.Type {
            // == 3.g.i    
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            // == 3.g.ii
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            // == 3.g.iii
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            // == 3.g.iv
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    // == 4
    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}

总过程图示

Run

  • 调用wait.Util 即循环定时调用ListAndWatch

ListAndWatch

  • 获取api-service所有资源状态放入store中,开启线程定时调用Resync同步并循环生成watch&调用watchHandler

watchHandler

  • 循环从watch中取出result,根据event.Type调用store不同的操作方法(Add/Update/Delete),向store储存信息

小结

  • 从api-server中获取指定资源的event,存入store中

Reflector.png

参考文章

墙裂推荐!!!
https://www.jianshu.com/p/79194ca934e5