PS:源码会将一部分日志/监控等代码删除,周知

0、架构图

图片 1.png

1、replicaset成员变量

  1. type ReplicaSetController struct {
  2. // 此controller控制的资源类型(通过gvk标识)
  3. schema.GroupVersionKind
  4. // k8s api客户端
  5. kubeClient clientset.Interface
  6. // pod控制器(负责pod的增删改)
  7. podControl controller.PodControlInterface
  8. // 单次最大创建的pod个数
  9. burstReplicas int
  10. // 真正的同步函数
  11. syncHandler func(rsKey string) error
  12. // 控制rs是否需要sync,相当于rs的二级缓存
  13. expectations *controller.UIDTrackingControllerExpectations
  14. // rs查询接口
  15. rsLister appslisters.ReplicaSetLister
  16. // 是否rs已经同步过一次
  17. rsListerSynced cache.InformerSynced
  18. // pod的查询接口
  19. podLister corelisters.PodLister
  20. // 是否pod已经同步过一次
  21. podListerSynced cache.InformerSynced
  22. // 工作队列
  23. queue workqueue.RateLimitingInterface
  24. }

2、主循环

  1. // controller启动函数
  2. // 同时开启workers个协程进行处理,不停的从workqueue读取发生变化的key,
  3. // 然后进行相应的处理
  4. func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
  5. defer utilruntime.HandleCrash()
  6. defer rsc.queue.ShutDown()
  7. if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
  8. return
  9. }
  10. for i := 0; i < workers; i++ {
  11. go wait.Until(rsc.worker, time.Second, stopCh)
  12. }
  13. <-stopCh
  14. }
  15. func (rsc *ReplicaSetController) worker() {
  16. for rsc.processNextWorkItem() {
  17. }
  18. }
  19. func (rsc *ReplicaSetController) processNextWorkItem() bool {
  20. key, quit := rsc.queue.Get()
  21. if quit {
  22. return false
  23. }
  24. defer rsc.queue.Done(key)
  25. err := rsc.syncHandler(key.(string))
  26. if err == nil {
  27. rsc.queue.Forget(key)
  28. return true
  29. }
  30. utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
  31. rsc.queue.AddRateLimited(key)
  32. return true
  33. }

3、replicaset事件回调函数

1、addRS

  1. // 监听到rs的增加事件,直接将对应的key放入到workqueue即可
  2. func (rsc *ReplicaSetController) addRS(obj interface{}) {
  3. rs := obj.(*apps.ReplicaSet)
  4. rsc.enqueueRS(rs)
  5. }

2、updateRS

  1. func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
  2. oldRS := old.(*apps.ReplicaSet)
  3. curRS := cur.(*apps.ReplicaSet)
  4. // oldRs删除事件丢失
  5. if curRS.UID != oldRS.UID {
  6. key, err := controller.KeyFunc(oldRS)
  7. if err != nil {
  8. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
  9. return
  10. }
  11. rsc.deleteRS(cache.DeletedFinalStateUnknown{
  12. Key: key,
  13. Obj: oldRS,
  14. })
  15. }
  16. rsc.enqueueRS(curRS)
  17. }

3、deleteRS

  1. func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
  2. rs, ok := obj.(*apps.ReplicaSet)
  3. if !ok {
  4. // 若rs的删除时间丢失,会有updateRS封装oldObj为DeletedFinalStateUnknown并调用deleteRS
  5. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  6. if !ok {
  7. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
  8. return
  9. }
  10. rs, ok = tombstone.Obj.(*apps.ReplicaSet)
  11. if !ok {
  12. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
  13. return
  14. }
  15. }
  16. key, err := controller.KeyFunc(rs)
  17. if err != nil {
  18. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
  19. return
  20. }
  21. rsc.expectations.DeleteExpectations(key)
  22. rsc.queue.Add(key)
  23. }

