PS:源码会将一部分日志/监控等代码删除,周知
0、架构图
1、replicaset成员变量
type ReplicaSetController struct {// 此controller控制的资源类型(通过gvk标识)schema.GroupVersionKind// k8s api客户端kubeClient clientset.Interface// pod控制器(负责pod的增删改)podControl controller.PodControlInterface// 单次最大创建的pod个数burstReplicas int// 真正的同步函数syncHandler func(rsKey string) error// 控制rs是否需要sync,相当于rs的二级缓存expectations *controller.UIDTrackingControllerExpectations// rs查询接口rsLister appslisters.ReplicaSetLister// 是否rs已经同步过一次rsListerSynced cache.InformerSynced// pod的查询接口podLister corelisters.PodLister// 是否pod已经同步过一次podListerSynced cache.InformerSynced// 工作队列queue workqueue.RateLimitingInterface}
2、主循环
// controller启动函数// 同时开启workers个协程进行处理,不停的从workqueue读取发生变化的key,// 然后进行相应的处理func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer rsc.queue.ShutDown()if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {return}for i := 0; i < workers; i++ {go wait.Until(rsc.worker, time.Second, stopCh)}<-stopCh}func (rsc *ReplicaSetController) worker() {for rsc.processNextWorkItem() {}}func (rsc *ReplicaSetController) processNextWorkItem() bool {key, quit := rsc.queue.Get()if quit {return false}defer rsc.queue.Done(key)err := rsc.syncHandler(key.(string))if err == nil {rsc.queue.Forget(key)return true}utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))rsc.queue.AddRateLimited(key)return true}
3、replicaset事件回调函数
1、addRS
// 监听到rs的增加事件,直接将对应的key放入到workqueue即可func (rsc *ReplicaSetController) addRS(obj interface{}) {rs := obj.(*apps.ReplicaSet)rsc.enqueueRS(rs)}
2、updateRS
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {oldRS := old.(*apps.ReplicaSet)curRS := cur.(*apps.ReplicaSet)// oldRs删除事件丢失if curRS.UID != oldRS.UID {key, err := controller.KeyFunc(oldRS)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))return}rsc.deleteRS(cache.DeletedFinalStateUnknown{Key: key,Obj: oldRS,})}rsc.enqueueRS(curRS)}
3、deleteRS
func (rsc *ReplicaSetController) deleteRS(obj interface{}) {rs, ok := obj.(*apps.ReplicaSet)if !ok {// 若rs的删除时间丢失,会有updateRS封装oldObj为DeletedFinalStateUnknown并调用deleteRStombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))return}rs, ok = tombstone.Obj.(*apps.ReplicaSet)if !ok {utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))return}}key, err := controller.KeyFunc(rs)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))return}rsc.expectations.DeleteExpectations(key)rsc.queue.Add(key)}
4、addPod
func (rsc *ReplicaSetController) addPod(obj interface{}) {pod := obj.(*v1.Pod)if pod.DeletionTimestamp != nil {// 不太清楚为什么会走有这种情况rsc.deletePod(pod)return}// 获取pod关联的rsif controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)if rs == nil {return}rsKey, err := controller.KeyFunc(rs)if err != nil {return}rsc.expectations.CreationObserved(rsKey)rsc.queue.Add(rsKey)return}rss := rsc.getPodReplicaSets(pod)if len(rss) == 0 {return}for _, rs := range rss {rsc.enqueueRS(rs)}}
5、updatePod
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {curPod := cur.(*v1.Pod)oldPod := old.(*v1.Pod)// 此处是resync触发的onUpdate事件,若版本号一致,则无须变更// rs无须担心pod update丢失的事件,因为在后续的同步中,rs会认领podif curPod.ResourceVersion == oldPod.ResourceVersion {return}labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)// 用户手动设置删除时间戳,未调用api进行删除,出现此种情况if curPod.DeletionTimestamp != nil {rsc.deletePod(curPod)// lable发生变更,需要删除旧版本pod,因为label变更会导致匹配到不同的rs,需要将rs都通知到// 此处不会触发两次DeletionObserved,DeletionObserved函数会保证只删除一次if labelChanged {rsc.deletePod(oldPod)}return}curControllerRef := metav1.GetControllerOf(curPod)oldControllerRef := metav1.GetControllerOf(oldPod)controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)// 到此处说明新pod的变更非删除,倘若发生了ownerRef变更,且旧pod ownerRef不为空// 需要将旧pod绑定的rs入队if controllerRefChanged && oldControllerRef != nil {if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {rsc.enqueueRS(rs)}}// 进入如下条件,需要如下情况// 1、controllerRefChanged为true,oldControllerRef == nil// 2、controllerRefChanged为false,oldControllerRef != nil// 3、controllerRefChanged为true,oldControllerRef != nil并且curControllerRef != nilif curControllerRef != nil {rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)if rs == nil {// 若新pod未找到对应的rs,直接returnreturn}rsc.enqueueRS(rs)// 因为pod从ready到available状态需要时间if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)}return}// 逻辑走到这里需要如下情况// 1、controllerRefChanged为false,oldControllerRef == nil,则newControllerRef == nil即为只是label发生了变化// 2、controllerRefChanged为true,oldControllerRef != nil并且curControllerRef == nil即为controllerRefChanged// 由于384行已经将旧pod对应rs入队,则此处无须入队,只需要将新pod对应rd入队if labelChanged || controllerRefChanged {rss := rsc.getPodReplicaSets(curPod)if len(rss) == 0 {return}for _, rs := range rss {rsc.enqueueRS(rs)}}}
6、deletePod
func (rsc *ReplicaSetController) deletePod(obj interface{}) {pod, ok := obj.(*v1.Pod)if !ok {tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))return}pod, ok = tombstone.Obj.(*v1.Pod)if !ok {utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))return}}// 孤儿Pod不会进行Expectation处理controllerRef := metav1.GetControllerOf(pod)if controllerRef == nil {return}rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)if rs == nil {return}rsKey, err := controller.KeyFunc(rs)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))return}rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))rsc.queue.Add(rsKey)}
4、replicaset同步函数
1、切割key获取namespace、name
2、查询informer获取相应的rs对象。若没有找到,则已经被删除,删除expectations里面的rs
3、根据expectations判断是否需要进行同步并转换selector
以下条件会进行sync
a. key未找到
b. key达到fulfilled状态
c. key已经过期
4、获取rs对应ns所有的pod,并过滤不健康的pod(不健康pod为已经成功结束/已经失败/处在删除状态)
5、认领pod/释放相应的pod
6、若需要同步并且rs不处于删除状态,进行sync
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {return err}rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)if errors.IsNotFound(err) {rsc.expectations.DeleteExpectations(key)return nil}if err != nil {return err}rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)if err != nil {utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))return nil}allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())if err != nil {return err}filteredPods := controller.FilterActivePods(allPods)filteredPods, err = rsc.claimPods(rs, selector, filteredPods)if err != nil {return err}var manageReplicasErr errorif rsNeedsSync && rs.DeletionTimestamp == nil {manageReplicasErr = rsc.manageReplicas(filteredPods, rs)}rs = rs.DeepCopy()newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)if err != nil {return err}if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)}return manageReplicasErr}
5、replicaset工具函数
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {diff := len(filteredPods) - int(*(rs.Spec.Replicas))rsKey, err := controller.KeyFunc(rs)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))return nil}if diff < 0 {diff *= -1if diff > rsc.burstReplicas {diff = rsc.burstReplicas}rsc.expectations.ExpectCreations(rsKey, diff)successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))if err != nil {if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {return nil}}return err})if skippedPods := diff - successfulCreations; skippedPods > 0 {for i := 0; i < skippedPods; i++ {rsc.expectations.CreationObserved(rsKey)}}return err} else if diff > 0 {if diff > rsc.burstReplicas {diff = rsc.burstReplicas}relatedPods, err := rsc.getIndirectlyRelatedPods(rs)utilruntime.HandleError(err)podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))errCh := make(chan error, diff)var wg sync.WaitGroupwg.Add(diff)for _, pod := range podsToDelete {go func(targetPod *v1.Pod) {defer wg.Done()if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {podKey := controller.PodKey(targetPod)rsc.expectations.DeletionObserved(rsKey, podKey)errCh <- err}}(pod)}wg.Wait()select {case err := <-errCh:if err != nil {return err}default:}}return nil}func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})if err != nil {return nil, err}if fresh.UID != rs.UID {return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)}return fresh, nil})cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)return cm.ClaimPods(filteredPods)}// 获取与给定rs有相同owner的rs集合func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet {controllerRef := metav1.GetControllerOf(rs)if controllerRef == nil {utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))return nil}allRSs, err := rsc.rsLister.ReplicaSets(rs.Namespace).List(labels.Everything())if err != nil {utilruntime.HandleError(err)return nil}var relatedRSs []*apps.ReplicaSetfor _, r := range allRSs {if ref := metav1.GetControllerOf(r); ref != nil && ref.UID == controllerRef.UID {relatedRSs = append(relatedRSs, r)}}return relatedRSs}// 获取关联给定pod的所有rs(通过label)func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {rss, err := rsc.rsLister.GetPodReplicaSets(pod)if err != nil {return nil}if len(rss) > 1 {utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))}return rss}// 通过给定的引用信息找到rsfunc (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {if controllerRef.Kind != rsc.Kind {return nil}rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)if err != nil {return nil}// 若UID不相同,则rs已经被重建,但是pod还未被删除if rs.UID != controllerRef.UID {return nil}return rs}func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) {var relatedPods []*v1.Podseen := make(map[types.UID]*apps.ReplicaSet)for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) {selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)if err != nil {return nil, err}pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)if err != nil {return nil, err}for _, pod := range pods {if otherRS, found := seen[pod.UID]; found {klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name)continue}seen[pod.UID] = relatedRSrelatedPods = append(relatedPods, pod)}}return relatedPods, nil}// 指数级创建podfunc slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {remaining := countsuccesses := 0for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {errCh := make(chan error, batchSize)var wg sync.WaitGroupwg.Add(batchSize)for i := 0; i < batchSize; i++ {go func() {defer wg.Done()if err := fn(); err != nil {errCh <- err}}()}wg.Wait()curSuccesses := batchSize - len(errCh)successes += curSuccessesif len(errCh) > 0 {return successes, <-errCh}remaining -= batchSize}return successes, nil}func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {if diff < len(filteredPods) {podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)sort.Sort(podsWithRanks)}return filteredPods[:diff]}func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {podsOnNode := make(map[string]int)for _, pod := range relatedPods {if controller.IsPodActive(pod) {podsOnNode[pod.Spec.NodeName]++}}ranks := make([]int, len(podsToRank))for i, pod := range podsToRank {ranks[i] = podsOnNode[pod.Spec.NodeName]}return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}}func getPodKeys(pods []*v1.Pod) []string {podKeys := make([]string, 0, len(pods))for _, pod := range pods {podKeys = append(podKeys, controller.PodKey(pod))}return podKeys}
