官方文档:https://openkruise.io/zh-cn/docs/cloneset.html

0、前置概念

0.1、readinessGates

https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate

1、Spec定义

  1. type CloneSetSpec struct {
  2. // 副本数量,默认为1
  3. Replicas *int32 `json:"replicas,omitempty"`
  4. Selector *metav1.LabelSelector `json:"selector"`
  5. // pod模板
  6. Template v1.PodTemplateSpec `json:"template"`
  7. // 存储模板
  8. VolumeClaimTemplates []v1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`
  9. // 扩/缩容策略
  10. ScaleStrategy CloneSetScaleStrategy `json:"scaleStrategy,omitempty"`
  11. // 更新策略
  12. UpdateStrategy CloneSetUpdateStrategy `json:"updateStrategy,omitempty"`
  13. // 历史版本保留数量,默认为10
  14. RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
  15. // pod从ready状态持续该时间,才认为pod是可用状态
  16. // 默认为0
  17. MinReadySeconds int32 `json:"minReadySeconds,omitempty"`
  18. // 删除和原地升级自定义hook
  19. Lifecycle *Lifecycle `json:"lifecycle,omitempty"`
  20. }
  21. type CloneSetScaleStrategy struct {
  22. // 缩容的时候指定pod进行删除
  23. // 若数量没有发生变化,则删除再创建
  24. PodsToDelete []string `json:"podsToDelete,omitempty"`
  25. }
  26. // 升级策略
  27. type CloneSetUpdateStrategy struct {
  28. // 升级策略
  29. // ReCreate、InPlaceIfPossible、InPlaceOnly
  30. Type CloneSetUpdateStrategyType `json:"type,omitempty"`
  31. // 保留旧版本的数量,默认为0
  32. Partition *int32 `json:"partition,omitempty"`
  33. // 最大不可用数量,默认20%
  34. MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
  35. // 最大弹性数量
  36. // 如果发布的时候设置了 maxSurge,控制器会先多扩出来 maxSurge 数量的 Pod(此时 Pod 总数为 (replicas+maxSurge)),
  37. // 然后再开始发布存量的 Pod。 然后,当新版本 Pod 数量已经满足 partition 要求之后,控制器会再把多余的 maxSurge 数量的 Pod 删除掉,
  38. // 保证最终的 Pod 数量符合 replicas。
  39. // 要说明的是,maxSurge 不允许配合 InPlaceOnly 更新模式使用。
  40. // 另外,如果是与 InPlaceIfPossible 策略配合使用,控制器会先扩出来 maxSurge 数量的 Pod,
  41. // 再对存量 Pod 做原地升级。
  42. MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`
  43. // CloneSet是否暂停
  44. Paused bool `json:"paused,omitempty"`
  45. // 更新优先级
  46. PriorityStrategy *UpdatePriorityStrategy `json:"priorityStrategy,omitempty"`
  47. // 打散方式更新
  48. ScatterStrategy CloneSetUpdateScatterStrategy `json:"scatterStrategy,omitempty"`
  49. // 原地更新策略
  50. InPlaceUpdateStrategy *InPlaceUpdateStrategy `json:"inPlaceUpdateStrategy,omitempty"`
  51. }
  52. type CloneSetUpdateScatterStrategy []CloneSetUpdateScatterTerm
  53. type CloneSetUpdateScatterTerm struct {
  54. Key string `json:"key"`
  55. Value string `json:"value"`
  56. }
  57. type CloneSetUpdateStrategyType string
  58. type InPlaceUpdateStrategy struct {
  59. GracePeriodSeconds int32 `json:"gracePeriodSeconds,omitempty"`
  60. }

