Pre
kubernetes 通过独立于 Kubelet 运行的 Attach/Dettach Controller,分离了卷的 attach/dettach 操作,新版本仅留下 volume 挂载(mount)卸载(unmount)功能,以解决 Node (即 kubelet)不可用时导致的存储问题。
kubelet 启动的时候添加 flag 可以选择是否转移这部分功能,以兼容旧版本
Interface
type VolumeManager interface {Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})WaitForAttachAndMount(pod *v1.Pod) errorWaitForUnmount(pod *v1.Pod) errorGetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMapGetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMapGetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64GetVolumesInUse() []v1.UniqueVolumeNameReconcilerStatesHasBeenSynced() boolVolumeIsAttached(volumeName v1.UniqueVolumeName) boolMarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)}
快速浏览一下 VolumeManager 的接口,可以看到除了 Run 接口外,其他都是工具函数,帮助获取关于 volume 相关的信息,如挂载等。
于是主要关注 Run 方法。
// VolumeManager runs a set of asynchronous loops that figure out which volumes// need to be attached/mounted/unmounted/detached based on the pods scheduled on// this node and makes it so.type VolumeManager interface {// Starts the volume manager and all the asynchronous loops that it controlsRun(sourcesReady config.SourcesReady, stopCh <-chan struct{})// WaitForAttachAndMount processes the volumes referenced in the specified// pod and blocks until they are all attached and mounted (reflected in// actual state of the world).// An error is returned if all volumes are not attached and mounted within// the duration defined in podAttachAndMountTimeout.WaitForAttachAndMount(pod *v1.Pod) error// WaitForUnmount processes the volumes referenced in the specified// pod and blocks until they are all unmounted (reflected in the actual// state of the world).// An error is returned if all volumes are not unmounted within// the duration defined in podAttachAndMountTimeout.WaitForUnmount(pod *v1.Pod) error// GetMountedVolumesForPod returns a VolumeMap containing the volumes// referenced by the specified pod that are successfully attached and// mounted. The key in the map is the OuterVolumeSpecName (i.e.// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no// volumes.GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap// GetPossiblyMountedVolumesForPod returns a VolumeMap containing the volumes// referenced by the specified pod that are either successfully attached// and mounted or are "uncertain", i.e. a volume plugin may be mounting// them right now. The key in the map is the OuterVolumeSpecName (i.e.// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no// volumes.GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap// GetExtraSupplementalGroupsForPod returns a list of the extra// supplemental groups for the Pod. These extra supplemental groups come// from annotations on persistent volumes that the pod depends on.GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher// interface and are currently in use according to the actual and desired// state of the world caches. A volume is considered "in use" as soon as it// is added to the desired state of world, indicating it *should* be// attached to this node and remains "in use" until it is removed from both// the desired state of the world and the actual state of the world, or it// has been unmounted (as indicated in actual state of world).GetVolumesInUse() []v1.UniqueVolumeName// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler// has been synced at least once after kubelet starts so that it is safe to update mounted// volume list retrieved from actual state.ReconcilerStatesHasBeenSynced() bool// VolumeIsAttached returns true if the given volume is attached to this// node.VolumeIsAttached(volumeName v1.UniqueVolumeName) bool// Marks the specified volume as having successfully been reported as "in// use" in the nodes's volume status.MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)}
Structure
// volumeManager implements the VolumeManager interfacetype volumeManager struct {// kubeClient is the kube API client used by DesiredStateOfWorldPopulator to// communicate with the API server to fetch PV and PVC objectskubeClient clientset.Interface// volumePluginMgr is the volume plugin manager used to access volume// plugins. It must be pre-initialized.volumePluginMgr *volume.VolumePluginMgr// desiredStateOfWorld is a data structure containing the desired state of// the world according to the volume manager: i.e. what volumes should be// attached and which pods are referencing the volumes).// The data structure is populated by the desired state of the world// populator using the kubelet pod manager.desiredStateOfWorld cache.DesiredStateOfWorld// actualStateOfWorld is a data structure containing the actual state of// the world according to the manager: i.e. which volumes are attached to// this node and what pods the volumes are mounted to.// The data structure is populated upon successful completion of attach,// detach, mount, and unmount actions triggered by the reconciler.actualStateOfWorld cache.ActualStateOfWorld// operationExecutor is used to start asynchronous attach, detach, mount,// and unmount operations.operationExecutor operationexecutor.OperationExecutor// reconciler runs an asynchronous periodic loop to reconcile the// desiredStateOfWorld with the actualStateOfWorld by triggering attach,// detach, mount, and unmount operations using the operationExecutor.reconciler reconciler.Reconciler// desiredStateOfWorldPopulator runs an asynchronous periodic loop to// populate the desiredStateOfWorld using the kubelet PodManager.desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator// csiMigratedPluginManager keeps track of CSI migration status of pluginscsiMigratedPluginManager csimigration.PluginManager// intreeToCSITranslator translates in-tree volume specs to CSIintreeToCSITranslator csimigration.InTreeToCSITranslator}
其中 csiMigratedPluginManager,volumePluginMgr,intreeToCSITranslator,operationExecutor,kub,eClient 和核心逻辑无关,是功能的执行者。重要的数据结构是 desiredStateOfWorld,actualStateOfWorld,reconciler,desiredStateOfWorldPopulator 的交互。
Explanation
desiredStateOfWorld: 期望的世界状态,简称 DSW,描述期望的 Pod 和 Volume 的挂载关系。actualStateOfWorld: 实际世界状态,简称ASW,即此时 volume 实际状态。实际状态未必是和期望状态一样,他代表现在进行时。Overview
/
事件来源:pod 的数据由 desiredStateOfWorldPopulator从 PodManager (PodManager 是 kubelet 中的 manager,它通过轮询维护着真实的 pod 数据及状态)中进行同步到 desiredStateOfWorld 中去。
事件处理:由 reconciler的 Run 方法持续 reconcile,保证记录在desiredStateOfWorld中 volume 卷与 pod 的关系处理同步到 actualStateOfWorld中。
重启与残留清理:reconciler除了负责触发挂载事件交由实际程序处理之外,还会进行卷的重建和清理工作(不支持重建的卷插件会清理他)
Run(入口)
go vm.volumePluginMgr.Run(stopCh)为 CSIDriver 启动 informergo vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh):开启循环,不断从 PodManager 中拿到 Pod 信息并处理这些 Pod 里的 Volume 信息,将它们加入到 DSW 中。go vm.reconciler.Run(stopCh):开始 VolumeManager 的主要调谐循环。func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {defer runtime.HandleCrash()if vm.kubeClient != nil {// start informer for CSIDrivergo vm.volumePluginMgr.Run(stopCh)}go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)klog.V(2).InfoS("The desired_state_of_world populator starts")klog.InfoS("Starting Kubelet Volume Manager")go vm.reconciler.Run(stopCh)metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)<-stopChklog.InfoS("Shutting down Kubelet Volume Manager")}
Reconcile
数据结构
type podVolume struct {podName volumetypes.UniquePodNamevolumeSpecName stringvolumePath stringpluginName stringvolumeMode v1.PersistentVolumeMode}
状态同步
VolumeManager 什么时候同步状态呢?可以在 Run 函数中找到答案
func (rc *reconciler) reconciliationLoopFunc() func() {return func() {rc.reconcile()// Sync the state with the reality once after all existing pods are added to the desired state from all sources.// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because// desired state of world does not contain a complete list of pods.if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {klog.InfoS("Reconciler: start to sync state")rc.sync()}}}
其中
rc.sync()是同步函数。两个条件分别是
- populator 开始运行添加了 podManager 中相关的 Pod:查看实现可以知道,只要 Populator.Run 开始运行,这个函数就会返回 true ```go rc.populatorHasAddedPods = vm.desiredStateOfWorldPopulator.HasAddedPods
func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool { dswp.hasAddedPodsLock.RLock() defer dswp.hasAddedPodsLock.RUnlock() return dswp.hasAddedPods }
// 在 Run 函数中置为 true。 func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly klog.InfoS(“Desired state populator starts to run”) wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) { done := sourcesReady.AllReady() dswp.populatorLoop() return done, nil }, stopCh) dswp.hasAddedPodsLock.Lock() dswp.hasAddedPods = true dswp.hasAddedPodsLock.Unlock() wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh) }
- 还没有同步过状态。```gofunc (rc *reconciler) StatesHasBeenSynced() bool {return !rc.timeOfLastSync.IsZero()}
同步可以保证 desired state of world 保留全部 pods 的列表,其意义是避免 重建 volume 时会清理正在使用的 pod 的 volume 卷。
sync() 详情
sync 会将 Pod 目录中所有的 volume 文件夹遍历一遍,不在 DSW 中的 volume 会被 重建 相关信息,并添加到 DSW 和 ASW 中。如果不支持 重建 的(一些 volume plugin 不支持)的卷,会在卷不在被使用之后清理他的挂载点
func (rc *reconciler) sync() {defer rc.updateLastSyncTime()rc.syncStates()}func (rc *reconciler) syncStates() {// Get volumes information by reading the pod's directorypodVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)if err != nil {klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")return}volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)volumeNeedReport := []v1.UniqueVolumeName{}for _, volume := range podVolumes {if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)// There is nothing to reconstructcontinue}volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)reconstructedVolume, err := rc.reconstructVolume(volume)if err != nil {if volumeInDSW {// Some pod needs the volume, don't clean it up and hope that// reconcile() calls SetUp and reconstructs the volume in ASW.klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)continue}// No pod needs the volume.klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)rc.cleanupMounts(volume)continue}if volumeInDSW {// Some pod needs the volume. And it exists on disk. Some previous// kubelet must have created the directory, therefore it must have// reported the volume as in use. Mark the volume as in use also in// this new kubelet so reconcile() calls SetUp and re-mounts the// volume if it's necessary.volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)continue}// There is no pod that uses the volume.if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {klog.InfoS("Volume is in pending operation, skip cleaning up mounts")}klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume}if len(volumesNeedUpdate) > 0 {if err = rc.updateStates(volumesNeedUpdate); err != nil {klog.ErrorS(err, "Error occurred during reconstruct volume from disk")}}if len(volumeNeedReport) > 0 {rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)}}
Working
那么剩余的工作 reconcile() 则会负责
func (rc *reconciler) reconcile() {// Unmounts are triggered before mounts so that a volume that was// referenced by a pod that was deleted and is now referenced by another// pod is unmounted from the first pod before being mounted to the new// pod.rc.unmountVolumes()// Next we mount required volumes. This function could also trigger// attach if kubelet is responsible for attaching volumes.// If underlying PVC was resized while in-use then this function also handles volume// resizing.rc.mountOrAttachVolumes()// Ensure devices that should be detached/unmounted are detached/unmounted.rc.unmountDetachDevices()}
- 取消卷挂载
- 挂载卷到节点或附加卷到 pod(attachvolume)
-
函数细节
func (rc *reconciler) unmountVolumes() {// Ensure volumes that should be unmounted are unmounted.for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {// Volume is mounted, unmount itklog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))err := rc.operationExecutor.UnmountVolume(mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)if err != nil && !isExpectedError(err) {klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())}if err == nil {klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))}}}}
遍历 ASW 中所有已经挂载的卷,比较 DSW 是否含有该卷和对应 Pod 的关系,很明显,如果没有,就把他给取消挂载。
func (rc *reconciler) mountOrAttachVolumes() {// Ensure volumes that should be attached/mounted are attached/mounted.for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)volumeToMount.DevicePath = devicePathif cache.IsVolumeNotAttachedError(err) {rc.waitForVolumeAttach(volumeToMount)} else if !volMounted || cache.IsRemountRequiredError(err) {rc.mountAttachedVolumes(volumeToMount, err)} else if cache.IsFSResizeRequiredError(err) {fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)}}}
遍历 DSW 中需要被挂载(Mount)的卷,判断该卷是否要进行挂载操作
判断是否有 Attach 错误,如果没有就等待其挂载 :::info 注意这里就和我们开头提到的 AD Controller 相关了。会判断是否开启 AD Controller,如果开启了就由其做 Attach 操作,这里只做 Attach 是否成功的检查。如果没有就是这里自己去 Attach :::
如果还没挂载(Mount)就 Mount 并 Attach 这个卷。
- 如果是要求 Resize,则调用 rc.expandVolume 进行扩容操作。
func (rc *reconciler) unmountDetachDevices() {for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {if attachedVolume.DeviceMayBeMounted() {// Volume is globally mounted to device, unmount itklog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))err := rc.operationExecutor.UnmountDevice(attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)if err != nil && !isExpectedError(err) {klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())}if err == nil {klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))}} else {// Volume is attached to node, detach it// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))} else {// Only detach if kubelet detach is enabledklog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))err := rc.operationExecutor.DetachVolume(attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)if err != nil && !isExpectedError(err) {klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())}if err == nil {klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))}}}}}}
- 还是从 ASW 中遍历所有取消挂载的卷,过滤在 DSW 中声明应该有的卷,也过滤正在进行 Mount(挂载)操作的卷。
- 对过滤掉的卷,进行 Detach 操作,和前文一样,如果开启了 AD Controller。Detach 操作不会由 kubelet 直接执行。