4、addPod

  1. func (rsc *ReplicaSetController) addPod(obj interface{}) {
  2. pod := obj.(*v1.Pod)
  3. if pod.DeletionTimestamp != nil {
  4. // 不太清楚为什么会走有这种情况
  5. rsc.deletePod(pod)
  6. return
  7. }
  8. // 获取pod关联的rs
  9. if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
  10. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  11. if rs == nil {
  12. return
  13. }
  14. rsKey, err := controller.KeyFunc(rs)
  15. if err != nil {
  16. return
  17. }
  18. rsc.expectations.CreationObserved(rsKey)
  19. rsc.queue.Add(rsKey)
  20. return
  21. }
  22. rss := rsc.getPodReplicaSets(pod)
  23. if len(rss) == 0 {
  24. return
  25. }
  26. for _, rs := range rss {
  27. rsc.enqueueRS(rs)
  28. }
  29. }

5、updatePod

  1. func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
  2. curPod := cur.(*v1.Pod)
  3. oldPod := old.(*v1.Pod)
  4. // 此处是resync触发的onUpdate事件,若版本号一致,则无须变更
  5. // rs无须担心pod update丢失的事件,因为在后续的同步中,rs会认领pod
  6. if curPod.ResourceVersion == oldPod.ResourceVersion {
  7. return
  8. }
  9. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  10. // 用户手动设置删除时间戳,未调用api进行删除,出现此种情况
  11. if curPod.DeletionTimestamp != nil {
  12. rsc.deletePod(curPod)
  13. // lable发生变更,需要删除旧版本pod,因为label变更会导致匹配到不同的rs,需要将rs都通知到
  14. // 此处不会触发两次DeletionObserved,DeletionObserved函数会保证只删除一次
  15. if labelChanged {
  16. rsc.deletePod(oldPod)
  17. }
  18. return
  19. }
  20. curControllerRef := metav1.GetControllerOf(curPod)
  21. oldControllerRef := metav1.GetControllerOf(oldPod)
  22. controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  23. // 到此处说明新pod的变更非删除,倘若发生了ownerRef变更,且旧pod ownerRef不为空
  24. // 需要将旧pod绑定的rs入队
  25. if controllerRefChanged && oldControllerRef != nil {
  26. if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
  27. rsc.enqueueRS(rs)
  28. }
  29. }
  30. // 进入如下条件,需要如下情况
  31. // 1、controllerRefChanged为true,oldControllerRef == nil
  32. // 2、controllerRefChanged为false,oldControllerRef != nil
  33. // 3、controllerRefChanged为true,oldControllerRef != nil并且curControllerRef != nil
  34. if curControllerRef != nil {
  35. rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
  36. if rs == nil {
  37. // 若新pod未找到对应的rs,直接return
  38. return
  39. }
  40. rsc.enqueueRS(rs)
  41. // 因为pod从ready到available状态需要时间
  42. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
  43. rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
  44. }
  45. return
  46. }
  47. // 逻辑走到这里需要如下情况
  48. // 1、controllerRefChanged为false,oldControllerRef == nil,则newControllerRef == nil即为只是label发生了变化
  49. // 2、controllerRefChanged为true,oldControllerRef != nil并且curControllerRef == nil即为controllerRefChanged
  50. // 由于384行已经将旧pod对应rs入队,则此处无须入队,只需要将新pod对应rd入队
  51. if labelChanged || controllerRefChanged {
  52. rss := rsc.getPodReplicaSets(curPod)
  53. if len(rss) == 0 {
  54. return
  55. }
  56. for _, rs := range rss {
  57. rsc.enqueueRS(rs)
  58. }
  59. }
  60. }

