官方文档:https://openkruise.io/zh-cn/docs/uniteddeployment.html UnitedDeployment是蚂蚁CafeDeployment开源版本

0、蚂蚁原生架构图

WX20201224-154127@2x.png
WX20201224-154136@2x.png

1、Spec定义

  1. type UnitedDeploymentSpec struct {
  2. // pod数量(所有可用区的所有pod数量)
  3. Replicas *int32 `json:"replicas,omitempty"`
  4. Selector *metav1.LabelSelector `json:"selector"`
  5. // subset模板(主要指定可用区workload期望信息)
  6. Template SubsetTemplate `json:"template,omitempty"`
  7. // 描述pod在每个subset之间的分布(pod的拓扑关系)
  8. Topology Topology `json:"topology,omitempty"`
  9. // 更新策略(当Template发生变化的时候采取的策略)
  10. UpdateStrategy UnitedDeploymentUpdateStrategy `json:"updateStrategy,omitempty"`
  11. // 历史版本缓存数量,默认为10
  12. RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
  13. }
  14. // subset的workload类型,可以为StatefulSet、AdvancedStatefulSet、CloneSet其中一种
  15. // 其中StatefulSet为k8s原生资源,另外两种为kruise自定义crd
  16. type SubsetTemplate struct {
  17. StatefulSetTemplate *StatefulSetTemplateSpec `json:"statefulSetTemplate,omitempty"`
  18. AdvancedStatefulSetTemplate *AdvancedStatefulSetTemplateSpec `json:"advancedStatefulSetTemplate,omitempty"`
  19. CloneSetTemplate *CloneSetTemplateSpec `json:"cloneSetTemplate,omitempty"`
  20. }
  21. // Subset支持的底层crd对象
  22. type StatefulSetTemplateSpec struct {
  23. metav1.ObjectMeta `json:"metadata,omitempty"`
  24. Spec appsv1.StatefulSetSpec `json:"spec"`
  25. }
  26. type AdvancedStatefulSetTemplateSpec struct {
  27. metav1.ObjectMeta `json:"metadata,omitempty"`
  28. Spec StatefulSetSpec `json:"spec"`
  29. }
  30. type CloneSetTemplateSpec struct {
  31. metav1.ObjectMeta `json:"metadata,omitempty"`
  32. Spec CloneSetSpec `json:"spec"`
  33. }
  34. // 更新策略
  35. type UnitedDeploymentUpdateStrategy struct {
  36. Type UpdateStrategyType `json:"type,omitempty"`
  37. ManualUpdate *ManualUpdate `json:"manualUpdate,omitempty"`
  38. }
  39. // Partitions的key为可用区,value为一个数值
  40. // 则对应可用区灰度发布的pod数量为count-value(创建workload的时候会传入)
  41. type ManualUpdate struct {
  42. Partitions map[string]int32 `json:"partitions,omitempty"`
  43. }
  44. // 描述pod分布(拓扑结构)
  45. type Topology struct {
  46. Subsets []Subset `json:"subsets,omitempty"`
  47. }
  48. // subset的详细信息
  49. type Subset struct {
  50. // 生成workload名称前缀,形如<deployment-name>-<subset-name>-
  51. // 在一个UnitedDeployment里面不能重复
  52. Name string `json:"name"`
  53. // subset选择哪个node,不可以更改
  54. // 通常会给不同可用区的node打上相应的标签
  55. // 对应蚂蚁架构图里面的datacenter=dc-a
  56. NodeSelectorTerm corev1.NodeSelectorTerm `json:"nodeSelectorTerm,omitempty"`
  57. // 容忍度,不可以更改
  58. Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
  59. // 每个子网创建的pod的数量
  60. // 可以为整数,也可以为百分比。比如若为10%,则整个UnitedDeployment有10%的pod需要分配到此subset
  61. // 若为空,则UnitedDeployment Controller将会平均分配pod到每个subset
  62. Replicas *intstr.IntOrString `json:"replicas,omitempty"`
  63. }