2、调谐函数

  1. func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {
  2. // fetch
  3. instance := &appsv1alpha1.CloneSet{}
  4. err := r.Get(context.TODO(), request.NamespacedName, instance)
  5. if err != nil {
  6. // 删除事件
  7. if errors.IsNotFound(err) {
  8. clonesetutils.ScaleExpectations.DeleteExpectations(request.String())
  9. clonesetutils.UpdateExpectations.DeleteExpectations(request.String())
  10. return reconcile.Result{}, nil
  11. }
  12. return reconcile.Result{}, err
  13. }
  14. coreControl := clonesetcore.New(instance)
  15. // 测试特性,永远返回false
  16. if coreControl.IsInitializing() {
  17. return reconcile.Result{}, nil
  18. }
  19. // 构造选择器
  20. selector, err := metav1.LabelSelectorAsSelector(instance.Spec.Selector)
  21. if err != nil {
  22. return reconcile.Result{}, nil
  23. }
  24. // 判断是否处在扩容状态
  25. // 若还处在扩容状态,则跳过本次循环
  26. if scaleSatisfied, unsatisfiedDuration, _ := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
  27. if unsatisfiedDuration >= expectations.ExpectationTimeout {
  28. return reconcile.Result{}, nil
  29. }
  30. return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
  31. }
  32. // 通过ownerRef索引获取此CloneSet关联的pod/pvc
  33. filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
  34. if err != nil {
  35. return reconcile.Result{}, err
  36. }
  37. // 认领pod,释放label不符合的pod/认领label符合的孤儿pod
  38. filteredPods, err = r.claimPods(instance, filteredPods)
  39. if err != nil {
  40. return reconcile.Result{}, err
  41. }
  42. // 获取历史版本
  43. // cloneset的历史版本是通过ControllerRevison维护,
  44. // 不同于Deployment通过直接维护repliset来维护历史Spec
  45. revisions, err := r.controllerHistory.ListControllerRevisions(instance, selector)
  46. if err != nil {
  47. return reconcile.Result{}, err
  48. }
  49. // 版本排序
  50. history.SortControllerRevisions(revisions)
  51. // 获取当前pod的Spec版本和即将要更新的Spec版本
  52. // 若当前Spec版本为nil,则当前Spec版本为即将要更新的Spec版本
  53. currentRevision, updateRevision, collisionCount, err := r.getActiveRevisions(instance, revisions, clonesetutils.GetPodsRevisions(filteredPods))
  54. if err != nil {
  55. return reconcile.Result{}, err
  56. }
  57. // 有可能是pod变更触发controller循环
  58. // 需要判断一下是否有pod已经变更了版本
  59. // 此处不在回调函数触发,是因为pod必须先要经过认领,
  60. // 为了减少认领操作,统一到ControllerLoop进行处理
  61. for _, pod := range filteredPods {
  62. clonesetutils.UpdateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)
  63. }
  64. // 是否正在进行的更新还没有完成
  65. if updateSatisfied, unsatisfiedDuration, _ := clonesetutils.UpdateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {
  66. if unsatisfiedDuration >= expectations.ExpectationTimeout {
  67. return reconcile.Result{}, nil
  68. }
  69. return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
  70. }
  71. for _, pod := range filteredPods {
  72. // 必须等待客户自定义回调全部处理完成才能进行下一轮同步
  73. // ResourceVersionExpectations是为了减少等待用户hook而无效的loop
  74. // 若进入了等待用户定义hook的状态,则Except Pod,只有当Pod发生变化才假定用户自定义hook完成
  75. // 否则跳过loop
  76. if isSatisfied, unsatisfiedDuration := clonesetutils.ResourceVersionExpectations.IsSatisfied(pod); !isSatisfied {
  77. if unsatisfiedDuration >= expectations.ExpectationTimeout {
  78. return reconcile.Result{}, nil
  79. }
  80. return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
  81. }
  82. }
  83. // 初始化新的Status
  84. newStatus := appsv1alpha1.CloneSetStatus{
  85. ObservedGeneration: instance.Generation,
  86. UpdateRevision: updateRevision.Name,
  87. CollisionCount: new(int32),
  88. LabelSelector: selector.String(),
  89. }
  90. *newStatus.CollisionCount = collisionCount
  91. // 开始进行扩/缩容、升级
  92. delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
  93. // 更新CloneSet状态
  94. if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {
  95. return reconcile.Result{}, err
  96. }
  97. // 对于用户指定的不存在的pod进行修正
  98. // 针对PodsToDelete参数
  99. if err = r.truncatePodsToDelete(instance, filteredPods); err != nil {
  100. klog.Warningf("Failed to truncate podsToDelete for %s: %v", request, err)
  101. }
  102. // 删除超过给定缓存历史Spec数量的旧版本ControllerRevision
  103. if err = r.truncateHistory(instance, filteredPods, revisions, currentRevision, updateRevision); err != nil {
  104. klog.Errorf("Failed to truncate history for %s: %v", request, err)
  105. }
  106. // 若符合同步成功、给定启动时间、可用数量少于期望数量
  107. // 则给定启动时间之后再将给Controller Key入对,更新Status
  108. if syncErr == nil && instance.Spec.MinReadySeconds > 0 && newStatus.AvailableReplicas != newStatus.ReadyReplicas {
  109. minReadyDuration := time.Second * time.Duration(instance.Spec.MinReadySeconds)
  110. if delayDuration == 0 || minReadyDuration < delayDuration {
  111. delayDuration = minReadyDuration
  112. }
  113. }
  114. return reconcile.Result{RequeueAfter: delayDuration}, syncErr
  115. }

