ReadLink
Directory Layout
pkg/kubelet/nodestatus
|- setters.go
|- setters_test.go
Setter
// Setter modifies the node in-place, and returns an error if the modification failed.
// Setters may partially mutate the node before returning an error.
type Setter func(node *v1.Node) error
Setter 函数定义了对 v1.Node 对象进行操作的函数。在返回错误的情况下,也可能对 Node 对象进行更改。
由函数定义可以看明白其使用方式:使用函数生成不同的 setter 用于一类对 Node 对象的修改。这样可以做到规范化修改 Node 的各种状态的效果。
用最简单的 func GoRuntime() Setter
示例如下:
// GoRuntime returns a Setter that sets GOOS and GOARCH on the node.
func GoRuntime() Setter {
return func(node *v1.Node) error {
node.Status.NodeInfo.OperatingSystem = goruntime.GOOS
node.Status.NodeInfo.Architecture = goruntime.GOARCH
return nil
}
}
这种模式属于中间件操作模式,读者可以联系 middleware 理解。
Setter List
上面我们学习了 Node status 更改的 setter 模式,目前代码中有以下十二个 setter:
每一个 Setter 返回函数有明确的注释在入参上,详细了解是哪些入参请参阅代码。
- NodeAddress returns a Setter that updates address-related information on the node.:更新和地址有关的字段,比如 IP 地址、 hostname(通常取 kubelet 中的 hostname 变量)等。
- MachineInfo returns a Setter that updates machine-related information on the node.:更新和主机信息有关的字段,比如 Pod 的最大限量、每个核分配的 Pod 数量、资源限量等。
- VersionInfo returns a Setter that updates version-related information on the node.:containerRuntime 的版本,cadvisor 的版本
- DaemonEndpoints returns a Setter that updates the daemon endpoints on the node.
- Images returns a Setter that updates the images on the node.:更新镜像相关信息
- GoRuntime returns a Setter that sets GOOS and GOARCH on the node.:GOOS GOARCH 信息
- ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.:从 Kubelet 各个字段比如 runtimeState 中的错误返回函数中判断是否 node 处于 Ready 状态。
- MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node.
- PIDPressureCondition returns a Setter that updates the v1.NodePIDPressure condition on the node.
- DiskPressureCondition returns a Setter that updates the v1.NodeDiskPressure condition on the node.
- VolumesInUse returns a Setter that updates the volumes in use on the node.
- VolumeLimits returns a Setter that updates the volume limits on the node.
Setter 的入参通常是 Kubelet 中的字段,自然使用是通过 Kubelet 去初始化使用。
Kubelet Node Status
/pkg /kubelet /kubelet_node_status.go
Setter 使用处
Setter 在 defaultNodeStatusFuncs 函数中全部初始化完毕,函数返回了一个 Setter 数组。
// defaultNodeStatusFuncs is a factory that generates the default set of
// setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
// if cloud is not nil, we expect the cloud resource sync manager to exist
var nodeAddressesFunc func() ([]v1.NodeAddress, error)
if kl.cloud != nil {
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
}
var validateHostFunc func() error
if kl.appArmorValidator != nil {
validateHostFunc = kl.appArmorValidator.ValidateHost
}
var setters []func(n *v1.Node) error
setters = append(setters,
nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
nodestatus.DaemonEndpoints(kl.daemonEndpoints),
nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
nodestatus.GoRuntime(),
)
// Volume limits
setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
setters = append(setters,
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
// these side-effects by decoupling the decisions to send events and partial status recording
// from the Node setters.
kl.recordNodeSchedulableEvent,
)
return setters
}
该数组会赋值给 kubelet 的 setNodeStatusFuncs 字段。
// Generating the status funcs should be the last thing we do,
// since this relies on the rest of the Kubelet having been constructed.
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
SyncNodeStatus Procedure
Kubelet 如何使用这些 Kubelet 呢?核心是 syncNodeStatus 函数。
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master if there is any change or enough time
// passed from the last sync, registering the kubelet first if necessary.
func (kl *Kubelet) syncNodeStatus() {
kl.syncNodeStatusMux.Lock()
defer kl.syncNodeStatusMux.Unlock()
if kl.kubeClient == nil || kl.heartbeatClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithAPIServer()
}
if err := kl.updateNodeStatus(); err != nil {
klog.ErrorS(err, "Unable to update node status")
}
}
syncNodeStatus
函数会在 goroutine 中定期调用,用于同步 node 状态到 master 中。
入口 Entry
目前会在三个地方调用:
在 Kubelet 的 Run 函数中,启动 goroutine 定期同步
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
:在 fastStatusUpdateOnce 函数中进行一次性的同步
func (kl *Kubelet) fastStatusUpdateOnce() {
for {
...
kl.syncNodeStatus()
return
}
}
:在 nodeshutdownmanager 的 start()函数中调用,其实是一个 goroutine 中,等从 channel 中收到 shutdown 事件后才会触发。
if isShuttingDown {
// Update node status and ready condition
go m.syncNodeStatus()
m.processShutdownEvent()
}
注册 RegisterWithAPIserver
如果 kubelet 需要注册,会进行一个 for 循环持续等待注册到 APIServer 中去。
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
// 1. 获取 node 对象及其信息
node, err := kl.initialNode(context.TODO())
if err != nil {
klog.ErrorS(err, "Unable to construct v1.Node object for kubelet")
continue
}
klog.InfoS("Attempting to register node", "node", klog.KObj(node))
// 2. 注册到 APIServer 中去
registered := kl.tryRegisterWithAPIServer(node)
if registered {
klog.InfoS("Successfully registered node", "node", klog.KObj(node))
kl.registrationCompleted = true
return
}
}
node, err := kl.initialNode(context.TODO())
: 获取 node 对象及其信息registered := kl.tryRegisterWithAPIServer(node)
:注册到 APIServer 中去
Use Setter
:这里省略但不止省略有关 volumeManager 的处理部分。
// tryUpdateNodeStatus tries to update node status to master if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
originalNode := node.DeepCopy()
...
kl.setNodeStatus(node)
...
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
...
return nil
}
kl.setNodeStatus 只是遍历了刚才我们提到的所有的 Setter 函数而已。
func (kl *Kubelet) setNodeStatus(node *v1.Node) {
for i, f := range kl.setNodeStatusFuncs {
klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
if err := f(node); err != nil {
klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
}
}
}
Conclusion
我们学会了:
- Node Status 的更改函数是哪些,遵循什么规则做函数签名。
- Setter 函数如何注册到 Kubelet 中。
- Kubelet 何时调用这些 setter 用于更改 Node 的状态。
下一步:
- 可以尝试自己添加自定义的 setter 函数
- Kubernetes 的代码并不想设计那样整洁,其中有的 todo 可以在看完本文和代码后有思路更改,尝试让代码更加解耦吧。(在代码中搜索 todo 也可以找到。)