Volume Manager read link

Pre

kubernetes 通过独立于 Kubelet 运行的 Attach/Dettach Controller,分离了卷的 attach/dettach 操作,新版本仅留下 volume 挂载(mount)卸载(unmount)功能,以解决 Node (即 kubelet)不可用时导致的存储问题。

kubelet 启动的时候添加 flag 可以选择是否转移这部分功能,以兼容旧版本

Interface

  1. type VolumeManager interface {
  2. Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
  3. WaitForAttachAndMount(pod *v1.Pod) error
  4. WaitForUnmount(pod *v1.Pod) error
  5. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  6. GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  7. GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
  8. GetVolumesInUse() []v1.UniqueVolumeName
  9. ReconcilerStatesHasBeenSynced() bool
  10. VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
  11. MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
  12. }

快速浏览一下 VolumeManager 的接口,可以看到除了 Run 接口外,其他都是工具函数,帮助获取关于 volume 相关的信息,如挂载等。

于是主要关注 Run 方法。

  1. // VolumeManager runs a set of asynchronous loops that figure out which volumes
  2. // need to be attached/mounted/unmounted/detached based on the pods scheduled on
  3. // this node and makes it so.
  4. type VolumeManager interface {
  5. // Starts the volume manager and all the asynchronous loops that it controls
  6. Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
  7. // WaitForAttachAndMount processes the volumes referenced in the specified
  8. // pod and blocks until they are all attached and mounted (reflected in
  9. // actual state of the world).
  10. // An error is returned if all volumes are not attached and mounted within
  11. // the duration defined in podAttachAndMountTimeout.
  12. WaitForAttachAndMount(pod *v1.Pod) error
  13. // WaitForUnmount processes the volumes referenced in the specified
  14. // pod and blocks until they are all unmounted (reflected in the actual
  15. // state of the world).
  16. // An error is returned if all volumes are not unmounted within
  17. // the duration defined in podAttachAndMountTimeout.
  18. WaitForUnmount(pod *v1.Pod) error
  19. // GetMountedVolumesForPod returns a VolumeMap containing the volumes
  20. // referenced by the specified pod that are successfully attached and
  21. // mounted. The key in the map is the OuterVolumeSpecName (i.e.
  22. // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
  23. // volumes.
  24. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  25. // GetPossiblyMountedVolumesForPod returns a VolumeMap containing the volumes
  26. // referenced by the specified pod that are either successfully attached
  27. // and mounted or are "uncertain", i.e. a volume plugin may be mounting
  28. // them right now. The key in the map is the OuterVolumeSpecName (i.e.
  29. // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
  30. // volumes.
  31. GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  32. // GetExtraSupplementalGroupsForPod returns a list of the extra
  33. // supplemental groups for the Pod. These extra supplemental groups come
  34. // from annotations on persistent volumes that the pod depends on.
  35. GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
  36. // GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
  37. // interface and are currently in use according to the actual and desired
  38. // state of the world caches. A volume is considered "in use" as soon as it
  39. // is added to the desired state of world, indicating it *should* be
  40. // attached to this node and remains "in use" until it is removed from both
  41. // the desired state of the world and the actual state of the world, or it
  42. // has been unmounted (as indicated in actual state of world).
  43. GetVolumesInUse() []v1.UniqueVolumeName
  44. // ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
  45. // has been synced at least once after kubelet starts so that it is safe to update mounted
  46. // volume list retrieved from actual state.
  47. ReconcilerStatesHasBeenSynced() bool
  48. // VolumeIsAttached returns true if the given volume is attached to this
  49. // node.
  50. VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
  51. // Marks the specified volume as having successfully been reported as "in
  52. // use" in the nodes's volume status.
  53. MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
  54. }

