0、前置概念
0.1、readinessGates
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate
1、Spec定义
type CloneSetSpec struct {
// 副本数量,默认为1
Replicas *int32 `json:"replicas,omitempty"`
Selector *metav1.LabelSelector `json:"selector"`
// pod模板
Template v1.PodTemplateSpec `json:"template"`
// 存储模板
VolumeClaimTemplates []v1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`
// 扩/缩容策略
ScaleStrategy CloneSetScaleStrategy `json:"scaleStrategy,omitempty"`
// 更新策略
UpdateStrategy CloneSetUpdateStrategy `json:"updateStrategy,omitempty"`
// 历史版本保留数量,默认为10
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
// pod从ready状态持续该时间,才认为pod是可用状态
// 默认为0
MinReadySeconds int32 `json:"minReadySeconds,omitempty"`
// 删除和原地升级自定义hook
Lifecycle *Lifecycle `json:"lifecycle,omitempty"`
}
type CloneSetScaleStrategy struct {
// 缩容的时候指定pod进行删除
// 若数量没有发生变化,则删除再创建
PodsToDelete []string `json:"podsToDelete,omitempty"`
}
// 升级策略
type CloneSetUpdateStrategy struct {
// 升级策略
// ReCreate、InPlaceIfPossible、InPlaceOnly
Type CloneSetUpdateStrategyType `json:"type,omitempty"`
// 保留旧版本的数量,默认为0
Partition *int32 `json:"partition,omitempty"`
// 最大不可用数量,默认20%
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// 最大弹性数量
// 如果发布的时候设置了 maxSurge,控制器会先多扩出来 maxSurge 数量的 Pod(此时 Pod 总数为 (replicas+maxSurge)),
// 然后再开始发布存量的 Pod。 然后,当新版本 Pod 数量已经满足 partition 要求之后,控制器会再把多余的 maxSurge 数量的 Pod 删除掉,
// 保证最终的 Pod 数量符合 replicas。
// 要说明的是,maxSurge 不允许配合 InPlaceOnly 更新模式使用。
// 另外,如果是与 InPlaceIfPossible 策略配合使用,控制器会先扩出来 maxSurge 数量的 Pod,
// 再对存量 Pod 做原地升级。
MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`
// CloneSet是否暂停
Paused bool `json:"paused,omitempty"`
// 更新优先级
PriorityStrategy *UpdatePriorityStrategy `json:"priorityStrategy,omitempty"`
// 打散方式更新
ScatterStrategy CloneSetUpdateScatterStrategy `json:"scatterStrategy,omitempty"`
// 原地更新策略
InPlaceUpdateStrategy *InPlaceUpdateStrategy `json:"inPlaceUpdateStrategy,omitempty"`
}
type CloneSetUpdateScatterStrategy []CloneSetUpdateScatterTerm
type CloneSetUpdateScatterTerm struct {
Key string `json:"key"`
Value string `json:"value"`
}
type CloneSetUpdateStrategyType string
type InPlaceUpdateStrategy struct {
GracePeriodSeconds int32 `json:"gracePeriodSeconds,omitempty"`
}
2、调谐函数
func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {
// fetch
instance := &appsv1alpha1.CloneSet{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
// 删除事件
if errors.IsNotFound(err) {
clonesetutils.ScaleExpectations.DeleteExpectations(request.String())
clonesetutils.UpdateExpectations.DeleteExpectations(request.String())
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
coreControl := clonesetcore.New(instance)
// 测试特性,永远返回false
if coreControl.IsInitializing() {
return reconcile.Result{}, nil
}
// 构造选择器
selector, err := metav1.LabelSelectorAsSelector(instance.Spec.Selector)
if err != nil {
return reconcile.Result{}, nil
}
// 判断是否处在扩容状态
// 若还处在扩容状态,则跳过本次循环
if scaleSatisfied, unsatisfiedDuration, _ := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
return reconcile.Result{}, nil
}
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
// 通过ownerRef索引获取此CloneSet关联的pod/pvc
filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
if err != nil {
return reconcile.Result{}, err
}
// 认领pod,释放label不符合的pod/认领label符合的孤儿pod
filteredPods, err = r.claimPods(instance, filteredPods)
if err != nil {
return reconcile.Result{}, err
}
// 获取历史版本
// cloneset的历史版本是通过ControllerRevison维护,
// 不同于Deployment通过直接维护repliset来维护历史Spec
revisions, err := r.controllerHistory.ListControllerRevisions(instance, selector)
if err != nil {
return reconcile.Result{}, err
}
// 版本排序
history.SortControllerRevisions(revisions)
// 获取当前pod的Spec版本和即将要更新的Spec版本
// 若当前Spec版本为nil,则当前Spec版本为即将要更新的Spec版本
currentRevision, updateRevision, collisionCount, err := r.getActiveRevisions(instance, revisions, clonesetutils.GetPodsRevisions(filteredPods))
if err != nil {
return reconcile.Result{}, err
}
// 有可能是pod变更触发controller循环
// 需要判断一下是否有pod已经变更了版本
// 此处不在回调函数触发,是因为pod必须先要经过认领,
// 为了减少认领操作,统一到ControllerLoop进行处理
for _, pod := range filteredPods {
clonesetutils.UpdateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)
}
// 是否正在进行的更新还没有完成
if updateSatisfied, unsatisfiedDuration, _ := clonesetutils.UpdateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
return reconcile.Result{}, nil
}
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
for _, pod := range filteredPods {
// 必须等待客户自定义回调全部处理完成才能进行下一轮同步
// ResourceVersionExpectations是为了减少等待用户hook而无效的loop
// 若进入了等待用户定义hook的状态,则Except Pod,只有当Pod发生变化才假定用户自定义hook完成
// 否则跳过loop
if isSatisfied, unsatisfiedDuration := clonesetutils.ResourceVersionExpectations.IsSatisfied(pod); !isSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
return reconcile.Result{}, nil
}
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
}
// 初始化新的Status
newStatus := appsv1alpha1.CloneSetStatus{
ObservedGeneration: instance.Generation,
UpdateRevision: updateRevision.Name,
CollisionCount: new(int32),
LabelSelector: selector.String(),
}
*newStatus.CollisionCount = collisionCount
// 开始进行扩/缩容、升级
delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
// 更新CloneSet状态
if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {
return reconcile.Result{}, err
}
// 对于用户指定的不存在的pod进行修正
// 针对PodsToDelete参数
if err = r.truncatePodsToDelete(instance, filteredPods); err != nil {
klog.Warningf("Failed to truncate podsToDelete for %s: %v", request, err)
}
// 删除超过给定缓存历史Spec数量的旧版本ControllerRevision
if err = r.truncateHistory(instance, filteredPods, revisions, currentRevision, updateRevision); err != nil {
klog.Errorf("Failed to truncate history for %s: %v", request, err)
}
// 若符合同步成功、给定启动时间、可用数量少于期望数量
// 则给定启动时间之后再将给Controller Key入对,更新Status
if syncErr == nil && instance.Spec.MinReadySeconds > 0 && newStatus.AvailableReplicas != newStatus.ReadyReplicas {
minReadyDuration := time.Second * time.Duration(instance.Spec.MinReadySeconds)
if delayDuration == 0 || minReadyDuration < delayDuration {
delayDuration = minReadyDuration
}
}
return reconcile.Result{RequeueAfter: delayDuration}, syncErr
}
3、同步函数
3.1、同步函数
func (r *ReconcileCloneSet) syncCloneSet(
instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
// 若已经被删除,则直接return
var delayDuration time.Duration
if instance.DeletionTimestamp != nil {
return delayDuration, nil
}
// 根据Revision复原CloneSet对象
currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)
if err != nil {
return delayDuration, err
}
updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)
if err != nil {
return delayDuration, err
}
var scaling bool
var podsScaleErr error
var podsUpdateErr error
// 开始扩容
scaling, podsScaleErr = r.scaleControl.Manage(currentSet, updateSet, currentRevision.Name, updateRevision.Name, filteredPods, filteredPVCs)
if podsScaleErr != nil {
newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
Type: appsv1alpha1.CloneSetConditionFailedScale,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: podsScaleErr.Error(),
})
err = podsScaleErr
}
// 若处在扩容状态,则直接return
// 否则才进行更新
if scaling {
return delayDuration, podsScaleErr
}
// 开始更新
delayDuration, podsUpdateErr = r.updateControl.Manage(updateSet, updateRevision, revisions, filteredPods, filteredPVCs)
if podsUpdateErr != nil {
newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
Type: appsv1alpha1.CloneSetConditionFailedUpdate,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: podsUpdateErr.Error(),
})
if err == nil {
err = podsUpdateErr
}
}
return delayDuration, err
}
3.2、扩容
3.2.1、扩容核心接口
1、首先通过managePreparingDelete和deletePods函数进行副本数量的控制 2、其次才进行升级相关的数量控制
func (r *realControl) Manage(
currentCS, updateCS *appsv1alpha1.CloneSet,
currentRevision, updateRevision string,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (bool, error) {
if updateCS.Spec.Replicas == nil {
return false, fmt.Errorf("spec.Replicas is nil")
}
coreControl := clonesetcore.New(updateCS)
if !coreControl.IsReadyToScale() {
return false, nil
}
// 获取计划要删除的pod
// 包含用户通过PodToDelete指定和已经处于PreDelete状态的pod
podsSpecifiedToDelete, podsInPreDelete := getPlannedDeletedPods(updateCS, pods)
// 由于用户指定要删除的pod和PreDelete状态的pod有可能有重合,因此需要merge
// 下面的逻辑只要发生了pod变更或者删除操作,就直接返回
podsToDelete := util.MergePods(podsSpecifiedToDelete, podsInPreDelete)
if len(podsToDelete) > 0 {
if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, len(podsToDelete)); err != nil || modified {
return modified, err
}
if modified, err := r.deletePods(updateCS, podsToDelete, pvcs); err != nil || modified {
return modified, err
}
}
// 根据版本获取已经更新的pod和还未更新的pod,并通过旧版本保留数和弹性数量计算diff值
updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)
diff, currentRevDiff := calculateDiffs(updateCS, updateRevision == currentRevision, len(pods), len(notUpdatedPods))
// 小于0则需要创建pod
if diff < 0 {
// 需要创建的总数
expectedCreations := diff * -1
// 缺少的当前的版本的数量
// 针对当前旧版本数量少于给定保留旧版本数量的情况
expectedCurrentCreations := 0
if currentRevDiff < 0 {
expectedCurrentCreations = currentRevDiff * -1
}
// 可用实例id
availableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs)
// 存在的pvc名字
existingPVCNames := sets.NewString()
for _, pvc := range pvcs {
existingPVCNames.Insert(pvc.Name)
}
// 创建pod
return r.createPods(expectedCreations, expectedCurrentCreations,
currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)
} else if diff > 0 {
// 否则删除pod
// 若当前存在要删除的pod,则先等待删除完成,再删除剩余pod
if len(podsToDelete) > 0 {
return false, nil
}
// 选择要删除的pod
podsToDelete := choosePodsToDelete(diff, currentRevDiff, notUpdatedPods, updatedPods)
return r.deletePods(updateCS, podsToDelete, pvcs)
}
return false, nil
}
func (r *realControl) managePreparingDelete(cs *appsv1alpha1.CloneSet, pods, podsInPreDelete []*v1.Pod, numToDelete int) (bool, error) {
// diff为还需要删除的数量
// 若要删除的数量,删除完成后副本数量达不到期望,将PreDelete的pod转为Normal状态
diff := int(*cs.Spec.Replicas) - len(pods) + numToDelete
var modified bool
for _, pod := range podsInPreDelete {
if diff <= 0 {
return modified, nil
}
if isPodSpecifiedDelete(cs, pod) {
continue
}
if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStateNormal); err != nil {
return modified, err
} else if patched {
modified = true
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
diff--
}
return modified, nil
}
func (r *realControl) createPods(
expectedCreations, expectedCurrentCreations int,
currentCS, updateCS *appsv1alpha1.CloneSet,
currentRevision, updateRevision string,
availableIDs []string, existingPVCNames sets.String,
) (bool, error) {
coreControl := clonesetcore.New(updateCS)
newPods, err := coreControl.NewVersionedPods(currentCS, updateCS, currentRevision, updateRevision,
expectedCreations, expectedCurrentCreations, availableIDs)
if err != nil {
return false, err
}
podsCreationChan := make(chan *v1.Pod, len(newPods))
for _, p := range newPods {
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)
podsCreationChan <- p
}
var created int64
successPodNames := sync.Map{}
_, err = clonesetutils.DoItSlowly(len(newPods), initialBatchSize, func() error {
pod := <-podsCreationChan
cs := updateCS
if pod.Labels[apps.ControllerRevisionHashLabelKey] == currentRevision {
cs = currentCS
}
lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateNormal)(pod)
var createErr error
if createErr = r.createOnePod(cs, pod, existingPVCNames); createErr != nil {
return createErr
}
atomic.AddInt64(&created, 1)
successPodNames.Store(pod.Name, struct{}{})
return nil
})
for _, pod := range newPods {
if _, ok := successPodNames.Load(pod.Name); !ok {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)
}
}
if created == 0 {
return false, err
}
return true, err
}
func (r *realControl) createOnePod(cs *appsv1alpha1.CloneSet, pod *v1.Pod, existingPVCNames sets.String) error {
claims := clonesetutils.GetPersistentVolumeClaims(cs, pod)
for _, c := range claims {
if existingPVCNames.Has(c.Name) {
continue
}
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
if err := r.Create(context.TODO(), &c); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Create, c.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pvc: %v, pvc: %v", err, util.DumpJSON(c))
return err
}
}
if err := r.Create(context.TODO(), pod); err != nil {
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedCreate", "failed to create pod: %v, pod: %v", err, util.DumpJSON(pod))
return err
}
r.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulCreate", "succeed to create pod %s", pod.Name)
return nil
}
func (r *realControl) deletePods(cs *appsv1alpha1.CloneSet, podsToDelete []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (bool, error) {
var modified bool
for _, pod := range podsToDelete {
if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.PreDelete, pod) {
if patched, err := lifecycle.PatchPodLifecycle(r, pod, appsv1alpha1.LifecycleStatePreparingDelete); err != nil {
return false, err
} else if patched {
modified = true
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
continue
}
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
if err := r.Delete(context.TODO(), pod); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pod.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
return modified, err
}
modified = true
r.recorder.Event(cs, v1.EventTypeNormal, "SuccessfulDelete", fmt.Sprintf("succeed to delete pod %s", pod.Name))
for _, pvc := range pvcs {
if pvc.Labels[appsv1alpha1.CloneSetInstanceID] != pod.Labels[appsv1alpha1.CloneSetInstanceID] {
continue
}
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := r.Delete(context.TODO(), pvc); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to delete pvc %s: %v", pvc.Name, err)
return modified, err
}
}
}
return modified, nil
}
3.2.2、扩容工具函数
// 判断pod是否被指定删除
// 1、判断是否有apps.kruise.io/specified-delete Label
// 2、是否出现在PodsToDelete参数中
func isPodSpecifiedDelete(cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
if specifieddelete.IsSpecifiedDelete(pod) {
return true
}
for _, name := range cs.Spec.ScaleStrategy.PodsToDelete {
if name == pod.Name {
return true
}
}
return false
}
// 获取被指定删除的pod和处于PreDelete状态的Pod
func getPlannedDeletedPods(cs *appsv1alpha1.CloneSet, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) {
var podsSpecifiedToDelete []*v1.Pod
var podsInPreDelete []*v1.Pod
for _, pod := range pods {
if isPodSpecifiedDelete(cs, pod) {
podsSpecifiedToDelete = append(podsSpecifiedToDelete, pod)
}
// 根据lifecycle.apps.kruise.io/state Label进行校验
if lifecycle.GetPodLifecycleState(pod) == appsv1alpha1.LifecycleStatePreparingDelete {
podsInPreDelete = append(podsInPreDelete, pod)
}
}
return podsSpecifiedToDelete, podsInPreDelete
}
// 获取可用id,如果可以复用,则进行复用
func getOrGenAvailableIDs(num int, pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) sets.String {
existingIDs := sets.NewString()
availableIDs := sets.NewString()
for _, pvc := range pvcs {
if id := pvc.Labels[appsv1alpha1.CloneSetInstanceID]; len(id) > 0 {
existingIDs.Insert(id)
availableIDs.Insert(id)
}
}
for _, pod := range pods {
if id := pod.Labels[appsv1alpha1.CloneSetInstanceID]; len(id) > 0 {
existingIDs.Insert(id)
availableIDs.Delete(id)
}
}
retIDs := sets.NewString()
for i := 0; i < num; i++ {
id := getOrGenInstanceID(existingIDs, availableIDs)
retIDs.Insert(id)
}
return retIDs
}
// 获取或者生成一个InstanceID
func getOrGenInstanceID(existingIDs, availableIDs sets.String) string {
id, _ := availableIDs.PopAny()
if len(id) == 0 {
for {
id = rand.String(LengthOfInstanceID)
if !existingIDs.Has(id) {
break
}
}
}
return id
}
// 计算diff(若小于0,则需要创建;否则需要删除)
func calculateDiffs(cs *appsv1alpha1.CloneSet, revConsistent bool, totalPods int, notUpdatedPods int) (totalDiff int, currentRevDiff int) {
var maxSurge int
// 若同时有升级,则需要考虑保留旧版本数量
if !revConsistent {
if cs.Spec.UpdateStrategy.Partition != nil {
currentRevDiff = notUpdatedPods - integer.IntMin(int(*cs.Spec.UpdateStrategy.Partition), int(*cs.Spec.Replicas))
}
if currentRevDiff > 0 {
if cs.Spec.UpdateStrategy.MaxSurge != nil {
maxSurge, _ = intstrutil.GetValueFromIntOrPercent(cs.Spec.UpdateStrategy.MaxSurge, int(*cs.Spec.Replicas), true)
maxSurge = integer.IntMin(maxSurge, currentRevDiff)
}
}
}
// 若只是扩缩容,则减去副本数量(弹性扩容数量为0)
totalDiff = totalPods - int(*cs.Spec.Replicas) - maxSurge
return
}
// 选择要删除的pod
func choosePodsToDelete(totalDiff int, currentRevDiff int, notUpdatedPods, updatedPods []*v1.Pod) []*v1.Pod {
choose := func(pods []*v1.Pod, diff int) []*v1.Pod {
if diff < len(pods) {
sort.Sort(kubecontroller.ActivePods(pods))
} else if diff > len(pods) {
klog.Warningf("Diff > len(pods) in choosePodsToDelete func which is not expected.")
return pods
}
return pods[:diff]
}
var podsToDelete []*v1.Pod
if currentRevDiff >= totalDiff {
// 若旧版本diff数量大于totalDiff
// 则直接从旧版本中删除totalDiff pod
podsToDelete = choose(notUpdatedPods, totalDiff)
} else if currentRevDiff > 0 {
// 否则先从notUpdatedPods删除currentRevDiff pods
// 然后从updatedPods删除剩余pods
podsToDelete = choose(notUpdatedPods, currentRevDiff)
podsToDelete = append(podsToDelete, choose(updatedPods, totalDiff-currentRevDiff)...)
} else {
// 从updatedPods删除totalDiff pods
podsToDelete = choose(updatedPods, totalDiff)
}
return podsToDelete
}
3.3、升级
3.3.1、升级核心接口
func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
requeueDuration := requeueduration.Duration{}
coreControl := clonesetcore.New(cs)
if cs.Spec.UpdateStrategy.Paused {
return requeueDuration.Get(), nil
}
// 刷新所有的pod状态
var modified bool
for _, pod := range pods {
patched, duration, err := c.refreshPodState(cs, coreControl, pod)
if err != nil {
return 0, err
} else if duration > 0 {
requeueDuration.Update(duration)
}
if patched {
modified = true
}
}
if modified {
return requeueDuration.Get(), nil
}
// 找到所有等待更新的pod
// 满足:
// 1、当前pod的revision不为给定的revision
// 2、不处于PreparingDelete、Updated状态
var waitUpdateIndexes []int
for i := range pods {
if coreControl.IsPodUpdatePaused(pods[i]) {
continue
}
if clonesetutils.GetPodRevision(pods[i]) != updateRevision.Name {
switch lifecycle.GetPodLifecycleState(pods[i]) {
case appsv1alpha1.LifecycleStatePreparingDelete, appsv1alpha1.LifecycleStateUpdated:
default:
waitUpdateIndexes = append(waitUpdateIndexes, i)
}
}
}
// 根据优先级策略和打散策略给待升级的pod进行排序
waitUpdateIndexes = sortUpdateIndexes(coreControl, cs.Spec.UpdateStrategy, pods, waitUpdateIndexes)
// 计算最大可以扩容的数量
needToUpdateCount := calculateUpdateCount(coreControl, cs.Spec.UpdateStrategy, cs.Spec.MinReadySeconds, int(*cs.Spec.Replicas), waitUpdateIndexes, pods)
if needToUpdateCount < len(waitUpdateIndexes) {
waitUpdateIndexes = waitUpdateIndexes[:needToUpdateCount]
}
// 更新pod
for _, idx := range waitUpdateIndexes {
pod := pods[idx]
if duration, err := c.updatePod(cs, coreControl, updateRevision, revisions, pod, pvcs); err != nil {
return requeueDuration.Get(), err
} else if duration > 0 {
requeueDuration.Update(duration)
}
}
return requeueDuration.Get(), nil
}
3.3.2、升级工具函数
func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control, pod *v1.Pod) (bool, time.Duration, error) {
opts := coreControl.GetUpdateOptions()
res := c.inplaceControl.Refresh(pod, opts)
if res.RefreshErr != nil {
return false, 0, res.RefreshErr
}
var state appsv1alpha1.LifecycleStateType
switch lifecycle.GetPodLifecycleState(pod) {
case appsv1alpha1.LifecycleStateUpdating:
checkFunc := inplaceupdate.CheckInPlaceUpdateCompleted
if opts != nil && opts.CustomizeCheckUpdateCompleted != nil {
checkFunc = opts.CustomizeCheckUpdateCompleted
}
if checkFunc(pod) == nil {
if cs.Spec.Lifecycle != nil && !lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
state = appsv1alpha1.LifecycleStateUpdated
} else {
state = appsv1alpha1.LifecycleStateNormal
}
}
case appsv1alpha1.LifecycleStateUpdated:
if cs.Spec.Lifecycle == nil ||
cs.Spec.Lifecycle.InPlaceUpdate == nil ||
lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
state = appsv1alpha1.LifecycleStateNormal
}
}
if state != "" {
if patched, err := lifecycle.PatchPodLifecycle(c, pod, state); err != nil {
return false, 0, err
} else if patched {
clonesetutils.ResourceVersionExpectations.Expect(pod)
return true, res.DelayDuration, nil
}
}
return false, res.DelayDuration, nil
}
func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
var oldRevision *apps.ControllerRevision
for _, r := range revisions {
if r.Name == clonesetutils.GetPodRevision(pod) {
oldRevision = r
break
}
}
if c.inplaceControl.CanUpdateInPlace(oldRevision, updateRevision, coreControl.GetUpdateOptions()) {
if cs.Spec.Lifecycle != nil && lifecycle.IsPodHooked(cs.Spec.Lifecycle.InPlaceUpdate, pod) {
if patched, err := lifecycle.PatchPodLifecycle(c, pod, appsv1alpha1.LifecycleStatePreparingUpdate); err != nil {
return 0, err
} else if patched {
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
return 0, nil
}
opts := coreControl.GetUpdateOptions()
opts.AdditionalFuncs = append(opts.AdditionalFuncs, lifecycle.SetPodLifecycle(appsv1alpha1.LifecycleStateUpdating))
res := c.inplaceControl.Update(pod, oldRevision, updateRevision, opts)
if res.InPlaceUpdate {
if res.UpdateErr == nil {
clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
}
return res.DelayDuration, res.UpdateErr
}
}
if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
return 0, fmt.Errorf("find Pod %s update strategy is InPlaceOnly but can not update in-place", pod.Name)
}
}
// 若更新策略不为原地升级类型,直接修改Pod labels apps.kruise.io/specified-delete为true
if patched, err := specifieddelete.PatchPodSpecifiedDelete(c, pod, "true"); err != nil {
return 0, err
} else if patched {
clonesetutils.ResourceVersionExpectations.Expect(pod)
}
return 0, nil
}
func sortUpdateIndexes(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, pods []*v1.Pod, waitUpdateIndexes []int) []int {
sort.Slice(waitUpdateIndexes, coreControl.GetPodsSortFunc(pods, waitUpdateIndexes))
if strategy.PriorityStrategy != nil {
waitUpdateIndexes = updatesort.NewPrioritySorter(strategy.PriorityStrategy).Sort(pods, waitUpdateIndexes)
}
if strategy.ScatterStrategy != nil {
waitUpdateIndexes = updatesort.NewScatterSorter(strategy.ScatterStrategy).Sort(pods, waitUpdateIndexes)
}
sort.Slice(waitUpdateIndexes, func(i, j int) bool {
preparingUpdateI := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[i]]) == appsv1alpha1.LifecycleStatePreparingUpdate
preparingUpdateJ := lifecycle.GetPodLifecycleState(pods[waitUpdateIndexes[j]]) == appsv1alpha1.LifecycleStatePreparingUpdate
if preparingUpdateI != preparingUpdateJ {
return preparingUpdateI
}
return false
})
return waitUpdateIndexes
}
// 计算要更新的数量
func calculateUpdateCount(coreControl clonesetcore.Control, strategy appsv1alpha1.CloneSetUpdateStrategy, minReadySeconds int32, totalReplicas int, waitUpdateIndexes []int, pods []*v1.Pod) int {
partition := 0
if strategy.Partition != nil {
partition = int(*strategy.Partition)
}
// 保留旧版本数量
if len(waitUpdateIndexes)-partition <= 0 {
return 0
}
waitUpdateIndexes = waitUpdateIndexes[:(len(waitUpdateIndexes) - partition)]
roundUp := true
if strategy.MaxSurge != nil {
maxSurge, _ := intstrutil.GetValueFromIntOrPercent(strategy.MaxSurge, totalReplicas, true)
roundUp = maxSurge == 0
}
maxUnavailable, _ := intstrutil.GetValueFromIntOrPercent(
intstrutil.ValueOrDefault(strategy.MaxUnavailable, intstrutil.FromString(appsv1alpha1.DefaultCloneSetMaxUnavailable)), totalReplicas, roundUp)
usedSurge := len(pods) - totalReplicas
// 可升级数量理论上为:
// 已经在升级但是状态不为Ready的 ➕ 等待升级的数量
var notReadyCount, updateCount int
for _, p := range pods {
if !isPodReady(coreControl, p, minReadySeconds) {
notReadyCount++
}
}
for _, i := range waitUpdateIndexes {
if isPodReady(coreControl, pods[i], minReadySeconds) {
if notReadyCount >= (maxUnavailable + usedSurge) {
break
} else {
notReadyCount++
}
}
updateCount++
}
return updateCount
}
// pod ready要求
// 1、lifecycle state为Normal
// 2、pod.Status.Phase == v1.PodRunning
// 3、pod是Available状态
// 4、condition InPlaceUpdateReady值为True
func isPodReady(coreControl clonesetcore.Control, pod *v1.Pod, minReadySeconds int32) bool {
state := lifecycle.GetPodLifecycleState(pod)
if state != "" && state != appsv1alpha1.LifecycleStateNormal {
return false
}
return coreControl.IsPodUpdateReady(pod, minReadySeconds)
}