6、deletePod

  1. func (rsc *ReplicaSetController) deletePod(obj interface{}) {
  2. pod, ok := obj.(*v1.Pod)
  3. if !ok {
  4. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  5. if !ok {
  6. utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
  7. return
  8. }
  9. pod, ok = tombstone.Obj.(*v1.Pod)
  10. if !ok {
  11. utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
  12. return
  13. }
  14. }
  15. // 孤儿Pod不会进行Expectation处理
  16. controllerRef := metav1.GetControllerOf(pod)
  17. if controllerRef == nil {
  18. return
  19. }
  20. rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  21. if rs == nil {
  22. return
  23. }
  24. rsKey, err := controller.KeyFunc(rs)
  25. if err != nil {
  26. utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
  27. return
  28. }
  29. rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
  30. rsc.queue.Add(rsKey)
  31. }

4、replicaset同步函数

1、切割key获取namespace、name
2、查询informer获取相应的rs对象。若没有找到,则已经被删除,删除expectations里面的rs
3、根据expectations判断是否需要进行同步并转换selector
以下条件会进行sync
a. key未找到
b. key达到fulfilled状态
c. key已经过期
4、获取rs对应ns所有的pod,并过滤不健康的pod(不健康pod为已经成功结束/已经失败/处在删除状态)
5、认领pod/释放相应的pod
6、若需要同步并且rs不处于删除状态,进行sync

  1. func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
  2. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  3. if err != nil {
  4. return err
  5. }
  6. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
  7. if errors.IsNotFound(err) {
  8. rsc.expectations.DeleteExpectations(key)
  9. return nil
  10. }
  11. if err != nil {
  12. return err
  13. }
  14. rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  15. selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
  16. if err != nil {
  17. utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
  18. return nil
  19. }
  20. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
  21. if err != nil {
  22. return err
  23. }
  24. filteredPods := controller.FilterActivePods(allPods)
  25. filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
  26. if err != nil {
  27. return err
  28. }
  29. var manageReplicasErr error
  30. if rsNeedsSync && rs.DeletionTimestamp == nil {
  31. manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
  32. }
  33. rs = rs.DeepCopy()
  34. newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
  35. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
  36. if err != nil {
  37. return err
  38. }
  39. if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
  40. updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
  41. updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
  42. rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
  43. }
  44. return manageReplicasErr
  45. }