2、调谐函数

  1. func (r *ReconcileUnitedDeployment) Reconcile(request reconcile.Request) (reconcile.Result, error) {
  2. // fetch UnitedDeployment
  3. // 若不存在、发生异常、删除事件,直接返回
  4. instance := &appsv1alpha1.UnitedDeployment{}
  5. err := r.Get(context.TODO(), request.NamespacedName, instance)
  6. if err != nil {
  7. if errors.IsNotFound(err) {
  8. return reconcile.Result{}, nil
  9. }
  10. return reconcile.Result{}, err
  11. }
  12. if instance.DeletionTimestamp != nil {
  13. return reconcile.Result{}, nil
  14. }
  15. // 深copy当前状态
  16. oldStatus := instance.Status.DeepCopy()
  17. // 获取当前版本和要更新的版本
  18. // 若当前版本为空,则当前版本等于要更新的版本
  19. // 当Template发生改变之后,会生成一个新的ControllerRevision
  20. currentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance)
  21. if err != nil {
  22. return reconcile.Result{}, err
  23. }
  24. // 获取subset控制器接口
  25. control, subsetType := r.getSubsetControls(instance)
  26. // 设置期望的Revision名称
  27. expectedRevision := currentRevision.Name
  28. if updatedRevision != nil {
  29. expectedRevision = updatedRevision.Name
  30. }
  31. // 根据当前版本获取subset状态并根据subset name进行分组
  32. nameToSubset, err := r.getNameToSubset(instance, control, expectedRevision)
  33. if err != nil {
  34. return reconcile.Result{}, nil
  35. }
  36. // 分配下一次每个subset的副本数量
  37. nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance)
  38. if err != nil {
  39. return reconcile.Result{}, nil
  40. }
  41. // 计算下一个要灰度的机器旧版本保留数量
  42. nextPartitions := calcNextPartitions(instance, nextReplicas)
  43. // 根据nextReplicas,nextPartitions更新所有的subset
  44. newStatus, err := r.manageSubsets(instance, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType)
  45. // 更新UnitedDeployment状态
  46. return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control)
  47. }

3、revision工具函数

1、获取历史版本列表 2、清除过期版本,依赖于RevisionHistoryLimit参数 3、创建一个新的版本 4、若新版本和历史版本有一样的,则只需要更新旧版本的revision即可,无须再创建一个revision 5、若版本并未发生重复,则创建一个新版本

controller_revision名字形如:ud名称+“-”+hash值

