0、前置概念
0.1、readinessGates
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate
1、Spec定义
type CloneSetSpec struct {// 副本数量,默认为1Replicas *int32 `json:"replicas,omitempty"`Selector *metav1.LabelSelector `json:"selector"`// pod模板Template v1.PodTemplateSpec `json:"template"`// 存储模板VolumeClaimTemplates []v1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`// 扩/缩容策略ScaleStrategy CloneSetScaleStrategy `json:"scaleStrategy,omitempty"`// 更新策略UpdateStrategy CloneSetUpdateStrategy `json:"updateStrategy,omitempty"`// 历史版本保留数量,默认为10RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`// pod从ready状态持续该时间,才认为pod是可用状态// 默认为0MinReadySeconds int32 `json:"minReadySeconds,omitempty"`// 删除和原地升级自定义hookLifecycle *Lifecycle `json:"lifecycle,omitempty"`}type CloneSetScaleStrategy struct {// 缩容的时候指定pod进行删除// 若数量没有发生变化,则删除再创建PodsToDelete []string `json:"podsToDelete,omitempty"`}// 升级策略type CloneSetUpdateStrategy struct {// 升级策略// ReCreate、InPlaceIfPossible、InPlaceOnlyType CloneSetUpdateStrategyType `json:"type,omitempty"`// 保留旧版本的数量,默认为0Partition *int32 `json:"partition,omitempty"`// 最大不可用数量,默认20%MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`// 最大弹性数量// 如果发布的时候设置了 maxSurge,控制器会先多扩出来 maxSurge 数量的 Pod(此时 Pod 总数为 (replicas+maxSurge)),// 然后再开始发布存量的 Pod。 然后,当新版本 Pod 数量已经满足 partition 要求之后,控制器会再把多余的 maxSurge 数量的 Pod 删除掉,// 保证最终的 Pod 数量符合 replicas。// 要说明的是,maxSurge 不允许配合 InPlaceOnly 更新模式使用。// 另外,如果是与 InPlaceIfPossible 策略配合使用,控制器会先扩出来 maxSurge 数量的 Pod,// 再对存量 Pod 做原地升级。MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`// CloneSet是否暂停Paused bool `json:"paused,omitempty"`// 更新优先级PriorityStrategy *UpdatePriorityStrategy `json:"priorityStrategy,omitempty"`// 打散方式更新ScatterStrategy CloneSetUpdateScatterStrategy `json:"scatterStrategy,omitempty"`// 原地更新策略InPlaceUpdateStrategy *InPlaceUpdateStrategy `json:"inPlaceUpdateStrategy,omitempty"`}type CloneSetUpdateScatterStrategy []CloneSetUpdateScatterTermtype CloneSetUpdateScatterTerm struct {Key string `json:"key"`Value string `json:"value"`}type CloneSetUpdateStrategyType stringtype InPlaceUpdateStrategy struct {GracePeriodSeconds int32 `json:"gracePeriodSeconds,omitempty"`}
2、调谐函数
func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {// fetchinstance := &appsv1alpha1.CloneSet{}err := r.Get(context.TODO(), request.NamespacedName, instance)if err != nil {// 删除事件if errors.IsNotFound(err) {clonesetutils.ScaleExpectations.DeleteExpectations(request.String())clonesetutils.UpdateExpectations.DeleteExpectations(request.String())return reconcile.Result{}, nil}return reconcile.Result{}, err}coreControl := clonesetcore.New(instance)// 测试特性,永远返回falseif coreControl.IsInitializing() {return reconcile.Result{}, nil}// 构造选择器selector, err := metav1.LabelSelectorAsSelector(instance.Spec.Selector)if err != nil {return reconcile.Result{}, nil}// 判断是否处在扩容状态// 若还处在扩容状态,则跳过本次循环if scaleSatisfied, unsatisfiedDuration, _ := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {if unsatisfiedDuration >= expectations.ExpectationTimeout {return reconcile.Result{}, nil}return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil}// 通过ownerRef索引获取此CloneSet关联的pod/pvcfilteredPods, filteredPVCs, err := r.getOwnedResource(instance)if err != nil {return reconcile.Result{}, err}// 认领pod,释放label不符合的pod/认领label符合的孤儿podfilteredPods, err = r.claimPods(instance, filteredPods)if err != nil {return reconcile.Result{}, err}// 获取历史版本// cloneset的历史版本是通过ControllerRevison维护,// 不同于Deployment通过直接维护repliset来维护历史Specrevisions, err := r.controllerHistory.ListControllerRevisions(instance, selector)if err != nil {return reconcile.Result{}, err}// 版本排序history.SortControllerRevisions(revisions)// 获取当前pod的Spec版本和即将要更新的Spec版本// 若当前Spec版本为nil,则当前Spec版本为即将要更新的Spec版本currentRevision, updateRevision, collisionCount, err := r.getActiveRevisions(instance, revisions, clonesetutils.GetPodsRevisions(filteredPods))if err != nil {return reconcile.Result{}, err}// 有可能是pod变更触发controller循环// 需要判断一下是否有pod已经变更了版本// 此处不在回调函数触发,是因为pod必须先要经过认领,// 为了减少认领操作,统一到ControllerLoop进行处理for _, pod := range filteredPods {clonesetutils.UpdateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)}// 是否正在进行的更新还没有完成if updateSatisfied, unsatisfiedDuration, _ := clonesetutils.UpdateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {if unsatisfiedDuration >= expectations.ExpectationTimeout {return reconcile.Result{}, nil}return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil}for _, pod := range filteredPods {// 必须等待客户自定义回调全部处理完成才能进行下一轮同步// ResourceVersionExpectations是为了减少等待用户hook而无效的loop// 若进入了等待用户定义hook的状态,则Except Pod,只有当Pod发生变化才假定用户自定义hook完成// 否则跳过loopif isSatisfied, unsatisfiedDuration := clonesetutils.ResourceVersionExpectations.IsSatisfied(pod); !isSatisfied {if unsatisfiedDuration >= expectations.ExpectationTimeout {return reconcile.Result{}, nil}return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil}}// 初始化新的StatusnewStatus := appsv1alpha1.CloneSetStatus{ObservedGeneration: instance.Generation,UpdateRevision: updateRevision.Name,CollisionCount: new(int32),LabelSelector: selector.String(),}*newStatus.CollisionCount = collisionCount// 开始进行扩/缩容、升级delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)// 更新CloneSet状态if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {return reconcile.Result{}, err}// 对于用户指定的不存在的pod进行修正// 针对PodsToDelete参数if err = r.truncatePodsToDelete(instance, filteredPods); err != nil {klog.Warningf("Failed to truncate podsToDelete for %s: %v", request, err)}// 删除超过给定缓存历史Spec数量的旧版本ControllerRevisionif err = r.truncateHistory(instance, filteredPods, revisions, currentRevision, updateRevision); err != nil {klog.Errorf("Failed to truncate history for %s: %v", request, err)}// 若符合同步成功、给定启动时间、可用数量少于期望数量// 则给定启动时间之后再将给Controller Key入对,更新Statusif syncErr == nil && instance.Spec.MinReadySeconds > 0 && newStatus.AvailableReplicas != newStatus.ReadyReplicas {minReadyDuration := time.Second * time.Duration(instance.Spec.MinReadySeconds)if delayDuration == 0 || minReadyDuration < delayDuration {delayDuration = minReadyDuration}}return reconcile.Result{RequeueAfter: delayDuration}, syncErr}
3、同步函数
3.1、同步函数
func (r *ReconcileCloneSet) syncCloneSet(instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,) (time.Duration, error) {// 若已经被删除,则直接returnvar delayDuration time.Durationif instance.DeletionTimestamp != nil {return delayDuration, nil}// 根据Revision复原CloneSet对象currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)if err != nil {return delayDuration, err}updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)if err != nil {return delayDuration, err}var scaling boolvar podsScaleErr errorvar podsUpdateErr error// 开始扩容scaling, podsScaleErr = r.scaleControl.Manage(currentSet, updateSet, currentRevision.Name, updateRevision.Name, filteredPods, filteredPVCs)if podsScaleErr != nil {newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{Type: appsv1alpha1.CloneSetConditionFailedScale,Status: v1.ConditionTrue,LastTransitionTime: metav1.Now(),Message: podsScaleErr.Error(),})err = podsScaleErr}// 若处在扩容状态,则直接return// 否则才进行更新if scaling {return delayDuration, podsScaleErr}// 开始更新delayDuration, podsUpdateErr = r.updateControl.Manage(updateSet, updateRevision, revisions, filteredPods, filteredPVCs)if podsUpdateErr != nil {newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{Type: appsv1alpha1.CloneSetConditionFailedUpdate,Status: v1.ConditionTrue,LastTransitionTime: metav1.Now(),Message: podsUpdateErr.Error(),})if err == nil {err = podsUpdateErr}}return delayDuration, err}
3.2、扩容
3.2.1、扩容核心接口
1、首先通过managePreparingDelete和deletePods函数进行副本数量的控制 2、其次才进行升级相关的数量控制
func (r *realControl) Manage(currentCS, updateCS *appsv1alpha1.CloneSet,currentRevision, updateRevision string,pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,) (bool, error) {if updateCS.Spec.Replicas == nil {return false, fmt.Errorf("spec.Replicas is nil")}coreControl := clonesetcore.New(updateCS)if !coreControl.IsReadyToScale() {return false, nil}// 获取计划要删除的pod// 包含用户通过PodToDelete指定和已经处于PreDelete状态的podpodsSpecifiedToDelete, podsInPreDelete := getPlannedDeletedPods(updateCS, pods)// 由于用户指定要删除的pod和PreDelete状态的pod有可能有重合,因此需要merge// 下面的逻辑只要发生了pod变更或者删除操作,就直接返回podsToDelete := util.MergePods(podsSpecifiedToDelete, podsInPreDelete)if len(podsToDelete) > 0 {if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, len(podsToDelete)); err != nil || modified {return modified, err}if modified, err := r.deletePods(updateCS, podsToDelete, pvcs); err != nil || modified {return modified, err}}// 根据版本获取已经更新的pod和还未更新的pod,并通过旧版本保留数和弹性数量计算diff值updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)diff, currentRevDiff := calculateDiffs(updateCS, updateRevision == currentRevision, len(pods), len(notUpdatedPods))// 小于0则需要创建podif diff < 0 {// 需要创建的总数expectedCreations := diff * -1// 缺少的当前的版本的数量// 针对当前旧版本数量少于给定保留旧版本数量的情况expectedCurrentCreations := 0if currentRevDiff < 0 {expectedCurrentCreations = currentRevDiff * -1}// 可用实例idavailableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs)// 存在的pvc名字existingPVCNames := sets.NewString()for _, pvc := range pvcs {existingPVCNames.Insert(pvc.Name)}// 创建podreturn r.createPods(expectedCreations, expectedCurrentCreations,currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)} else if diff > 0 {// 否则删除pod// 若当前存在要删除的pod,则先等待删除完成,再删除剩余podif len(podsToDelete) > 0 {return false, nil}// 选择要删除的podpodsToDelete := choosePodsToDelete(diff, currentRevDiff, notUpdatedPods, updatedPods)return r.deletePods(updateCS, podsToDelete, pvcs)}return false, nil}func (r *realControl) managePreparingDelete(cs *appsv1alpha1.CloneSet, pods, podsInPreDelete []*v1.Pod, numToDelete int) (bool, error) {// diff为还需要删除的数量// 若要删除的数量,删除完成后副本数量达不到期望,将PreDelete的pod转为Normal状态diff := int(*cs.Spec.Replicas) - len(pods) + numToDeletevar modified boolfor _, pod := range podsInPreDelete {if diff <= 0 {return modified, nil}if isPodSpecifiedDelete(cs, pod) {continue}if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStateNormal); err != nil {return modified, err} else if patched {modified = trueclonesetutils.ResourceVersionExpectations.Expect(pod)}diff--}return modified, nil}func (r *realControl) createPods(expectedCreations, expectedCurrentCreations int,currentCS, updateCS *appsv1alpha1.CloneSet,currentRevision, updateRevision string,availableIDs []string, existingPVCNames sets.String,) (bool, error) {coreControl := clonesetcore.New(updateCS)newPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision,expectedCreations, expectedCurrentCreations, availableIDs)if err != nil {return false, err}podsCreationChan := make(chan *v1.Pod, len(newPods))for _, p := range newPods {clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)podsCreationChan <- p}var created int64successPodNames := sync.Map{}_, err = clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error {pod := <-podsCreationChancs := updateCSif pod.Labels[apps.ControllerRevisionHashLabelKey] == currentRevision {cs = currentCS}lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateNormal)(pod)var createErr errorif createErr = r.createOnePod(cs, pod, existingPVCNames); createErr != nil {return createErr}atomic.AddInt64(&created, 1)successPodNames.Store(pod.Name, struct{}{})return nil})for _, pod := range newPods {if _, ok := successPodNames.Load(pod.Name); !ok {clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)}}if created == 0 {return false, err}return true, err}func (r *realControl) createOnePod(cs *appsv1alpha1.CloneSet, pod *v1.Pod, existingPVCNames sets.String) error {claims := clonesetutils.GetPersistentVolumeClaims(cs, pod)for _, c := range claims {if existingPVCNames.Has(c.Name) {continue}clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)if err := r.Create(context.TODO(), &c); err != nil {clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pvc: %v, pvc: %v", err, util.DumpJSON(c))return err}}if err := r.Create(context.TODO(), pod); err != nil {r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pod: %v, pod: %v", err, util.DumpJSON(pod))return err}r.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulCreate", "succeed to create pod %s", pod.Name)return nil}func (r *realControl) deletePods(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (bool, error) {var modified boolfor _, pod := range podsToDelete {if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.PreDelete, pod) {if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStatePreparingDelete); err != nil {return false, err} else if patched {modified = trueclonesetutils.ResourceVersionExpectations.Expect(pod)}continue}clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)if err := r.Delete(context.TODO(), pod); err != nil {clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)return modified, err}modified = truer.recorder.Event(cs, v1.EventTypeNormal, "SuccessfulDelete", fmt.Sprintf("succeed to delete pod %s", pod.Name))for _, pvc := range pvcs {if pvc.Labels[appsv1alpha1.CloneSetInstanceID] != pod.Labels[appsv1alpha1.CloneSetInstanceID] {continue}clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)if err := r.Delete(context.TODO(), pvc); err != nil {clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pvc %s: %v", pvc.Name, err)return modified, err}}}return modified, nil}
3.2.2、扩容工具函数
// 判断pod是否被指定删除// 1、判断是否有apps.kruise.io/specified-delete Label// 2、是否出现在PodsToDelete参数中func isPodSpecifiedDelete(cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {if specifieddelete.IsSpecifiedDelete(pod) {return true}for _, name := range cs.Spec.ScaleStrategy.PodsToDelete {if name == pod.Name {return true}}return false}// 获取被指定删除的pod和处于PreDelete状态的Podfunc getPlannedDeletedPods(cs *appsv1alpha1.CloneSet, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) {var podsSpecifiedToDelete []*v1.Podvar podsInPreDelete []*v1.Podfor _, pod := range pods {if isPodSpecifiedDelete(cs, pod) {podsSpecifiedToDelete = append(podsSpecifiedToDelete, pod)}// 根据lifecycle.apps.kruise.io/state Label进行校验if lifecycle.GetPodLifecycleState(pod) == appsv1alpha1.LifecycleStatePreparingDelete {podsInPreDelete = append(podsInPreDelete, pod)}}return podsSpecifiedToDelete, podsInPreDelete}// 获取可用id,如果可以复用,则进行复用func getOrGenAvailableIDs(num int, pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) sets.String {existingIDs := sets.NewString()availableIDs := sets.NewString()for _, pvc := range pvcs {if id := pvc.Labels[appsv1alpha1.CloneSetInstanceID]; len(id) > 0 {existingIDs.Insert(id)availableIDs.Insert(id)}}for _, pod := range pods {if id := pod.Labels[appsv1alpha1.CloneSetInstanceID]; len(id) > 0 {existingIDs.Insert(id)availableIDs.Delete(id)}}retIDs := sets.NewString()for i := 0; i < num; i++ {id := getOrGenInstanceID(existingIDs, availableIDs)retIDs.Insert(id)}return retIDs}// 获取或者生成一个InstanceIDfunc getOrGenInstanceID(existingIDs, availableIDs sets.String) string {id, _ := availableIDs.PopAny()if len(id) == 0 {for {id = rand.String(LengthOfInstanceID)if !existingIDs.Has(id) {break}}}return id}// 计算diff(若小于0,则需要创建;否则需要删除)func calculateDiffs(cs *appsv1alpha1.CloneSet, revConsistent bool, totalPods int, notUpdatedPods int) (totalDiff int, currentRevDiff int) {var maxSurge int// 若同时有升级,则需要考虑保留旧版本数量if !revConsistent {if cs.Spec.UpdateStrategy.Partition != nil {currentRevDiff = notUpdatedPods - integer.IntMin(int(*cs.Spec.UpdateStrategy.Partition), int(*cs.Spec.Replicas))}if currentRevDiff > 0 {if cs.Spec.UpdateStrategy.MaxSurge != nil {maxSurge, _ = intstrutil.GetValueFromIntOrPercent(cs.Spec.UpdateStrategy.MaxSurge, int(*cs.Spec.Replicas), true)maxSurge = integer.IntMin(maxSurge, currentRevDiff)}}}// 若只是扩缩容,则减去副本数量(弹性扩容数量为0)totalDiff = totalPods - int(*cs.Spec.Replicas) - maxSurgereturn}// 选择要删除的podfunc choosePodsToDelete(totalDiff int, currentRevDiff int, notUpdatedPods, updatedPods []*v1.Pod) []*v1.Pod {choose := func(pods []*v1.Pod, diff int) []*v1.Pod {if diff < len(pods) {sort.Sort(kubecontroller.ActivePods(pods))} else if diff > len(pods) {klog.Warningf("Diff > len(pods) in choosePodsToDelete func which is not expected.")return pods}return pods[:diff]}var podsToDelete []*v1.Podif currentRevDiff >= totalDiff {// 若旧版本diff数量大于totalDiff// 则直接从旧版本中删除totalDiff podpodsToDelete = choose(notUpdatedPods, totalDiff)} else if currentRevDiff > 0 {// 否则先从notUpdatedPods删除currentRevDiff pods// 然后从updatedPods删除剩余podspodsToDelete = choose(notUpdatedPods, currentRevDiff)podsToDelete = append(podsToDelete, choose(updatedPods, totalDiff-currentRevDiff)...)} else {// 从updatedPods删除totalDiff podspodsToDelete = choose(updatedPods, totalDiff)}return podsToDelete}
3.3、升级
3.3.1、升级核心接口
func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,) (time.Duration, error) {requeueDuration := requeueduration.Duration{}coreControl := clonesetcore.New(cs)if cs.Spec.UpdateStrategy.Paused {return requeueDuration.Get(), nil}// 刷新所有的pod状态var modified boolfor _, pod := range pods {patched, duration, err := c.refreshPodState(cs, coreControl, pod)if err != nil {return 0, err} else if duration > 0 {requeueDuration.Update(duration)}if patched {modified = true}}if modified {return requeueDuration.Get(), nil}// 找到所有等待更新的pod// 满足:// 1、当前pod的revision不为给定的revision// 2、不处于PreparingDelete、Updated状态var waitUpdateIndexes []intfor i := range pods {if coreControl.IsPodUpdatePaused(pods[i]) {continue}if clonesetutils.GetPodRevision(pods[i]) != updateRevision.Name {switch lifecycle.GetPodLifecycleState(pods[i]) {case appsv1alpha1.LifecycleStatePreparingDelete, appsv1alpha1.LifecycleStateUpdated:default:waitUpdateIndexes = append(waitUpdateIndexes, i)}}}// 根据优先级策略和打散策略给待升级的pod进行排序waitUpdateIndexes = sortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, pods, waitUpdateIndexes)// 计算最大可以扩容的数量needToUpdateCount := calculateUpdateCount(coreControl, cs.Spec.UpdateStrategy, cs.Spec.MinReadySeconds, int(*cs.Spec.Replicas), waitUpdateIndexes, pods)if needToUpdateCount < len(waitUpdateIndexes) {waitUpdateIndexes = waitUpdateIndexes[:needToUpdateCount]}// 更新podfor _, idx := range waitUpdateIndexes {pod := pods[idx]if duration, err := c.updatePod(cs, coreControl, updateRevision, revisions, pod, pvcs); err != nil {return requeueDuration.Get(), err} else if duration > 0 {requeueDuration.Update(duration)}}return requeueDuration.Get(), nil}
3.3.2、升级工具函数
func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control, pod *v1.Pod) (bool, time.Duration, error) {opts := coreControl.GetUpdateOptions()res := c.inplaceControl.Refresh(pod, opts)if res.RefreshErr != nil {return false, 0, res.RefreshErr}var state appsv1alpha1.LifecycleStateTypeswitch lifecycle.GetPodLifecycleState(pod) {case appsv1alpha1.LifecycleStateUpdating:checkFunc := inplaceupdate.CheckInPlaceUpdateCompletedif opts != nil && opts.CustomizeCheckUpdateCompleted != nil {checkFunc = opts.CustomizeCheckUpdateCompleted}if checkFunc(pod) == nil {if cs.Spec.Lifecycle != nil && !lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {state = appsv1alpha1.LifecycleStateUpdated} else {state = appsv1alpha1.LifecycleStateNormal}}case appsv1alpha1.LifecycleStateUpdated:if cs.Spec.Lifecycle == nil ||cs.Spec.Lifecycle.InPlaceUpdate == nil ||lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {state = appsv1alpha1.LifecycleStateNormal}}if state != "" {if patched, err := lifecycle.PatchPodLifecycle(c, pod, state); err != nil {return false, 0, err} else if patched {clonesetutils.ResourceVersionExpectations.Expect(pod)return true, res.DelayDuration, nil}}return false, res.DelayDuration, nil}func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,) (time.Duration, error) {if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType ||cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {var oldRevision *apps.ControllerRevisionfor _, r := range revisions {if r.Name == clonesetutils.GetPodRevision(pod) {oldRevision = rbreak}}if c.inplaceControl.CanUpdateInPlace(oldRevision, updateRevision, coreControl.GetUpdateOptions()) {if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {if patched, err := lifecycle.PatchPodLifecycle(c, pod, appsv1alpha1.LifecycleStatePreparingUpdate); err != nil {return 0, err} else if patched {clonesetutils.ResourceVersionExpectations.Expect(pod)}return 0, nil}opts := coreControl.GetUpdateOptions()opts.AdditionalFuncs = append(opts.AdditionalFuncs, lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateUpdating))res := c.inplaceControl.Update(pod, oldRevision, updateRevision, opts)if res.InPlaceUpdate {if res.UpdateErr == nil {clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)return res.DelayDuration, nil}return res.DelayDuration, res.UpdateErr}}if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {return 0, fmt.Errorf("find Pod %s update strategy is InPlaceOnly but can not update in-place", pod.Name)}}// 若更新策略不为原地升级类型,直接修改Pod labels apps.kruise.io/specified-delete为trueif patched, err := specifieddelete.PatchPodSpecifiedDelete(c, pod, "true"); err != nil {return 0, err} else if patched {clonesetutils.ResourceVersionExpectations.Expect(pod)}return 0, nil}func sortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, pods []*v1.Pod, waitUpdateIndexes []int) []int {sort.Slice(waitUpdateIndexes, coreControl.GetPodsSortFunc(pods, waitUpdateIndexes))if strategy.PriorityStrategy != nil {waitUpdateIndexes = updatesort.NewPrioritySorter(strategy.PriorityStrategy).Sort(pods, waitUpdateIndexes)}if strategy.ScatterStrategy != nil {waitUpdateIndexes = updatesort.NewScatterSorter(strategy.ScatterStrategy).Sort(pods, waitUpdateIndexes)}sort.Slice(waitUpdateIndexes, func(i, j int) bool {preparingUpdateI := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[i]]) == appsv1alpha1.LifecycleStatePreparingUpdatepreparingUpdateJ := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[j]]) == appsv1alpha1.LifecycleStatePreparingUpdateif preparingUpdateI != preparingUpdateJ {return preparingUpdateI}return false})return waitUpdateIndexes}// 计算要更新的数量func calculateUpdateCount(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, minReadySeconds int32, totalReplicas int, waitUpdateIndexes []int, pods []*v1.Pod) int {partition := 0if strategy.Partition != nil {partition = int(*strategy.Partition)}// 保留旧版本数量if len(waitUpdateIndexes)-partition <= 0 {return 0}waitUpdateIndexes = waitUpdateIndexes[:(len(waitUpdateIndexes) - partition)]roundUp := trueif strategy.MaxSurge != nil {maxSurge, _ := intstrutil.GetValueFromIntOrPercent(strategy.MaxSurge, totalReplicas, true)roundUp = maxSurge == 0}maxUnavailable, _ := intstrutil.GetValueFromIntOrPercent(intstrutil.ValueOrDefault(strategy.MaxUnavailable, intstrutil.FromString(appsv1alpha1.DefaultCloneSetMaxUnavailable)), totalReplicas, roundUp)usedSurge := len(pods) - totalReplicas// 可升级数量理论上为:// 已经在升级但是状态不为Ready的 ➕ 等待升级的数量var notReadyCount, updateCount intfor _, p := range pods {if !isPodReady(coreControl, p, minReadySeconds) {notReadyCount++}}for _, i := range waitUpdateIndexes {if isPodReady(coreControl, pods[i], minReadySeconds) {if notReadyCount >= (maxUnavailable + usedSurge) {break} else {notReadyCount++}}updateCount++}return updateCount}// pod ready要求// 1、lifecycle state为Normal// 2、pod.Status.Phase == v1.PodRunning// 3、pod是Available状态// 4、condition InPlaceUpdateReady值为Truefunc isPodReady(coreControl clonesetcore.Control, pod *v1.Pod, minReadySeconds int32) bool {state := lifecycle.GetPodLifecycleState(pod)if state != "" && state != appsv1alpha1.LifecycleStateNormal {return false}return coreControl.IsPodUpdateReady(pod, minReadySeconds)}
