PS:源码会将一部分日志/监控等代码删除,周知
0、架构图
1、replicaset成员变量
type ReplicaSetController struct {
// 此controller控制的资源类型(通过gvk标识)
schema.GroupVersionKind
// k8s api客户端
kubeClient clientset.Interface
// pod控制器(负责pod的增删改)
podControl controller.PodControlInterface
// 单次最大创建的pod个数
burstReplicas int
// 真正的同步函数
syncHandler func(rsKey string) error
// 控制rs是否需要sync,相当于rs的二级缓存
expectations *controller.UIDTrackingControllerExpectations
// rs查询接口
rsLister appslisters.ReplicaSetLister
// 是否rs已经同步过一次
rsListerSynced cache.InformerSynced
// pod的查询接口
podLister corelisters.PodLister
// 是否pod已经同步过一次
podListerSynced cache.InformerSynced
// 工作队列
queue workqueue.RateLimitingInterface
}
2、主循环
// controller启动函数
// 同时开启workers个协程进行处理,不停的从workqueue读取发生变化的key,
// 然后进行相应的处理
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
3、replicaset事件回调函数
1、addRS
// 监听到rs的增加事件,直接将对应的key放入到workqueue即可
func (rsc *ReplicaSetController) addRS(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
rsc.enqueueRS(rs)
}
2、updateRS
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*apps.ReplicaSet)
curRS := cur.(*apps.ReplicaSet)
// oldRs删除事件丢失
if curRS.UID != oldRS.UID {
key, err := controller.KeyFunc(oldRS)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
return
}
rsc.deleteRS(cache.DeletedFinalStateUnknown{
Key: key,
Obj: oldRS,
})
}
rsc.enqueueRS(curRS)
}
3、deleteRS
func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
if !ok {
// 若rs的删除时间丢失,会有updateRS封装oldObj为DeletedFinalStateUnknown并调用deleteRS
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
rsc.expectations.DeleteExpectations(key)
rsc.queue.Add(key)
}
4、addPod
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// 不太清楚为什么会走有这种情况
rsc.deletePod(pod)
return
}
// 获取pod关联的rs
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
rsc.expectations.CreationObserved(rsKey)
rsc.queue.Add(rsKey)
return
}
rss := rsc.getPodReplicaSets(pod)
if len(rss) == 0 {
return
}
for _, rs := range rss {
rsc.enqueueRS(rs)
}
}
5、updatePod
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
// 此处是resync触发的onUpdate事件,若版本号一致,则无须变更
// rs无须担心pod update丢失的事件,因为在后续的同步中,rs会认领pod
if curPod.ResourceVersion == oldPod.ResourceVersion {
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
// 用户手动设置删除时间戳,未调用api进行删除,出现此种情况
if curPod.DeletionTimestamp != nil {
rsc.deletePod(curPod)
// lable发生变更,需要删除旧版本pod,因为label变更会导致匹配到不同的rs,需要将rs都通知到
// 此处不会触发两次DeletionObserved,DeletionObserved函数会保证只删除一次
if labelChanged {
rsc.deletePod(oldPod)
}
return
}
curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
// 到此处说明新pod的变更非删除,倘若发生了ownerRef变更,且旧pod ownerRef不为空
// 需要将旧pod绑定的rs入队
if controllerRefChanged && oldControllerRef != nil {
if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
rsc.enqueueRS(rs)
}
}
// 进入如下条件,需要如下情况
// 1、controllerRefChanged为true,oldControllerRef == nil
// 2、controllerRefChanged为false,oldControllerRef != nil
// 3、controllerRefChanged为true,oldControllerRef != nil并且curControllerRef != nil
if curControllerRef != nil {
rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
if rs == nil {
// 若新pod未找到对应的rs,直接return
return
}
rsc.enqueueRS(rs)
// 因为pod从ready到available状态需要时间
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}
// 逻辑走到这里需要如下情况
// 1、controllerRefChanged为false,oldControllerRef == nil,则newControllerRef == nil即为只是label发生了变化
// 2、controllerRefChanged为true,oldControllerRef != nil并且curControllerRef == nil即为controllerRefChanged
// 由于384行已经将旧pod对应rs入队,则此处无须入队,只需要将新pod对应rd入队
if labelChanged || controllerRefChanged {
rss := rsc.getPodReplicaSets(curPod)
if len(rss) == 0 {
return
}
for _, rs := range rss {
rsc.enqueueRS(rs)
}
}
}
6、deletePod
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
// 孤儿Pod不会进行Expectation处理
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
return
}
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.queue.Add(rsKey)
}
4、replicaset同步函数
1、切割key获取namespace、name
2、查询informer获取相应的rs对象。若没有找到,则已经被删除,删除expectations里面的rs
3、根据expectations判断是否需要进行同步并转换selector
以下条件会进行sync
a. key未找到
b. key达到fulfilled状态
c. key已经过期
4、获取rs对应ns所有的pod,并过滤不健康的pod(不健康pod为已经成功结束/已经失败/处在删除状态)
5、认领pod/释放相应的pod
6、若需要同步并且rs不处于删除状态,进行sync
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
filteredPods := controller.FilterActivePods(allPods)
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
return err
}
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
5、replicaset工具函数
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
}
return err
})
if skippedPods := diff - successfulCreations; skippedPods > 0 {
for i := 0; i < skippedPods; i++ {
rsc.expectations.CreationObserved(rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
utilruntime.HandleError(err)
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
if err != nil {
return err
}
default:
}
}
return nil
}
func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != rs.UID {
return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
return cm.ClaimPods(filteredPods)
}
// 获取与给定rs有相同owner的rs集合
func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet {
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil {
utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
return nil
}
allRSs, err := rsc.rsLister.ReplicaSets(rs.Namespace).List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return nil
}
var relatedRSs []*apps.ReplicaSet
for _, r := range allRSs {
if ref := metav1.GetControllerOf(r); ref != nil && ref.UID == controllerRef.UID {
relatedRSs = append(relatedRSs, r)
}
}
return relatedRSs
}
// 获取关联给定pod的所有rs(通过label)
func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
rss, err := rsc.rsLister.GetPodReplicaSets(pod)
if err != nil {
return nil
}
if len(rss) > 1 {
utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
}
return rss
}
// 通过给定的引用信息找到rs
func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
if controllerRef.Kind != rsc.Kind {
return nil
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
// 若UID不相同,则rs已经被重建,但是pod还未被删除
if rs.UID != controllerRef.UID {
return nil
}
return rs
}
func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) {
var relatedPods []*v1.Pod
seen := make(map[types.UID]*apps.ReplicaSet)
for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) {
selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
if err != nil {
return nil, err
}
pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)
if err != nil {
return nil, err
}
for _, pod := range pods {
if otherRS, found := seen[pod.UID]; found {
klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name)
continue
}
seen[pod.UID] = relatedRS
relatedPods = append(relatedPods, pod)
}
}
return relatedPods, nil
}
// 指数级创建pod
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}
func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
if diff < len(filteredPods) {
podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
sort.Sort(podsWithRanks)
}
return filteredPods[:diff]
}
func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
podsOnNode := make(map[string]int)
for _, pod := range relatedPods {
if controller.IsPodActive(pod) {
podsOnNode[pod.Spec.NodeName]++
}
}
ranks := make([]int, len(podsToRank))
for i, pod := range podsToRank {
ranks[i] = podsOnNode[pod.Spec.NodeName]
}
return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}
}
func getPodKeys(pods []*v1.Pod) []string {
podKeys := make([]string, 0, len(pods))
for _, pod := range pods {
podKeys = append(podKeys, controller.PodKey(pod))
}
return podKeys
}