https://docs.nginx.com/nginx-ingress-controller/configuration/global-configuration/command-line-arguments/

-enable-leader-election

Enables Leader election to avoid multiple replicas of the controller reporting the status of Ingress, VirtualServer and VirtualServerRoute resources – only one replica will report status. (default true) See -report-ingress-status flag.
启用领导者选举,使领导者选举可以避免控制器的多个副本报告Ingress,VirtualServer和VirtualServerRoute资源的状态–仅一个副本将报告状态。 (默认为true)
请参阅-report-ingress-status标志。

-report-ingress-status

Update the address field in the status of Ingresses resources. Requires the -external-service flag or the external-status-address key in the ConfigMap.
报告状态,更新Ingresses资源状态中的address字段。 在ConfigMap中需要-external-service标志或external-status-address键。

image.png
这个参数控制status那块的。
image.png
image.png

image.png

leader作用

leader会给metric指标中加自己是leader的指标,还会启动worker去更新ingress,下面代码:
image.png

publish-service和publish-status-address

指定了—publish-status-address就会直接用这个地址更新到ingress status里。

image.png
指定了—public-service,就会获取这个service对象,根据不同的service类型获取到addr:

  1. func statusAddressFromService(service string, kubeClient clientset.Interface) ([]string, error) {
  2. ns, name, _ := k8s.ParseNameNS(service)
  3. svc, err := kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
  4. if err != nil {
  5. return nil, err
  6. }
  7. switch svc.Spec.Type {
  8. case apiv1.ServiceTypeExternalName:
  9. return []string{svc.Spec.ExternalName}, nil
  10. case apiv1.ServiceTypeClusterIP:
  11. return []string{svc.Spec.ClusterIP}, nil
  12. case apiv1.ServiceTypeNodePort:
  13. addresses := []string{}
  14. if svc.Spec.ExternalIPs != nil {
  15. addresses = append(addresses, svc.Spec.ExternalIPs...)
  16. } else {
  17. addresses = append(addresses, svc.Spec.ClusterIP)
  18. }
  19. return addresses, nil
  20. case apiv1.ServiceTypeLoadBalancer:
  21. addresses := []string{}
  22. for _, ip := range svc.Status.LoadBalancer.Ingress {
  23. if ip.IP == "" {
  24. addresses = append(addresses, ip.Hostname)
  25. } else {
  26. addresses = append(addresses, ip.IP)
  27. }
  28. }
  29. addresses = append(addresses, svc.Spec.ExternalIPs...)
  30. return addresses, nil
  31. }
  32. return nil, fmt.Errorf("unable to extract IP address/es from service %v", service)
  33. }

如下用到的service:
image.png
image.png

这两个参数都没指定,就会获取到一组ingress-nginx pod的IP,更新到ingress status中。

不同的apiVersion怎么更新?

比如我创建的是如下的apiVersion:extensions/v1beta1

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"extensions/v1beta1","kind":"Ingress","metadata":{"annotations":{"nginx.ingress.kubernetes.io/rewrite-target":"/$2"},"creationTimestamp":"2019-12-13T07:01:27Z","generation":3,"name":"es-head","namespace":"default","resourceVersion":"752529","selfLink":"/apis/extensions/v1beta1/namespaces/default/ingresses/es-head","uid":"7829f59c-f506-42ee-ab94-c0e00222dfca"},"spec":{"rules":[{"host":"ca.test1.liabio.cn","http":{"paths":[{"backend":{"serviceName":"es-head","servicePort":9100},"path":"/es-head(/|$)(.*)","pathType":"ImplementationSpecific"}]}}]},"status":{"loadBalancer":{}}}
    nginx.ingress.kubernetes.io/rewrite-target: /$2
  creationTimestamp: "2020-10-16T08:01:53Z"
  generation: 1
  managedFields:
  - apiVersion: extensions/v1beta1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:kubectl.kubernetes.io/last-applied-configuration: {}
          f:nginx.ingress.kubernetes.io/rewrite-target: {}
      f:spec:
        f:rules: {}
    manager: kubectl-client-side-apply
    operation: Update
    time: "2020-10-16T08:01:53Z"
  - apiVersion: networking.k8s.io/v1beta1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:loadBalancer:
          f:ingress: {}
    manager: nginx-ingress-controller
    operation: Update
    time: "2020-10-16T08:19:51Z"
  name: es-head
  namespace: default
  resourceVersion: "13664071"
  selfLink: /apis/extensions/v1beta1/namespaces/default/ingresses/es-head
  uid: 1f37c2f6-3dcd-457a-ba2b-c704a912fe71
