ReadLink

Directory Layout

  1. pkg/kubelet/nodestatus
  2. |- setters.go
  3. |- setters_test.go

Setter

  1. // Setter modifies the node in-place, and returns an error if the modification failed.
  2. // Setters may partially mutate the node before returning an error.
  3. type Setter func(node *v1.Node) error

Setter 函数定义了对 v1.Node 对象进行操作的函数。在返回错误的情况下,也可能对 Node 对象进行更改。

由函数定义可以看明白其使用方式:使用函数生成不同的 setter 用于一类对 Node 对象的修改。这样可以做到规范化修改 Node 的各种状态的效果。

用最简单的 func GoRuntime() Setter示例如下:

  1. // GoRuntime returns a Setter that sets GOOS and GOARCH on the node.
  2. func GoRuntime() Setter {
  3. return func(node *v1.Node) error {
  4. node.Status.NodeInfo.OperatingSystem = goruntime.GOOS
  5. node.Status.NodeInfo.Architecture = goruntime.GOARCH
  6. return nil
  7. }
  8. }

这种模式属于中间件操作模式,读者可以联系 middleware 理解。

Setter List

上面我们学习了 Node status 更改的 setter 模式,目前代码中有以下十二个 setter:
每一个 Setter 返回函数有明确的注释在入参上,详细了解是哪些入参请参阅代码。

  • NodeAddress returns a Setter that updates address-related information on the node.:更新和地址有关的字段,比如 IP 地址、 hostname(通常取 kubelet 中的 hostname 变量)等。
  • MachineInfo returns a Setter that updates machine-related information on the node.:更新和主机信息有关的字段,比如 Pod 的最大限量、每个核分配的 Pod 数量、资源限量等。
  • VersionInfo returns a Setter that updates version-related information on the node.:containerRuntime 的版本,cadvisor 的版本
  • DaemonEndpoints returns a Setter that updates the daemon endpoints on the node.
  • Images returns a Setter that updates the images on the node.:更新镜像相关信息
  • GoRuntime returns a Setter that sets GOOS and GOARCH on the node.:GOOS GOARCH 信息
  • ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.:从 Kubelet 各个字段比如 runtimeState 中的错误返回函数中判断是否 node 处于 Ready 状态。
  • MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node.
  • PIDPressureCondition returns a Setter that updates the v1.NodePIDPressure condition on the node.
  • DiskPressureCondition returns a Setter that updates the v1.NodeDiskPressure condition on the node.
  • VolumesInUse returns a Setter that updates the volumes in use on the node.
  • VolumeLimits returns a Setter that updates the volume limits on the node.

Setter 的入参通常是 Kubelet 中的字段,自然使用是通过 Kubelet 去初始化使用。

Kubelet Node Status

/pkg /kubelet /kubelet_node_status.go

Setter 使用处

Setter 在 defaultNodeStatusFuncs 函数中全部初始化完毕,函数返回了一个 Setter 数组。

  1. // defaultNodeStatusFuncs is a factory that generates the default set of
  2. // setNodeStatus funcs
  3. func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
  4. // if cloud is not nil, we expect the cloud resource sync manager to exist
  5. var nodeAddressesFunc func() ([]v1.NodeAddress, error)
  6. if kl.cloud != nil {
  7. nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
  8. }
  9. var validateHostFunc func() error
  10. if kl.appArmorValidator != nil {
  11. validateHostFunc = kl.appArmorValidator.ValidateHost
  12. }
  13. var setters []func(n *v1.Node) error
  14. setters = append(setters,
  15. nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
  16. nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
  17. kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
  18. nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
  19. nodestatus.DaemonEndpoints(kl.daemonEndpoints),
  20. nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
  21. nodestatus.GoRuntime(),
  22. )
  23. // Volume limits
  24. setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
  25. setters = append(setters,
  26. nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
  27. nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
  28. nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
  29. nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
  30. nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
  31. // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
  32. // and record state back to the Kubelet runtime object. In the future, I'd like to isolate
  33. // these side-effects by decoupling the decisions to send events and partial status recording
  34. // from the Node setters.
  35. kl.recordNodeSchedulableEvent,
  36. )
  37. return setters
  38. }