3、同步函数

3.1、同步函数

  1. func (r *ReconcileCloneSet) syncCloneSet(
  2. instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,
  3. currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
  4. filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,
  5. ) (time.Duration, error) {
  6. // 若已经被删除,则直接return
  7. var delayDuration time.Duration
  8. if instance.DeletionTimestamp != nil {
  9. return delayDuration, nil
  10. }
  11. // 根据Revision复原CloneSet对象
  12. currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)
  13. if err != nil {
  14. return delayDuration, err
  15. }
  16. updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)
  17. if err != nil {
  18. return delayDuration, err
  19. }
  20. var scaling bool
  21. var podsScaleErr error
  22. var podsUpdateErr error
  23. // 开始扩容
  24. scaling, podsScaleErr = r.scaleControl.Manage(currentSet, updateSet, currentRevision.Name, updateRevision.Name, filteredPods, filteredPVCs)
  25. if podsScaleErr != nil {
  26. newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
  27. Type: appsv1alpha1.CloneSetConditionFailedScale,
  28. Status: v1.ConditionTrue,
  29. LastTransitionTime: metav1.Now(),
  30. Message: podsScaleErr.Error(),
  31. })
  32. err = podsScaleErr
  33. }
  34. // 若处在扩容状态,则直接return
  35. // 否则才进行更新
  36. if scaling {
  37. return delayDuration, podsScaleErr
  38. }
  39. // 开始更新
  40. delayDuration, podsUpdateErr = r.updateControl.Manage(updateSet, updateRevision, revisions, filteredPods, filteredPVCs)
  41. if podsUpdateErr != nil {
  42. newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
  43. Type: appsv1alpha1.CloneSetConditionFailedUpdate,
  44. Status: v1.ConditionTrue,
  45. LastTransitionTime: metav1.Now(),
  46. Message: podsUpdateErr.Error(),
  47. })
  48. if err == nil {
  49. err = podsUpdateErr
  50. }
  51. }
  52. return delayDuration, err
  53. }

3.2、扩容

3.2.1、扩容核心接口

