-enable-leader-election
Enables Leader election to avoid multiple replicas of the controller reporting the status of Ingress, VirtualServer and VirtualServerRoute resources – only one replica will report status. (default true) See -report-ingress-status flag.
启用领导者选举,使领导者选举可以避免控制器的多个副本报告Ingress,VirtualServer和VirtualServerRoute资源的状态–仅一个副本将报告状态。 (默认为true)
请参阅-report-ingress-status标志。
-report-ingress-status
Update the address field in the status of Ingresses resources. Requires the -external-service flag or the external-status-address key in the ConfigMap.
报告状态,更新Ingresses资源状态中的address字段。 在ConfigMap中需要-external-service标志或external-status-address键。
这个参数控制status那块的。
leader作用
leader会给metric指标中加自己是leader的指标,还会启动worker去更新ingress,下面代码:
publish-service和publish-status-address
指定了—publish-status-address就会直接用这个地址更新到ingress status里。
指定了—public-service,就会获取这个service对象,根据不同的service类型获取到addr:
func statusAddressFromService(service string, kubeClient clientset.Interface) ([]string, error) {
ns, name, _ := k8s.ParseNameNS(service)
svc, err := kubeClient.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
return nil, err
}
switch svc.Spec.Type {
case apiv1.ServiceTypeExternalName:
return []string{svc.Spec.ExternalName}, nil
case apiv1.ServiceTypeClusterIP:
return []string{svc.Spec.ClusterIP}, nil
case apiv1.ServiceTypeNodePort:
addresses := []string{}
if svc.Spec.ExternalIPs != nil {
addresses = append(addresses, svc.Spec.ExternalIPs...)
} else {
addresses = append(addresses, svc.Spec.ClusterIP)
}
return addresses, nil
case apiv1.ServiceTypeLoadBalancer:
addresses := []string{}
for _, ip := range svc.Status.LoadBalancer.Ingress {
if ip.IP == "" {
addresses = append(addresses, ip.Hostname)
} else {
addresses = append(addresses, ip.IP)
}
}
addresses = append(addresses, svc.Spec.ExternalIPs...)
return addresses, nil
}
return nil, fmt.Errorf("unable to extract IP address/es from service %v", service)
}
如下用到的service:
这两个参数都没指定,就会获取到一组ingress-nginx pod的IP,更新到ingress status中。
不同的apiVersion怎么更新?
比如我创建的是如下的apiVersion:extensions/v1beta1
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"extensions/v1beta1","kind":"Ingress","metadata":{"annotations":{"nginx.ingress.kubernetes.io/rewrite-target":"/$2"},"creationTimestamp":"2019-12-13T07:01:27Z","generation":3,"name":"es-head","namespace":"default","resourceVersion":"752529","selfLink":"/apis/extensions/v1beta1/namespaces/default/ingresses/es-head","uid":"7829f59c-f506-42ee-ab94-c0e00222dfca"},"spec":{"rules":[{"host":"ca.test1.liabio.cn","http":{"paths":[{"backend":{"serviceName":"es-head","servicePort":9100},"path":"/es-head(/|$)(.*)","pathType":"ImplementationSpecific"}]}}]},"status":{"loadBalancer":{}}}
nginx.ingress.kubernetes.io/rewrite-target: /$2
creationTimestamp: "2020-10-16T08:01:53Z"
generation: 1
managedFields:
- apiVersion: extensions/v1beta1
fieldsType: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.: {}
f:kubectl.kubernetes.io/last-applied-configuration: {}
f:nginx.ingress.kubernetes.io/rewrite-target: {}
f:spec:
f:rules: {}
manager: kubectl-client-side-apply
operation: Update
time: "2020-10-16T08:01:53Z"
- apiVersion: networking.k8s.io/v1beta1
fieldsType: FieldsV1
fieldsV1:
f:status:
f:loadBalancer:
f:ingress: {}
manager: nginx-ingress-controller
operation: Update
time: "2020-10-16T08:19:51Z"
name: es-head
namespace: default
resourceVersion: "13664071"
selfLink: /apis/extensions/v1beta1/namespaces/default/ingresses/es-head
uid: 1f37c2f6-3dcd-457a-ba2b-c704a912fe71
spec:
rules:
- host: ca.test1.liabio.cn
http:
paths:
- backend:
serviceName: es-head
servicePort: 9100
path: /es-head(/|$)(.*)
pathType: ImplementationSpecific
status:
loadBalancer:
ingress:
- ip: 10.0.9.52
更新逻辑代码如下:
func runUpdate(ing *ingress.Ingress, status []apiv1.LoadBalancerIngress,
client clientset.Interface) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
if k8s.IsNetworkingIngressAvailable {
ingClient := client.NetworkingV1beta1().Ingresses(ing.Namespace)
currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("unexpected error searching Ingress %v/%v", ing.Namespace, ing.Name))
}
klog.Infof("updating Ingress %v/%v status from %v to %v", currIng.Namespace, currIng.Name, currIng.Status.LoadBalancer.Ingress, status)
currIng.Status.LoadBalancer.Ingress = status
_, err = ingClient.UpdateStatus(currIng)
if err != nil {
klog.Warningf("error updating ingress rule: %v", err)
}
} else {
ingClient := client.ExtensionsV1beta1().Ingresses(ing.Namespace)
currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("unexpected error searching Ingress %v/%v", ing.Namespace, ing.Name))
}
klog.Infof("updating Ingress %v/%v status from %v to %v", currIng.Namespace, currIng.Name, currIng.Status.LoadBalancer.Ingress, status)
currIng.Status.LoadBalancer.Ingress = status
_, err = ingClient.UpdateStatus(currIng)
if err != nil {
klog.Warningf("error updating ingress rule: %v", err)
}
}
return true, nil
}
}
下面的日志是if中打印的。
也就是IsNetworkingIngressAvailable为true,该变量是被下面函数赋值的:
// NetworkingIngressAvailable checks if the package "k8s.io/api/networking/v1beta1" is available or not
func NetworkingIngressAvailable(client clientset.Interface) bool {
// check kubernetes version to use new ingress package or not
version114, err := version.ParseGeneric("v1.14.0")
if err != nil {
klog.Errorf("unexpected error parsing version: %v", err)
return false
}
serverVersion, err := client.Discovery().ServerVersion()
if err != nil {
klog.Errorf("unexpected error parsing Kubernetes version: %v", err)
return false
}
runningVersion, err := version.ParseGeneric(serverVersion.String())
if err != nil {
klog.Errorf("unexpected error parsing running Kubernetes version: %v", err)
return false
}
return runningVersion.AtLeast(version114)
}
那他是怎么networking.k8s.io/v1beta1的apiVersion去更新我的extensions/v1beta1 ingress的呢?
看日志发现:以下代码的15行打印了日志:
lastSync是上一次执行’sync’的Unix时期时间;
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item := key.(Element)
if t.lastSync > item.Timestamp {
klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).Infof("syncing %v", item.Key)
if err := t.sync(key); err != nil {
klog.Warningf("requeuing %v, err %v", item.Key, err)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
0.30.0的代码:
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)
return
}
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
klog.V(3).Infof("queuing item %v", obj)
key, err := t.fn(obj)
if err != nil {
klog.Errorf("%v", err)
return
}
t.queue.Add(Element{
Key: key,
Timestamp: ts,
})
}
func (t *Queue) defaultKeyFunc(obj interface{}) (interface{}, error) {
key, err := keyFunc(obj)
if err != nil {
return "", fmt.Errorf("could not get key for object %+v: %v", obj, err)
}
return key, nil
}
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item := key.(Element)
if t.lastSync > item.Timestamp {
klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).Infof("syncing %v", item.Key)
if err := t.sync(key); err != nil {
klog.Warningf("requeuing %v, err %v", item.Key, err)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
状态更新,leader pod会定时执行:
// Start starts the loop to keep the status in sync
func (s statusSync) Run(stopCh chan struct{}) {
go s.syncQueue.Run(time.Second, stopCh)
// trigger initial sync
//触发初始同步
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
// when this instance is the leader we need to enqueue
// an item to trigger the update of the Ingress status.
//当此实例为领导者时,我们需要排队
//一个条目以触发Ingress状态的更新。
wait.PollUntil(updateInterval, func() (bool, error) {
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
return false, nil
}, stopCh)
}
这里的updateInterval是60s,即1分钟会调用GetDummyObject函数向queue中放入sync status这个ObjectMeta结构体。上面的go s.syncQueue.Run(time.Second, stopCh)每1s执行一次worker:
func (t *Queue) worker() {
for {
klog.Infof("-----queue.fn is nil == %v", t.fn == nil)
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item := key.(Element)
if t.lastSync > item.Timestamp {
klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).Infof("syncing %v", item.Key)
if err := t.sync(key); err != nil {
klog.Warningf("requeuing %v, err %v", item.Key, err)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
这里去执行status.go中的sync方法:
func (s *statusSync) sync(key interface{}) error {
klog.Infof("-----status sync start key: %v, ", key)
if s.syncQueue.IsShuttingDown() {
klog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
return nil
}
addrs, err := s.runningAddresses()
if err != nil {
return err
}
s.updateStatus(sliceToStatus(addrs))
return nil
}
这里的key就是ObjectMeta结构体,结构体是下面这样子:
runningAddresses会获取addrs,如果publish-service和publish-status-address参数都为空,会调用GetNodeIPOrName获取:
// GetNodeIPOrName returns the IP address or the name of a node in the cluster
func GetNodeIPOrName(kubeClient clientset.Interface, name string, useInternalIP bool) string {
node, err := kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Error getting node %v: %v", name, err)
return ""
}
if useInternalIP {
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeInternalIP {
if address.Address != "" {
return address.Address
}
}
}
}
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeExternalIP {
if address.Address != "" {
return address.Address
}
}
}
return ""
}
这里有个useInternalIP参数为—report-node-internal-ip-address启动参数指定,默认值为false:
useNodeInternalIP = flags.Bool("report-node-internal-ip-address", false,
`Set the load-balancer status of Ingress objects to internal Node addresses instead of external.
Requires the update-status parameter.`)
所以这里会去获取node.status.Address下的ExternalIP,如果获取不到就会返回空串,最终addrs就是空的。实际node.status.Address是不会存在ExternalIP的,只有InternalIP和Hostname:
status:
addresses:
- address: 10.0.9.52
type: InternalIP
- address: liabio
type: Hostname
以下是0.30.0的GetNodeIPOrName函数源码:
// GetNodeIPOrName returns the IP address or the name of a node in the cluster
func GetNodeIPOrName(kubeClient clientset.Interface, name string, useInternalIP bool) string {
node, err := kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Error getting node %v: %v", name, err)
return ""
}
defaultOrInternalIP := ""
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeInternalIP {
if address.Address != "" {
defaultOrInternalIP = address.Address
break
}
}
}
if useInternalIP {
return defaultOrInternalIP
}
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeExternalIP {
if address.Address != "" {
return address.Address
}
}
}
return defaultOrInternalIP
}
可以看到和0.24.1源码不同的是,当获取不到ExternalIP时,依旧会返回InternalIP。
updateStatus方法中会先获取和当前controller ingress-class相同的ingress对象去做更新处理:
// updateStatus changes the status information of Ingress rules
func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
//这里只会获取到和当前controller ingress-class相同的ingress对象
ings := s.IngressLister.ListIngresses()
p := pool.NewLimited(10)
defer p.Close()
batch := p.Batch()
sort.SliceStable(newIngressPoint, lessLoadBalancerIngress(newIngressPoint))
for _, ing := range ings {
curIPs := ing.Status.LoadBalancer.Ingress
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
if ingressSliceEqual(curIPs, newIngressPoint) {
klog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
continue
}
batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
}
batch.QueueComplete()
batch.WaitAll()
}
node的externalIP和internalIP有什么区别呢?什么时候才会出现externalIP?
官方文档中说明:
https://kubernetes.io/docs/concepts/architecture/nodes/
- ExternalIP:通常是节点的可外部路由(从集群外可访问)的 IP 地址。
- InternalIP:通常是节点的仅可在集群内部路由的 IP 地址。
参考
这个issue有类似的问题:
https://github.com/kubernetes/ingress-nginx/issues/1467