感知Pod状态变化和探测状态改变的流程

004.kubelet容器状态同步管理
代码路径: /pkg /kubelet /status /status_manager.go

接口定义

PodStatusProvider: 提供获取 PodStatus 的方法
Manager:提供最新 Pod 状态的存储载体,也负责同步更新到 API Server

  1. // PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
  2. // that need to introspect status.
  3. type PodStatusProvider interface {
  4. // GetPodStatus returns the cached status for the provided pod UID, as well as whether it
  5. // was a cache hit.
  6. GetPodStatus(uid types.UID) (v1.PodStatus, bool)
  7. }
  1. // Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
  2. // the latest v1.PodStatus. It also syncs updates back to the API server.
  3. type Manager interface {
  4. PodStatusProvider
  5. // Start the API server status sync loop.
  6. Start()
  7. // SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
  8. SetPodStatus(pod *v1.Pod, status v1.PodStatus)
  9. // SetContainerReadiness updates the cached container status with the given readiness, and
  10. // triggers a status update.
  11. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
  12. // SetContainerStartup updates the cached container status with the given startup, and
  13. // triggers a status update.
  14. SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
  15. // TerminatePod resets the container status for the provided pod to terminated and triggers
  16. // a status update.
  17. TerminatePod(pod *v1.Pod)
  18. // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
  19. // the provided podUIDs.
  20. RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
  21. }

Status Manager 与 Kubelet 交互图

需要注意的是,最后都会流转到 updateStatusInternal 处理,所以我们主要看的核心逻辑不言而喻
Status manager - 图1

结构体

PodStatus:PodStatus 表示有关 pod 状态的信息。状态可能跟踪系统的实际状态,尤其是当承载pod的节点无法联系控制平面时。(注: pod 状态信息字段较多,跳转查看:)

  1. // PodStatus represents information about the status of a pod. Status may trail the actual
  2. // state of a system, especially if the node that hosts the pod cannot contact the control
  3. // plane.
  4. type PodStatus struct {
  5. // The phase of a Pod is a simple, high-level summary of where the Pod is in its lifecycle.
  6. // The conditions array, the reason and message fields, and the individual container status
  7. // arrays contain more detail about the pod's status.
  8. // There are five possible phase values:
  9. //
  10. // Pending: The pod has been accepted by the Kubernetes system, but one or more of the
  11. // container images has not been created. This includes time before being scheduled as
  12. // well as time spent downloading images over the network, which could take a while.
  13. // Running: The pod has been bound to a node, and all of the containers have been created.
  14. // At least one container is still running, or is in the process of starting or restarting.
  15. // Succeeded: All containers in the pod have terminated in success, and will not be restarted.
  16. // Failed: All containers in the pod have terminated, and at least one container has
  17. // terminated in failure. The container either exited with non-zero status or was terminated
  18. // by the system.
  19. // Unknown: For some reason the state of the pod could not be obtained, typically due to an
  20. // error in communicating with the host of the pod.
  21. //
  22. // More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#pod-phase
  23. // +optional
  24. Phase PodPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase,casttype=PodPhase"`
  25. // Current service state of pod.
  26. // More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#pod-conditions
  27. // +optional
  28. // +patchMergeKey=type
  29. // +patchStrategy=merge
  30. Conditions []PodCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,2,rep,name=conditions"`
  31. // A human readable message indicating details about why the pod is in this condition.
  32. // +optional
  33. Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
  34. // A brief CamelCase message indicating details about why the pod is in this state.
  35. // e.g. 'Evicted'
  36. // +optional
  37. Reason string `json:"reason,omitempty" protobuf:"bytes,4,opt,name=reason"`
  38. // nominatedNodeName is set only when this pod preempts other pods on the node, but it cannot be
  39. // scheduled right away as preemption victims receive their graceful termination periods.
  40. // This field does not guarantee that the pod will be scheduled on this node. Scheduler may decide
  41. // to place the pod elsewhere if other nodes become available sooner. Scheduler may also decide to
  42. // give the resources on this node to a higher priority pod that is created after preemption.
  43. // As a result, this field may be different than PodSpec.nodeName when the pod is
  44. // scheduled.
  45. // +optional
  46. NominatedNodeName string `json:"nominatedNodeName,omitempty" protobuf:"bytes,11,opt,name=nominatedNodeName"`
  47. // IP address of the host to which the pod is assigned. Empty if not yet scheduled.
  48. // +optional
  49. HostIP string `json:"hostIP,omitempty" protobuf:"bytes,5,opt,name=hostIP"`
  50. // IP address allocated to the pod. Routable at least within the cluster.
  51. // Empty if not yet allocated.
  52. // +optional
  53. PodIP string `json:"podIP,omitempty" protobuf:"bytes,6,opt,name=podIP"`
  54. // podIPs holds the IP addresses allocated to the pod. If this field is specified, the 0th entry must
  55. // match the podIP field. Pods may be allocated at most 1 value for each of IPv4 and IPv6. This list
  56. // is empty if no IPs have been allocated yet.
  57. // +optional
  58. // +patchStrategy=merge
  59. // +patchMergeKey=ip
  60. PodIPs []PodIP `json:"podIPs,omitempty" protobuf:"bytes,12,rep,name=podIPs" patchStrategy:"merge" patchMergeKey:"ip"`
  61. // RFC 3339 date and time at which the object was acknowledged by the Kubelet.
  62. // This is before the Kubelet pulled the container image(s) for the pod.
  63. // +optional
  64. StartTime *metav1.Time `json:"startTime,omitempty" protobuf:"bytes,7,opt,name=startTime"`
  65. // The list has one entry per init container in the manifest. The most recent successful
  66. // init container will have ready = true, the most recently started container will have
  67. // startTime set.
  68. // More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#pod-and-container-status
  69. InitContainerStatuses []ContainerStatus `json:"initContainerStatuses,omitempty" protobuf:"bytes,10,rep,name=initContainerStatuses"`
  70. // The list has one entry per container in the manifest.
  71. // More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#pod-and-container-status
  72. // +optional
  73. ContainerStatuses []ContainerStatus `json:"containerStatuses,omitempty" protobuf:"bytes,8,rep,name=containerStatuses"`
  74. // The Quality of Service (QOS) classification assigned to the pod based on resource requirements
  75. // See PodQOSClass type for available QOS classes
  76. // More info: https://git.k8s.io/community/contributors/design-proposals/node/resource-qos.md
  77. // +optional
  78. QOSClass PodQOSClass `json:"qosClass,omitempty" protobuf:"bytes,9,rep,name=qosClass"`
  79. // Status for any ephemeral containers that have run in this pod.
  80. // This field is beta-level and available on clusters that haven't disabled the EphemeralContainers feature gate.
  81. // +optional
  82. EphemeralContainerStatuses []ContainerStatus `json:"ephemeralContainerStatuses,omitempty" protobuf:"bytes,13,rep,name=ephemeralContainerStatuses"`
  83. }

