ReadLink

  • configmap manager
  • pkg/kubelet/secret/secret_manager.go

    Configmap Manager

    1. // Manager interface provides methods for Kubelet to manage ConfigMap.
    2. type Manager interface {
    3. // Get configmap by configmap namespace and name.
    4. GetConfigMap(namespace, name string) (*v1.ConfigMap, error)
    5. // WARNING: Register/UnregisterPod functions should be efficient,
    6. // i.e. should not block on network operations.
    7. // RegisterPod registers all configmaps from a given pod.
    8. RegisterPod(pod *v1.Pod)
    9. // UnregisterPod unregisters configmaps from a given pod that are not
    10. // used by any other registered pod.
    11. UnregisterPod(pod *v1.Pod)
    12. }

    接口非常简单。

  1. GetConfigMap : 通过 namespace 和 name 获取对应 ConfigMap 对象。
  2. RegisterPod(pod *v1.Pod):把指定 Pod 对象 yaml 指定的 configmap 注册到 Controller 中管理
  3. UnregisterPod(pod *v1.Pod):把指定 Pod 对象 yaml 指定的 configmap 从 Controller 中注册管理中删除,注意 ConfigMap 需要没有任何其他已注册的 Pod 引用(即无被依赖项)才可以删除

当前代码中有两种 manager 的实现

  • NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager:该实现有两点逻辑

    • 当一个 Pod 创建或者更新时,所有的 configmap 缓存都失效。
    • GetObject() 调用首先从本地缓存获取,失败则访问 APISever 并刷新 configmap 的缓存。
      1. // NewCachingConfigMapManager creates a manager that keeps a cache of all configmaps
      2. // necessary for registered pods.
      3. // It implement the following logic:
      4. // - whenever a pod is create or updated, the cached versions of all configmaps
      5. // are invalidated
      6. // - every GetObject() call tries to fetch the value from local cache; if it is
      7. // not there, invalidated or too old, we fetch it from apiserver and refresh the
      8. // value in cache; otherwise it is just fetched from cache
      9. func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager {
      10. getConfigMap := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
      11. return kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, opts)
      12. }
      13. configMapStore := manager.NewObjectStore(getConfigMap, clock.RealClock{}, getTTL, defaultTTL)
      14. return &configMapManager{
      15. manager: manager.NewCacheBasedManager(configMapStore, getConfigMapNames),
      16. }
      17. }
  • NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager

    • 当一个 Pod 创建或者更新时,会对指定该 Pod 引用的资源,并且该资源未被其他 Pod 引用进行独立的 watch。
    • GetObject() 调用首先从本地缓存获取 ```go // NewWatchingConfigMapManager creates a manager that keeps a cache of all configmaps // necessary for registered pods. // It implements the following logic: // - whenever a pod is created or updated, we start individual watches for all // referenced objects that aren’t referenced from other registered pods // - every GetObject() returns a value from local cache propagated via watches func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager { listConfigMap := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CoreV1().ConfigMaps(namespace).List(context.TODO(), opts) } watchConfigMap := func(namespace string, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.CoreV1().ConfigMaps(namespace).Watch(context.TODO(), opts) } newConfigMap := func() runtime.Object { return &v1.ConfigMap{} } isImmutable := func(object runtime.Object) bool { if configMap, ok := object.(*v1.ConfigMap); ok {
      1. return configMap.Immutable != nil && *configMap.Immutable
      } return false } gr := corev1.Resource(“configmap”) return &configMapManager{ manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames), } }
  1. <a name="gEtqm"></a>
  2. ## Secret Manager
  3. secret manager 除了资源类型和 configmap 不一样,其他逻辑相同,所以仅列出两种 secret manager 的初始化函数。<br />[/](https://sourcegraph.com/github.com/kubernetes/kubernetes)[pkg /](https://sourcegraph.com/github.com/kubernetes/kubernetes/-/tree/pkg)[kubelet /](https://sourcegraph.com/github.com/kubernetes/kubernetes/-/tree/pkg/kubelet)[secret /](https://sourcegraph.com/github.com/kubernetes/kubernetes/-/tree/pkg/kubelet/secret)[secret_manager.go](https://sourcegraph.com/github.com/kubernetes/kubernetes/-/blob/pkg/kubelet/secret/secret_manager.go)
  4. ```go
  5. // NewCachingSecretManager creates a manager that keeps a cache of all secrets
  6. // necessary for registered pods.
  7. // It implements the following logic:
  8. // - whenever a pod is created or updated, the cached versions of all secrets
  9. // are invalidated
  10. // - every GetObject() call tries to fetch the value from local cache; if it is
  11. // not there, invalidated or too old, we fetch it from apiserver and refresh the
  12. // value in cache; otherwise it is just fetched from cache
  13. func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager {
  14. getSecret := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
  15. return kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, opts)
  16. }
  17. secretStore := manager.NewObjectStore(getSecret, clock.RealClock{}, getTTL, defaultTTL)
  18. return &secretManager{
  19. manager: manager.NewCacheBasedManager(secretStore, getSecretNames),
  20. }
  21. }
  22. // NewWatchingSecretManager creates a manager that keeps a cache of all secrets
  23. // necessary for registered pods.
  24. // It implements the following logic:
  25. // - whenever a pod is created or updated, we start individual watches for all
  26. // referenced objects that aren't referenced from other registered pods
  27. // - every GetObject() returns a value from local cache propagated via watches
  28. func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval time.Duration) Manager {
  29. listSecret := func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
  30. return kubeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
  31. }
  32. watchSecret := func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
  33. return kubeClient.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
  34. }
  35. newSecret := func() runtime.Object {
  36. return &v1.Secret{}
  37. }
  38. isImmutable := func(object runtime.Object) bool {
  39. if secret, ok := object.(*v1.Secret); ok {
  40. return secret.Immutable != nil && *secret.Immutable
  41. }
  42. return false
  43. }
  44. gr := corev1.Resource("secret")
  45. return &secretManager{
  46. manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
  47. }
  48. }