1、首先通过managePreparingDelete和deletePods函数进行副本数量的控制 2、其次才进行升级相关的数量控制

  1. func (r *realControl) Manage(
  2. currentCS, updateCS *appsv1alpha1.CloneSet,
  3. currentRevision, updateRevision string,
  4. pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
  5. ) (bool, error) {
  6. if updateCS.Spec.Replicas == nil {
  7. return false, fmt.Errorf("spec.Replicas is nil")
  8. }
  9. coreControl := clonesetcore.New(updateCS)
  10. if !coreControl.IsReadyToScale() {
  11. return false, nil
  12. }
  13. // 获取计划要删除的pod
  14. // 包含用户通过PodToDelete指定和已经处于PreDelete状态的pod
  15. podsSpecifiedToDelete, podsInPreDelete := getPlannedDeletedPods(updateCS, pods)
  16. // 由于用户指定要删除的pod和PreDelete状态的pod有可能有重合,因此需要merge
  17. // 下面的逻辑只要发生了pod变更或者删除操作,就直接返回
  18. podsToDelete := util.MergePods(podsSpecifiedToDelete, podsInPreDelete)
  19. if len(podsToDelete) > 0 {
  20. if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, len(podsToDelete)); err != nil || modified {
  21. return modified, err
  22. }
  23. if modified, err := r.deletePods(updateCS, podsToDelete, pvcs); err != nil || modified {
  24. return modified, err
  25. }
  26. }
  27. // 根据版本获取已经更新的pod和还未更新的pod,并通过旧版本保留数和弹性数量计算diff值
  28. updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)
  29. diff, currentRevDiff := calculateDiffs(updateCS, updateRevision == currentRevision, len(pods), len(notUpdatedPods))
  30. // 小于0则需要创建pod
  31. if diff < 0 {
  32. // 需要创建的总数
  33. expectedCreations := diff * -1
  34. // 缺少的当前的版本的数量
  35. // 针对当前旧版本数量少于给定保留旧版本数量的情况
  36. expectedCurrentCreations := 0
  37. if currentRevDiff < 0 {
  38. expectedCurrentCreations = currentRevDiff * -1
  39. }
  40. // 可用实例id
  41. availableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs)
  42. // 存在的pvc名字
  43. existingPVCNames := sets.NewString()
  44. for _, pvc := range pvcs {
  45. existingPVCNames.Insert(pvc.Name)
  46. }
  47. // 创建pod
  48. return r.createPods(expectedCreations, expectedCurrentCreations,
  49. currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)
  50. } else if diff > 0 {
  51. // 否则删除pod
  52. // 若当前存在要删除的pod,则先等待删除完成,再删除剩余pod
  53. if len(podsToDelete) > 0 {
  54. return false, nil
  55. }
  56. // 选择要删除的pod
  57. podsToDelete := choosePodsToDelete(diff, currentRevDiff, notUpdatedPods, updatedPods)
  58. return r.deletePods(updateCS, podsToDelete, pvcs)
  59. }
  60. return false, nil
  61. }
  62. func (r *realControl) managePreparingDelete(cs *appsv1alpha1.CloneSet, pods, podsInPreDelete []*v1.Pod, numToDelete int) (bool, error) {
  63. // diff为还需要删除的数量
  64. // 若要删除的数量,删除完成后副本数量达不到期望,将PreDelete的pod转为Normal状态
  65. diff := int(*cs.Spec.Replicas) - len(pods) + numToDelete
  66. var modified bool
  67. for _, pod := range podsInPreDelete {
  68. if diff <= 0 {
  69. return modified, nil
  70. }
  71. if isPodSpecifiedDelete(cs, pod) {
  72. continue
  73. }
  74. if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStateNormal); err != nil {
  75. return modified, err
  76. } else if patched {
  77. modified = true
  78. clonesetutils.ResourceVersionExpectations.Expect(pod)
  79. }
  80. diff--
  81. }
  82. return modified, nil
  83. }
  84. func (r *realControl) createPods(
  85. expectedCreations, expectedCurrentCreations int,
  86. currentCS, updateCS *appsv1alpha1.CloneSet,
  87. currentRevision, updateRevision string,
  88. availableIDs []string, existingPVCNames sets.String,
  89. ) (bool, error) {
  90. coreControl := clonesetcore.New(updateCS)
  91. newPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision,
  92. expectedCreations, expectedCurrentCreations, availableIDs)
  93. if err != nil {
  94. return false, err
  95. }
  96. podsCreationChan := make(chan *v1.Pod, len(newPods))
  97. for _, p := range newPods {
  98. clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)
  99. podsCreationChan <- p
  100. }
  101. var created int64
  102. successPodNames := sync.Map{}
  103. _, err = clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error {
  104. pod := <-podsCreationChan
  105. cs := updateCS
  106. if pod.Labels[apps.ControllerRevisionHashLabelKey] == currentRevision {
  107. cs = currentCS
  108. }
  109. lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateNormal)(pod)
  110. var createErr error
  111. if createErr = r.createOnePod(cs, pod, existingPVCNames); createErr != nil {
  112. return createErr
  113. }
  114. atomic.AddInt64(&created, 1)
  115. successPodNames.Store(pod.Name, struct{}{})
  116. return nil
  117. })
  118. for _, pod := range newPods {
  119. if _, ok := successPodNames.Load(pod.Name); !ok {
  120. clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)
  121. }
  122. }
  123. if created == 0 {
  124. return false, err
  125. }
  126. return true, err
  127. }
  128. func (r *realControl) createOnePod(cs *appsv1alpha1.CloneSet, pod *v1.Pod, existingPVCNames sets.String) error {
  129. claims := clonesetutils.GetPersistentVolumeClaims(cs, pod)
  130. for _, c := range claims {
  131. if existingPVCNames.Has(c.Name) {
  132. continue
  133. }
  134. clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
  135. if err := r.Create(context.TODO(), &c); err != nil {
  136. clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
  137. r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pvc: %v, pvc: %v", err, util.DumpJSON(c))
  138. return err
  139. }
  140. }
  141. if err := r.Create(context.TODO(), pod); err != nil {
  142. r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pod: %v, pod: %v", err, util.DumpJSON(pod))
  143. return err
  144. }
  145. r.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulCreate", "succeed to create pod %s", pod.Name)
  146. return nil
  147. }
  148. func (r *realControl) deletePods(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (bool, error) {
  149. var modified bool
  150. for _, pod := range podsToDelete {
  151. if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.PreDelete, pod) {
  152. if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStatePreparingDelete); err != nil {
  153. return false, err
  154. } else if patched {
  155. modified = true
  156. clonesetutils.ResourceVersionExpectations.Expect(pod)
  157. }
  158. continue
  159. }
  160. clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
  161. if err := r.Delete(context.TODO(), pod); err != nil {
  162. clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
  163. r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
  164. return modified, err
  165. }
  166. modified = true
  167. r.recorder.Event(cs, v1.EventTypeNormal, "SuccessfulDelete", fmt.Sprintf("succeed to delete pod %s", pod.Name))
  168. for _, pvc := range pvcs {
  169. if pvc.Labels[appsv1alpha1.CloneSetInstanceID] != pod.Labels[appsv1alpha1.CloneSetInstanceID] {
  170. continue
  171. }
  172. clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
  173. if err := r.Delete(context.TODO(), pvc); err != nil {
  174. clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
  175. r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pvc %s: %v", pvc.Name, err)
  176. return modified, err
  177. }
  178. }
  179. }
  180. return modified, nil
  181. }