WX20201225-161205@2x.png

  1. // 组织UnitedDeployment所有的版本信息
  2. func (r *ReconcileUnitedDeployment) constructUnitedDeploymentRevisions(ud *appsalphav1.UnitedDeployment) (*apps.ControllerRevision, *apps.ControllerRevision, *[]*apps.ControllerRevision, int32, error) {
  3. var currentRevision, updateRevision *apps.ControllerRevision
  4. revisions, err := r.controlledHistories(ud)
  5. if err != nil {
  6. if ud.Status.CollisionCount == nil {
  7. return currentRevision, updateRevision, nil, 0, err
  8. }
  9. return currentRevision, updateRevision, nil, *ud.Status.CollisionCount, err
  10. }
  11. history.SortControllerRevisions(revisions)
  12. cleanedRevision, err := r.cleanExpiredRevision(ud, &revisions)
  13. if err != nil {
  14. if ud.Status.CollisionCount == nil {
  15. return currentRevision, updateRevision, nil, 0, err
  16. }
  17. return currentRevision, updateRevision, nil, *ud.Status.CollisionCount, err
  18. }
  19. revisions = *cleanedRevision
  20. var collisionCount int32
  21. if ud.Status.CollisionCount != nil {
  22. collisionCount = *ud.Status.CollisionCount
  23. }
  24. updateRevision, err = r.newRevision(ud, nextRevision(revisions), &collisionCount)
  25. if err != nil {
  26. return nil, nil, nil, collisionCount, err
  27. }
  28. equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
  29. equalCount := len(equalRevisions)
  30. revisionCount := len(revisions)
  31. if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
  32. updateRevision = revisions[revisionCount-1]
  33. } else if equalCount > 0 {
  34. equalRevisions[equalCount-1].Revision = updateRevision.Revision
  35. err := r.Client.Update(context.TODO(), equalRevisions[equalCount-1])
  36. if err != nil {
  37. return nil, nil, nil, collisionCount, err
  38. }
  39. updateRevision = equalRevisions[equalCount-1]
  40. } else {
  41. updateRevision, err = r.createControllerRevision(ud, updateRevision, &collisionCount)
  42. if err != nil {
  43. return nil, nil, nil, collisionCount, err
  44. }
  45. }
  46. for i := range revisions {
  47. if revisions[i].Name == ud.Status.CurrentRevision {
  48. currentRevision = revisions[i]
  49. }
  50. }
  51. if currentRevision == nil {
  52. currentRevision = updateRevision
  53. }
  54. return currentRevision, updateRevision, &revisions, collisionCount, nil
  55. }
  56. // 获取所有的历史controller revision
  57. func (r *ReconcileUnitedDeployment) controlledHistories(ud *appsalphav1.UnitedDeployment) ([]*apps.ControllerRevision, error) {
  58. // 根据key=value进行过滤,但是不代表label就完全一样
  59. // 比如:
  60. // 1、app=test
  61. // 2、app=test;az=bj
  62. // 因此对于第二种情况,若controller ref相等,则也进行认领
  63. selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)
  64. if err != nil {
  65. return nil, err
  66. }
  67. histories := &apps.ControllerRevisionList{}
  68. err = r.Client.List(context.TODO(), histories, &client.ListOptions{LabelSelector: selector})
  69. if err != nil {
  70. return nil, err
  71. }
  72. cm, err := refmanager.New(r.Client, ud.Spec.Selector, ud, r.scheme)
  73. if err != nil {
  74. return nil, err
  75. }
  76. mts := make([]metav1.Object, len(histories.Items))
  77. for i, h := range histories.Items {
  78. mts[i] = h.DeepCopy()
  79. }
  80. claims, err := cm.ClaimOwnedObjects(mts)
  81. if err != nil {
  82. return nil, err
  83. }
  84. claimHistories := make([]*apps.ControllerRevision, len(claims))
  85. for i, mt := range claims {
  86. claimHistories[i] = mt.(*apps.ControllerRevision)
  87. }
  88. return claimHistories, nil
  89. }
  90. // 清除过期的revision
  91. // 并返回清除过过期版本的revisions
  92. func (r *ReconcileUnitedDeployment) cleanExpiredRevision(ud *appsalphav1.UnitedDeployment, sortedRevisions *[]*apps.ControllerRevision) (*[]*apps.ControllerRevision, error) {
  93. exceedNum := len(*sortedRevisions) - int(*ud.Spec.RevisionHistoryLimit)
  94. if exceedNum <= 0 {
  95. return sortedRevisions, nil
  96. }
  97. live := map[string]bool{}
  98. live[ud.Status.CurrentRevision] = true
  99. if ud.Status.UpdateStatus != nil {
  100. live[ud.Status.UpdateStatus.UpdatedRevision] = true
  101. }
  102. for i, revision := range *sortedRevisions {
  103. if _, exist := live[revision.Name]; exist {
  104. continue
  105. }
  106. if i >= exceedNum {
  107. break
  108. }
  109. if err := r.Client.Delete(context.TODO(), revision); err != nil {
  110. return sortedRevisions, err
  111. }
  112. }
  113. cleanedRevisions := (*sortedRevisions)[exceedNum:]
  114. return &cleanedRevisions, nil
  115. }
  116. // 新建一个controller revision
  117. func (r *ReconcileUnitedDeployment) createControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) {
  118. if collisionCount == nil {
  119. return nil, fmt.Errorf("collisionCount should not be nil")
  120. }
  121. clone := revision.DeepCopy()
  122. var err error
  123. for {
  124. hash := history.HashControllerRevision(revision, collisionCount)
  125. clone.Name = history.ControllerRevisionName(parent.GetName(), hash)
  126. err = r.Client.Create(context.TODO(), clone)
  127. if errors.IsAlreadyExists(err) {
  128. exists := &apps.ControllerRevision{}
  129. err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: parent.GetNamespace(), Name: clone.Name}, exists)
  130. if err != nil {
  131. return nil, err
  132. }
  133. if bytes.Equal(exists.Data.Raw, clone.Data.Raw) {
  134. return exists, nil
  135. }
  136. *collisionCount++
  137. continue
  138. }
  139. return clone, err
  140. }
  141. }
  142. // 初始化一个revision
  143. func (r *ReconcileUnitedDeployment) newRevision(ud *appsalphav1.UnitedDeployment, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
  144. patch, err := getUnitedDeploymentPatch(ud)
  145. if err != nil {
  146. return nil, err
  147. }
  148. gvk, err := apiutil.GVKForObject(ud, r.scheme)
  149. if err != nil {
  150. return nil, err
  151. }
  152. var selectedLabels map[string]string
  153. if ud.Spec.Template.StatefulSetTemplate != nil {
  154. selectedLabels = ud.Spec.Template.StatefulSetTemplate.Labels
  155. } else if ud.Spec.Template.AdvancedStatefulSetTemplate != nil {
  156. selectedLabels = ud.Spec.Template.AdvancedStatefulSetTemplate.Labels
  157. }
  158. cr, err := history.NewControllerRevision(ud,
  159. gvk,
  160. selectedLabels,
  161. runtime.RawExtension{Raw: patch},
  162. revision,
  163. collisionCount)
  164. if err != nil {
  165. return nil, err
  166. }
  167. cr.Namespace = ud.Namespace
  168. return cr, nil
  169. }
  170. // 获取下一个版本号,最新的版本号+1
  171. func nextRevision(revisions []*apps.ControllerRevision) int64 {
  172. count := len(revisions)
  173. if count <= 0 {
  174. return 1
  175. }
  176. return revisions[count-1].Revision + 1
  177. }
  178. // 获取需要Patch的内容
  179. func getUnitedDeploymentPatch(ud *appsalphav1.UnitedDeployment) ([]byte, error) {
  180. dsBytes, err := json.Marshal(ud)
  181. if err != nil {
  182. return nil, err
  183. }
  184. var raw map[string]interface{}
  185. err = json.Unmarshal(dsBytes, &raw)
  186. if err != nil {
  187. return nil, err
  188. }
  189. objCopy := make(map[string]interface{})
  190. specCopy := make(map[string]interface{})
  191. // Create a patch of the UnitedDeployment that replaces spec.template
  192. spec := raw["spec"].(map[string]interface{})
  193. template := spec["template"].(map[string]interface{})
  194. specCopy["template"] = template
  195. template["$patch"] = "replace"
  196. objCopy["spec"] = specCopy
  197. patch, err := json.Marshal(objCopy)
  198. return patch, err
  199. }

