Kubernetes系统使用Etcd作为Kubernetes集群的唯一存储,Etcd在生产环境中一般以集群形式部署(称为Etcd集群)。Etcd集群是分布式K/V存储集群,提供了可靠的强一致性服务发现。

Etcd集群存储Kubernetes系统的集群状态和元数据,其中包括所有Kubernetes资源对象信息、资源对象状态、集群节点信息等。Kubernetes将所有数据存储至Etcd集群前缀为/registry的目录下。

Etcd存储的架构设计

image.png
其中:

  • UnderlyingStorage:底层存储,是真正和Etcd交互的资源对象,也叫后端存储(BackendStorage)
  • CacherStorage:带有缓存功能的资源存储对象,它是Storage.Interface通用存储接口的实现。CacherStorage缓存层的设计有利于Etcd集群中的数据能够获得快速的响应,并与Etcd集群数据保持一致。
  • Storage.Interface:通用存储接口,该接口定义了资源的操作方法(即Create、Delete、Watch、WatchList、Get、GetToList、List、GuaranteedUpdate、Count、Versioner等方法)。
  • RegistryStorage:实现了资源存储的通用操作,例如,在存储资源对象之前执行某个函数(即BeforeFunc),在存储资源对象之后执行某个函数(即AfterFunc)。
  • RESTStorage:实现了RESTful风格的对外资源存储服务的API接口。

RESTStorage

Kubernetes的每种资源(包括子资源)都提供了RESTful风格的对外资源存储服务API接口(即RESTStorage接口),所有通过RESTful API对外暴露的资源都必须实现RESTStorage接口。RESTStorage接口代码示例如下(源码:staging\src\k8s.io\apiserver\pkg\registry\rest\rest.go):

  1. type Storage interface {
  2. // New returns an empty object that can be used with Create and Update after request data has been put into it.
  3. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
  4. New() runtime.Object
  5. }

Kubernetes的每种资源实现的RESTStorage接口一般定义在pkg/registry/<资源组>/<资源>/storage/storage.go中,它们通过NewStorage函数或NewREST函数实例化。以Deployment资源为例,代码示例如下:

  1. type DeploymentStorage struct {
  2. Deployment *REST
  3. Status *StatusREST
  4. Scale *ScaleREST
  5. Rollback *RollbackREST
  6. }
  7. // REST implements a RESTStorage for Deployments.
  8. type REST struct {
  9. *genericregistry.Store
  10. categories []string
  11. }
  12. // StatusREST implements the REST endpoint for changing the status of a deployment
  13. type StatusREST struct {
  14. store *genericregistry.Store
  15. }

在以上代码中,Deployment资源定义了REST数据结构与StatusREST数据结构,其中REST数据结构用于实现deployment资源的RESTStorage接口,而StatusREST数据结构用于实现deployment/status子资源的RESTStorage接口。每一个RESTStorage接口都对RegistryStore操作进行了封装,例如,对deployment/status子资源进行Get操作时,实际执行的是RegistryStore操作,代码示例如下:

  1. // Get retrieves the object from the storage. It is required to support Patch.
  2. func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
  3. return r.store.Get(ctx, name, options)
  4. }

RegistryStorage

RegistryStorage实现了资源存储的通用操作,它定义了如下两种函数:

  • BeforeFunc:预处理,也称strategy。主要是在创建资源对象之前做一些预处理工作
  • AfterFunc:主要是在创建资源对象之后做一些收尾工作

每种资源的特殊化存储需求可以定义在Before Func和After Func中,RegistryStorage的定义如下(源码:staging\src\k8s.io\apiserver\pkg\registry\generic\registry\store.go)

  1. type Store struct {
  2. NewFunc func() runtime.Object
  3. NewListFunc func() runtime.Object
  4. DefaultQualifiedResource schema.GroupResource
  5. KeyRootFunc func(ctx context.Context) string
  6. KeyFunc func(ctx context.Context, name string) (string, error)
  7. ObjectNameFunc func(obj runtime.Object) (string, error)
  8. TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
  9. PredicateFunc func(label labels.Selector, field fields.Selector) storage.SelectionPredicate
  10. EnableGarbageCollection bool
  11. DeleteCollectionWorkers int
  12. Decorator ObjectFunc
  13. CreateStrategy rest.RESTCreateStrategy
  14. AfterCreate ObjectFunc
  15. UpdateStrategy rest.RESTUpdateStrategy
  16. AfterUpdate ObjectFunc
  17. DeleteStrategy rest.RESTDeleteStrategy
  18. AfterDelete ObjectFunc
  19. ReturnDeletedObject bool
  20. ShouldDeleteDuringUpdate func(ctx context.Context, key string, obj, existing runtime.Object) bool
  21. ExportStrategy rest.RESTExportStrategy
  22. TableConvertor rest.TableConvertor
  23. Storage DryRunnableStorage
  24. StorageVersioner runtime.GroupVersioner
  25. DestroyFunc func()
  26. }

其中预处理方法是:

  • CreateStrategy
  • UpdateStrategy
  • DeleteStrategy
  • ExportStrategy

处理后的方法是:

  • AfterCreate
  • AfterUpdate
  • AfterDelete