3.2.2、扩容工具函数

  1. // 判断pod是否被指定删除
  2. // 1、判断是否有apps.kruise.io/specified-delete Label
  3. // 2、是否出现在PodsToDelete参数中
  4. func isPodSpecifiedDelete(cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
  5. if specifieddelete.IsSpecifiedDelete(pod) {
  6. return true
  7. }
  8. for _, name := range cs.Spec.ScaleStrategy.PodsToDelete {
  9. if name == pod.Name {
  10. return true
  11. }
  12. }
  13. return false
  14. }
  15. // 获取被指定删除的pod和处于PreDelete状态的Pod
  16. func getPlannedDeletedPods(cs *appsv1alpha1.CloneSet, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) {
  17. var podsSpecifiedToDelete []*v1.Pod
  18. var podsInPreDelete []*v1.Pod
  19. for _, pod := range pods {
  20. if isPodSpecifiedDelete(cs, pod) {
  21. podsSpecifiedToDelete = append(podsSpecifiedToDelete, pod)
  22. }
  23. // 根据lifecycle.apps.kruise.io/state Label进行校验
  24. if lifecycle.GetPodLifecycleState(pod) == appsv1alpha1.LifecycleStatePreparingDelete {
  25. podsInPreDelete = append(podsInPreDelete, pod)
  26. }
  27. }
  28. return podsSpecifiedToDelete, podsInPreDelete
  29. }
  30. // 获取可用id,如果可以复用,则进行复用
  31. func getOrGenAvailableIDs(num int, pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) sets.String {
  32. existingIDs := sets.NewString()
  33. availableIDs := sets.NewString()
  34. for _, pvc := range pvcs {
  35. if id := pvc.Labels[appsv1alpha1.CloneSetInstanceID]; len(id) > 0 {
  36. existingIDs.Insert(id)
  37. availableIDs.Insert(id)
  38. }
  39. }
  40. for _, pod := range pods {
  41. if id := pod.Labels[appsv1alpha1.CloneSetInstanceID]; len(id) > 0 {
  42. existingIDs.Insert(id)
  43. availableIDs.Delete(id)
  44. }
  45. }
  46. retIDs := sets.NewString()
  47. for i := 0; i < num; i++ {
  48. id := getOrGenInstanceID(existingIDs, availableIDs)
  49. retIDs.Insert(id)
  50. }
  51. return retIDs
  52. }
  53. // 获取或者生成一个InstanceID
  54. func getOrGenInstanceID(existingIDs, availableIDs sets.String) string {
  55. id, _ := availableIDs.PopAny()
  56. if len(id) == 0 {
  57. for {
  58. id = rand.String(LengthOfInstanceID)
  59. if !existingIDs.Has(id) {
  60. break
  61. }
  62. }
  63. }
  64. return id
  65. }
  66. // 计算diff(若小于0,则需要创建;否则需要删除)
  67. func calculateDiffs(cs *appsv1alpha1.CloneSet, revConsistent bool, totalPods int, notUpdatedPods int) (totalDiff int, currentRevDiff int) {
  68. var maxSurge int
  69. // 若同时有升级,则需要考虑保留旧版本数量
  70. if !revConsistent {
  71. if cs.Spec.UpdateStrategy.Partition != nil {
  72. currentRevDiff = notUpdatedPods - integer.IntMin(int(*cs.Spec.UpdateStrategy.Partition), int(*cs.Spec.Replicas))
  73. }
  74. if currentRevDiff > 0 {
  75. if cs.Spec.UpdateStrategy.MaxSurge != nil {
  76. maxSurge, _ = intstrutil.GetValueFromIntOrPercent(cs.Spec.UpdateStrategy.MaxSurge, int(*cs.Spec.Replicas), true)
  77. maxSurge = integer.IntMin(maxSurge, currentRevDiff)
  78. }
  79. }
  80. }
  81. // 若只是扩缩容,则减去副本数量(弹性扩容数量为0)
  82. totalDiff = totalPods - int(*cs.Spec.Replicas) - maxSurge
  83. return
  84. }
  85. // 选择要删除的pod
  86. func choosePodsToDelete(totalDiff int, currentRevDiff int, notUpdatedPods, updatedPods []*v1.Pod) []*v1.Pod {
  87. choose := func(pods []*v1.Pod, diff int) []*v1.Pod {
  88. if diff < len(pods) {
  89. sort.Sort(kubecontroller.ActivePods(pods))
  90. } else if diff > len(pods) {
  91. klog.Warningf("Diff > len(pods) in choosePodsToDelete func which is not expected.")
  92. return pods
  93. }
  94. return pods[:diff]
  95. }
  96. var podsToDelete []*v1.Pod
  97. if currentRevDiff >= totalDiff {
  98. // 若旧版本diff数量大于totalDiff
  99. // 则直接从旧版本中删除totalDiff pod
  100. podsToDelete = choose(notUpdatedPods, totalDiff)
  101. } else if currentRevDiff > 0 {
  102. // 否则先从notUpdatedPods删除currentRevDiff pods
  103. // 然后从updatedPods删除剩余pods
  104. podsToDelete = choose(notUpdatedPods, currentRevDiff)
  105. podsToDelete = append(podsToDelete, choose(updatedPods, totalDiff-currentRevDiff)...)
  106. } else {
  107. // 从updatedPods删除totalDiff pods
  108. podsToDelete = choose(updatedPods, totalDiff)
  109. }
  110. return podsToDelete
  111. }