WX20201225-154536@2x.png

4、subset工具函数

ud相关subset工具函数

  1. func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) {
  2. subSets, err := control.GetAllSubsets(instance, expectedRevision)
  3. if err != nil {
  4. return nil, fmt.Errorf("fail to get all Subsets for UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
  5. }
  6. nameToSubsets := r.classifySubsetBySubsetName(instance, subSets)
  7. nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control)
  8. if err != nil {
  9. return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
  10. }
  11. return nameToSubset, nil
  12. }
  13. // 按照Subset Name进行分组
  14. // 之所以是切片,应该是考虑有可能有Subset删除失败的场景
  15. func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(_ *appsv1alpha1.UnitedDeployment, subsets []*Subset) map[string][]*Subset {
  16. mapping := map[string][]*Subset{}
  17. for _, ss := range subsets {
  18. subSetName, err := getSubsetNameFrom(ss)
  19. if err != nil {
  20. continue
  21. }
  22. mapping[subSetName] = append(mapping[subSetName], ss)
  23. }
  24. return mapping
  25. }
  26. // Subset去重
  27. func (r *ReconcileUnitedDeployment) deleteDupSubset(_ *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) {
  28. nameToSubset := map[string]*Subset{}
  29. for name, subsets := range nameToSubsets {
  30. if len(subsets) > 1 {
  31. for _, subset := range subsets[1:] {
  32. if err := control.DeleteSubset(subset); err != nil {
  33. if errors.IsNotFound(err) {
  34. continue
  35. }
  36. return &nameToSubset, err
  37. }
  38. }
  39. }
  40. if len(subsets) > 0 {
  41. nameToSubset[name] = subsets[0]
  42. }
  43. }
  44. return &nameToSubset, nil
  45. }

subset control interface

  1. // 获取所有的Subset,并进行Subset认领
  2. func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) {
  3. selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)
  4. if err != nil {
  5. return nil, err
  6. }
  7. setList := m.adapter.NewResourceListObject()
  8. err = m.Client.List(context.TODO(), setList, &client.ListOptions{LabelSelector: selector})
  9. if err != nil {
  10. return nil, err
  11. }
  12. manager, err := refmanager.New(m.Client, ud.Spec.Selector, ud, m.scheme)
  13. if err != nil {
  14. return nil, err
  15. }
  16. v := reflect.ValueOf(setList).Elem().FieldByName("Items")
  17. selected := make([]metav1.Object, v.Len())
  18. for i := 0; i < v.Len(); i++ {
  19. selected[i] = v.Index(i).Addr().Interface().(metav1.Object)
  20. }
  21. claimedSets, err := manager.ClaimOwnedObjects(selected)
  22. if err != nil {
  23. return nil, err
  24. }
  25. for _, claimedSet := range claimedSets {
  26. subSet, err := m.convertToSubset(claimedSet, updatedRevision)
  27. if err != nil {
  28. return nil, err
  29. }
  30. subSets = append(subSets, subSet)
  31. }
  32. return subSets, nil
  33. }
  34. func (m *SubsetControl) CreateSubset(ud *alpha1.UnitedDeployment, subsetName string, revision string, replicas, partition int32) error {
  35. set := m.adapter.NewResourceObject()
  36. if err := m.adapter.ApplySubsetTemplate(ud, subsetName, revision, replicas, partition, set); err != nil {
  37. return err
  38. }
  39. return m.Create(context.TODO(), set)
  40. }
  41. func (m *SubsetControl) UpdateSubset(subset *Subset, ud *alpha1.UnitedDeployment, revision string, replicas, partition int32) error {
  42. set := m.adapter.NewResourceObject()
  43. var updateError error
  44. for i := 0; i < updateRetries; i++ {
  45. getError := m.Client.Get(context.TODO(), m.objectKey(&subset.ObjectMeta), set)
  46. if getError != nil {
  47. return getError
  48. }
  49. if err := m.adapter.ApplySubsetTemplate(ud, subset.Spec.SubsetName, revision, replicas, partition, set); err != nil {
  50. return err
  51. }
  52. updateError = m.Client.Update(context.TODO(), set)
  53. if updateError == nil {
  54. break
  55. }
  56. }
  57. if updateError != nil {
  58. return updateError
  59. }
  60. return m.adapter.PostUpdate(ud, set, revision, partition)
  61. }
  62. func (m *SubsetControl) DeleteSubset(subSet *Subset) error {
  63. set := subSet.Spec.SubsetRef.Resources[0].(runtime.Object)
  64. return m.Delete(context.TODO(), set, client.PropagationPolicy(metav1.DeletePropagationBackground))
  65. }
  66. func (m *SubsetControl) GetSubsetFailure(subset *Subset) *string {
  67. return m.adapter.GetSubsetFailure()
  68. }
  69. func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool {
  70. return m.adapter.IsExpected(subSet.Spec.SubsetRef.Resources[0], revision)
  71. }
  72. // 转化obj为Subset
  73. func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) {
  74. subSetName, err := getSubsetNameFrom(set)
  75. if err != nil {
  76. return nil, err
  77. }
  78. subset := &Subset{}
  79. subset.ObjectMeta = metav1.ObjectMeta{
  80. Name: set.GetName(),
  81. GenerateName: set.GetGenerateName(),
  82. Namespace: set.GetNamespace(),
  83. SelfLink: set.GetSelfLink(),
  84. UID: set.GetUID(),
  85. ResourceVersion: set.GetResourceVersion(),
  86. Generation: set.GetGeneration(),
  87. CreationTimestamp: set.GetCreationTimestamp(),
  88. DeletionTimestamp: set.GetDeletionTimestamp(),
  89. DeletionGracePeriodSeconds: set.GetDeletionGracePeriodSeconds(),
  90. Labels: set.GetLabels(),
  91. Annotations: set.GetAnnotations(),
  92. OwnerReferences: set.GetOwnerReferences(),
  93. Finalizers: set.GetFinalizers(),
  94. ClusterName: set.GetClusterName(),
  95. }
  96. subset.Spec.SubsetName = subSetName
  97. specReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision)
  98. if err != nil {
  99. return subset, err
  100. }
  101. if specReplicas != nil {
  102. subset.Spec.Replicas = *specReplicas
  103. }
  104. if specPartition != nil {
  105. subset.Spec.UpdateStrategy.Partition = *specPartition
  106. }
  107. subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set)
  108. subset.Status.Replicas = statusReplicas
  109. subset.Status.ReadyReplicas = statusReadyReplicas
  110. subset.Status.UpdatedReplicas = statusUpdatedReplicas
  111. subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas
  112. subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)
  113. return subset, nil
  114. }
  115. func (m *SubsetControl) objectKey(objMeta *metav1.ObjectMeta) client.ObjectKey {
  116. return types.NamespacedName{
  117. Namespace: objMeta.Namespace,
  118. Name: objMeta.Name,
  119. }
  120. }