cache_based_manager

/pkg /kubelet /util /manager /cache_based_manager.go

  1. // cacheBasedManager keeps a store with objects necessary
  2. // for registered pods. Different implementations of the store
  3. // may result in different semantics for freshness of objects
  4. // (e.g. ttl-based implementation vs watch-based implementation).
  5. type cacheBasedManager struct {
  6. objectStore Store
  7. getReferencedObjects func(*v1.Pod) sets.String
  8. lock sync.Mutex
  9. registeredPods map[objectKey]*v1.Pod
  10. }

该 manager 代码位于 /pkg /kubelet /util /manager /cache_based_manager.go,属于通用的 Manager 结构体工具,用于保留注册的 Pod 所必要引用的 kubernetes 对象(objects)
如何做到的呢?
通过 getReferencedObjects 字段,一个可以传入的成员函数,自定义实现用于从 v1.Pod 对象中获取到对应对象(或一组对象)的 name。流程如下:

  1. func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
  2. // 1. 获取名字
  3. names := c.getReferencedObjects(pod)
  4. c.lock.Lock()
  5. defer c.lock.Unlock()
  6. // 2. 给每一个名字和 pod 的命名空间一起添加到 c.objectStore 中存储
  7. for name := range names {
  8. c.objectStore.AddReference(pod.Namespace, name)
  9. }
  10. // 3. 检查是否之前已经注册了该 Pod
  11. var prev *v1.Pod
  12. key := objectKey{namespace: pod.Namespace, name: pod.Name, uid: pod.UID}
  13. prev = c.registeredPods[key]
  14. // 4. 用新注册的 pod 替换之前存储的注册 Pod 的信息.
  15. c.registeredPods[key] = pod
  16. // 5. 删除旧 Pod 在 c.objectStore 中的引用信息,这是因为在上面第二步 Add 到 c.objectStore
  17. // 中时,这些资源的引用次数又新增了一次,但实际上只是同一个 Pod 的引用,自然需要删除,当然,也有
  18. // 可能新 Pod 已经不再引用目标资源了, Delete 函数在下面也处理这个情况
  19. if prev != nil {
  20. for name := range c.getReferencedObjects(prev) {
  21. // On an update, the .Add() call above will have re-incremented the
  22. // ref count of any existing object, so any objects that are in both
  23. // names and prev need to have their ref counts decremented. Any that
  24. // are only in prev need to be completely removed. This unconditional
  25. // call takes care of both cases.
  26. c.objectStore.DeleteReference(prev.Namespace, name)
  27. }
  28. }
  29. }
  1. 获取名字
  2. 给每一个名字和 pod 的命名空间一起添加到 c.objectStore 中存储
  3. 检查是否之前已经注册了该 Pod
  4. 用新注册的 pod 替换之前存储的注册 Pod 的信息.
  5. 删除旧 Pod 在 c.objectStore 中的引用信息,这是因为在上面第二步 Add 到 c.objectStore 中时,这些资源的引用次数又新增了一次,但实际上只是同一个 Pod 的引用,自然需要删除,当然,也有可能新 Pod 已经不再引用目标资源了, Delete 函数在下面也处理这个情况

    ttl ObjectStore

    cache_based 的 objectStore 通过 ttl 设置缓存有效期。

    watch_based_manager

    /pkg /kubelet /util /manager /watch_based_manager.go
    可以看到,watch_based_manager 最后使用了 NewCacheBasedManager ,所以 watch_based_manager 和 cache_based_manager 不同的是 ObjectStore 字段。

    1. // NewWatchBasedManager creates a manager that keeps a cache of all objects
    2. // necessary for registered pods.
    3. // It implements the following logic:
    4. // - whenever a pod is created or updated, we start individual watches for all
    5. // referenced objects that aren't referenced from other registered pods
    6. // - every GetObject() returns a value from local cache propagated via watches
    7. func NewWatchBasedManager(
    8. listObject listObjectFunc,
    9. watchObject watchObjectFunc,
    10. newObject newObjectFunc,
    11. isImmutable isImmutableFunc,
    12. groupResource schema.GroupResource,
    13. resyncInterval time.Duration,
    14. getReferencedObjects func(*v1.Pod) sets.String) Manager {
    15. // If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
    16. // We just want to stop the objectCacheItem referenced by environment variables,
    17. // So, maxIdleTime is set to an integer multiple of resyncInterval,
    18. // We currently set it to 5 times.
    19. maxIdleTime := resyncInterval * 5
    20. // TODO propagate stopCh from the higher level.
    21. objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
    22. return NewCacheBasedManager(objectStore, getReferencedObjects)
    23. }

    watch_based_manager 通过 watch 而不是简单的 ttl 去确认或者刷新缓存。