最后,Storage字段是RegistryStore对Storage.Interface通用存储接口进行的封装,实现了对Etcd集群的读/写操作。以RegistryStore的Create方法(创建资源对象的方法)为例,代码示例如下:

  1. func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
  2. if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
  3. return nil, err
  4. }
  5. ......
  6. if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
  7. ......
  8. }
  9. if e.AfterCreate != nil {
  10. if err := e.AfterCreate(out); err != nil {
  11. return nil, err
  12. }
  13. }
  14. ......
  15. return out, nil
  16. }

Create方法创建资源对象的过程主要有3步:

  • 通过rest.BeforeCreate进行预处理操作
  • 通过e.Storage.Create创建资源对象
  • 通过 e.AfterCreate执行收尾操作

Storage.Interface

Storage.Interface通用存储接口定义了资源的操作方法,代码如下(源码:staging\src\k8s.io\apiserver\pkg\storage\interfaces.go):

  1. type Interface interface {
  2. Versioner() Versioner
  3. Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
  4. Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
  5. Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
  6. WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
  7. Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
  8. GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
  9. List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
  10. GuaranteedUpdate(
  11. ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
  12. precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
  13. Count(key string) (int64, error)
  14. }

其中:

  • Versioner:资源版本管理器,用于管理Etcd集群中的数据版本对象。
  • Create:创建资源对象的方法。
  • Delete:删除资源对象的方法。
  • Watch:通过Watch机制监控资源对象变化方法,只应用于单个key。
  • WatchList:通过Watch机制监控资源对象变化方法,应用于多个key(当前目录及目录下所有的key)。
  • Get:获取资源对象的方法。
  • GetToList:获取资源对象的方法,以列表(List)的形式返回。
  • List:获取资源对象的方法,以列表(List)的形式返回。
  • GuaranteedUpdate:保证传入的tryUpdate函数运行成功。
  • Count:获取指定key下的条目数量

Storage.Interface是通用接口,实现通用的接口的分别是CacherStorage和UnderlyingStorage。

  • CacherStorage:带缓存功能的资源存储对象
  • UnderlyingStorage:底层存储对象,真正和Etcd交互的存储对象

CacherStorage

CacherStorage缓存层的设计有利于快速响应请求并返回所需的数据,这样可以减少Etcd集群的连接数,返回的数据也与Etcd集群中的数据保持一致。

CacherStoraeg缓存层并非会为所有操作都缓存数据。对于某些操作,为保证数据一致性,没有必要在其上再封装一层缓存层,例如Create、Delete、Count等操作,通过UnderlyingStorage直接向Etcd集群发出请求即可。只有Get、GetToList、List、GuaranteedUpdate、Watch、WatchList等操作是基于缓存设计的。
image.png
其中:

  • cacheWatcher:是Watcher观察者管理
  • Cacher:用于分发给目前已连接的观察者,分发过程通过非阻塞机制实现
  • watchCache:通过Reflector框架与UnderlyingStorage底层存储对象交互,UnderlyingStorage与Etcd集群进行交互,并将回调事件分别存储至w.onEvent、w.cache、cache.Store中。

cacheWatcher

每一个发送Watch请求的客户端都会分配一个cacheWatcher,用于客户端接收Watch事件,代码示例如下(源码:staging\src\k8s.io\apiserver\pkg\storage\cacher\cacher.go):

  1. func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
  2. pred := opts.Predicate
  3. watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
  4. if err != nil {
  5. return nil, err
  6. }
  7. c.ready.wait()
  8. triggerValue, triggerSupported := "", false
  9. if c.indexedTrigger != nil {
  10. for _, field := range pred.IndexFields {
  11. if field == c.indexedTrigger.indexName {
  12. if value, ok := pred.Field.RequiresExactMatch(field); ok {
  13. triggerValue, triggerSupported = value, true
  14. }
  15. }
  16. }
  17. }
  18. chanSize := 10
  19. if c.indexedTrigger != nil && !triggerSupported {
  20. chanSize = 1000
  21. }
  22. deadline, _ := ctx.Deadline()
  23. watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
  24. c.watchCache.RLock()
  25. defer c.watchCache.RUnlock()
  26. initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
  27. if err != nil {
  28. return newErrWatcher(err), nil
  29. }
  30. if len(initEvents) > 0 {
  31. watchRV = initEvents[len(initEvents)-1].ResourceVersion
  32. }
  33. func() {
  34. c.Lock()
  35. defer c.Unlock()
  36. watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
  37. c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
  38. if watcher.allowWatchBookmarks {
  39. c.bookmarkWatchers.addWatcher(watcher)
  40. }
  41. c.watcherIdx++
  42. }()
  43. go watcher.process(ctx, initEvents, watchRV)
  44. return watcher, nil
  45. }

当客户端发起Watch请求时,通过newCacheWatcher()实例化一个cacheWatcher,并为其分配一个id,该id是唯一的,从0开始计数,每次有新的客户端发送Watch请求时,该id会自增1,但在Kubernetes API Server重启时其会被清零。最后,将该对象添加到c.watchers中进行统一管理,例如执行Add(添加)、Delete(删除)、Terminate(终止)等操作。

UnderlyingStorage