5、adapter工具函数(此处以CloneSet为例)

  1. type CloneSetAdapter struct {
  2. client.Client
  3. Scheme *runtime.Scheme
  4. }
  5. func (a *CloneSetAdapter) NewResourceObject() runtime.Object {
  6. return &alpha1.CloneSet{}
  7. }
  8. func (a *CloneSetAdapter) NewResourceListObject() runtime.Object {
  9. return &alpha1.CloneSetList{}
  10. }
  11. func (a *CloneSetAdapter) GetObjectMeta(obj metav1.Object) *metav1.ObjectMeta {
  12. return &obj.(*alpha1.CloneSet).ObjectMeta
  13. }
  14. func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
  15. return obj.(*alpha1.CloneSet).Status.ObservedGeneration
  16. }
  17. func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
  18. set := obj.(*alpha1.CloneSet)
  19. var pods []*corev1.Pod
  20. pods, err = a.getCloneSetPods(set)
  21. if err != nil {
  22. return
  23. }
  24. specReplicas = set.Spec.Replicas
  25. if set.Spec.UpdateStrategy.Partition != nil {
  26. specPartition = set.Spec.UpdateStrategy.Partition
  27. }
  28. statusReplicas = set.Status.Replicas
  29. statusReadyReplicas = set.Status.ReadyReplicas
  30. statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
  31. return
  32. }
  33. func (a *CloneSetAdapter) GetSubsetFailure() *string {
  34. return nil
  35. }
  36. func (a *CloneSetAdapter) ApplySubsetTemplate(ud *alpha1.UnitedDeployment, subsetName, revision string, replicas, partition int32, obj runtime.Object) error {
  37. set := obj.(*alpha1.CloneSet)
  38. var subSetConfig *alpha1.Subset
  39. for _, subset := range ud.Spec.Topology.Subsets {
  40. if subset.Name == subsetName {
  41. subSetConfig = &subset
  42. break
  43. }
  44. }
  45. if subSetConfig == nil {
  46. return fmt.Errorf("fail to find subset config %s", subsetName)
  47. }
  48. set.Namespace = ud.Namespace
  49. if set.Labels == nil {
  50. set.Labels = map[string]string{}
  51. }
  52. for k, v := range ud.Spec.Template.CloneSetTemplate.Labels {
  53. set.Labels[k] = v
  54. }
  55. for k, v := range ud.Spec.Selector.MatchLabels {
  56. set.Labels[k] = v
  57. }
  58. set.Labels[alpha1.ControllerRevisionHashLabelKey] = revision
  59. // record the subset name as a label
  60. set.Labels[alpha1.SubSetNameLabelKey] = subsetName
  61. if set.Annotations == nil {
  62. set.Annotations = map[string]string{}
  63. }
  64. for k, v := range ud.Spec.Template.CloneSetTemplate.Annotations {
  65. set.Annotations[k] = v
  66. }
  67. set.GenerateName = getSubsetPrefix(ud.Name, subsetName)
  68. selectors := ud.Spec.Selector.DeepCopy()
  69. selectors.MatchLabels[alpha1.SubSetNameLabelKey] = subsetName
  70. if err := controllerutil.SetControllerReference(ud, set, a.Scheme); err != nil {
  71. return err
  72. }
  73. set.Spec.Selector = selectors
  74. set.Spec.Replicas = &replicas
  75. set.Spec.UpdateStrategy = ud.Spec.Template.CloneSetTemplate.Spec.UpdateStrategy
  76. set.Spec.UpdateStrategy.Partition = &partition
  77. set.Spec.Template = *ud.Spec.Template.CloneSetTemplate.Spec.Template.DeepCopy()
  78. if set.Spec.Template.Labels == nil {
  79. set.Spec.Template.Labels = map[string]string{}
  80. }
  81. set.Spec.Template.Labels[alpha1.SubSetNameLabelKey] = subsetName
  82. set.Spec.Template.Labels[alpha1.ControllerRevisionHashLabelKey] = revision
  83. set.Spec.RevisionHistoryLimit = ud.Spec.Template.CloneSetTemplate.Spec.RevisionHistoryLimit
  84. set.Spec.VolumeClaimTemplates = ud.Spec.Template.CloneSetTemplate.Spec.VolumeClaimTemplates
  85. attachNodeAffinity(&set.Spec.Template.Spec, subSetConfig)
  86. attachTolerations(&set.Spec.Template.Spec, subSetConfig)
  87. return nil
  88. }
  89. func (a *CloneSetAdapter) PostUpdate(ud *alpha1.UnitedDeployment, obj runtime.Object, revision string, partition int32) error {
  90. return nil
  91. }
  92. func (a *CloneSetAdapter) IsExpected(obj metav1.Object, revision string) bool {
  93. return obj.GetLabels()[appsv1.ControllerRevisionHashLabelKey] != revision
  94. }
  95. func (a *CloneSetAdapter) getCloneSetPods(set *alpha1.CloneSet) ([]*corev1.Pod, error) {
  96. selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  97. if err != nil {
  98. return nil, err
  99. }
  100. podList := &corev1.PodList{}
  101. err = a.Client.List(context.TODO(), podList, &client.ListOptions{LabelSelector: selector})
  102. if err != nil {
  103. return nil, err
  104. }
  105. manager, err := refmanager.New(a.Client, set.Spec.Selector, set, a.Scheme)
  106. if err != nil {
  107. return nil, err
  108. }
  109. selected := make([]metav1.Object, len(podList.Items))
  110. for i, pod := range podList.Items {
  111. selected[i] = pod.DeepCopy()
  112. }
  113. claimed, err := manager.ClaimOwnedObjects(selected)
  114. if err != nil {
  115. return nil, err
  116. }
  117. claimedPods := make([]*corev1.Pod, len(claimed))
  118. for i, pod := range claimed {
  119. claimedPods[i] = pod.(*corev1.Pod)
  120. }
  121. return claimedPods, nil
  122. }

