Kubernetes系统使用Etcd作为Kubernetes集群的唯一存储,Etcd在生产环境中一般以集群形式部署(称为Etcd集群)。Etcd集群是分布式K/V存储集群,提供了可靠的强一致性服务发现。
Etcd集群存储Kubernetes系统的集群状态和元数据,其中包括所有Kubernetes资源对象信息、资源对象状态、集群节点信息等。Kubernetes将所有数据存储至Etcd集群前缀为/registry的目录下。
Etcd存储的架构设计
其中:
- UnderlyingStorage:底层存储,是真正和Etcd交互的资源对象,也叫后端存储(BackendStorage)
- CacherStorage:带有缓存功能的资源存储对象,它是Storage.Interface通用存储接口的实现。CacherStorage缓存层的设计有利于Etcd集群中的数据能够获得快速的响应,并与Etcd集群数据保持一致。
- Storage.Interface:通用存储接口,该接口定义了资源的操作方法(即Create、Delete、Watch、WatchList、Get、GetToList、List、GuaranteedUpdate、Count、Versioner等方法)。
- RegistryStorage:实现了资源存储的通用操作,例如,在存储资源对象之前执行某个函数(即BeforeFunc),在存储资源对象之后执行某个函数(即AfterFunc)。
- RESTStorage:实现了RESTful风格的对外资源存储服务的API接口。
RESTStorage
Kubernetes的每种资源(包括子资源)都提供了RESTful风格的对外资源存储服务API接口(即RESTStorage接口),所有通过RESTful API对外暴露的资源都必须实现RESTStorage接口。RESTStorage接口代码示例如下(源码:staging\src\k8s.io\apiserver\pkg\registry\rest\rest.go):
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
}
Kubernetes的每种资源实现的RESTStorage接口一般定义在pkg/registry/<资源组>/<资源>/storage/storage.go中,它们通过NewStorage函数或NewREST函数实例化。以Deployment资源为例,代码示例如下:
type DeploymentStorage struct {
Deployment *REST
Status *StatusREST
Scale *ScaleREST
Rollback *RollbackREST
}
// REST implements a RESTStorage for Deployments.
type REST struct {
*genericregistry.Store
categories []string
}
// StatusREST implements the REST endpoint for changing the status of a deployment
type StatusREST struct {
store *genericregistry.Store
}
在以上代码中,Deployment资源定义了REST数据结构与StatusREST数据结构,其中REST数据结构用于实现deployment资源的RESTStorage接口,而StatusREST数据结构用于实现deployment/status子资源的RESTStorage接口。每一个RESTStorage接口都对RegistryStore操作进行了封装,例如,对deployment/status子资源进行Get操作时,实际执行的是RegistryStore操作,代码示例如下:
// Get retrieves the object from the storage. It is required to support Patch.
func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
return r.store.Get(ctx, name, options)
}
RegistryStorage
RegistryStorage实现了资源存储的通用操作,它定义了如下两种函数:
- BeforeFunc:预处理,也称strategy。主要是在创建资源对象之前做一些预处理工作
- AfterFunc:主要是在创建资源对象之后做一些收尾工作
每种资源的特殊化存储需求可以定义在Before Func和After Func中,RegistryStorage的定义如下(源码:staging\src\k8s.io\apiserver\pkg\registry\generic\registry\store.go)
type Store struct {
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
DefaultQualifiedResource schema.GroupResource
KeyRootFunc func(ctx context.Context) string
KeyFunc func(ctx context.Context, name string) (string, error)
ObjectNameFunc func(obj runtime.Object) (string, error)
TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
PredicateFunc func(label labels.Selector, field fields.Selector) storage.SelectionPredicate
EnableGarbageCollection bool
DeleteCollectionWorkers int
Decorator ObjectFunc
CreateStrategy rest.RESTCreateStrategy
AfterCreate ObjectFunc
UpdateStrategy rest.RESTUpdateStrategy
AfterUpdate ObjectFunc
DeleteStrategy rest.RESTDeleteStrategy
AfterDelete ObjectFunc
ReturnDeletedObject bool
ShouldDeleteDuringUpdate func(ctx context.Context, key string, obj, existing runtime.Object) bool
ExportStrategy rest.RESTExportStrategy
TableConvertor rest.TableConvertor
Storage DryRunnableStorage
StorageVersioner runtime.GroupVersioner
DestroyFunc func()
}
其中预处理方法是:
- CreateStrategy
- UpdateStrategy
- DeleteStrategy
- ExportStrategy
处理后的方法是:
- AfterCreate
- AfterUpdate
- AfterDelete
最后,Storage字段是RegistryStore对Storage.Interface通用存储接口进行的封装,实现了对Etcd集群的读/写操作。以RegistryStore的Create方法(创建资源对象的方法)为例,代码示例如下:
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
......
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
......
}
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, err
}
}
......
return out, nil
}
Create方法创建资源对象的过程主要有3步:
- 通过
rest.BeforeCreate
进行预处理操作 - 通过
e.Storage.Create
创建资源对象 - 通过
e.AfterCreate
执行收尾操作
Storage.Interface
Storage.Interface通用存储接口定义了资源的操作方法,代码如下(源码:staging\src\k8s.io\apiserver\pkg\storage\interfaces.go):
type Interface interface {
Versioner() Versioner
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
Count(key string) (int64, error)
}
其中:
- Versioner:资源版本管理器,用于管理Etcd集群中的数据版本对象。
- Create:创建资源对象的方法。
- Delete:删除资源对象的方法。
- Watch:通过Watch机制监控资源对象变化方法,只应用于单个key。
- WatchList:通过Watch机制监控资源对象变化方法,应用于多个key(当前目录及目录下所有的key)。
- Get:获取资源对象的方法。
- GetToList:获取资源对象的方法,以列表(List)的形式返回。
- List:获取资源对象的方法,以列表(List)的形式返回。
- GuaranteedUpdate:保证传入的tryUpdate函数运行成功。
- Count:获取指定key下的条目数量
Storage.Interface是通用接口,实现通用的接口的分别是CacherStorage和UnderlyingStorage。
- CacherStorage:带缓存功能的资源存储对象
- UnderlyingStorage:底层存储对象,真正和Etcd交互的存储对象
CacherStorage
CacherStorage缓存层的设计有利于快速响应请求并返回所需的数据,这样可以减少Etcd集群的连接数,返回的数据也与Etcd集群中的数据保持一致。
CacherStoraeg缓存层并非会为所有操作都缓存数据。对于某些操作,为保证数据一致性,没有必要在其上再封装一层缓存层,例如Create、Delete、Count等操作,通过UnderlyingStorage直接向Etcd集群发出请求即可。只有Get、GetToList、List、GuaranteedUpdate、Watch、WatchList等操作是基于缓存设计的。
其中:
- cacheWatcher:是Watcher观察者管理
- Cacher:用于分发给目前已连接的观察者,分发过程通过非阻塞机制实现
- watchCache:通过Reflector框架与UnderlyingStorage底层存储对象交互,UnderlyingStorage与Etcd集群进行交互,并将回调事件分别存储至w.onEvent、w.cache、cache.Store中。
cacheWatcher
每一个发送Watch请求的客户端都会分配一个cacheWatcher,用于客户端接收Watch事件,代码示例如下(源码:staging\src\k8s.io\apiserver\pkg\storage\cacher\cacher.go):
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
pred := opts.Predicate
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
c.ready.wait()
triggerValue, triggerSupported := "", false
if c.indexedTrigger != nil {
for _, field := range pred.IndexFields {
if field == c.indexedTrigger.indexName {
if value, ok := pred.Field.RequiresExactMatch(field); ok {
triggerValue, triggerSupported = value, true
}
}
}
}
chanSize := 10
if c.indexedTrigger != nil && !triggerSupported {
chanSize = 1000
}
deadline, _ := ctx.Deadline()
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
return newErrWatcher(err), nil
}
if len(initEvents) > 0 {
watchRV = initEvents[len(initEvents)-1].ResourceVersion
}
func() {
c.Lock()
defer c.Unlock()
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
go watcher.process(ctx, initEvents, watchRV)
return watcher, nil
}
当客户端发起Watch请求时,通过newCacheWatcher()
实例化一个cacheWatcher,并为其分配一个id,该id是唯一的,从0开始计数,每次有新的客户端发送Watch请求时,该id会自增1,但在Kubernetes API Server重启时其会被清零。最后,将该对象添加到c.watchers中进行统一管理,例如执行Add(添加)、Delete(删除)、Terminate(终止)等操作。