概述

在 Kubernetes 中,大部分功能都是通过声明式 API 来描述的。
而 Kubernetes 为了能够更加灵活,还支持了 CRD(自定义资源)类型,而针对自定义的资源对象的操作,则需要通过 Kubernetes Operator 来进行管理。
在本文中,我们将会详细介绍如何从零开始开发一个 K8s Operator。

StepByStep

项目初始化

在本文中,我们希望实现如下一个 CRD 文件的处理,文件名称为 network.yaml,内容如下图所示:

  1. apiVersion: apiextensions.k8s.io/v1beta1
  2. kind: CustomResourceDefinition
  3. metadata:
  4. name: networks.samplecrd.k8s.io
  5. spec:
  6. group: samplecrd.k8s.io
  7. version: v1
  8. names:
  9. kind: Network
  10. plural: networks
  11. scope: Namespaced

可以看到,在这个 CRD 中,我指定了“group: samplecrd.k8s.io”“version: v1”这样的 API 信息,也指定了这个 CR 的资源类型叫作 Network,复数(plural)是 networks。然后,我还声明了它的 scope 是 Namespaced,即:我们定义的这个 Network 是一个属于 Namespace 的对象,类似于 Pod。
这就是一个 Network API 资源类型的 API 部分的宏观定义了。接下来,我还需要让 Kubernetes“认识”这种 YAML 文件里描述的“网络”部分,比如“cidr”(网段),“gateway”(网关)这些字段的含义。这些内容就需要涉及到一定的代码开发了。
我们需要创建一个如下目录结构的项目:

  1. $ tree github.com/<your-name>/k8s-controller-custom-resource
  2. .
  3. ├── controller.go
  4. ├── crd
  5. └── network.yaml
  6. ├── example
  7. └── example-network.yaml
  8. ├── main.go
  9. └── pkg
  10. └── apis
  11. └── samplecrd
  12. ├── register.go
  13. └── v1
  14. ├── doc.go
  15. ├── register.go
  16. └── types.go

其中,pkg/apis/samplecrd 就是 API 组的名字,v1 是版本,而 v1 下面的 types.go 文件里,则定义了 Network 对象的完整描述。
然后,我在 pkg/apis/samplecrd 目录下创建了一个 register.go 文件,用来放置后面要用到的全局变量。这个文件的内容如下所示:

  1. package samplecrd
  2. const (
  3. GroupName = "samplecrd.k8s.io"
  4. Version = "v1"
  5. )

接着,我需要在 pkg/apis/samplecrd 目录下添加一个 doc.go 文件(Golang 的文档源文件)。这个文件里的内容如下所示:

  1. // +k8s:deepcopy-gen=package
  2. // +groupName=samplecrd.k8s.io
  3. package v1

在这个文件中,你会看到 +[=value]格式的注释,这就是 Kubernetes 进行代码生成要用的 Annotation 风格的注释。其中,+k8s:deepcopy-gen=package 意思是,请为整个 v1 包里的所有类型定义自动生成 DeepCopy 方法;而+groupName=samplecrd.k8s.io,则定义了这个包对应的 API 组的名字。
可以看到,这些定义在 doc.go 文件的注释,起到的是全局的代码生成控制的作用,所以也被称为 Global Tags。
接下来,我需要添加 types.go 文件。顾名思义,它的作用就是定义一个 Network 类型到底有哪些字段(比如,spec 字段里的内容)。这个文件的主要内容如下所示:

  1. package v1
  2. ...
  3. // +genclient
  4. // +genclient:noStatus
  5. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
  6. // Network describes a Network resource
  7. type Network struct {
  8. // TypeMeta is the metadata for the resource, like kind and apiversion
  9. metav1.TypeMeta `json:",inline"`
  10. // ObjectMeta contains the metadata for the particular object, including
  11. // things like...
  12. // - name
  13. // - namespace
  14. // - self link
  15. // - labels
  16. // - ... etc ...
  17. metav1.ObjectMeta `json:"metadata,omitempty"`
  18. Spec networkspec `json:"spec"`
  19. }
  20. // networkspec is the spec for a Network resource
  21. type networkspec struct {
  22. Cidr string `json:"cidr"`
  23. Gateway string `json:"gateway"`
  24. }
  25. // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
  26. // NetworkList is a list of Network resources
  27. type NetworkList struct {
  28. metav1.TypeMeta `json:",inline"`
  29. metav1.ListMeta `json:"metadata"`
  30. Items []Network `json:"items"`
  31. }