spec:
  rules:
  - host: ca.test1.liabio.cn
    http:
      paths:
      - backend:
          serviceName: es-head
          servicePort: 9100
        path: /es-head(/|$)(.*)
        pathType: ImplementationSpecific
status:
  loadBalancer:
    ingress:
    - ip: 10.0.9.52

更新逻辑代码如下:


func runUpdate(ing *ingress.Ingress, status []apiv1.LoadBalancerIngress,
    client clientset.Interface) pool.WorkFunc {
    return func(wu pool.WorkUnit) (interface{}, error) {
        if wu.IsCancelled() {
            return nil, nil
        }

        if k8s.IsNetworkingIngressAvailable {
            ingClient := client.NetworkingV1beta1().Ingresses(ing.Namespace)
            currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
            if err != nil {
                return nil, errors.Wrap(err, fmt.Sprintf("unexpected error searching Ingress %v/%v", ing.Namespace, ing.Name))
            }

            klog.Infof("updating Ingress %v/%v status from %v to %v", currIng.Namespace, currIng.Name, currIng.Status.LoadBalancer.Ingress, status)
            currIng.Status.LoadBalancer.Ingress = status
            _, err = ingClient.UpdateStatus(currIng)
            if err != nil {
                klog.Warningf("error updating ingress rule: %v", err)
            }
        } else {
            ingClient := client.ExtensionsV1beta1().Ingresses(ing.Namespace)
            currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
            if err != nil {
                return nil, errors.Wrap(err, fmt.Sprintf("unexpected error searching Ingress %v/%v", ing.Namespace, ing.Name))
            }

            klog.Infof("updating Ingress %v/%v status from %v to %v", currIng.Namespace, currIng.Name, currIng.Status.LoadBalancer.Ingress, status)
            currIng.Status.LoadBalancer.Ingress = status
            _, err = ingClient.UpdateStatus(currIng)
            if err != nil {
                klog.Warningf("error updating ingress rule: %v", err)
            }
        }

        return true, nil
    }
}

下面的日志是if中打印的。
image.png

也就是IsNetworkingIngressAvailable为true,该变量是被下面函数赋值的:

// NetworkingIngressAvailable checks if the package "k8s.io/api/networking/v1beta1" is available or not
func NetworkingIngressAvailable(client clientset.Interface) bool {
    // check kubernetes version to use new ingress package or not
    version114, err := version.ParseGeneric("v1.14.0")
    if err != nil {
        klog.Errorf("unexpected error parsing version: %v", err)
        return false
    }

    serverVersion, err := client.Discovery().ServerVersion()
    if err != nil {
        klog.Errorf("unexpected error parsing Kubernetes version: %v", err)
        return false
    }

    runningVersion, err := version.ParseGeneric(serverVersion.String())
    if err != nil {
        klog.Errorf("unexpected error parsing running Kubernetes version: %v", err)
        return false
    }

    return runningVersion.AtLeast(version114)
}

那他是怎么networking.k8s.io/v1beta1的apiVersion去更新我的extensions/v1beta1 ingress的呢?
image.png

image.png
看日志发现:以下代码的15行打印了日志:

lastSync是上一次执行’sync’的Unix时期时间;