versionedPodStatus:包装了 v1.PodStatus,额外提供了版本信息,保证过时的版本不会发送到 API Server 去

  1. // A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are
  2. // not sent to the API server.
  3. type versionedPodStatus struct {
  4. status v1.PodStatus
  5. // Monotonically increasing version number (per pod).
  6. version uint64
  7. // Pod name & namespace, for sending updates to API server.
  8. podName string
  9. podNamespace string
  10. }

podStstusSyncRequest:同步 pod 请求的包装结构体

  1. type podStatusSyncRequest struct {
  2. podUID types.UID
  3. status versionedPodStatus
  4. }

manager:线程安全的实例,会同步 Pod 的状态到 API Server,只在新状态变化的时候执行写操作

  1. // Updates pod statuses in apiserver. Writes only when new status has changed.
  2. // All methods are thread-safe.
  3. type manager struct {
  4. kubeClient clientset.Interface
  5. podManager kubepod.Manager
  6. // Map from pod UID to sync status of the corresponding pod.
  7. podStatuses map[types.UID]versionedPodStatus
  8. podStatusesLock sync.RWMutex
  9. podStatusChannel chan podStatusSyncRequest
  10. // Map from (mirror) pod UID to latest status version successfully sent to the API server.
  11. // apiStatusVersions must only be accessed from the sync thread.
  12. apiStatusVersions map[kubetypes.MirrorPodUID]uint64
  13. podDeletionSafety PodDeletionSafetyProvider
  14. }

核心逻辑

Status manager - 图2

练习

PR Need:needsReconcile 需要简化逻辑

Help Wanted

描述:这里不得不传递 Static Pod 的 uid,因为 pod manager 只支持通过 static pod 获取 mirror pod。

读者可以开始一个 PR 练习

  1. // needsReconcile compares the given status with the status in the pod manager (which
  2. // in fact comes from apiserver), returns whether the status needs to be reconciled with
  3. // the apiserver. Now when pod status is inconsistent between apiserver and kubelet,
  4. // kubelet should forcibly send an update to reconcile the inconsistence, because kubelet
  5. // should be the source of truth of pod status.
  6. // NOTE(random-liu): It's simpler to pass in mirror pod uid and get mirror pod by uid, but
  7. // now the pod manager only supports getting mirror pod by static pod, so we have to pass
  8. // static pod uid here.
  9. // TODO(random-liu): Simplify the logic when mirror pod manager is added.
  10. func (m *manager) needsReconcile(uid types.UID, status v1.PodStatus) bool {
  11. // The pod could be a static pod, so we should translate first.
  12. pod, ok := m.podManager.GetPodByUID(uid)
  13. if !ok {
  14. klog.V(4).InfoS("Pod has been deleted, no need to reconcile", "podUID", string(uid))
  15. return false
  16. }
  17. // If the pod is a static pod, we should check its mirror pod, because only status in mirror pod is meaningful to us.
  18. if kubetypes.IsStaticPod(pod) {
  19. mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod)
  20. if !ok {
  21. klog.V(4).InfoS("Static pod has no corresponding mirror pod, no need to reconcile", "pod", klog.KObj(pod))
  22. return false
  23. }
  24. pod = mirrorPod
  25. }
  26. podStatus := pod.Status.DeepCopy()
  27. normalizeStatus(pod, podStatus)
  28. if isPodStatusByKubeletEqual(podStatus, &status) {
  29. // If the status from the source is the same with the cached status,
  30. // reconcile is not needed. Just return.
  31. return false
  32. }
  33. klog.V(3).InfoS("Pod status is inconsistent with cached status for pod, a reconciliation should be triggered",
  34. "pod", klog.KObj(pod),
  35. "statusDiff", diff.ObjectDiff(podStatus, &status))
  36. return true
  37. }