在上面这部分代码里,你可以看到 Network 类型定义方法跟标准的 Kubernetes 对象一样,都包括了 TypeMeta(API 元数据)和 ObjectMeta(对象元数据)字段。
而其中的 Spec 字段,就是需要我们自己定义的部分。所以,在 networkspec 里,我定义了 Cidr 和 Gateway 两个字段。其中,每个字段最后面的部分比如json:”cidr”,指的就是这个字段被转换成 JSON 格式之后的名字,也就是 YAML 文件里的字段名字。
此外,除了定义 Network 类型,你还需要定义一个 NetworkList 类型,用来描述一组 Network 对象应该包括哪些字段。之所以需要这样一个类型,是因为在 Kubernetes 中,获取所有 X 对象的 List() 方法,返回值都是List 类型,而不是 X 类型的数组。这是不一样的。
同样地,在 Network 和 NetworkList 类型上,也有代码生成注释。
其中,+genclient 的意思是:请为下面这个 API 资源类型生成对应的 Client 代码(这个 Client,我马上会讲到)。而 +genclient:noStatus 的意思是:这个 API 资源类型定义里,没有 Status 字段。否则,生成的 Client 就会自动带上 UpdateStatus 方法。
如果你的类型定义包括了 Status 字段的话,就不需要这句 +genclient:noStatus 注释了。比如下面这个例子:

  1. // +genclient
  2. // Network is a specification for a Network resource
  3. type Network struct {
  4. metav1.TypeMeta `json:",inline"`
  5. metav1.ObjectMeta `json:"metadata,omitempty"`
  6. Spec NetworkSpec `json:"spec"`
  7. Status NetworkStatus `json:"status"`
  8. }

需要注意的是,+genclient 只需要写在 Network 类型上,而不用写在 NetworkList 上。因为 NetworkList 只是一个返回值类型,Network 才是“主类型”。

而由于我在 Global Tags 里已经定义了为所有类型生成 DeepCopy 方法,所以这里就不需要再显式地加上 +k8s:deepcopy-gen=true 了。当然,这也就意味着你可以用 +k8s:deepcopy-gen=false 来阻止为某些类型生成 DeepCopy。

你可能已经注意到,在这两个类型上面还有一句+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object的注释。它的意思是,请在生成 DeepCopy 的时候,实现 Kubernetes 提供的 runtime.Object 接口。否则,在某些版本的 Kubernetes 里,你的这个类型定义会出现编译错误。这是一个固定的操作,记住即可

最后,我需要再编写一个 pkg/apis/samplecrd/v1/register.go 文件。
在前面对 APIServer 工作原理的讲解中,我已经提到,“registry”的作用就是注册一个类型(Type)给 APIServer。其中,Network 资源类型在服务器端注册的工作,APIServer 会自动帮我们完成。但与之对应的,我们还需要让客户端也能“知道”Network 资源类型的定义。这就需要我们在项目里添加一个 register.go 文件。它最主要的功能,就是定义了如下所示的 addKnownTypes() 方法:

  1. package v1
  2. ...
  3. // addKnownTypes adds our types to the API scheme by registering
  4. // Network and NetworkList
  5. func addKnownTypes(scheme *runtime.Scheme) error {
  6. scheme.AddKnownTypes(
  7. SchemeGroupVersion,
  8. &Network{},
  9. &NetworkList{},
  10. )
  11. // register the type in the scheme
  12. metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
  13. return nil
  14. }

有了这个方法,Kubernetes 就能够在后面生成客户端的时候,“知道”Network 以及 NetworkList 类型的定义了。
像上面这种 register.go 文件里的内容其实是非常固定的,你以后可以直接使用我提供的这部分代码做模板,然后把其中的资源类型、GroupName 和 Version 替换成你自己的定义即可。
这样,Network 对象的定义工作就全部完成了。

接下来,我就要使用 Kubernetes 提供的代码生成工具,为上面定义的 Network 资源类型自动生成 clientset、informer 和 lister

这个代码生成工具名为 k8s.io/code-generator,使用方法如下所示:

  1. # 代码生成的工作目录,也就是我们的项目路径
  2. ROOT_PACKAGE="github.com/resouer/k8s-controller-custom-resource"
  3. # API Group
  4. CUSTOM_RESOURCE_NAME="samplecrd"
  5. # API Version
  6. CUSTOM_RESOURCE_VERSION="v1"
  7. # 安装k8s.io/code-generator
  8. go get -u k8s.io/code-generator/...
  9. cd $GOPATH/src/k8s.io/code-generator
  10. # 执行代码自动生成,其中pkg/client是生成目标目录,pkg/apis是类型定义目录
  11. ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"