6、allocator工具函数

根据拓扑给每个subset分配副本数量

  1. func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) {
  2. subsetInfos := getSubsetInfos(nameToSubset, ud)
  3. specifiedReplicas := getSpecifiedSubsetReplicas(ud)
  4. return subsetInfos.SortToAllocator().AllocateReplicas(*ud.Spec.Replicas, specifiedReplicas)
  5. }
  6. type nameToReplicas struct {
  7. SubsetName string
  8. Replicas int32
  9. Specified bool
  10. }
  11. type subsetInfos []*nameToReplicas
  12. func (n subsetInfos) Get(i int) *nameToReplicas {
  13. return []*nameToReplicas(n)[i]
  14. }
  15. func (n subsetInfos) Len() int {
  16. return len(n)
  17. }
  18. func (n subsetInfos) Less(i, j int) bool {
  19. if n[i].Replicas != n[j].Replicas {
  20. return n[i].Replicas < n[j].Replicas
  21. }
  22. return strings.Compare(n[i].SubsetName, n[j].SubsetName) < 0
  23. }
  24. func (n subsetInfos) Swap(i, j int) {
  25. n[i], n[j] = n[j], n[i]
  26. }
  27. func (n subsetInfos) SortToAllocator() *replicasAllocator {
  28. sort.Sort(n)
  29. return &replicasAllocator{subsets: &n}
  30. }
  31. type replicasAllocator struct {
  32. subsets *subsetInfos
  33. }
  34. func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (
  35. *map[string]int32, error) {
  36. if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil {
  37. return nil, err
  38. }
  39. return s.normalAllocate(replicas, specifiedSubsetReplicas), nil
  40. }
  41. func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {
  42. if subsetReplicasLimits == nil {
  43. return nil
  44. }
  45. var specifiedReplicas int32
  46. for _, replicas := range *subsetReplicasLimits {
  47. specifiedReplicas += replicas
  48. }
  49. if specifiedReplicas > replicas {
  50. return fmt.Errorf("specified subsets' replica (%d) is greater than UnitedDeployment replica (%d)",
  51. specifiedReplicas, replicas)
  52. } else if specifiedReplicas < replicas {
  53. specifiedCount := 0
  54. for _, subset := range *s.subsets {
  55. if _, exist := (*subsetReplicasLimits)[subset.SubsetName]; exist {
  56. specifiedCount++
  57. }
  58. }
  59. if specifiedCount == len(*s.subsets) {
  60. return fmt.Errorf("specified subsets' replica (%d) is less than UnitedDeployment replica (%d)",
  61. specifiedReplicas, replicas)
  62. }
  63. }
  64. return nil
  65. }
  66. func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
  67. var specifiedReplicas int32
  68. specifiedSubsetCount := 0
  69. // Step 1: apply replicas to specified subsets, and mark them as specified = true.
  70. for _, subset := range *s.subsets {
  71. if replicas, exist := (*specifiedSubsetReplicas)[subset.SubsetName]; exist {
  72. specifiedReplicas += replicas
  73. subset.Replicas = replicas
  74. subset.Specified = true
  75. specifiedSubsetCount++
  76. }
  77. }
  78. // Step 2: averagely allocate the rest replicas to left unspecified subsets.
  79. leftSubsetCount := len(*s.subsets) - specifiedSubsetCount
  80. if leftSubsetCount != 0 {
  81. allocatableReplicas := expectedReplicas - specifiedReplicas
  82. average := int(allocatableReplicas) / leftSubsetCount
  83. remainder := int(allocatableReplicas) % leftSubsetCount
  84. for i := len(*s.subsets) - 1; i >= 0; i-- {
  85. subset := (*s.subsets)[i]
  86. if subset.Specified {
  87. continue
  88. }
  89. if remainder > 0 {
  90. subset.Replicas = int32(average + 1)
  91. remainder--
  92. } else {
  93. subset.Replicas = int32(average)
  94. }
  95. leftSubsetCount--
  96. if leftSubsetCount == 0 {
  97. break
  98. }
  99. }
  100. }
  101. return s.toSubsetReplicaMap()
  102. }
  103. func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
  104. allocatedReplicas := map[string]int32{}
  105. for _, subset := range *s.subsets {
  106. allocatedReplicas[subset.SubsetName] = subset.Replicas
  107. }
  108. return &allocatedReplicas
  109. }
  110. func (s *replicasAllocator) String() string {
  111. result := ""
  112. sort.Sort(s.subsets)
  113. for _, subset := range *s.subsets {
  114. result = fmt.Sprintf("%s %s -> %d;", result, subset.SubsetName, subset.Replicas)
  115. }
  116. return result
  117. }
  118. func getSpecifiedSubsetReplicas(ud *appsv1alpha1.UnitedDeployment) *map[string]int32 {
  119. replicaLimits := map[string]int32{}
  120. if ud.Spec.Topology.Subsets == nil {
  121. return &replicaLimits
  122. }
  123. for _, subsetDef := range ud.Spec.Topology.Subsets {
  124. if subsetDef.Replicas == nil {
  125. continue
  126. }
  127. if specifiedReplicas, err := ParseSubsetReplicas(*ud.Spec.Replicas, *subsetDef.Replicas); err == nil {
  128. replicaLimits[subsetDef.Name] = specifiedReplicas
  129. }
  130. }
  131. return &replicaLimits
  132. }
  133. func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) *subsetInfos {
  134. infos := make(subsetInfos, len(ud.Spec.Topology.Subsets))
  135. for idx, subsetDef := range ud.Spec.Topology.Subsets {
  136. var replicas int32
  137. if subset, exist := (*nameToSubset)[subsetDef.Name]; exist {
  138. replicas = subset.Spec.Replicas
  139. }
  140. infos[idx] = &nameToReplicas{SubsetName: subsetDef.Name, Replicas: replicas}
  141. }
  142. return &infos
  143. }