该数组会赋值给 kubelet 的 setNodeStatusFuncs 字段。

  1. // Generating the status funcs should be the last thing we do,
  2. // since this relies on the rest of the Kubelet having been constructed.
  3. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()

SyncNodeStatus Procedure

Kubelet 如何使用这些 Kubelet 呢?核心是 syncNodeStatus 函数。

  1. // syncNodeStatus should be called periodically from a goroutine.
  2. // It synchronizes node status to master if there is any change or enough time
  3. // passed from the last sync, registering the kubelet first if necessary.
  4. func (kl *Kubelet) syncNodeStatus() {
  5. kl.syncNodeStatusMux.Lock()
  6. defer kl.syncNodeStatusMux.Unlock()
  7. if kl.kubeClient == nil || kl.heartbeatClient == nil {
  8. return
  9. }
  10. if kl.registerNode {
  11. // This will exit immediately if it doesn't need to do anything.
  12. kl.registerWithAPIServer()
  13. }
  14. if err := kl.updateNodeStatus(); err != nil {
  15. klog.ErrorS(err, "Unable to update node status")
  16. }
  17. }

syncNodeStatus函数会在 goroutine 中定期调用,用于同步 node 状态到 master 中。

入口 Entry

目前会在三个地方调用:

  1. 在 Kubelet 的 Run 函数中,启动 goroutine 定期同步

    1. go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
  2. :在 fastStatusUpdateOnce 函数中进行一次性的同步

    1. func (kl *Kubelet) fastStatusUpdateOnce() {
    2. for {
    3. ...
    4. kl.syncNodeStatus()
    5. return
    6. }
    7. }
  3. :在 nodeshutdownmanager 的 start()函数中调用,其实是一个 goroutine 中,等从 channel 中收到 shutdown 事件后才会触发。

    1. if isShuttingDown {
    2. // Update node status and ready condition
    3. go m.syncNodeStatus()
    4. m.processShutdownEvent()
    5. }

    注册 RegisterWithAPIserver

    如果 kubelet 需要注册,会进行一个 for 循环持续等待注册到 APIServer 中去。

    1. for {
    2. time.Sleep(step)
    3. step = step * 2
    4. if step >= 7*time.Second {
    5. step = 7 * time.Second
    6. }
    7. // 1. 获取 node 对象及其信息
    8. node, err := kl.initialNode(context.TODO())
    9. if err != nil {
    10. klog.ErrorS(err, "Unable to construct v1.Node object for kubelet")
    11. continue
    12. }
    13. klog.InfoS("Attempting to register node", "node", klog.KObj(node))
    14. // 2. 注册到 APIServer 中去
    15. registered := kl.tryRegisterWithAPIServer(node)
    16. if registered {
    17. klog.InfoS("Successfully registered node", "node", klog.KObj(node))
    18. kl.registrationCompleted = true
    19. return
    20. }
    21. }
  4. node, err := kl.initialNode(context.TODO()) : 获取 node 对象及其信息

  5. registered := kl.tryRegisterWithAPIServer(node):注册到 APIServer 中去

Use Setter

:这里省略但不止省略有关 volumeManager 的处理部分。

  1. // tryUpdateNodeStatus tries to update node status to master if there is any
  2. // change or enough time passed from the last sync.
  3. func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
  4. originalNode := node.DeepCopy()
  5. ...
  6. kl.setNodeStatus(node)
  7. ...
  8. // Patch the current status on the API server
  9. updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
  10. ...
  11. return nil
  12. }

kl.setNodeStatus 只是遍历了刚才我们提到的所有的 Setter 函数而已。

  1. func (kl *Kubelet) setNodeStatus(node *v1.Node) {
  2. for i, f := range kl.setNodeStatusFuncs {
  3. klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
  4. if err := f(node); err != nil {
  5. klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
  6. }
  7. }
  8. }

Conclusion

我们学会了:

  1. Node Status 的更改函数是哪些,遵循什么规则做函数签名。
  2. Setter 函数如何注册到 Kubelet 中。
  3. Kubelet 何时调用这些 setter 用于更改 Node 的状态。

下一步:

  1. 可以尝试自己添加自定义的 setter 函数
  2. Kubernetes 的代码并不想设计那样整洁,其中有的 todo 可以在看完本文和代码后有思路更改,尝试让代码更加解耦吧。(在代码中搜索 todo 也可以找到。)