代码生成工作完成之后,我们再查看一下这个项目的目录结构:

  1. $ tree
  2. .
  3. ├── controller.go
  4. ├── crd
  5. └── network.yaml
  6. ├── example
  7. └── example-network.yaml
  8. ├── main.go
  9. └── pkg
  10. ├── apis
  11. └── samplecrd
  12. ├── constants.go
  13. └── v1
  14. ├── doc.go
  15. ├── register.go
  16. ├── types.go
  17. └── zz_generated.deepcopy.go
  18. └── client
  19. ├── clientset
  20. ├── informers
  21. └── listers

其中,pkg/apis/samplecrd/v1 下面的 zz_generated.deepcopy.go 文件,就是自动生成的 DeepCopy 代码文件。而整个 client 目录,以及下面的三个包(clientset、informers、 listers),都是 Kubernetes 为 Network 类型生成的客户端库,这些库会在后面编写自定义控制器的时候用到。

什么是自定义控制器?

“声明式 API”并不像“命令式 API”那样有着明显的执行逻辑。这就使得基于声明式 API 的业务功能实现,往往需要通过控制器模式来“监视”API 对象的变化(比如,创建或者删除 Network),然后以此来决定实际要执行的具体工作。
下面,我们来看看在 K8s 中自定义控制器的工作原理。
在 Kubernetes 项目中,一个自定义控制器的工作原理,可以用下面这样一幅流程图来表示:
image.png
我们从左开始看起。
首先,自定义控制器要做的第一件事就是从 Kubernetes 的 APIServer 里获取它所关心的对象,也就是我们自定义的资源对象。这个操作,依靠的是一个叫作 Informer(可以翻译为:通知器)的代码库完成的。Informer 与 API 对象是一一对应的。例如对于一个名为 Network 的 CRD而言,所以我们传递给自定义控制器的就是一个 Network 对象的 Informer(Network Informer)。

对于一个 Informer 对象而言,真正负责与 K8s APIServer 建立和维护连接的是 Informer 中所使用的 Reflector 包。更具体地说,Reflector 使用的是一种叫作 ListAndWatch 的方法,来“获取”并“监听”这些 Network 对象实例的变化。
在 ListAndWatch 机制下,一旦 APIServer 端有新的 Network 实例被创建、删除或者更新,Reflector 都会收到“事件通知”。这时,该事件及它对应的 API 对象这个组合,就被称为增量(Delta),它会被放进一个 Delta FIFO Queue(即:增量先进先出队列)中。
而另一方面,Informe 会不断地从这个 Delta FIFO Queue 里读取(Pop)增量。每拿到一个增量,Informer 就会判断这个增量里的事件类型,然后创建或者更新本地对象的缓存。这个缓存,在 Kubernetes 里一般被叫作 Store。比如,如果事件类型是 Added(添加对象),那么 Informer 就会通过一个叫作 Indexer 的库把这个增量里的 API 对象保存在本地缓存中,并为它创建索引。相反,如果增量的事件类型是 Deleted(删除对象),那么 Informer 就会从本地缓存中删除这个对象。

这个同步本地缓存的工作,是 Informer 的第一个职责,也是它最重要的职责

而 Informer 的第二个职责,则是根据这些事件的类型,触发事先注册好的 ResourceEventHandler。这些 Handler,需要在创建控制器的时候注册给它对应的 Informer

自定义控制器开发说明

编写自定义控制器代码的过程包括:编写 main 函数、编写自定义控制器的定义,以及编写控制器里的业务逻辑三个部分。

main 函数开发

main 函数的主要工作就是,定义并初始化一个自定义控制器(Custom Controller),然后启动它。
这部分代码的主要内容如下所示:

  1. func main() {
  2. ...
  3. cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
  4. ...
  5. kubeClient, err := kubernetes.NewForConfig(cfg)
  6. ...
  7. networkClient, err := clientset.NewForConfig(cfg)
  8. ...
  9. networkInformerFactory := informers.NewSharedInformerFactory(networkClient, ...)
  10. controller := NewController(
  11. kubeClient, networkClient,
  12. networkInformerFactory.Samplecrd().V1().Networks()
  13. )
  14. go networkInformerFactory.Start(stopCh)
  15. if err = controller.Run(2, stopCh); err != nil {
  16. glog.Fatalf("Error running controller: %s", err.Error())
  17. }
  18. }