3.3、升级

3.3.1、升级核心接口

  1. func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
  2. updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
  3. pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
  4. ) (time.Duration, error) {
  5. requeueDuration := requeueduration.Duration{}
  6. coreControl := clonesetcore.New(cs)
  7. if cs.Spec.UpdateStrategy.Paused {
  8. return requeueDuration.Get(), nil
  9. }
  10. // 刷新所有的pod状态
  11. var modified bool
  12. for _, pod := range pods {
  13. patched, duration, err := c.refreshPodState(cs, coreControl, pod)
  14. if err != nil {
  15. return 0, err
  16. } else if duration > 0 {
  17. requeueDuration.Update(duration)
  18. }
  19. if patched {
  20. modified = true
  21. }
  22. }
  23. if modified {
  24. return requeueDuration.Get(), nil
  25. }
  26. // 找到所有等待更新的pod
  27. // 满足:
  28. // 1、当前pod的revision不为给定的revision
  29. // 2、不处于PreparingDelete、Updated状态
  30. var waitUpdateIndexes []int
  31. for i := range pods {
  32. if coreControl.IsPodUpdatePaused(pods[i]) {
  33. continue
  34. }
  35. if clonesetutils.GetPodRevision(pods[i]) != updateRevision.Name {
  36. switch lifecycle.GetPodLifecycleState(pods[i]) {
  37. case appsv1alpha1.LifecycleStatePreparingDelete, appsv1alpha1.LifecycleStateUpdated:
  38. default:
  39. waitUpdateIndexes = append(waitUpdateIndexes, i)
  40. }
  41. }
  42. }
  43. // 根据优先级策略和打散策略给待升级的pod进行排序
  44. waitUpdateIndexes = sortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, pods, waitUpdateIndexes)
  45. // 计算最大可以扩容的数量
  46. needToUpdateCount := calculateUpdateCount(coreControl, cs.Spec.UpdateStrategy, cs.Spec.MinReadySeconds, int(*cs.Spec.Replicas), waitUpdateIndexes, pods)
  47. if needToUpdateCount < len(waitUpdateIndexes) {
  48. waitUpdateIndexes = waitUpdateIndexes[:needToUpdateCount]
  49. }
  50. // 更新pod
  51. for _, idx := range waitUpdateIndexes {
  52. pod := pods[idx]
  53. if duration, err := c.updatePod(cs, coreControl, updateRevision, revisions, pod, pvcs); err != nil {
  54. return requeueDuration.Get(), err
  55. } else if duration > 0 {
  56. requeueDuration.Update(duration)
  57. }
  58. }
  59. return requeueDuration.Get(), nil
  60. }