Structure

  1. // volumeManager implements the VolumeManager interface
  2. type volumeManager struct {
  3. // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
  4. // communicate with the API server to fetch PV and PVC objects
  5. kubeClient clientset.Interface
  6. // volumePluginMgr is the volume plugin manager used to access volume
  7. // plugins. It must be pre-initialized.
  8. volumePluginMgr *volume.VolumePluginMgr
  9. // desiredStateOfWorld is a data structure containing the desired state of
  10. // the world according to the volume manager: i.e. what volumes should be
  11. // attached and which pods are referencing the volumes).
  12. // The data structure is populated by the desired state of the world
  13. // populator using the kubelet pod manager.
  14. desiredStateOfWorld cache.DesiredStateOfWorld
  15. // actualStateOfWorld is a data structure containing the actual state of
  16. // the world according to the manager: i.e. which volumes are attached to
  17. // this node and what pods the volumes are mounted to.
  18. // The data structure is populated upon successful completion of attach,
  19. // detach, mount, and unmount actions triggered by the reconciler.
  20. actualStateOfWorld cache.ActualStateOfWorld
  21. // operationExecutor is used to start asynchronous attach, detach, mount,
  22. // and unmount operations.
  23. operationExecutor operationexecutor.OperationExecutor
  24. // reconciler runs an asynchronous periodic loop to reconcile the
  25. // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
  26. // detach, mount, and unmount operations using the operationExecutor.
  27. reconciler reconciler.Reconciler
  28. // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
  29. // populate the desiredStateOfWorld using the kubelet PodManager.
  30. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
  31. // csiMigratedPluginManager keeps track of CSI migration status of plugins
  32. csiMigratedPluginManager csimigration.PluginManager
  33. // intreeToCSITranslator translates in-tree volume specs to CSI
  34. intreeToCSITranslator csimigration.InTreeToCSITranslator
  35. }

其中 csiMigratedPluginManager,volumePluginMgr,intreeToCSITranslator,operationExecutor,kub,eClient 和核心逻辑无关,是功能的执行者。重要的数据结构是 desiredStateOfWorldactualStateOfWorldreconcilerdesiredStateOfWorldPopulator 的交互。

Explanation

  • desiredStateOfWorld: 期望的世界状态,简称 DSW,描述期望的 Pod 和 Volume 的挂载关系。
  • actualStateOfWorld: 实际世界状态,简称ASW,即此时 volume 实际状态。实际状态未必是和期望状态一样,他代表现在进行时。

    Overview

    /
    Volume Manager - 图1

事件来源:pod 的数据由 desiredStateOfWorldPopulator从 PodManager (PodManager 是 kubelet 中的 manager,它通过轮询维护着真实的 pod 数据及状态)中进行同步到 desiredStateOfWorld 中去。
事件处理:由 reconciler的 Run 方法持续 reconcile,保证记录在desiredStateOfWorld中 volume 卷与 pod 的关系处理同步到 actualStateOfWorld中。
重启与残留清理:reconciler除了负责触发挂载事件交由实际程序处理之外,还会进行卷的重建和清理工作(不支持重建的卷插件会清理他)

Run(入口)

  1. go vm.volumePluginMgr.Run(stopCh)为 CSIDriver 启动 informer
  2. go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh):开启循环,不断从 PodManager 中拿到 Pod 信息并处理这些 Pod 里的 Volume 信息,将它们加入到 DSW 中。
  3. go vm.reconciler.Run(stopCh):开始 VolumeManager 的主要调谐循环。

    1. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    2. defer runtime.HandleCrash()
    3. if vm.kubeClient != nil {
    4. // start informer for CSIDriver
    5. go vm.volumePluginMgr.Run(stopCh)
    6. }
    7. go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    8. klog.V(2).InfoS("The desired_state_of_world populator starts")
    9. klog.InfoS("Starting Kubelet Volume Manager")
    10. go vm.reconciler.Run(stopCh)
    11. metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
    12. <-stopCh
    13. klog.InfoS("Shutting down Kubelet Volume Manager")
    14. }

    Reconcile

    数据结构

    1. type podVolume struct {
    2. podName volumetypes.UniquePodName
    3. volumeSpecName string
    4. volumePath string
    5. pluginName string
    6. volumeMode v1.PersistentVolumeMode
    7. }

    Pod 和 Volume 的关系,包含必要的处理信息。

    状态同步

    VolumeManager 什么时候同步状态呢?可以在 Run 函数中找到答案

    1. func (rc *reconciler) reconciliationLoopFunc() func() {
    2. return func() {
    3. rc.reconcile()
    4. // Sync the state with the reality once after all existing pods are added to the desired state from all sources.
    5. // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
    6. // desired state of world does not contain a complete list of pods.
    7. if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
    8. klog.InfoS("Reconciler: start to sync state")
    9. rc.sync()
    10. }
    11. }
    12. }

    其中 rc.sync()是同步函数。两个条件分别是

  • populator 开始运行添加了 podManager 中相关的 Pod:查看实现可以知道,只要 Populator.Run 开始运行,这个函数就会返回 true ```go rc.populatorHasAddedPods = vm.desiredStateOfWorldPopulator.HasAddedPods

func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool { dswp.hasAddedPodsLock.RLock() defer dswp.hasAddedPodsLock.RUnlock() return dswp.hasAddedPods }

// 在 Run 函数中置为 true。 func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly klog.InfoS(“Desired state populator starts to run”) wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) { done := sourcesReady.AllReady() dswp.populatorLoop() return done, nil }, stopCh) dswp.hasAddedPodsLock.Lock() dswp.hasAddedPods = true dswp.hasAddedPodsLock.Unlock() wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh) }

  1. - 还没有同步过状态。
  2. ```go
  3. func (rc *reconciler) StatesHasBeenSynced() bool {
  4. return !rc.timeOfLastSync.IsZero()
  5. }