这个 main 函数主要通过三步完成了初始化并启动一个自定义控制器的工作:

  1. 根据我提供的 Master 配置(APIServer 的地址端口和 kubeconfig 的路径),创建一个 Kubernetes 的 client(kubeClient)和 Network 对象的 client(networkClient)。
  2. main 函数为 Network 对象创建一个叫作 InformerFactory(即:networkInformerFactory)的工厂,并使用它生成一个 Network 对象的 Informer,传递给控制器。
  3. main 函数启动上述的 Informer,然后执行 controller.Run,启动自定义控制器。

自定义控制器定义

接下来,我们就来编写这个控制器的定义,它的主要内容如下所示:

  1. func NewController(
  2. kubeclientset kubernetes.Interface,
  3. networkclientset clientset.Interface,
  4. networkInformer informers.NetworkInformer) *Controller {
  5. ...
  6. controller := &Controller{
  7. kubeclientset: kubeclientset,
  8. networkclientset: networkclientset,
  9. networksLister: networkInformer.Lister(),
  10. networksSynced: networkInformer.Informer().HasSynced,
  11. workqueue: workqueue.NewNamedRateLimitingQueue(..., "Networks"),
  12. ...
  13. }
  14. networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  15. AddFunc: controller.enqueueNetwork,
  16. UpdateFunc: func(old, new interface{}) {
  17. oldNetwork := old.(*samplecrdv1.Network)
  18. newNetwork := new.(*samplecrdv1.Network)
  19. if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
  20. return
  21. }
  22. controller.enqueueNetwork(new)
  23. },
  24. DeleteFunc: controller.enqueueNetworkForDelete,
  25. return controller
  26. }

我们前面在 main 函数里创建了两个 client(kubeclientset 和 networkclientset),然后在这段代码里,使用这两个 client 和前面创建的 Informer,初始化了自定义控制器
值得注意的是,在这个自定义控制器里,我还设置了一个工作队列(work queue),它正是处于示意图中间位置的 WorkQueue。这个工作队列的作用是,负责同步 Informer 和控制循环之间的数据。

然后,我们为 networkInformer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc),分别对应 API 对象的“添加”“更新”和“删除”事件。而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中。需要注意的是,实际入队的并不是 API 对象本身,而是它们的 Key,即:该 API 对象的namespace/name。而我们后面即将编写的控制循环,则会不断地从这个工作队列里拿到这些 Key,然后开始执行真正的控制逻辑。

综合上面的讲述,你现在应该就能明白,所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client。它是自定义控制器跟 APIServer 进行数据同步的重要组件

更具体地说,Informer 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。其中,ListAndWatch 方法的含义是:首先,通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化。而通过监听到的事件变化,Informer 就可以实时地更新本地缓存,并且调用这些事件对应的 EventHandler 了。此外,在这个过程中,每经过 resyncPeriod 指定的时间,Informer 维护的本地缓存,都会使用最近一次 LIST 返回的结果强制更新一次,从而保证缓存的有效性。在 Kubernetes 中,这个缓存强制更新的操作就叫作:resync。需要注意的是,这个定时 resync 操作,也会触发 Informer 注册的“更新”事件。但此时,这个“更新”事件对应的 Network 对象实际上并没有发生变化,即:新、旧两个 Network 对象的 ResourceVersion 是一样的。在这种情况下,Informer 就不需要对这个更新事件再做进一步的处理了

控制器业务逻辑开发

接下来,我们就来到了示意图中最后面的控制循环(Control Loop)部分,也正是我在 main 函数最后调用 controller.Run() 启动的“控制循环”。它的主要内容如下所示:

  1. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
  2. ...
  3. if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
  4. return fmt.Errorf("failed to wait for caches to sync")
  5. }
  6. ...
  7. for i := 0; i < threadiness; i++ {
  8. go wait.Until(c.runWorker, time.Second, stopCh)
  9. }
  10. ...
  11. return nil
  12. }

可以看到,启动控制循环的逻辑非常简单:

  1. 首先,等待 Informer 完成一次本地缓存的数据同步操作;
  2. 然后,直接通过 goroutine 启动一个(或者并发启动多个)“无限循环”的任务。

