官方文档:https://openkruise.io/zh-cn/docs/uniteddeployment.html UnitedDeployment是蚂蚁CafeDeployment开源版本
0、蚂蚁原生架构图
1、Spec定义
type UnitedDeploymentSpec struct {// pod数量(所有可用区的所有pod数量)Replicas *int32 `json:"replicas,omitempty"`Selector *metav1.LabelSelector `json:"selector"`// subset模板(主要指定可用区workload期望信息)Template SubsetTemplate `json:"template,omitempty"`// 描述pod在每个subset之间的分布(pod的拓扑关系)Topology Topology `json:"topology,omitempty"`// 更新策略(当Template发生变化的时候采取的策略)UpdateStrategy UnitedDeploymentUpdateStrategy `json:"updateStrategy,omitempty"`// 历史版本缓存数量,默认为10RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`}// subset的workload类型,可以为StatefulSet、AdvancedStatefulSet、CloneSet其中一种// 其中StatefulSet为k8s原生资源,另外两种为kruise自定义crdtype SubsetTemplate struct {StatefulSetTemplate *StatefulSetTemplateSpec `json:"statefulSetTemplate,omitempty"`AdvancedStatefulSetTemplate *AdvancedStatefulSetTemplateSpec `json:"advancedStatefulSetTemplate,omitempty"`CloneSetTemplate *CloneSetTemplateSpec `json:"cloneSetTemplate,omitempty"`}// Subset支持的底层crd对象type StatefulSetTemplateSpec struct {metav1.ObjectMeta `json:"metadata,omitempty"`Spec appsv1.StatefulSetSpec `json:"spec"`}type AdvancedStatefulSetTemplateSpec struct {metav1.ObjectMeta `json:"metadata,omitempty"`Spec StatefulSetSpec `json:"spec"`}type CloneSetTemplateSpec struct {metav1.ObjectMeta `json:"metadata,omitempty"`Spec CloneSetSpec `json:"spec"`}// 更新策略type UnitedDeploymentUpdateStrategy struct {Type UpdateStrategyType `json:"type,omitempty"`ManualUpdate *ManualUpdate `json:"manualUpdate,omitempty"`}// Partitions的key为可用区,value为一个数值// 则对应可用区灰度发布的pod数量为count-value(创建workload的时候会传入)type ManualUpdate struct {Partitions map[string]int32 `json:"partitions,omitempty"`}// 描述pod分布(拓扑结构)type Topology struct {Subsets []Subset `json:"subsets,omitempty"`}// subset的详细信息type Subset struct {// 生成workload名称前缀,形如<deployment-name>-<subset-name>-// 在一个UnitedDeployment里面不能重复Name string `json:"name"`// subset选择哪个node,不可以更改// 通常会给不同可用区的node打上相应的标签// 对应蚂蚁架构图里面的datacenter=dc-aNodeSelectorTerm corev1.NodeSelectorTerm `json:"nodeSelectorTerm,omitempty"`// 容忍度,不可以更改Tolerations []corev1.Toleration `json:"tolerations,omitempty"`// 每个子网创建的pod的数量// 可以为整数,也可以为百分比。比如若为10%,则整个UnitedDeployment有10%的pod需要分配到此subset// 若为空,则UnitedDeployment Controller将会平均分配pod到每个subsetReplicas *intstr.IntOrString `json:"replicas,omitempty"`}
2、调谐函数
func (r *ReconcileUnitedDeployment) Reconcile(request reconcile.Request) (reconcile.Result, error) {// fetch UnitedDeployment// 若不存在、发生异常、删除事件,直接返回instance := &appsv1alpha1.UnitedDeployment{}err := r.Get(context.TODO(), request.NamespacedName, instance)if err != nil {if errors.IsNotFound(err) {return reconcile.Result{}, nil}return reconcile.Result{}, err}if instance.DeletionTimestamp != nil {return reconcile.Result{}, nil}// 深copy当前状态oldStatus := instance.Status.DeepCopy()// 获取当前版本和要更新的版本// 若当前版本为空,则当前版本等于要更新的版本// 当Template发生改变之后,会生成一个新的ControllerRevisioncurrentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance)if err != nil {return reconcile.Result{}, err}// 获取subset控制器接口control, subsetType := r.getSubsetControls(instance)// 设置期望的Revision名称expectedRevision := currentRevision.Nameif updatedRevision != nil {expectedRevision = updatedRevision.Name}// 根据当前版本获取subset状态并根据subset name进行分组nameToSubset, err := r.getNameToSubset(instance, control, expectedRevision)if err != nil {return reconcile.Result{}, nil}// 分配下一次每个subset的副本数量nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance)if err != nil {return reconcile.Result{}, nil}// 计算下一个要灰度的机器旧版本保留数量nextPartitions := calcNextPartitions(instance, nextReplicas)// 根据nextReplicas,nextPartitions更新所有的subsetnewStatus, err := r.manageSubsets(instance, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType)// 更新UnitedDeployment状态return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control)}
3、revision工具函数
1、获取历史版本列表 2、清除过期版本,依赖于RevisionHistoryLimit参数 3、创建一个新的版本 4、若新版本和历史版本有一样的,则只需要更新旧版本的revision即可,无须再创建一个revision 5、若版本并未发生重复,则创建一个新版本
controller_revision名字形如:ud名称+“-”+hash值

// 组织UnitedDeployment所有的版本信息func (r *ReconcileUnitedDeployment) constructUnitedDeploymentRevisions(ud *appsalphav1.UnitedDeployment) (*apps.ControllerRevision, *apps.ControllerRevision, *[]*apps.ControllerRevision, int32, error) {var currentRevision, updateRevision *apps.ControllerRevisionrevisions, err := r.controlledHistories(ud)if err != nil {if ud.Status.CollisionCount == nil {return currentRevision, updateRevision, nil, 0, err}return currentRevision, updateRevision, nil, *ud.Status.CollisionCount, err}history.SortControllerRevisions(revisions)cleanedRevision, err := r.cleanExpiredRevision(ud, &revisions)if err != nil {if ud.Status.CollisionCount == nil {return currentRevision, updateRevision, nil, 0, err}return currentRevision, updateRevision, nil, *ud.Status.CollisionCount, err}revisions = *cleanedRevisionvar collisionCount int32if ud.Status.CollisionCount != nil {collisionCount = *ud.Status.CollisionCount}updateRevision, err = r.newRevision(ud, nextRevision(revisions), &collisionCount)if err != nil {return nil, nil, nil, collisionCount, err}equalRevisions := history.FindEqualRevisions(revisions, updateRevision)equalCount := len(equalRevisions)revisionCount := len(revisions)if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {updateRevision = revisions[revisionCount-1]} else if equalCount > 0 {equalRevisions[equalCount-1].Revision = updateRevision.Revisionerr := r.Client.Update(context.TODO(), equalRevisions[equalCount-1])if err != nil {return nil, nil, nil, collisionCount, err}updateRevision = equalRevisions[equalCount-1]} else {updateRevision, err = r.createControllerRevision(ud, updateRevision, &collisionCount)if err != nil {return nil, nil, nil, collisionCount, err}}for i := range revisions {if revisions[i].Name == ud.Status.CurrentRevision {currentRevision = revisions[i]}}if currentRevision == nil {currentRevision = updateRevision}return currentRevision, updateRevision, &revisions, collisionCount, nil}// 获取所有的历史controller revisionfunc (r *ReconcileUnitedDeployment) controlledHistories(ud *appsalphav1.UnitedDeployment) ([]*apps.ControllerRevision, error) {// 根据key=value进行过滤,但是不代表label就完全一样// 比如:// 1、app=test// 2、app=test;az=bj// 因此对于第二种情况,若controller ref相等,则也进行认领selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)if err != nil {return nil, err}histories := &apps.ControllerRevisionList{}err = r.Client.List(context.TODO(), histories, &client.ListOptions{LabelSelector: selector})if err != nil {return nil, err}cm, err := refmanager.New(r.Client, ud.Spec.Selector, ud, r.scheme)if err != nil {return nil, err}mts := make([]metav1.Object, len(histories.Items))for i, h := range histories.Items {mts[i] = h.DeepCopy()}claims, err := cm.ClaimOwnedObjects(mts)if err != nil {return nil, err}claimHistories := make([]*apps.ControllerRevision, len(claims))for i, mt := range claims {claimHistories[i] = mt.(*apps.ControllerRevision)}return claimHistories, nil}// 清除过期的revision// 并返回清除过过期版本的revisionsfunc (r *ReconcileUnitedDeployment) cleanExpiredRevision(ud *appsalphav1.UnitedDeployment, sortedRevisions *[]*apps.ControllerRevision) (*[]*apps.ControllerRevision, error) {exceedNum := len(*sortedRevisions) - int(*ud.Spec.RevisionHistoryLimit)if exceedNum <= 0 {return sortedRevisions, nil}live := map[string]bool{}live[ud.Status.CurrentRevision] = trueif ud.Status.UpdateStatus != nil {live[ud.Status.UpdateStatus.UpdatedRevision] = true}for i, revision := range *sortedRevisions {if _, exist := live[revision.Name]; exist {continue}if i >= exceedNum {break}if err := r.Client.Delete(context.TODO(), revision); err != nil {return sortedRevisions, err}}cleanedRevisions := (*sortedRevisions)[exceedNum:]return &cleanedRevisions, nil}// 新建一个controller revisionfunc (r *ReconcileUnitedDeployment) createControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) {if collisionCount == nil {return nil, fmt.Errorf("collisionCount should not be nil")}clone := revision.DeepCopy()var err errorfor {hash := history.HashControllerRevision(revision, collisionCount)clone.Name = history.ControllerRevisionName(parent.GetName(), hash)err = r.Client.Create(context.TODO(), clone)if errors.IsAlreadyExists(err) {exists := &apps.ControllerRevision{}err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: parent.GetNamespace(), Name: clone.Name}, exists)if err != nil {return nil, err}if bytes.Equal(exists.Data.Raw, clone.Data.Raw) {return exists, nil}*collisionCount++continue}return clone, err}}// 初始化一个revisionfunc (r *ReconcileUnitedDeployment) newRevision(ud *appsalphav1.UnitedDeployment, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {patch, err := getUnitedDeploymentPatch(ud)if err != nil {return nil, err}gvk, err := apiutil.GVKForObject(ud, r.scheme)if err != nil {return nil, err}var selectedLabels map[string]stringif ud.Spec.Template.StatefulSetTemplate != nil {selectedLabels = ud.Spec.Template.StatefulSetTemplate.Labels} else if ud.Spec.Template.AdvancedStatefulSetTemplate != nil {selectedLabels = ud.Spec.Template.AdvancedStatefulSetTemplate.Labels}cr, err := history.NewControllerRevision(ud,gvk,selectedLabels,runtime.RawExtension{Raw: patch},revision,collisionCount)if err != nil {return nil, err}cr.Namespace = ud.Namespacereturn cr, nil}// 获取下一个版本号,最新的版本号+1func nextRevision(revisions []*apps.ControllerRevision) int64 {count := len(revisions)if count <= 0 {return 1}return revisions[count-1].Revision + 1}// 获取需要Patch的内容func getUnitedDeploymentPatch(ud *appsalphav1.UnitedDeployment) ([]byte, error) {dsBytes, err := json.Marshal(ud)if err != nil {return nil, err}var raw map[string]interface{}err = json.Unmarshal(dsBytes, &raw)if err != nil {return nil, err}objCopy := make(map[string]interface{})specCopy := make(map[string]interface{})// Create a patch of the UnitedDeployment that replaces spec.templatespec := raw["spec"].(map[string]interface{})template := spec["template"].(map[string]interface{})specCopy["template"] = templatetemplate["$patch"] = "replace"objCopy["spec"] = specCopypatch, err := json.Marshal(objCopy)return patch, err}
4、subset工具函数
ud相关subset工具函数
func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) {subSets, err := control.GetAllSubsets(instance, expectedRevision)if err != nil {return nil, fmt.Errorf("fail to get all Subsets for UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)}nameToSubsets := r.classifySubsetBySubsetName(instance, subSets)nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control)if err != nil {return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)}return nameToSubset, nil}// 按照Subset Name进行分组// 之所以是切片,应该是考虑有可能有Subset删除失败的场景func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(_ *appsv1alpha1.UnitedDeployment, subsets []*Subset) map[string][]*Subset {mapping := map[string][]*Subset{}for _, ss := range subsets {subSetName, err := getSubsetNameFrom(ss)if err != nil {continue}mapping[subSetName] = append(mapping[subSetName], ss)}return mapping}// Subset去重func (r *ReconcileUnitedDeployment) deleteDupSubset(_ *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) {nameToSubset := map[string]*Subset{}for name, subsets := range nameToSubsets {if len(subsets) > 1 {for _, subset := range subsets[1:] {if err := control.DeleteSubset(subset); err != nil {if errors.IsNotFound(err) {continue}return &nameToSubset, err}}}if len(subsets) > 0 {nameToSubset[name] = subsets[0]}}return &nameToSubset, nil}
subset control interface
// 获取所有的Subset,并进行Subset认领func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) {selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)if err != nil {return nil, err}setList := m.adapter.NewResourceListObject()err = m.Client.List(context.TODO(), setList, &client.ListOptions{LabelSelector: selector})if err != nil {return nil, err}manager, err := refmanager.New(m.Client, ud.Spec.Selector, ud, m.scheme)if err != nil {return nil, err}v := reflect.ValueOf(setList).Elem().FieldByName("Items")selected := make([]metav1.Object, v.Len())for i := 0; i < v.Len(); i++ {selected[i] = v.Index(i).Addr().Interface().(metav1.Object)}claimedSets, err := manager.ClaimOwnedObjects(selected)if err != nil {return nil, err}for _, claimedSet := range claimedSets {subSet, err := m.convertToSubset(claimedSet, updatedRevision)if err != nil {return nil, err}subSets = append(subSets, subSet)}return subSets, nil}func (m *SubsetControl) CreateSubset(ud *alpha1.UnitedDeployment, subsetName string, revision string, replicas, partition int32) error {set := m.adapter.NewResourceObject()if err := m.adapter.ApplySubsetTemplate(ud, subsetName, revision, replicas, partition, set); err != nil {return err}return m.Create(context.TODO(), set)}func (m *SubsetControl) UpdateSubset(subset *Subset, ud *alpha1.UnitedDeployment, revision string, replicas, partition int32) error {set := m.adapter.NewResourceObject()var updateError errorfor i := 0; i < updateRetries; i++ {getError := m.Client.Get(context.TODO(), m.objectKey(&subset.ObjectMeta), set)if getError != nil {return getError}if err := m.adapter.ApplySubsetTemplate(ud, subset.Spec.SubsetName, revision, replicas, partition, set); err != nil {return err}updateError = m.Client.Update(context.TODO(), set)if updateError == nil {break}}if updateError != nil {return updateError}return m.adapter.PostUpdate(ud, set, revision, partition)}func (m *SubsetControl) DeleteSubset(subSet *Subset) error {set := subSet.Spec.SubsetRef.Resources[0].(runtime.Object)return m.Delete(context.TODO(), set, client.PropagationPolicy(metav1.DeletePropagationBackground))}func (m *SubsetControl) GetSubsetFailure(subset *Subset) *string {return m.adapter.GetSubsetFailure()}func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool {return m.adapter.IsExpected(subSet.Spec.SubsetRef.Resources[0], revision)}// 转化obj为Subsetfunc (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) {subSetName, err := getSubsetNameFrom(set)if err != nil {return nil, err}subset := &Subset{}subset.ObjectMeta = metav1.ObjectMeta{Name: set.GetName(),GenerateName: set.GetGenerateName(),Namespace: set.GetNamespace(),SelfLink: set.GetSelfLink(),UID: set.GetUID(),ResourceVersion: set.GetResourceVersion(),Generation: set.GetGeneration(),CreationTimestamp: set.GetCreationTimestamp(),DeletionTimestamp: set.GetDeletionTimestamp(),DeletionGracePeriodSeconds: set.GetDeletionGracePeriodSeconds(),Labels: set.GetLabels(),Annotations: set.GetAnnotations(),OwnerReferences: set.GetOwnerReferences(),Finalizers: set.GetFinalizers(),ClusterName: set.GetClusterName(),}subset.Spec.SubsetName = subSetNamespecReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision)if err != nil {return subset, err}if specReplicas != nil {subset.Spec.Replicas = *specReplicas}if specPartition != nil {subset.Spec.UpdateStrategy.Partition = *specPartition}subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set)subset.Status.Replicas = statusReplicassubset.Status.ReadyReplicas = statusReadyReplicassubset.Status.UpdatedReplicas = statusUpdatedReplicassubset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicassubset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)return subset, nil}func (m *SubsetControl) objectKey(objMeta *metav1.ObjectMeta) client.ObjectKey {return types.NamespacedName{Namespace: objMeta.Namespace,Name: objMeta.Name,}}
5、adapter工具函数(此处以CloneSet为例)
type CloneSetAdapter struct {client.ClientScheme *runtime.Scheme}func (a *CloneSetAdapter) NewResourceObject() runtime.Object {return &alpha1.CloneSet{}}func (a *CloneSetAdapter) NewResourceListObject() runtime.Object {return &alpha1.CloneSetList{}}func (a *CloneSetAdapter) GetObjectMeta(obj metav1.Object) *metav1.ObjectMeta {return &obj.(*alpha1.CloneSet).ObjectMeta}func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {return obj.(*alpha1.CloneSet).Status.ObservedGeneration}func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {set := obj.(*alpha1.CloneSet)var pods []*corev1.Podpods, err = a.getCloneSetPods(set)if err != nil {return}specReplicas = set.Spec.Replicasif set.Spec.UpdateStrategy.Partition != nil {specPartition = set.Spec.UpdateStrategy.Partition}statusReplicas = set.Status.ReplicasstatusReadyReplicas = set.Status.ReadyReplicasstatusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)return}func (a *CloneSetAdapter) GetSubsetFailure() *string {return nil}func (a *CloneSetAdapter) ApplySubsetTemplate(ud *alpha1.UnitedDeployment, subsetName, revision string, replicas, partition int32, obj runtime.Object) error {set := obj.(*alpha1.CloneSet)var subSetConfig *alpha1.Subsetfor _, subset := range ud.Spec.Topology.Subsets {if subset.Name == subsetName {subSetConfig = &subsetbreak}}if subSetConfig == nil {return fmt.Errorf("fail to find subset config %s", subsetName)}set.Namespace = ud.Namespaceif set.Labels == nil {set.Labels = map[string]string{}}for k, v := range ud.Spec.Template.CloneSetTemplate.Labels {set.Labels[k] = v}for k, v := range ud.Spec.Selector.MatchLabels {set.Labels[k] = v}set.Labels[alpha1.ControllerRevisionHashLabelKey] = revision// record the subset name as a labelset.Labels[alpha1.SubSetNameLabelKey] = subsetNameif set.Annotations == nil {set.Annotations = map[string]string{}}for k, v := range ud.Spec.Template.CloneSetTemplate.Annotations {set.Annotations[k] = v}set.GenerateName = getSubsetPrefix(ud.Name, subsetName)selectors := ud.Spec.Selector.DeepCopy()selectors.MatchLabels[alpha1.SubSetNameLabelKey] = subsetNameif err := controllerutil.SetControllerReference(ud, set, a.Scheme); err != nil {return err}set.Spec.Selector = selectorsset.Spec.Replicas = &replicasset.Spec.UpdateStrategy = ud.Spec.Template.CloneSetTemplate.Spec.UpdateStrategyset.Spec.UpdateStrategy.Partition = &partitionset.Spec.Template = *ud.Spec.Template.CloneSetTemplate.Spec.Template.DeepCopy()if set.Spec.Template.Labels == nil {set.Spec.Template.Labels = map[string]string{}}set.Spec.Template.Labels[alpha1.SubSetNameLabelKey] = subsetNameset.Spec.Template.Labels[alpha1.ControllerRevisionHashLabelKey] = revisionset.Spec.RevisionHistoryLimit = ud.Spec.Template.CloneSetTemplate.Spec.RevisionHistoryLimitset.Spec.VolumeClaimTemplates = ud.Spec.Template.CloneSetTemplate.Spec.VolumeClaimTemplatesattachNodeAffinity(&set.Spec.Template.Spec, subSetConfig)attachTolerations(&set.Spec.Template.Spec, subSetConfig)return nil}func (a *CloneSetAdapter) PostUpdate(ud *alpha1.UnitedDeployment, obj runtime.Object, revision string, partition int32) error {return nil}func (a *CloneSetAdapter) IsExpected(obj metav1.Object, revision string) bool {return obj.GetLabels()[appsv1.ControllerRevisionHashLabelKey] != revision}func (a *CloneSetAdapter) getCloneSetPods(set *alpha1.CloneSet) ([]*corev1.Pod, error) {selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)if err != nil {return nil, err}podList := &corev1.PodList{}err = a.Client.List(context.TODO(), podList, &client.ListOptions{LabelSelector: selector})if err != nil {return nil, err}manager, err := refmanager.New(a.Client, set.Spec.Selector, set, a.Scheme)if err != nil {return nil, err}selected := make([]metav1.Object, len(podList.Items))for i, pod := range podList.Items {selected[i] = pod.DeepCopy()}claimed, err := manager.ClaimOwnedObjects(selected)if err != nil {return nil, err}claimedPods := make([]*corev1.Pod, len(claimed))for i, pod := range claimed {claimedPods[i] = pod.(*corev1.Pod)}return claimedPods, nil}
6、allocator工具函数
根据拓扑给每个subset分配副本数量
func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) {subsetInfos := getSubsetInfos(nameToSubset, ud)specifiedReplicas := getSpecifiedSubsetReplicas(ud)return subsetInfos.SortToAllocator().AllocateReplicas(*ud.Spec.Replicas, specifiedReplicas)}type nameToReplicas struct {SubsetName stringReplicas int32Specified bool}type subsetInfos []*nameToReplicasfunc (n subsetInfos) Get(i int) *nameToReplicas {return []*nameToReplicas(n)[i]}func (n subsetInfos) Len() int {return len(n)}func (n subsetInfos) Less(i, j int) bool {if n[i].Replicas != n[j].Replicas {return n[i].Replicas < n[j].Replicas}return strings.Compare(n[i].SubsetName, n[j].SubsetName) < 0}func (n subsetInfos) Swap(i, j int) {n[i], n[j] = n[j], n[i]}func (n subsetInfos) SortToAllocator() *replicasAllocator {sort.Sort(n)return &replicasAllocator{subsets: &n}}type replicasAllocator struct {subsets *subsetInfos}func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (*map[string]int32, error) {if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil {return nil, err}return s.normalAllocate(replicas, specifiedSubsetReplicas), nil}func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {if subsetReplicasLimits == nil {return nil}var specifiedReplicas int32for _, replicas := range *subsetReplicasLimits {specifiedReplicas += replicas}if specifiedReplicas > replicas {return fmt.Errorf("specified subsets' replica (%d) is greater than UnitedDeployment replica (%d)",specifiedReplicas, replicas)} else if specifiedReplicas < replicas {specifiedCount := 0for _, subset := range *s.subsets {if _, exist := (*subsetReplicasLimits)[subset.SubsetName]; exist {specifiedCount++}}if specifiedCount == len(*s.subsets) {return fmt.Errorf("specified subsets' replica (%d) is less than UnitedDeployment replica (%d)",specifiedReplicas, replicas)}}return nil}func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {var specifiedReplicas int32specifiedSubsetCount := 0// Step 1: apply replicas to specified subsets, and mark them as specified = true.for _, subset := range *s.subsets {if replicas, exist := (*specifiedSubsetReplicas)[subset.SubsetName]; exist {specifiedReplicas += replicassubset.Replicas = replicassubset.Specified = truespecifiedSubsetCount++}}// Step 2: averagely allocate the rest replicas to left unspecified subsets.leftSubsetCount := len(*s.subsets) - specifiedSubsetCountif leftSubsetCount != 0 {allocatableReplicas := expectedReplicas - specifiedReplicasaverage := int(allocatableReplicas) / leftSubsetCountremainder := int(allocatableReplicas) % leftSubsetCountfor i := len(*s.subsets) - 1; i >= 0; i-- {subset := (*s.subsets)[i]if subset.Specified {continue}if remainder > 0 {subset.Replicas = int32(average + 1)remainder--} else {subset.Replicas = int32(average)}leftSubsetCount--if leftSubsetCount == 0 {break}}}return s.toSubsetReplicaMap()}func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {allocatedReplicas := map[string]int32{}for _, subset := range *s.subsets {allocatedReplicas[subset.SubsetName] = subset.Replicas}return &allocatedReplicas}func (s *replicasAllocator) String() string {result := ""sort.Sort(s.subsets)for _, subset := range *s.subsets {result = fmt.Sprintf("%s %s -> %d;", result, subset.SubsetName, subset.Replicas)}return result}func getSpecifiedSubsetReplicas(ud *appsv1alpha1.UnitedDeployment) *map[string]int32 {replicaLimits := map[string]int32{}if ud.Spec.Topology.Subsets == nil {return &replicaLimits}for _, subsetDef := range ud.Spec.Topology.Subsets {if subsetDef.Replicas == nil {continue}if specifiedReplicas, err := ParseSubsetReplicas(*ud.Spec.Replicas, *subsetDef.Replicas); err == nil {replicaLimits[subsetDef.Name] = specifiedReplicas}}return &replicaLimits}func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) *subsetInfos {infos := make(subsetInfos, len(ud.Spec.Topology.Subsets))for idx, subsetDef := range ud.Spec.Topology.Subsets {var replicas int32if subset, exist := (*nameToSubset)[subsetDef.Name]; exist {replicas = subset.Spec.Replicas}infos[idx] = &nameToReplicas{SubsetName: subsetDef.Name, Replicas: replicas}}return &infos}
7、计算状态工具函数
func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (newStatus *appsv1alpha1.UnitedDeploymentStatus, updateErr error) {newStatus = ud.Status.DeepCopy()exists, provisioned, err := r.manageSubsetProvision(ud, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType)if err != nil {SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionFalse, "Error", err.Error()))return newStatus, fmt.Errorf("fail to manage Subset provision: %s", err)}if provisioned {SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionTrue, "", ""))}expectedRevision := currentRevisionif updatedRevision != nil {expectedRevision = updatedRevision}var needUpdate []stringfor _, name := range exists.List() {subset := (*nameToSubset)[name]if r.subSetControls[subsetType].IsExpected(subset, expectedRevision.Name) ||subset.Spec.Replicas != (*nextReplicas)[name] ||subset.Spec.UpdateStrategy.Partition != (*nextPartitions)[name] {needUpdate = append(needUpdate, name)}}if len(needUpdate) > 0 {_, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error {cell := needUpdate[index]subset := (*nameToSubset)[cell]replicas := (*nextReplicas)[cell]partition := (*nextPartitions)[cell]updateSubsetErr := r.subSetControls[subsetType].UpdateSubset(subset, ud, expectedRevision.Name, replicas, partition)return updateSubsetErr})}if updateErr == nil {SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", ""))} else {SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error()))}return}func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (sets.String, bool, error) {expectedSubsets := sets.String{}gotSubsets := sets.String{}for _, subset := range ud.Spec.Topology.Subsets {expectedSubsets.Insert(subset.Name)}for subsetName := range *nameToSubset {gotSubsets.Insert(subsetName)}var creates []stringfor _, expectSubset := range expectedSubsets.List() {if gotSubsets.Has(expectSubset) {continue}creates = append(creates, expectSubset)}var deletes []stringfor _, gotSubset := range gotSubsets.List() {if expectedSubsets.Has(gotSubset) {continue}deletes = append(deletes, gotSubset)}revision := currentRevision.Nameif updatedRevision != nil {revision = updatedRevision.Name}var errs []error// manage creatingif len(creates) > 0 {createdSubsets := make([]string, len(creates))for i, subset := range creates {createdSubsets[i] = subset}var createdErr error_, createdErr = util.SlowStartBatch(len(creates), slowStartInitialBatchSize, func(idx int) error {subsetName := createdSubsets[idx]replicas := (*nextReplicas)[subsetName]partition := (*nextPartitions)[subsetName]err := r.subSetControls[subsetType].CreateSubset(ud, subsetName, revision, replicas, partition)if err != nil {if !errors.IsTimeout(err) {return fmt.Errorf("fail to create Subset (%s) %s: %s", subsetType, subsetName, err.Error())}}return nil})errs = append(errs, createdErr)}// manage deletingif len(deletes) > 0 {var deleteErrs []errorfor _, subsetName := range deletes {subset := (*nameToSubset)[subsetName]if err := r.subSetControls[subsetType].DeleteSubset(subset); err != nil {deleteErrs = append(deleteErrs, fmt.Errorf("fail to delete Subset (%s) %s/%s for %s: %s", subsetType, subset.Namespace, subset.Name, subsetName, err))}}if len(deleteErrs) > 0 {errs = append(errs, deleteErrs...)}}// clean the other kind of subsetscleaned := falsefor t, control := range r.subSetControls {if t == subsetType {continue}subsets, err := control.GetAllSubsets(ud, revision)if err != nil {errs = append(errs, fmt.Errorf("fail to list Subset of other type %s for UnitedDeployment %s/%s: %s", t, ud.Namespace, ud.Name, err))continue}for _, subset := range subsets {cleaned = trueif err := control.DeleteSubset(subset); err != nil {errs = append(errs, fmt.Errorf("fail to delete Subset %s of other type %s for UnitedDeployment %s/%s: %s", subset.Name, t, ud.Namespace, ud.Name, err))continue}}}return expectedSubsets.Intersection(gotSubsets), len(creates) > 0 || len(deletes) > 0 || cleaned, utilerrors.NewAggregate(errs)}