同步可以保证 desired state of world 保留全部 pods 的列表,其意义是避免 重建 volume 时会清理正在使用的 pod 的 volume 卷。

sync() 详情

sync 会将 Pod 目录中所有的 volume 文件夹遍历一遍,不在 DSW 中的 volume 会被 重建 相关信息,并添加到 DSW 和 ASW 中。如果不支持 重建 的(一些 volume plugin 不支持)的卷,会在卷不在被使用之后清理他的挂载点

  1. func (rc *reconciler) sync() {
  2. defer rc.updateLastSyncTime()
  3. rc.syncStates()
  4. }
  5. func (rc *reconciler) syncStates() {
  6. // Get volumes information by reading the pod's directory
  7. podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir)
  8. if err != nil {
  9. klog.ErrorS(err, "Cannot get volumes from disk, skip sync states for volume reconstruction")
  10. return
  11. }
  12. volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume)
  13. volumeNeedReport := []v1.UniqueVolumeName{}
  14. for _, volume := range podVolumes {
  15. if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) {
  16. klog.V(4).InfoS("Volume exists in actual state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
  17. // There is nothing to reconstruct
  18. continue
  19. }
  20. volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
  21. reconstructedVolume, err := rc.reconstructVolume(volume)
  22. if err != nil {
  23. if volumeInDSW {
  24. // Some pod needs the volume, don't clean it up and hope that
  25. // reconcile() calls SetUp and reconstructs the volume in ASW.
  26. klog.V(4).InfoS("Volume exists in desired state, skip cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
  27. continue
  28. }
  29. // No pod needs the volume.
  30. klog.InfoS("Could not construct volume information, cleaning up mounts", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName, "err", err)
  31. rc.cleanupMounts(volume)
  32. continue
  33. }
  34. if volumeInDSW {
  35. // Some pod needs the volume. And it exists on disk. Some previous
  36. // kubelet must have created the directory, therefore it must have
  37. // reported the volume as in use. Mark the volume as in use also in
  38. // this new kubelet so reconcile() calls SetUp and re-mounts the
  39. // volume if it's necessary.
  40. volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName)
  41. klog.V(4).InfoS("Volume exists in desired state, marking as InUse", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
  42. continue
  43. }
  44. // There is no pod that uses the volume.
  45. if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
  46. klog.InfoS("Volume is in pending operation, skip cleaning up mounts")
  47. }
  48. klog.V(2).InfoS("Reconciler sync states: could not find pod information in desired state, update it in actual state", "reconstructedVolume", reconstructedVolume)
  49. volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume
  50. }
  51. if len(volumesNeedUpdate) > 0 {
  52. if err = rc.updateStates(volumesNeedUpdate); err != nil {
  53. klog.ErrorS(err, "Error occurred during reconstruct volume from disk")
  54. }
  55. }
  56. if len(volumeNeedReport) > 0 {
  57. rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport)
  58. }
  59. }