而这个“无限循环”任务的每一个循环周期,执行的正是我们真正关心的业务逻辑。

所以接下来,我们就来编写这个自定义控制器的业务逻辑,它的主要内容如下所示:

  1. func (c *Controller) runWorker() {
  2. for c.processNextWorkItem() {
  3. }
  4. }
  5. func (c *Controller) processNextWorkItem() bool {
  6. obj, shutdown := c.workqueue.Get()
  7. ...
  8. err := func(obj interface{}) error {
  9. ...
  10. if err := c.syncHandler(key); err != nil {
  11. return fmt.Errorf("error syncing '%s': %s", key, err.Error())
  12. }
  13. c.workqueue.Forget(obj)
  14. ...
  15. return nil
  16. }(obj)
  17. ...
  18. return true
  19. }
  20. func (c *Controller) syncHandler(key string) error {
  21. namespace, name, err := cache.SplitMetaNamespaceKey(key)
  22. ...
  23. network, err := c.networksLister.Networks(namespace).Get(name)
  24. if err != nil {
  25. if errors.IsNotFound(err) {
  26. glog.Warningf("Network does not exist in local cache: %s/%s, will delete it from Neutron ...",
  27. namespace, name)
  28. glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
  29. namespace, name)
  30. // FIX ME: call Neutron API to delete this network by name.
  31. //
  32. // neutron.Delete(namespace, name)
  33. return nil
  34. }
  35. ...
  36. return err
  37. }
  38. glog.Infof("[Neutron] Try to process network: %#v ...", network)
  39. // FIX ME: Do diff().
  40. //
  41. // actualNetwork, exists := neutron.Get(namespace, name)
  42. //
  43. // if !exists {
  44. // neutron.Create(namespace, name)
  45. // } else if !reflect.DeepEqual(actualNetwork, network) {
  46. // neutron.Update(namespace, name)
  47. // }
  48. return nil
  49. }

可以看到,在这个执行周期里(processNextWorkItem),我们:

  1. 首先从工作队列里出队(workqueue.Get)了一个成员,也就是一个 Key(Network 对象的:namespace/name)。
  2. 然后,在 syncHandler 方法中,我使用这个 Key,尝试从 Informer 维护的缓存中拿到了它所对应的 Network 对象。可以看到,在这里,我使用了 networksLister 来尝试获取这个 Key 对应的 Network 对象。这个操作,其实就是在访问本地缓存的索引。实际上,在 Kubernetes 的源码中,你会经常看到控制器从各种 Lister 里获取对象,比如:podLister、nodeLister 等等,它们使用的都是 Informer 和缓存机制。而如果控制循环从缓存中拿不到这个对象(即:networkLister 返回了 IsNotFound 错误),那就意味着这个 Network 对象的 Key 是通过前面的“删除”事件添加进工作队列的。所以,尽管队列里有这个 Key,但是对应的 Network 对象已经被删除了。这时候,我就需要调用 Neutron 的 API,把这个 Key 对应的 Neutron 网络从真实的集群里删除掉。而如果能够获取到对应的 Network 对象,我就可以执行控制器模式里的对比“期望状态”和“实际状态”的逻辑了。其中,自定义控制器“千辛万苦”拿到的这个 Network 对象,正是 APIServer 里保存的“期望状态”,即:用户通过 YAML 文件提交到 APIServer 里的信息。当然,在我们的例子里,它已经被 Informer 缓存在了本地。
  3. 那么,“实际状态”又从哪里来呢?当然是来自于实际的集群了。所以,我们的控制循环需要通过 Neutron API 来查询实际的网络情况。比如,我可以先通过 Neutron 来查询这个 Network 对象对应的真实网络是否存在。
    1. 如果不存在,这就是一个典型的“期望状态”与“实际状态”不一致的情形。这时,我就需要使用这个 Network 对象里的信息(比如:CIDR 和 Gateway),调用 Neutron API 来创建真实的网络。
    2. 如果存在,那么,我就要读取这个真实网络的信息,判断它是否跟 Network 对象里的信息一致,从而决定我是否要通过 Neutron 来更新这个已经存在的真实网络。
  4. 这样,我就通过对比“期望状态”和“实际状态”的差异,完成了一次调协(Reconcile)的过程。

至此,一个完整的自定义 API 对象和它所对应的自定义控制器,就编写完毕了。

实战运行