官方文档:https://openkruise.io/zh-cn/docs/uniteddeployment.html UnitedDeployment是蚂蚁CafeDeployment开源版本
0、蚂蚁原生架构图
1、Spec定义
type UnitedDeploymentSpec struct {
// pod数量(所有可用区的所有pod数量)
Replicas *int32 `json:"replicas,omitempty"`
Selector *metav1.LabelSelector `json:"selector"`
// subset模板(主要指定可用区workload期望信息)
Template SubsetTemplate `json:"template,omitempty"`
// 描述pod在每个subset之间的分布(pod的拓扑关系)
Topology Topology `json:"topology,omitempty"`
// 更新策略(当Template发生变化的时候采取的策略)
UpdateStrategy UnitedDeploymentUpdateStrategy `json:"updateStrategy,omitempty"`
// 历史版本缓存数量,默认为10
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
}
// subset的workload类型,可以为StatefulSet、AdvancedStatefulSet、CloneSet其中一种
// 其中StatefulSet为k8s原生资源,另外两种为kruise自定义crd
type SubsetTemplate struct {
StatefulSetTemplate *StatefulSetTemplateSpec `json:"statefulSetTemplate,omitempty"`
AdvancedStatefulSetTemplate *AdvancedStatefulSetTemplateSpec `json:"advancedStatefulSetTemplate,omitempty"`
CloneSetTemplate *CloneSetTemplateSpec `json:"cloneSetTemplate,omitempty"`
}
// Subset支持的底层crd对象
type StatefulSetTemplateSpec struct {
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec appsv1.StatefulSetSpec `json:"spec"`
}
type AdvancedStatefulSetTemplateSpec struct {
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec StatefulSetSpec `json:"spec"`
}
type CloneSetTemplateSpec struct {
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CloneSetSpec `json:"spec"`
}
// 更新策略
type UnitedDeploymentUpdateStrategy struct {
Type UpdateStrategyType `json:"type,omitempty"`
ManualUpdate *ManualUpdate `json:"manualUpdate,omitempty"`
}
// Partitions的key为可用区,value为一个数值
// 则对应可用区灰度发布的pod数量为count-value(创建workload的时候会传入)
type ManualUpdate struct {
Partitions map[string]int32 `json:"partitions,omitempty"`
}
// 描述pod分布(拓扑结构)
type Topology struct {
Subsets []Subset `json:"subsets,omitempty"`
}
// subset的详细信息
type Subset struct {
// 生成workload名称前缀,形如<deployment-name>-<subset-name>-
// 在一个UnitedDeployment里面不能重复
Name string `json:"name"`
// subset选择哪个node,不可以更改
// 通常会给不同可用区的node打上相应的标签
// 对应蚂蚁架构图里面的datacenter=dc-a
NodeSelectorTerm corev1.NodeSelectorTerm `json:"nodeSelectorTerm,omitempty"`
// 容忍度,不可以更改
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
// 每个子网创建的pod的数量
// 可以为整数,也可以为百分比。比如若为10%,则整个UnitedDeployment有10%的pod需要分配到此subset
// 若为空,则UnitedDeployment Controller将会平均分配pod到每个subset
Replicas *intstr.IntOrString `json:"replicas,omitempty"`
}
2、调谐函数
func (r *ReconcileUnitedDeployment) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// fetch UnitedDeployment
// 若不存在、发生异常、删除事件,直接返回
instance := &appsv1alpha1.UnitedDeployment{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if instance.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
// 深copy当前状态
oldStatus := instance.Status.DeepCopy()
// 获取当前版本和要更新的版本
// 若当前版本为空,则当前版本等于要更新的版本
// 当Template发生改变之后,会生成一个新的ControllerRevision
currentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance)
if err != nil {
return reconcile.Result{}, err
}
// 获取subset控制器接口
control, subsetType := r.getSubsetControls(instance)
// 设置期望的Revision名称
expectedRevision := currentRevision.Name
if updatedRevision != nil {
expectedRevision = updatedRevision.Name
}
// 根据当前版本获取subset状态并根据subset name进行分组
nameToSubset, err := r.getNameToSubset(instance, control, expectedRevision)
if err != nil {
return reconcile.Result{}, nil
}
// 分配下一次每个subset的副本数量
nextReplicas, err := GetAllocatedReplicas(nameToSubset, instance)
if err != nil {
return reconcile.Result{}, nil
}
// 计算下一个要灰度的机器旧版本保留数量
nextPartitions := calcNextPartitions(instance, nextReplicas)
// 根据nextReplicas,nextPartitions更新所有的subset
newStatus, err := r.manageSubsets(instance, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType)
// 更新UnitedDeployment状态
return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control)
}
3、revision工具函数
1、获取历史版本列表 2、清除过期版本,依赖于RevisionHistoryLimit参数 3、创建一个新的版本 4、若新版本和历史版本有一样的,则只需要更新旧版本的revision即可,无须再创建一个revision 5、若版本并未发生重复,则创建一个新版本
controller_revision名字形如:ud名称+“-”+hash值
// 组织UnitedDeployment所有的版本信息
func (r *ReconcileUnitedDeployment) constructUnitedDeploymentRevisions(ud *appsalphav1.UnitedDeployment) (*apps.ControllerRevision, *apps.ControllerRevision, *[]*apps.ControllerRevision, int32, error) {
var currentRevision, updateRevision *apps.ControllerRevision
revisions, err := r.controlledHistories(ud)
if err != nil {
if ud.Status.CollisionCount == nil {
return currentRevision, updateRevision, nil, 0, err
}
return currentRevision, updateRevision, nil, *ud.Status.CollisionCount, err
}
history.SortControllerRevisions(revisions)
cleanedRevision, err := r.cleanExpiredRevision(ud, &revisions)
if err != nil {
if ud.Status.CollisionCount == nil {
return currentRevision, updateRevision, nil, 0, err
}
return currentRevision, updateRevision, nil, *ud.Status.CollisionCount, err
}
revisions = *cleanedRevision
var collisionCount int32
if ud.Status.CollisionCount != nil {
collisionCount = *ud.Status.CollisionCount
}
updateRevision, err = r.newRevision(ud, nextRevision(revisions), &collisionCount)
if err != nil {
return nil, nil, nil, collisionCount, err
}
equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
equalCount := len(equalRevisions)
revisionCount := len(revisions)
if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
updateRevision = revisions[revisionCount-1]
} else if equalCount > 0 {
equalRevisions[equalCount-1].Revision = updateRevision.Revision
err := r.Client.Update(context.TODO(), equalRevisions[equalCount-1])
if err != nil {
return nil, nil, nil, collisionCount, err
}
updateRevision = equalRevisions[equalCount-1]
} else {
updateRevision, err = r.createControllerRevision(ud, updateRevision, &collisionCount)
if err != nil {
return nil, nil, nil, collisionCount, err
}
}
for i := range revisions {
if revisions[i].Name == ud.Status.CurrentRevision {
currentRevision = revisions[i]
}
}
if currentRevision == nil {
currentRevision = updateRevision
}
return currentRevision, updateRevision, &revisions, collisionCount, nil
}
// 获取所有的历史controller revision
func (r *ReconcileUnitedDeployment) controlledHistories(ud *appsalphav1.UnitedDeployment) ([]*apps.ControllerRevision, error) {
// 根据key=value进行过滤,但是不代表label就完全一样
// 比如:
// 1、app=test
// 2、app=test;az=bj
// 因此对于第二种情况,若controller ref相等,则也进行认领
selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)
if err != nil {
return nil, err
}
histories := &apps.ControllerRevisionList{}
err = r.Client.List(context.TODO(), histories, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
cm, err := refmanager.New(r.Client, ud.Spec.Selector, ud, r.scheme)
if err != nil {
return nil, err
}
mts := make([]metav1.Object, len(histories.Items))
for i, h := range histories.Items {
mts[i] = h.DeepCopy()
}
claims, err := cm.ClaimOwnedObjects(mts)
if err != nil {
return nil, err
}
claimHistories := make([]*apps.ControllerRevision, len(claims))
for i, mt := range claims {
claimHistories[i] = mt.(*apps.ControllerRevision)
}
return claimHistories, nil
}
// 清除过期的revision
// 并返回清除过过期版本的revisions
func (r *ReconcileUnitedDeployment) cleanExpiredRevision(ud *appsalphav1.UnitedDeployment, sortedRevisions *[]*apps.ControllerRevision) (*[]*apps.ControllerRevision, error) {
exceedNum := len(*sortedRevisions) - int(*ud.Spec.RevisionHistoryLimit)
if exceedNum <= 0 {
return sortedRevisions, nil
}
live := map[string]bool{}
live[ud.Status.CurrentRevision] = true
if ud.Status.UpdateStatus != nil {
live[ud.Status.UpdateStatus.UpdatedRevision] = true
}
for i, revision := range *sortedRevisions {
if _, exist := live[revision.Name]; exist {
continue
}
if i >= exceedNum {
break
}
if err := r.Client.Delete(context.TODO(), revision); err != nil {
return sortedRevisions, err
}
}
cleanedRevisions := (*sortedRevisions)[exceedNum:]
return &cleanedRevisions, nil
}
// 新建一个controller revision
func (r *ReconcileUnitedDeployment) createControllerRevision(parent metav1.Object, revision *apps.ControllerRevision, collisionCount *int32) (*apps.ControllerRevision, error) {
if collisionCount == nil {
return nil, fmt.Errorf("collisionCount should not be nil")
}
clone := revision.DeepCopy()
var err error
for {
hash := history.HashControllerRevision(revision, collisionCount)
clone.Name = history.ControllerRevisionName(parent.GetName(), hash)
err = r.Client.Create(context.TODO(), clone)
if errors.IsAlreadyExists(err) {
exists := &apps.ControllerRevision{}
err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: parent.GetNamespace(), Name: clone.Name}, exists)
if err != nil {
return nil, err
}
if bytes.Equal(exists.Data.Raw, clone.Data.Raw) {
return exists, nil
}
*collisionCount++
continue
}
return clone, err
}
}
// 初始化一个revision
func (r *ReconcileUnitedDeployment) newRevision(ud *appsalphav1.UnitedDeployment, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
patch, err := getUnitedDeploymentPatch(ud)
if err != nil {
return nil, err
}
gvk, err := apiutil.GVKForObject(ud, r.scheme)
if err != nil {
return nil, err
}
var selectedLabels map[string]string
if ud.Spec.Template.StatefulSetTemplate != nil {
selectedLabels = ud.Spec.Template.StatefulSetTemplate.Labels
} else if ud.Spec.Template.AdvancedStatefulSetTemplate != nil {
selectedLabels = ud.Spec.Template.AdvancedStatefulSetTemplate.Labels
}
cr, err := history.NewControllerRevision(ud,
gvk,
selectedLabels,
runtime.RawExtension{Raw: patch},
revision,
collisionCount)
if err != nil {
return nil, err
}
cr.Namespace = ud.Namespace
return cr, nil
}
// 获取下一个版本号,最新的版本号+1
func nextRevision(revisions []*apps.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}
// 获取需要Patch的内容
func getUnitedDeploymentPatch(ud *appsalphav1.UnitedDeployment) ([]byte, error) {
dsBytes, err := json.Marshal(ud)
if err != nil {
return nil, err
}
var raw map[string]interface{}
err = json.Unmarshal(dsBytes, &raw)
if err != nil {
return nil, err
}
objCopy := make(map[string]interface{})
specCopy := make(map[string]interface{})
// Create a patch of the UnitedDeployment that replaces spec.template
spec := raw["spec"].(map[string]interface{})
template := spec["template"].(map[string]interface{})
specCopy["template"] = template
template["$patch"] = "replace"
objCopy["spec"] = specCopy
patch, err := json.Marshal(objCopy)
return patch, err
}
4、subset工具函数
ud相关subset工具函数
func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) {
subSets, err := control.GetAllSubsets(instance, expectedRevision)
if err != nil {
return nil, fmt.Errorf("fail to get all Subsets for UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
}
nameToSubsets := r.classifySubsetBySubsetName(instance, subSets)
nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control)
if err != nil {
return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err)
}
return nameToSubset, nil
}
// 按照Subset Name进行分组
// 之所以是切片,应该是考虑有可能有Subset删除失败的场景
func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(_ *appsv1alpha1.UnitedDeployment, subsets []*Subset) map[string][]*Subset {
mapping := map[string][]*Subset{}
for _, ss := range subsets {
subSetName, err := getSubsetNameFrom(ss)
if err != nil {
continue
}
mapping[subSetName] = append(mapping[subSetName], ss)
}
return mapping
}
// Subset去重
func (r *ReconcileUnitedDeployment) deleteDupSubset(_ *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) {
nameToSubset := map[string]*Subset{}
for name, subsets := range nameToSubsets {
if len(subsets) > 1 {
for _, subset := range subsets[1:] {
if err := control.DeleteSubset(subset); err != nil {
if errors.IsNotFound(err) {
continue
}
return &nameToSubset, err
}
}
}
if len(subsets) > 0 {
nameToSubset[name] = subsets[0]
}
}
return &nameToSubset, nil
}
subset control interface
// 获取所有的Subset,并进行Subset认领
func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) {
selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector)
if err != nil {
return nil, err
}
setList := m.adapter.NewResourceListObject()
err = m.Client.List(context.TODO(), setList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
manager, err := refmanager.New(m.Client, ud.Spec.Selector, ud, m.scheme)
if err != nil {
return nil, err
}
v := reflect.ValueOf(setList).Elem().FieldByName("Items")
selected := make([]metav1.Object, v.Len())
for i := 0; i < v.Len(); i++ {
selected[i] = v.Index(i).Addr().Interface().(metav1.Object)
}
claimedSets, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
for _, claimedSet := range claimedSets {
subSet, err := m.convertToSubset(claimedSet, updatedRevision)
if err != nil {
return nil, err
}
subSets = append(subSets, subSet)
}
return subSets, nil
}
func (m *SubsetControl) CreateSubset(ud *alpha1.UnitedDeployment, subsetName string, revision string, replicas, partition int32) error {
set := m.adapter.NewResourceObject()
if err := m.adapter.ApplySubsetTemplate(ud, subsetName, revision, replicas, partition, set); err != nil {
return err
}
return m.Create(context.TODO(), set)
}
func (m *SubsetControl) UpdateSubset(subset *Subset, ud *alpha1.UnitedDeployment, revision string, replicas, partition int32) error {
set := m.adapter.NewResourceObject()
var updateError error
for i := 0; i < updateRetries; i++ {
getError := m.Client.Get(context.TODO(), m.objectKey(&subset.ObjectMeta), set)
if getError != nil {
return getError
}
if err := m.adapter.ApplySubsetTemplate(ud, subset.Spec.SubsetName, revision, replicas, partition, set); err != nil {
return err
}
updateError = m.Client.Update(context.TODO(), set)
if updateError == nil {
break
}
}
if updateError != nil {
return updateError
}
return m.adapter.PostUpdate(ud, set, revision, partition)
}
func (m *SubsetControl) DeleteSubset(subSet *Subset) error {
set := subSet.Spec.SubsetRef.Resources[0].(runtime.Object)
return m.Delete(context.TODO(), set, client.PropagationPolicy(metav1.DeletePropagationBackground))
}
func (m *SubsetControl) GetSubsetFailure(subset *Subset) *string {
return m.adapter.GetSubsetFailure()
}
func (m *SubsetControl) IsExpected(subSet *Subset, revision string) bool {
return m.adapter.IsExpected(subSet.Spec.SubsetRef.Resources[0], revision)
}
// 转化obj为Subset
func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision string) (*Subset, error) {
subSetName, err := getSubsetNameFrom(set)
if err != nil {
return nil, err
}
subset := &Subset{}
subset.ObjectMeta = metav1.ObjectMeta{
Name: set.GetName(),
GenerateName: set.GetGenerateName(),
Namespace: set.GetNamespace(),
SelfLink: set.GetSelfLink(),
UID: set.GetUID(),
ResourceVersion: set.GetResourceVersion(),
Generation: set.GetGeneration(),
CreationTimestamp: set.GetCreationTimestamp(),
DeletionTimestamp: set.GetDeletionTimestamp(),
DeletionGracePeriodSeconds: set.GetDeletionGracePeriodSeconds(),
Labels: set.GetLabels(),
Annotations: set.GetAnnotations(),
OwnerReferences: set.GetOwnerReferences(),
Finalizers: set.GetFinalizers(),
ClusterName: set.GetClusterName(),
}
subset.Spec.SubsetName = subSetName
specReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision)
if err != nil {
return subset, err
}
if specReplicas != nil {
subset.Spec.Replicas = *specReplicas
}
if specPartition != nil {
subset.Spec.UpdateStrategy.Partition = *specPartition
}
subset.Status.ObservedGeneration = m.adapter.GetStatusObservedGeneration(set)
subset.Status.Replicas = statusReplicas
subset.Status.ReadyReplicas = statusReadyReplicas
subset.Status.UpdatedReplicas = statusUpdatedReplicas
subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas
subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set)
return subset, nil
}
func (m *SubsetControl) objectKey(objMeta *metav1.ObjectMeta) client.ObjectKey {
return types.NamespacedName{
Namespace: objMeta.Namespace,
Name: objMeta.Name,
}
}
5、adapter工具函数(此处以CloneSet为例)
type CloneSetAdapter struct {
client.Client
Scheme *runtime.Scheme
}
func (a *CloneSetAdapter) NewResourceObject() runtime.Object {
return &alpha1.CloneSet{}
}
func (a *CloneSetAdapter) NewResourceListObject() runtime.Object {
return &alpha1.CloneSetList{}
}
func (a *CloneSetAdapter) GetObjectMeta(obj metav1.Object) *metav1.ObjectMeta {
return &obj.(*alpha1.CloneSet).ObjectMeta
}
func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 {
return obj.(*alpha1.CloneSet).Status.ObservedGeneration
}
func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) {
set := obj.(*alpha1.CloneSet)
var pods []*corev1.Pod
pods, err = a.getCloneSetPods(set)
if err != nil {
return
}
specReplicas = set.Spec.Replicas
if set.Spec.UpdateStrategy.Partition != nil {
specPartition = set.Spec.UpdateStrategy.Partition
}
statusReplicas = set.Status.Replicas
statusReadyReplicas = set.Status.ReadyReplicas
statusUpdatedReplicas, statusUpdatedReadyReplicas = calculateUpdatedReplicas(pods, updatedRevision)
return
}
func (a *CloneSetAdapter) GetSubsetFailure() *string {
return nil
}
func (a *CloneSetAdapter) ApplySubsetTemplate(ud *alpha1.UnitedDeployment, subsetName, revision string, replicas, partition int32, obj runtime.Object) error {
set := obj.(*alpha1.CloneSet)
var subSetConfig *alpha1.Subset
for _, subset := range ud.Spec.Topology.Subsets {
if subset.Name == subsetName {
subSetConfig = &subset
break
}
}
if subSetConfig == nil {
return fmt.Errorf("fail to find subset config %s", subsetName)
}
set.Namespace = ud.Namespace
if set.Labels == nil {
set.Labels = map[string]string{}
}
for k, v := range ud.Spec.Template.CloneSetTemplate.Labels {
set.Labels[k] = v
}
for k, v := range ud.Spec.Selector.MatchLabels {
set.Labels[k] = v
}
set.Labels[alpha1.ControllerRevisionHashLabelKey] = revision
// record the subset name as a label
set.Labels[alpha1.SubSetNameLabelKey] = subsetName
if set.Annotations == nil {
set.Annotations = map[string]string{}
}
for k, v := range ud.Spec.Template.CloneSetTemplate.Annotations {
set.Annotations[k] = v
}
set.GenerateName = getSubsetPrefix(ud.Name, subsetName)
selectors := ud.Spec.Selector.DeepCopy()
selectors.MatchLabels[alpha1.SubSetNameLabelKey] = subsetName
if err := controllerutil.SetControllerReference(ud, set, a.Scheme); err != nil {
return err
}
set.Spec.Selector = selectors
set.Spec.Replicas = &replicas
set.Spec.UpdateStrategy = ud.Spec.Template.CloneSetTemplate.Spec.UpdateStrategy
set.Spec.UpdateStrategy.Partition = &partition
set.Spec.Template = *ud.Spec.Template.CloneSetTemplate.Spec.Template.DeepCopy()
if set.Spec.Template.Labels == nil {
set.Spec.Template.Labels = map[string]string{}
}
set.Spec.Template.Labels[alpha1.SubSetNameLabelKey] = subsetName
set.Spec.Template.Labels[alpha1.ControllerRevisionHashLabelKey] = revision
set.Spec.RevisionHistoryLimit = ud.Spec.Template.CloneSetTemplate.Spec.RevisionHistoryLimit
set.Spec.VolumeClaimTemplates = ud.Spec.Template.CloneSetTemplate.Spec.VolumeClaimTemplates
attachNodeAffinity(&set.Spec.Template.Spec, subSetConfig)
attachTolerations(&set.Spec.Template.Spec, subSetConfig)
return nil
}
func (a *CloneSetAdapter) PostUpdate(ud *alpha1.UnitedDeployment, obj runtime.Object, revision string, partition int32) error {
return nil
}
func (a *CloneSetAdapter) IsExpected(obj metav1.Object, revision string) bool {
return obj.GetLabels()[appsv1.ControllerRevisionHashLabelKey] != revision
}
func (a *CloneSetAdapter) getCloneSetPods(set *alpha1.CloneSet) ([]*corev1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
podList := &corev1.PodList{}
err = a.Client.List(context.TODO(), podList, &client.ListOptions{LabelSelector: selector})
if err != nil {
return nil, err
}
manager, err := refmanager.New(a.Client, set.Spec.Selector, set, a.Scheme)
if err != nil {
return nil, err
}
selected := make([]metav1.Object, len(podList.Items))
for i, pod := range podList.Items {
selected[i] = pod.DeepCopy()
}
claimed, err := manager.ClaimOwnedObjects(selected)
if err != nil {
return nil, err
}
claimedPods := make([]*corev1.Pod, len(claimed))
for i, pod := range claimed {
claimedPods[i] = pod.(*corev1.Pod)
}
return claimedPods, nil
}
6、allocator工具函数
根据拓扑给每个subset分配副本数量
func GetAllocatedReplicas(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) (*map[string]int32, error) {
subsetInfos := getSubsetInfos(nameToSubset, ud)
specifiedReplicas := getSpecifiedSubsetReplicas(ud)
return subsetInfos.SortToAllocator().AllocateReplicas(*ud.Spec.Replicas, specifiedReplicas)
}
type nameToReplicas struct {
SubsetName string
Replicas int32
Specified bool
}
type subsetInfos []*nameToReplicas
func (n subsetInfos) Get(i int) *nameToReplicas {
return []*nameToReplicas(n)[i]
}
func (n subsetInfos) Len() int {
return len(n)
}
func (n subsetInfos) Less(i, j int) bool {
if n[i].Replicas != n[j].Replicas {
return n[i].Replicas < n[j].Replicas
}
return strings.Compare(n[i].SubsetName, n[j].SubsetName) < 0
}
func (n subsetInfos) Swap(i, j int) {
n[i], n[j] = n[j], n[i]
}
func (n subsetInfos) SortToAllocator() *replicasAllocator {
sort.Sort(n)
return &replicasAllocator{subsets: &n}
}
type replicasAllocator struct {
subsets *subsetInfos
}
func (s *replicasAllocator) AllocateReplicas(replicas int32, specifiedSubsetReplicas *map[string]int32) (
*map[string]int32, error) {
if err := s.validateReplicas(replicas, specifiedSubsetReplicas); err != nil {
return nil, err
}
return s.normalAllocate(replicas, specifiedSubsetReplicas), nil
}
func (s *replicasAllocator) validateReplicas(replicas int32, subsetReplicasLimits *map[string]int32) error {
if subsetReplicasLimits == nil {
return nil
}
var specifiedReplicas int32
for _, replicas := range *subsetReplicasLimits {
specifiedReplicas += replicas
}
if specifiedReplicas > replicas {
return fmt.Errorf("specified subsets' replica (%d) is greater than UnitedDeployment replica (%d)",
specifiedReplicas, replicas)
} else if specifiedReplicas < replicas {
specifiedCount := 0
for _, subset := range *s.subsets {
if _, exist := (*subsetReplicasLimits)[subset.SubsetName]; exist {
specifiedCount++
}
}
if specifiedCount == len(*s.subsets) {
return fmt.Errorf("specified subsets' replica (%d) is less than UnitedDeployment replica (%d)",
specifiedReplicas, replicas)
}
}
return nil
}
func (s *replicasAllocator) normalAllocate(expectedReplicas int32, specifiedSubsetReplicas *map[string]int32) *map[string]int32 {
var specifiedReplicas int32
specifiedSubsetCount := 0
// Step 1: apply replicas to specified subsets, and mark them as specified = true.
for _, subset := range *s.subsets {
if replicas, exist := (*specifiedSubsetReplicas)[subset.SubsetName]; exist {
specifiedReplicas += replicas
subset.Replicas = replicas
subset.Specified = true
specifiedSubsetCount++
}
}
// Step 2: averagely allocate the rest replicas to left unspecified subsets.
leftSubsetCount := len(*s.subsets) - specifiedSubsetCount
if leftSubsetCount != 0 {
allocatableReplicas := expectedReplicas - specifiedReplicas
average := int(allocatableReplicas) / leftSubsetCount
remainder := int(allocatableReplicas) % leftSubsetCount
for i := len(*s.subsets) - 1; i >= 0; i-- {
subset := (*s.subsets)[i]
if subset.Specified {
continue
}
if remainder > 0 {
subset.Replicas = int32(average + 1)
remainder--
} else {
subset.Replicas = int32(average)
}
leftSubsetCount--
if leftSubsetCount == 0 {
break
}
}
}
return s.toSubsetReplicaMap()
}
func (s *replicasAllocator) toSubsetReplicaMap() *map[string]int32 {
allocatedReplicas := map[string]int32{}
for _, subset := range *s.subsets {
allocatedReplicas[subset.SubsetName] = subset.Replicas
}
return &allocatedReplicas
}
func (s *replicasAllocator) String() string {
result := ""
sort.Sort(s.subsets)
for _, subset := range *s.subsets {
result = fmt.Sprintf("%s %s -> %d;", result, subset.SubsetName, subset.Replicas)
}
return result
}
func getSpecifiedSubsetReplicas(ud *appsv1alpha1.UnitedDeployment) *map[string]int32 {
replicaLimits := map[string]int32{}
if ud.Spec.Topology.Subsets == nil {
return &replicaLimits
}
for _, subsetDef := range ud.Spec.Topology.Subsets {
if subsetDef.Replicas == nil {
continue
}
if specifiedReplicas, err := ParseSubsetReplicas(*ud.Spec.Replicas, *subsetDef.Replicas); err == nil {
replicaLimits[subsetDef.Name] = specifiedReplicas
}
}
return &replicaLimits
}
func getSubsetInfos(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) *subsetInfos {
infos := make(subsetInfos, len(ud.Spec.Topology.Subsets))
for idx, subsetDef := range ud.Spec.Topology.Subsets {
var replicas int32
if subset, exist := (*nameToSubset)[subsetDef.Name]; exist {
replicas = subset.Spec.Replicas
}
infos[idx] = &nameToReplicas{SubsetName: subsetDef.Name, Replicas: replicas}
}
return &infos
}
7、计算状态工具函数
func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (newStatus *appsv1alpha1.UnitedDeploymentStatus, updateErr error) {
newStatus = ud.Status.DeepCopy()
exists, provisioned, err := r.manageSubsetProvision(ud, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, subsetType)
if err != nil {
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionFalse, "Error", err.Error()))
return newStatus, fmt.Errorf("fail to manage Subset provision: %s", err)
}
if provisioned {
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionTrue, "", ""))
}
expectedRevision := currentRevision
if updatedRevision != nil {
expectedRevision = updatedRevision
}
var needUpdate []string
for _, name := range exists.List() {
subset := (*nameToSubset)[name]
if r.subSetControls[subsetType].IsExpected(subset, expectedRevision.Name) ||
subset.Spec.Replicas != (*nextReplicas)[name] ||
subset.Spec.UpdateStrategy.Partition != (*nextPartitions)[name] {
needUpdate = append(needUpdate, name)
}
}
if len(needUpdate) > 0 {
_, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error {
cell := needUpdate[index]
subset := (*nameToSubset)[cell]
replicas := (*nextReplicas)[cell]
partition := (*nextPartitions)[cell]
updateSubsetErr := r.subSetControls[subsetType].UpdateSubset(subset, ud, expectedRevision.Name, replicas, partition)
return updateSubsetErr
})
}
if updateErr == nil {
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", ""))
} else {
SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error()))
}
return
}
func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextReplicas, nextPartitions *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (sets.String, bool, error) {
expectedSubsets := sets.String{}
gotSubsets := sets.String{}
for _, subset := range ud.Spec.Topology.Subsets {
expectedSubsets.Insert(subset.Name)
}
for subsetName := range *nameToSubset {
gotSubsets.Insert(subsetName)
}
var creates []string
for _, expectSubset := range expectedSubsets.List() {
if gotSubsets.Has(expectSubset) {
continue
}
creates = append(creates, expectSubset)
}
var deletes []string
for _, gotSubset := range gotSubsets.List() {
if expectedSubsets.Has(gotSubset) {
continue
}
deletes = append(deletes, gotSubset)
}
revision := currentRevision.Name
if updatedRevision != nil {
revision = updatedRevision.Name
}
var errs []error
// manage creating
if len(creates) > 0 {
createdSubsets := make([]string, len(creates))
for i, subset := range creates {
createdSubsets[i] = subset
}
var createdErr error
_, createdErr = util.SlowStartBatch(len(creates), slowStartInitialBatchSize, func(idx int) error {
subsetName := createdSubsets[idx]
replicas := (*nextReplicas)[subsetName]
partition := (*nextPartitions)[subsetName]
err := r.subSetControls[subsetType].CreateSubset(ud, subsetName, revision, replicas, partition)
if err != nil {
if !errors.IsTimeout(err) {
return fmt.Errorf("fail to create Subset (%s) %s: %s", subsetType, subsetName, err.Error())
}
}
return nil
})
errs = append(errs, createdErr)
}
// manage deleting
if len(deletes) > 0 {
var deleteErrs []error
for _, subsetName := range deletes {
subset := (*nameToSubset)[subsetName]
if err := r.subSetControls[subsetType].DeleteSubset(subset); err != nil {
deleteErrs = append(deleteErrs, fmt.Errorf("fail to delete Subset (%s) %s/%s for %s: %s", subsetType, subset.Namespace, subset.Name, subsetName, err))
}
}
if len(deleteErrs) > 0 {
errs = append(errs, deleteErrs...)
}
}
// clean the other kind of subsets
cleaned := false
for t, control := range r.subSetControls {
if t == subsetType {
continue
}
subsets, err := control.GetAllSubsets(ud, revision)
if err != nil {
errs = append(errs, fmt.Errorf("fail to list Subset of other type %s for UnitedDeployment %s/%s: %s", t, ud.Namespace, ud.Name, err))
continue
}
for _, subset := range subsets {
cleaned = true
if err := control.DeleteSubset(subset); err != nil {
errs = append(errs, fmt.Errorf("fail to delete Subset %s of other type %s for UnitedDeployment %s/%s: %s", subset.Name, t, ud.Namespace, ud.Name, err))
continue
}
}
}
return expectedSubsets.Intersection(gotSubsets), len(creates) > 0 || len(deletes) > 0 || cleaned, utilerrors.NewAggregate(errs)
}