Pre
kubernetes 通过独立于 Kubelet 运行的 Attach/Dettach Controller,分离了卷的 attach/dettach 操作,新版本仅留下 volume 挂载(mount)卸载(unmount)功能,以解决 Node (即 kubelet)不可用时导致的存储问题。
kubelet 启动的时候添加 flag 可以选择是否转移这部分功能,以兼容旧版本
Interface
type VolumeManager interface {
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
WaitForAttachAndMount(pod *v1.Pod) error
WaitForUnmount(pod *v1.Pod) error
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
GetVolumesInUse() []v1.UniqueVolumeName
ReconcilerStatesHasBeenSynced() bool
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
快速浏览一下 VolumeManager 的接口,可以看到除了 Run 接口外,其他都是工具函数,帮助获取关于 volume 相关的信息,如挂载等。
于是主要关注 Run 方法。
// VolumeManager runs a set of asynchronous loops that figure out which volumes
// need to be attached/mounted/unmounted/detached based on the pods scheduled on
// this node and makes it so.
type VolumeManager interface {
// Starts the volume manager and all the asynchronous loops that it controls
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// WaitForAttachAndMount processes the volumes referenced in the specified
// pod and blocks until they are all attached and mounted (reflected in
// actual state of the world).
// An error is returned if all volumes are not attached and mounted within
// the duration defined in podAttachAndMountTimeout.
WaitForAttachAndMount(pod *v1.Pod) error
// WaitForUnmount processes the volumes referenced in the specified
// pod and blocks until they are all unmounted (reflected in the actual
// state of the world).
// An error is returned if all volumes are not unmounted within
// the duration defined in podAttachAndMountTimeout.
WaitForUnmount(pod *v1.Pod) error
// GetMountedVolumesForPod returns a VolumeMap containing the volumes
// referenced by the specified pod that are successfully attached and
// mounted. The key in the map is the OuterVolumeSpecName (i.e.
// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
// volumes.
GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
// GetPossiblyMountedVolumesForPod returns a VolumeMap containing the volumes
// referenced by the specified pod that are either successfully attached
// and mounted or are "uncertain", i.e. a volume plugin may be mounting
// them right now. The key in the map is the OuterVolumeSpecName (i.e.
// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
// volumes.
GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
// GetExtraSupplementalGroupsForPod returns a list of the extra
// supplemental groups for the Pod. These extra supplemental groups come
// from annotations on persistent volumes that the pod depends on.
GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
// interface and are currently in use according to the actual and desired
// state of the world caches. A volume is considered "in use" as soon as it
// is added to the desired state of world, indicating it *should* be
// attached to this node and remains "in use" until it is removed from both
// the desired state of the world and the actual state of the world, or it
// has been unmounted (as indicated in actual state of world).
GetVolumesInUse() []v1.UniqueVolumeName
// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
// has been synced at least once after kubelet starts so that it is safe to update mounted
// volume list retrieved from actual state.
ReconcilerStatesHasBeenSynced() bool
// VolumeIsAttached returns true if the given volume is attached to this
// node.
VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
// Marks the specified volume as having successfully been reported as "in
// use" in the nodes's volume status.
MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
}
Structure
// volumeManager implements the VolumeManager interface
type volumeManager struct {
// kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
// communicate with the API server to fetch PV and PVC objects
kubeClient clientset.Interface
// volumePluginMgr is the volume plugin manager used to access volume
// plugins. It must be pre-initialized.
volumePluginMgr *volume.VolumePluginMgr
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to the volume manager: i.e. what volumes should be
// attached and which pods are referencing the volumes).
// The data structure is populated by the desired state of the world
// populator using the kubelet pod manager.
desiredStateOfWorld cache.DesiredStateOfWorld
// actualStateOfWorld is a data structure containing the actual state of
// the world according to the manager: i.e. which volumes are attached to
// this node and what pods the volumes are mounted to.
// The data structure is populated upon successful completion of attach,
// detach, mount, and unmount actions triggered by the reconciler.
actualStateOfWorld cache.ActualStateOfWorld
// operationExecutor is used to start asynchronous attach, detach, mount,
// and unmount operations.
operationExecutor operationexecutor.OperationExecutor
// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
// detach, mount, and unmount operations using the operationExecutor.
reconciler reconciler.Reconciler
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the desiredStateOfWorld using the kubelet PodManager.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
// csiMigratedPluginManager keeps track of CSI migration status of plugins
csiMigratedPluginManager csimigration.PluginManager
// intreeToCSITranslator translates in-tree volume specs to CSI
intreeToCSITranslator csimigration.InTreeToCSITranslator
}
其中 csiMigratedPluginManager,volumePluginMgr,intreeToCSITranslator,operationExecutor,kub,eClient 和核心逻辑无关,是功能的执行者。重要的数据结构是 desiredStateOfWorld,actualStateOfWorld,reconciler,desiredStateOfWorldPopulator 的交互。
Explanation
desiredStateOfWorld
: 期望的世界状态,简称 DSW,描述期望的 Pod 和 Volume 的挂载关系。actualStateOfWorld
: 实际世界状态,简称ASW,即此时 volume 实际状态。实际状态未必是和期望状态一样,他代表现在进行时。Overview
/
事件来源:pod 的数据由 desiredStateOfWorldPopulator
从 PodManager (PodManager 是 kubelet 中的 manager,它通过轮询维护着真实的 pod 数据及状态)中进行同步到 desiredStateOfWorld
中去。
事件处理:由 reconciler
的 Run 方法持续 reconcile,保证记录在desiredStateOfWorld
中 volume 卷与 pod 的关系处理同步到 actualStateOfWorld
中。
重启与残留清理:reconciler
除了负责触发挂载事件交由实际程序处理之外,还会进行卷的重建和清理工作(不支持重建的卷插件会清理他)
Run(入口)
go vm.volumePluginMgr.Run(stopCh)
为 CSIDriver 启动 informergo vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
:开启循环,不断从 PodManager 中拿到 Pod 信息并处理这些 Pod 里的 Volume 信息,将它们加入到 DSW 中。go vm.reconciler.Run(stopCh)
:开始 VolumeManager 的主要调谐循环。func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
if vm.kubeClient != nil {
// start informer for CSIDriver
go vm.volumePluginMgr.Run(stopCh)
}
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
klog.V(2).InfoS("The desired_state_of_world populator starts")
klog.InfoS("Starting Kubelet Volume Manager")
go vm.reconciler.Run(stopCh)
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
<-stopCh
klog.InfoS("Shutting down Kubelet Volume Manager")
}
Reconcile
数据结构
type podVolume struct {
podName volumetypes.UniquePodName
volumeSpecName string
volumePath string
pluginName string
volumeMode v1.PersistentVolumeMode
}
状态同步
VolumeManager 什么时候同步状态呢?可以在 Run 函数中找到答案
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.InfoS("Reconciler: start to sync state")
rc.sync()
}
}
}
其中
rc.sync()
是同步函数。两个条件分别是
- populator 开始运行添加了 podManager 中相关的 Pod:查看实现可以知道,只要 Populator.Run 开始运行,这个函数就会返回 true ```go rc.populatorHasAddedPods = vm.desiredStateOfWorldPopulator.HasAddedPods
func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool { dswp.hasAddedPodsLock.RLock() defer dswp.hasAddedPodsLock.RUnlock() return dswp.hasAddedPods }
// 在 Run 函数中置为 true。 func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly klog.InfoS(“Desired state populator starts to run”) wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) { done := sourcesReady.AllReady() dswp.populatorLoop() return done, nil }, stopCh) dswp.hasAddedPodsLock.Lock() dswp.hasAddedPods = true dswp.hasAddedPodsLock.Unlock() wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh) }
- 还没有同步过状态。
```go
func (rc *reconciler) StatesHasBeenSynced() bool {
return !rc.timeOfLastSync.IsZero()
}
同步可以保证 desired state of world 保留全部 pods 的列表,其意义是避免 重建 volume 时会清理正在使用的 pod 的 volume 卷。
sync() 详情
sync 会将 Pod 目录中所有的 volume 文件夹遍历一遍,不在 DSW 中的 volume 会被 重建 相关信息,并添加到 DSW 和 ASW 中。如果不支持 重建 的(一些 volume plugin 不支持)的卷,会在卷不在被使用之后清理他的挂载点
func (rc *reconciler) sync() {
defer rc.updateLastSyncTime()
rc.syncStates()
}
func (rc *reconciler) syncStates() {
// Get volumes information by reading the pod's directory
podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
if err != nil {
klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
return
}
volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
volumeNeedReport := []v1.UniqueVolumeName{}
for _, volume := range podVolumes {
if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
// There is nothing to reconstruct
continue
}
volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume)
if err != nil {
if volumeInDSW {
// Some pod needs the volume, don't clean it up and hope that
// reconcile() calls SetUp and reconstructs the volume in ASW.
klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// No pod needs the volume.
klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
rc.cleanupMounts(volume)
continue
}
if volumeInDSW {
// Some pod needs the volume. And it exists on disk. Some previous
// kubelet must have created the directory, therefore it must have
// reported the volume as in use. Mark the volume as in use also in
// this new kubelet so reconcile() calls SetUp and re-mounts the
// volume if it's necessary.
volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
continue
}
// There is no pod that uses the volume.
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
}
klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
}
if len(volumesNeedUpdate) > 0 {
if err = rc.updateStates(volumesNeedUpdate); err != nil {
klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
}
}
if len(volumeNeedReport) > 0 {
rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
}
}
Working
那么剩余的工作 reconcile() 则会负责
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountOrAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
}
- 取消卷挂载
- 挂载卷到节点或附加卷到 pod(attachvolume)
-
函数细节
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
}
遍历 ASW 中所有已经挂载的卷,比较 DSW 是否含有该卷和对应 Pod 的关系,很明显,如果没有,就把他给取消挂载。
func (rc *reconciler) mountOrAttachVolumes() {
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
rc.waitForVolumeAttach(volumeToMount)
} else if !volMounted || cache.IsRemountRequiredError(err) {
rc.mountAttachedVolumes(volumeToMount, err)
} else if cache.IsFSResizeRequiredError(err) {
fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
}
}
}
遍历 DSW 中需要被挂载(Mount)的卷,判断该卷是否要进行挂载操作
判断是否有 Attach 错误,如果没有就等待其挂载 :::info 注意这里就和我们开头提到的 AD Controller 相关了。会判断是否开启 AD Controller,如果开启了就由其做 Attach 操作,这里只做 Attach 是否成功的检查。如果没有就是这里自己去 Attach :::
如果还没挂载(Mount)就 Mount 并 Attach 这个卷。
- 如果是要求 Resize,则调用 rc.expandVolume 进行扩容操作。
func (rc *reconciler) unmountDetachDevices() {
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
if attachedVolume.DeviceMayBeMounted() {
// Volume is globally mounted to device, unmount it
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
}
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
if err != nil && !isExpectedError(err) {
klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
}
}
}
}
}
}
- 还是从 ASW 中遍历所有取消挂载的卷,过滤在 DSW 中声明应该有的卷,也过滤正在进行 Mount(挂载)操作的卷。
- 对过滤掉的卷,进行 Detach 操作,和前文一样,如果开启了 AD Controller。Detach 操作不会由 kubelet 直接执行。