Working

那么剩余的工作 reconcile() 则会负责

  1. func (rc *reconciler) reconcile() {
  2. // Unmounts are triggered before mounts so that a volume that was
  3. // referenced by a pod that was deleted and is now referenced by another
  4. // pod is unmounted from the first pod before being mounted to the new
  5. // pod.
  6. rc.unmountVolumes()
  7. // Next we mount required volumes. This function could also trigger
  8. // attach if kubelet is responsible for attaching volumes.
  9. // If underlying PVC was resized while in-use then this function also handles volume
  10. // resizing.
  11. rc.mountOrAttachVolumes()
  12. // Ensure devices that should be detached/unmounted are detached/unmounted.
  13. rc.unmountDetachDevices()
  14. }
  1. 取消卷挂载
  2. 挂载卷到节点或附加卷到 pod(attachvolume)
  3. 卸载卷或从 pod 中取消附加(detach)

    函数细节

    1. func (rc *reconciler) unmountVolumes() {
    2. // Ensure volumes that should be unmounted are unmounted.
    3. for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
    4. if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
    5. // Volume is mounted, unmount it
    6. klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
    7. err := rc.operationExecutor.UnmountVolume(
    8. mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
    9. if err != nil && !isExpectedError(err) {
    10. klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
    11. }
    12. if err == nil {
    13. klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
    14. }
    15. }
    16. }
    17. }

    遍历 ASW 中所有已经挂载的卷,比较 DSW 是否含有该卷和对应 Pod 的关系,很明显,如果没有,就把他给取消挂载。

    1. func (rc *reconciler) mountOrAttachVolumes() {
    2. // Ensure volumes that should be attached/mounted are attached/mounted.
    3. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
    4. volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.PersistentVolumeSize)
    5. volumeToMount.DevicePath = devicePath
    6. if cache.IsVolumeNotAttachedError(err) {
    7. rc.waitForVolumeAttach(volumeToMount)
    8. } else if !volMounted || cache.IsRemountRequiredError(err) {
    9. rc.mountAttachedVolumes(volumeToMount, err)
    10. } else if cache.IsFSResizeRequiredError(err) {
    11. fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
    12. rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
    13. }
    14. }
    15. }

    遍历 DSW 中需要被挂载(Mount)的卷,判断该卷是否要进行挂载操作

  4. 判断是否有 Attach 错误,如果没有就等待其挂载 :::info 注意这里就和我们开头提到的 AD Controller 相关了。会判断是否开启 AD Controller,如果开启了就由其做 Attach 操作,这里只做 Attach 是否成功的检查。如果没有就是这里自己去 Attach :::

  5. 如果还没挂载(Mount)就 Mount 并 Attach 这个卷。

  6. 如果是要求 Resize,则调用 rc.expandVolume 进行扩容操作。
  1. func (rc *reconciler) unmountDetachDevices() {
  2. for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
  3. // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
  4. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
  5. !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
  6. if attachedVolume.DeviceMayBeMounted() {
  7. // Volume is globally mounted to device, unmount it
  8. klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
  9. err := rc.operationExecutor.UnmountDevice(
  10. attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
  11. if err != nil && !isExpectedError(err) {
  12. klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  13. }
  14. if err == nil {
  15. klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
  16. }
  17. } else {
  18. // Volume is attached to node, detach it
  19. // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
  20. if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
  21. rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
  22. klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
  23. } else {
  24. // Only detach if kubelet detach is enabled
  25. klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
  26. err := rc.operationExecutor.DetachVolume(
  27. attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
  28. if err != nil && !isExpectedError(err) {
  29. klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
  30. }
  31. if err == nil {
  32. klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
  33. }
  34. }
  35. }
  36. }
  37. }
  38. }
  1. 还是从 ASW 中遍历所有取消挂载的卷,过滤在 DSW 中声明应该有的卷,也过滤正在进行 Mount(挂载)操作的卷。
  2. 对过滤掉的卷,进行 Detach 操作,和前文一样,如果开启了 AD Controller。Detach 操作不会由 kubelet 直接执行。