3.3.2、升级工具函数

  1. func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control, pod *v1.Pod) (bool, time.Duration, error) {
  2. opts := coreControl.GetUpdateOptions()
  3. res := c.inplaceControl.Refresh(pod, opts)
  4. if res.RefreshErr != nil {
  5. return false, 0, res.RefreshErr
  6. }
  7. var state appsv1alpha1.LifecycleStateType
  8. switch lifecycle.GetPodLifecycleState(pod) {
  9. case appsv1alpha1.LifecycleStateUpdating:
  10. checkFunc := inplaceupdate.CheckInPlaceUpdateCompleted
  11. if opts != nil && opts.CustomizeCheckUpdateCompleted != nil {
  12. checkFunc = opts.CustomizeCheckUpdateCompleted
  13. }
  14. if checkFunc(pod) == nil {
  15. if cs.Spec.Lifecycle != nil && !lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
  16. state = appsv1alpha1.LifecycleStateUpdated
  17. } else {
  18. state = appsv1alpha1.LifecycleStateNormal
  19. }
  20. }
  21. case appsv1alpha1.LifecycleStateUpdated:
  22. if cs.Spec.Lifecycle == nil ||
  23. cs.Spec.Lifecycle.InPlaceUpdate == nil ||
  24. lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
  25. state = appsv1alpha1.LifecycleStateNormal
  26. }
  27. }
  28. if state != "" {
  29. if patched, err := lifecycle.PatchPodLifecycle(c, pod, state); err != nil {
  30. return false, 0, err
  31. } else if patched {
  32. clonesetutils.ResourceVersionExpectations.Expect(pod)
  33. return true, res.DelayDuration, nil
  34. }
  35. }
  36. return false, res.DelayDuration, nil
  37. }
  38. func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
  39. updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
  40. pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
  41. ) (time.Duration, error) {
  42. if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType ||
  43. cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
  44. var oldRevision *apps.ControllerRevision
  45. for _, r := range revisions {
  46. if r.Name == clonesetutils.GetPodRevision(pod) {
  47. oldRevision = r
  48. break
  49. }
  50. }
  51. if c.inplaceControl.CanUpdateInPlace(oldRevision, updateRevision, coreControl.GetUpdateOptions()) {
  52. if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
  53. if patched, err := lifecycle.PatchPodLifecycle(c, pod, appsv1alpha1.LifecycleStatePreparingUpdate); err != nil {
  54. return 0, err
  55. } else if patched {
  56. clonesetutils.ResourceVersionExpectations.Expect(pod)
  57. }
  58. return 0, nil
  59. }
  60. opts := coreControl.GetUpdateOptions()
  61. opts.AdditionalFuncs = append(opts.AdditionalFuncs, lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateUpdating))
  62. res := c.inplaceControl.Update(pod, oldRevision, updateRevision, opts)
  63. if res.InPlaceUpdate {
  64. if res.UpdateErr == nil {
  65. clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
  66. return res.DelayDuration, nil
  67. }
  68. return res.DelayDuration, res.UpdateErr
  69. }
  70. }
  71. if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
  72. return 0, fmt.Errorf("find Pod %s update strategy is InPlaceOnly but can not update in-place", pod.Name)
  73. }
  74. }
  75. // 若更新策略不为原地升级类型,直接修改Pod labels apps.kruise.io/specified-delete为true
  76. if patched, err := specifieddelete.PatchPodSpecifiedDelete(c, pod, "true"); err != nil {
  77. return 0, err
  78. } else if patched {
  79. clonesetutils.ResourceVersionExpectations.Expect(pod)
  80. }
  81. return 0, nil
  82. }
  83. func sortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, pods []*v1.Pod, waitUpdateIndexes []int) []int {
  84. sort.Slice(waitUpdateIndexes, coreControl.GetPodsSortFunc(pods, waitUpdateIndexes))
  85. if strategy.PriorityStrategy != nil {
  86. waitUpdateIndexes = updatesort.NewPrioritySorter(strategy.PriorityStrategy).Sort(pods, waitUpdateIndexes)
  87. }
  88. if strategy.ScatterStrategy != nil {
  89. waitUpdateIndexes = updatesort.NewScatterSorter(strategy.ScatterStrategy).Sort(pods, waitUpdateIndexes)
  90. }
  91. sort.Slice(waitUpdateIndexes, func(i, j int) bool {
  92. preparingUpdateI := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[i]]) == appsv1alpha1.LifecycleStatePreparingUpdate
  93. preparingUpdateJ := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[j]]) == appsv1alpha1.LifecycleStatePreparingUpdate
  94. if preparingUpdateI != preparingUpdateJ {
  95. return preparingUpdateI
  96. }
  97. return false
  98. })
  99. return waitUpdateIndexes
  100. }
  101. // 计算要更新的数量
  102. func calculateUpdateCount(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, minReadySeconds int32, totalReplicas int, waitUpdateIndexes []int, pods []*v1.Pod) int {
  103. partition := 0
  104. if strategy.Partition != nil {
  105. partition = int(*strategy.Partition)
  106. }
  107. // 保留旧版本数量
  108. if len(waitUpdateIndexes)-partition <= 0 {
  109. return 0
  110. }
  111. waitUpdateIndexes = waitUpdateIndexes[:(len(waitUpdateIndexes) - partition)]
  112. roundUp := true
  113. if strategy.MaxSurge != nil {
  114. maxSurge, _ := intstrutil.GetValueFromIntOrPercent(strategy.MaxSurge, totalReplicas, true)
  115. roundUp = maxSurge == 0
  116. }
  117. maxUnavailable, _ := intstrutil.GetValueFromIntOrPercent(
  118. intstrutil.ValueOrDefault(strategy.MaxUnavailable, intstrutil.FromString(appsv1alpha1.DefaultCloneSetMaxUnavailable)), totalReplicas, roundUp)
  119. usedSurge := len(pods) - totalReplicas
  120. // 可升级数量理论上为:
  121. // 已经在升级但是状态不为Ready的 ➕ 等待升级的数量
  122. var notReadyCount, updateCount int
  123. for _, p := range pods {
  124. if !isPodReady(coreControl, p, minReadySeconds) {
  125. notReadyCount++
  126. }
  127. }
  128. for _, i := range waitUpdateIndexes {
  129. if isPodReady(coreControl, pods[i], minReadySeconds) {
  130. if notReadyCount >= (maxUnavailable + usedSurge) {
  131. break
  132. } else {
  133. notReadyCount++
  134. }
  135. }
  136. updateCount++
  137. }
  138. return updateCount
  139. }
  140. // pod ready要求
  141. // 1、lifecycle state为Normal
  142. // 2、pod.Status.Phase == v1.PodRunning
  143. // 3、pod是Available状态
  144. // 4、condition InPlaceUpdateReady值为True
  145. func isPodReady(coreControl clonesetcore.Control, pod *v1.Pod, minReadySeconds int32) bool {
  146. state := lifecycle.GetPodLifecycleState(pod)
  147. if state != "" && state != appsv1alpha1.LifecycleStateNormal {
  148. return false
  149. }
  150. return coreControl.IsPodUpdateReady(pod, minReadySeconds)
  151. }

3.3.3、原地升级函数

4、整体逻辑