7、计算状态工具函数

  1. func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (newStatus *appsv1alpha1.UnitedDeploymentStatus, updateErr error) {
  2. newStatus = ud.Status.DeepCopy()
  3. exists, provisioned, err := r.manageSubsetProvision(ud, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType)
  4. if err != nil {
  5. SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionFalse, "Error", err.Error()))
  6. return newStatus, fmt.Errorf("fail to manage Subset provision: %s", err)
  7. }
  8. if provisioned {
  9. SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionTrue, "", ""))
  10. }
  11. expectedRevision := currentRevision
  12. if updatedRevision != nil {
  13. expectedRevision = updatedRevision
  14. }
  15. var needUpdate []string
  16. for _, name := range exists.List() {
  17. subset := (*nameToSubset)[name]
  18. if r.subSetControls[subsetType].IsExpected(subset, expectedRevision.Name) ||
  19. subset.Spec.Replicas != (*nextReplicas)[name] ||
  20. subset.Spec.UpdateStrategy.Partition != (*nextPartitions)[name] {
  21. needUpdate = append(needUpdate, name)
  22. }
  23. }
  24. if len(needUpdate) > 0 {
  25. _, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error {
  26. cell := needUpdate[index]
  27. subset := (*nameToSubset)[cell]
  28. replicas := (*nextReplicas)[cell]
  29. partition := (*nextPartitions)[cell]
  30. updateSubsetErr := r.subSetControls[subsetType].UpdateSubset(subset, ud, expectedRevision.Name, replicas, partition)
  31. return updateSubsetErr
  32. })
  33. }
  34. if updateErr == nil {
  35. SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", ""))
  36. } else {
  37. SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error()))
  38. }
  39. return
  40. }
  41. func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (sets.String, bool, error) {
  42. expectedSubsets := sets.String{}
  43. gotSubsets := sets.String{}
  44. for _, subset := range ud.Spec.Topology.Subsets {
  45. expectedSubsets.Insert(subset.Name)
  46. }
  47. for subsetName := range *nameToSubset {
  48. gotSubsets.Insert(subsetName)
  49. }
  50. var creates []string
  51. for _, expectSubset := range expectedSubsets.List() {
  52. if gotSubsets.Has(expectSubset) {
  53. continue
  54. }
  55. creates = append(creates, expectSubset)
  56. }
  57. var deletes []string
  58. for _, gotSubset := range gotSubsets.List() {
  59. if expectedSubsets.Has(gotSubset) {
  60. continue
  61. }
  62. deletes = append(deletes, gotSubset)
  63. }
  64. revision := currentRevision.Name
  65. if updatedRevision != nil {
  66. revision = updatedRevision.Name
  67. }
  68. var errs []error
  69. // manage creating
  70. if len(creates) > 0 {
  71. createdSubsets := make([]string, len(creates))
  72. for i, subset := range creates {
  73. createdSubsets[i] = subset
  74. }
  75. var createdErr error
  76. _, createdErr = util.SlowStartBatch(len(creates), slowStartInitialBatchSize, func(idx int) error {
  77. subsetName := createdSubsets[idx]
  78. replicas := (*nextReplicas)[subsetName]
  79. partition := (*nextPartitions)[subsetName]
  80. err := r.subSetControls[subsetType].CreateSubset(ud, subsetName, revision, replicas, partition)
  81. if err != nil {
  82. if !errors.IsTimeout(err) {
  83. return fmt.Errorf("fail to create Subset (%s) %s: %s", subsetType, subsetName, err.Error())
  84. }
  85. }
  86. return nil
  87. })
  88. errs = append(errs, createdErr)
  89. }
  90. // manage deleting
  91. if len(deletes) > 0 {
  92. var deleteErrs []error
  93. for _, subsetName := range deletes {
  94. subset := (*nameToSubset)[subsetName]
  95. if err := r.subSetControls[subsetType].DeleteSubset(subset); err != nil {
  96. deleteErrs = append(deleteErrs, fmt.Errorf("fail to delete Subset (%s) %s/%s for %s: %s", subsetType, subset.Namespace, subset.Name, subsetName, err))
  97. }
  98. }
  99. if len(deleteErrs) > 0 {
  100. errs = append(errs, deleteErrs...)
  101. }
  102. }
  103. // clean the other kind of subsets
  104. cleaned := false
  105. for t, control := range r.subSetControls {
  106. if t == subsetType {
  107. continue
  108. }
  109. subsets, err := control.GetAllSubsets(ud, revision)
  110. if err != nil {
  111. errs = append(errs, fmt.Errorf("fail to list Subset of other type %s for UnitedDeployment %s/%s: %s", t, ud.Namespace, ud.Name, err))
  112. continue
  113. }
  114. for _, subset := range subsets {
  115. cleaned = true
  116. if err := control.DeleteSubset(subset); err != nil {
  117. errs = append(errs, fmt.Errorf("fail to delete Subset %s of other type %s for UnitedDeployment %s/%s: %s", subset.Name, t, ud.Namespace, ud.Name, err))
  118. continue
  119. }
  120. }
  121. }
  122. return expectedSubsets.Intersection(gotSubsets), len(creates) > 0 || len(deletes) > 0 || cleaned, utilerrors.NewAggregate(errs)
  123. }

8、整体逻辑