5、replicaset工具函数

  1. func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
  2. diff := len(filteredPods) - int(*(rs.Spec.Replicas))
  3. rsKey, err := controller.KeyFunc(rs)
  4. if err != nil {
  5. utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
  6. return nil
  7. }
  8. if diff < 0 {
  9. diff *= -1
  10. if diff > rsc.burstReplicas {
  11. diff = rsc.burstReplicas
  12. }
  13. rsc.expectations.ExpectCreations(rsKey, diff)
  14. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
  15. err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
  16. if err != nil {
  17. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  18. return nil
  19. }
  20. }
  21. return err
  22. })
  23. if skippedPods := diff - successfulCreations; skippedPods > 0 {
  24. for i := 0; i < skippedPods; i++ {
  25. rsc.expectations.CreationObserved(rsKey)
  26. }
  27. }
  28. return err
  29. } else if diff > 0 {
  30. if diff > rsc.burstReplicas {
  31. diff = rsc.burstReplicas
  32. }
  33. relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
  34. utilruntime.HandleError(err)
  35. podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
  36. rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
  37. errCh := make(chan error, diff)
  38. var wg sync.WaitGroup
  39. wg.Add(diff)
  40. for _, pod := range podsToDelete {
  41. go func(targetPod *v1.Pod) {
  42. defer wg.Done()
  43. if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
  44. podKey := controller.PodKey(targetPod)
  45. rsc.expectations.DeletionObserved(rsKey, podKey)
  46. errCh <- err
  47. }
  48. }(pod)
  49. }
  50. wg.Wait()
  51. select {
  52. case err := <-errCh:
  53. if err != nil {
  54. return err
  55. }
  56. default:
  57. }
  58. }
  59. return nil
  60. }
  61. func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
  62. canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
  63. fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})
  64. if err != nil {
  65. return nil, err
  66. }
  67. if fresh.UID != rs.UID {
  68. return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
  69. }
  70. return fresh, nil
  71. })
  72. cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
  73. return cm.ClaimPods(filteredPods)
  74. }
  75. // 获取与给定rs有相同owner的rs集合
  76. func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet {
  77. controllerRef := metav1.GetControllerOf(rs)
  78. if controllerRef == nil {
  79. utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
  80. return nil
  81. }
  82. allRSs, err := rsc.rsLister.ReplicaSets(rs.Namespace).List(labels.Everything())
  83. if err != nil {
  84. utilruntime.HandleError(err)
  85. return nil
  86. }
  87. var relatedRSs []*apps.ReplicaSet
  88. for _, r := range allRSs {
  89. if ref := metav1.GetControllerOf(r); ref != nil && ref.UID == controllerRef.UID {
  90. relatedRSs = append(relatedRSs, r)
  91. }
  92. }
  93. return relatedRSs
  94. }
  95. // 获取关联给定pod的所有rs(通过label)
  96. func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
  97. rss, err := rsc.rsLister.GetPodReplicaSets(pod)
  98. if err != nil {
  99. return nil
  100. }
  101. if len(rss) > 1 {
  102. utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
  103. }
  104. return rss
  105. }
  106. // 通过给定的引用信息找到rs
  107. func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
  108. if controllerRef.Kind != rsc.Kind {
  109. return nil
  110. }
  111. rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
  112. if err != nil {
  113. return nil
  114. }
  115. // 若UID不相同,则rs已经被重建,但是pod还未被删除
  116. if rs.UID != controllerRef.UID {
  117. return nil
  118. }
  119. return rs
  120. }
  121. func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) {
  122. var relatedPods []*v1.Pod
  123. seen := make(map[types.UID]*apps.ReplicaSet)
  124. for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) {
  125. selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
  126. if err != nil {
  127. return nil, err
  128. }
  129. pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)
  130. if err != nil {
  131. return nil, err
  132. }
  133. for _, pod := range pods {
  134. if otherRS, found := seen[pod.UID]; found {
  135. klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name)
  136. continue
  137. }
  138. seen[pod.UID] = relatedRS
  139. relatedPods = append(relatedPods, pod)
  140. }
  141. }
  142. return relatedPods, nil
  143. }
  144. // 指数级创建pod
  145. func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
  146. remaining := count
  147. successes := 0
  148. for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
  149. errCh := make(chan error, batchSize)
  150. var wg sync.WaitGroup
  151. wg.Add(batchSize)
  152. for i := 0; i < batchSize; i++ {
  153. go func() {
  154. defer wg.Done()
  155. if err := fn(); err != nil {
  156. errCh <- err
  157. }
  158. }()
  159. }
  160. wg.Wait()
  161. curSuccesses := batchSize - len(errCh)
  162. successes += curSuccesses
  163. if len(errCh) > 0 {
  164. return successes, <-errCh
  165. }
  166. remaining -= batchSize
  167. }
  168. return successes, nil
  169. }
  170. func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
  171. if diff < len(filteredPods) {
  172. podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
  173. sort.Sort(podsWithRanks)
  174. }
  175. return filteredPods[:diff]
  176. }
  177. func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
  178. podsOnNode := make(map[string]int)
  179. for _, pod := range relatedPods {
  180. if controller.IsPodActive(pod) {
  181. podsOnNode[pod.Spec.NodeName]++
  182. }
  183. }
  184. ranks := make([]int, len(podsToRank))
  185. for i, pod := range podsToRank {
  186. ranks[i] = podsOnNode[pod.Spec.NodeName]
  187. }
  188. return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}
  189. }
  190. func getPodKeys(pods []*v1.Pod) []string {
  191. podKeys := make([]string, 0, len(pods))
  192. for _, pod := range pods {
  193. podKeys = append(podKeys, controller.PodKey(pod))
  194. }
  195. return podKeys
  196. }