// worker processes work in the queue through sync.
func (t *Queue) worker() {
    for {
        key, quit := t.queue.Get()
        if quit {
            if !isClosed(t.workerDone) {
                close(t.workerDone)
            }
            return
        }
        ts := time.Now().UnixNano()

        item := key.(Element)
        if t.lastSync > item.Timestamp {
            klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
            t.queue.Forget(key)
            t.queue.Done(key)
            continue
        }

        klog.V(3).Infof("syncing %v", item.Key)
        if err := t.sync(key); err != nil {
            klog.Warningf("requeuing %v, err %v", item.Key, err)
            t.queue.AddRateLimited(Element{
                Key:       item.Key,
                Timestamp: time.Now().UnixNano(),
            })
        } else {
            t.queue.Forget(key)
            t.lastSync = ts
        }

        t.queue.Done(key)
    }
}

0.30.0的代码:


// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
    if t.IsShuttingDown() {
        klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
        return
    }

    ts := time.Now().UnixNano()
    if !skippable {
        // make sure the timestamp is bigger than lastSync
        ts = time.Now().Add(24 * time.Hour).UnixNano()
    }
    klog.V(3).Infof("queuing item %v", obj)
    key, err := t.fn(obj)
    if err != nil {
        klog.Errorf("%v", err)
        return
    }
    t.queue.Add(Element{
        Key:       key,
        Timestamp: ts,
    })
}

func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) {
    key, err := keyFunc(obj)
    if err != nil {
        return "", fmt.Errorf("could not get key for object %+v: %v", obj, err)
    }

    return key, nil
}

// worker processes work in the queue through sync.
func (t *Queue) worker() {
    for {
        key, quit := t.queue.Get()
        if quit {
            if !isClosed(t.workerDone) {
                close(t.workerDone)
            }
            return
        }
        ts := time.Now().UnixNano()

        item := key.(Element)
        if t.lastSync > item.Timestamp {
            klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
            t.queue.Forget(key)
            t.queue.Done(key)
            continue
        }

        klog.V(3).Infof("syncing %v", item.Key)
        if err := t.sync(key); err != nil {
            klog.Warningf("requeuing %v, err %v", item.Key, err)
            t.queue.AddRateLimited(Element{
                Key:       item.Key,
                Timestamp: time.Now().UnixNano(),
            })
        } else {
            t.queue.Forget(key)
            t.lastSync = ts
        }

        t.queue.Done(key)
    }
}

状态更新,leader pod会定时执行:

// Start starts the loop to keep the status in sync
func (s statusSync) Run(stopCh chan struct{}) {
    go s.syncQueue.Run(time.Second, stopCh)

    // trigger initial sync
    //触发初始同步
    s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))

    // when this instance is the leader we need to enqueue
    // an item to trigger the update of the Ingress status.
    //当此实例为领导者时,我们需要排队
    //一个条目以触发Ingress状态的更新。
    wait.PollUntil(updateInterval, func() (bool, error) {
        s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
        return false, nil
    }, stopCh)
}

这里的updateInterval是60s,即1分钟会调用GetDummyObject函数向queue中放入sync status这个ObjectMeta结构体。上面的go s.syncQueue.Run(time.Second, stopCh)每1s执行一次worker:

func (t *Queue) worker() {
    for {
        klog.Infof("-----queue.fn is nil == %v", t.fn == nil)
        key, quit := t.queue.Get()
        if quit {
            if !isClosed(t.workerDone) {
                close(t.workerDone)
            }
            return
        }
        ts := time.Now().UnixNano()

        item := key.(Element)
        if t.lastSync > item.Timestamp {
            klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
            t.queue.Forget(key)
            t.queue.Done(key)
            continue
        }

        klog.V(3).Infof("syncing %v", item.Key)
        if err := t.sync(key); err != nil {
            klog.Warningf("requeuing %v, err %v", item.Key, err)
            t.queue.AddRateLimited(Element{
                Key:       item.Key,
                Timestamp: time.Now().UnixNano(),
            })
        } else {
            t.queue.Forget(key)
            t.lastSync = ts
        }

        t.queue.Done(key)
    }
}

这里去执行status.go中的sync方法:

func (s *statusSync) sync(key interface{}) error {
    klog.Infof("-----status sync start key: %v, ", key)
    if s.syncQueue.IsShuttingDown() {
        klog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
        return nil
    }

    addrs, err := s.runningAddresses()
    if err != nil {
        return err
    }
    s.updateStatus(sliceToStatus(addrs))
    return nil
}

这里的key就是ObjectMeta结构体,结构体是下面这样子:
image.png
runningAddresses会获取addrs,如果publish-service和publish-status-address参数都为空,会调用GetNodeIPOrName获取:


// GetNodeIPOrName returns the IP address or the name of a node in the cluster
func GetNodeIPOrName(kubeClient clientset.Interface, name string, useInternalIP bool) string {
    node, err := kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
    if err != nil {
        klog.Errorf("Error getting node %v: %v", name, err)
        return ""
    }

    if useInternalIP {
        for _, address := range node.Status.Addresses {
            if address.Type == apiv1.NodeInternalIP {
                if address.Address != "" {
                    return address.Address
                }
            }
        }
    }

    for _, address := range node.Status.Addresses {
        if address.Type == apiv1.NodeExternalIP {
            if address.Address != "" {
                return address.Address
            }
        }
    }
    return ""
}

这里有个useInternalIP参数为—report-node-internal-ip-address启动参数指定,默认值为false:

useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false,
            `Set the load-balancer status of Ingress objects to internal Node addresses instead of external.
Requires the update-status parameter.`)

所以这里会去获取node.status.Address下的ExternalIP,如果获取不到就会返回空串,最终addrs就是空的。实际node.status.Address是不会存在ExternalIP的,只有InternalIP和Hostname:

  status:
    addresses:
    - address: 10.0.9.52
      type: InternalIP
    - address: liabio
      type: Hostname

以下是0.30.0的GetNodeIPOrName函数源码:


// GetNodeIPOrName returns the IP address or the name of a node in the cluster
func GetNodeIPOrName(kubeClient clientset.Interface, name string, useInternalIP bool) string {
    node, err := kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
    if err != nil {
        klog.Errorf("Error getting node %v: %v", name, err)
        return ""
    }

    defaultOrInternalIP := ""
    for _, address := range node.Status.Addresses {
        if address.Type == apiv1.NodeInternalIP {
            if address.Address != "" {
                defaultOrInternalIP = address.Address
                break
            }
        }
    }

    if useInternalIP {
        return defaultOrInternalIP
    }

    for _, address := range node.Status.Addresses {
        if address.Type == apiv1.NodeExternalIP {
            if address.Address != "" {
                return address.Address
            }
        }
    }

    return defaultOrInternalIP
}

可以看到和0.24.1源码不同的是,当获取不到ExternalIP时,依旧会返回InternalIP。

updateStatus方法中会先获取和当前controller ingress-class相同的ingress对象去做更新处理:

// updateStatus changes the status information of Ingress rules
func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
    //这里只会获取到和当前controller ingress-class相同的ingress对象
    ings := s.IngressLister.ListIngresses()

    p := pool.NewLimited(10)
    defer p.Close()

    batch := p.Batch()
    sort.SliceStable(newIngressPoint, lessLoadBalancerIngress(newIngressPoint))

    for _, ing := range ings {
        curIPs := ing.Status.LoadBalancer.Ingress
        sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
        if ingressSliceEqual(curIPs, newIngressPoint) {
            klog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
            continue
        }

        batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
    }

    batch.QueueComplete()
    batch.WaitAll()
}

node的externalIP和internalIP有什么区别呢?什么时候才会出现externalIP?

官方文档中说明:
https://kubernetes.io/docs/concepts/architecture/nodes/

  • ExternalIP:通常是节点的可外部路由(从集群外可访问)的 IP 地址。
  • InternalIP:通常是节点的仅可在集群内部路由的 IP 地址。

参考

这个issue有类似的问题:
https://github.com/kubernetes/ingress-nginx/issues/1467