0、架构图
1、存储相关配置
1.1、资源与版本
1.1.1、资源与版本的启用
// ResourceConfig记录了每个Group启用了哪个版本以及哪个具体的GVK是否启用
// 用gv/gvk map来维护信息
// eg. 若当前ResourceConfig记录信息如下
// GroupVersionConfigs:
// testGroup01/v1 true
// testGroup01/v1beta1 false
// testGroup02/v1 true
//
// testGroup01/v1/type01 false
//
// 则版本testGroup01/v1 testGroup02/v1处于启用状态
// testGroup01/v1beta1处于禁用状态(禁用状态不会生成相应的rest api)
// 由于testGroup01/v1/type01为false,则testGroup01/v1虽然启用,但是其资源type01
// 处于禁用状态,则不会生成testGroup01/v1/type01的rest api
type ResourceConfig struct {
GroupVersionConfigs map[schema.GroupVersion]bool
ResourceConfigs map[schema.GroupVersionResource]bool
}
func NewResourceConfig() *ResourceConfig {
return &ResourceConfig{GroupVersionConfigs: map[schema.GroupVersion]bool{}, ResourceConfigs: map[schema.GroupVersionResource]bool{}}
}
func (o *ResourceConfig) DisableAll() {
for k := range o.GroupVersionConfigs {
o.GroupVersionConfigs[k] = false
}
}
func (o *ResourceConfig) EnableAll() {
for k := range o.GroupVersionConfigs {
o.GroupVersionConfigs[k] = true
}
}
func (o *ResourceConfig) DisableMatchingVersions(matcher func(gv schema.GroupVersion) bool) {
for k := range o.GroupVersionConfigs {
if matcher(k) {
o.GroupVersionConfigs[k] = false
}
}
}
func (o *ResourceConfig) EnableMatchingVersions(matcher func(gv schema.GroupVersion) bool) {
for k := range o.GroupVersionConfigs {
if matcher(k) {
o.GroupVersionConfigs[k] = true
}
}
}
func (o *ResourceConfig) DisableVersions(versions ...schema.GroupVersion) {
for _, version := range versions {
o.GroupVersionConfigs[version] = false
}
}
func (o *ResourceConfig) EnableVersions(versions ...schema.GroupVersion) {
for _, version := range versions {
o.GroupVersionConfigs[version] = true
}
}
func (o *ResourceConfig) VersionEnabled(version schema.GroupVersion) bool {
enabled, _ := o.GroupVersionConfigs[version]
if enabled {
return true
}
return false
}
func (o *ResourceConfig) DisableResources(resources ...schema.GroupVersionResource) {
for _, resource := range resources {
o.ResourceConfigs[resource] = false
}
}
func (o *ResourceConfig) EnableResources(resources ...schema.GroupVersionResource) {
for _, resource := range resources {
o.ResourceConfigs[resource] = true
}
}
func (o *ResourceConfig) ResourceEnabled(resource schema.GroupVersionResource) bool {
if !o.VersionEnabled(resource.GroupVersion()) {
return false
}
resourceEnabled, explicitlySet := o.ResourceConfigs[resource]
if !explicitlySet {
return true
}
return resourceEnabled
}
func (o *ResourceConfig) AnyVersionForGroupEnabled(group string) bool {
for version := range o.GroupVersionConfigs {
if version.Group == group {
if o.VersionEnabled(version) {
return true
}
}
}
return false
}
1.1.2、资源的存储版本和内存版本
// DefaultResourceEncodingConfig记录了每个gv的存储版本和内存版本,
// 用以生成相应的存储序列化器和反序列化器
// 此处的配置叫OverridingResourceEncoding。即覆盖配置。因为若无配置,则默认系统会用
// v1作为存储版本,__internal用作内存版本(因为一般自定义资源注册版本优先级的时候v1版本优先级最高)
// 若用户有自定义,则用OverridingResourceEncoding覆盖默认配置
type DefaultResourceEncodingConfig struct {
resources map[schema.GroupResource]*OverridingResourceEncoding
scheme *runtime.Scheme
}
type OverridingResourceEncoding struct {
ExternalResourceEncoding schema.GroupVersion
InternalResourceEncoding schema.GroupVersion
}
func NewDefaultResourceEncodingConfig(scheme *runtime.Scheme) *DefaultResourceEncodingConfig {
return &DefaultResourceEncodingConfig{resources: map[schema.GroupResource]*OverridingResourceEncoding{}, scheme: scheme}
}
func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored schema.GroupResource, externalEncodingVersion, internalVersion schema.GroupVersion) {
o.resources[resourceBeingStored] = &OverridingResourceEncoding{
ExternalResourceEncoding: externalEncodingVersion,
InternalResourceEncoding: internalVersion,
}
}
func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
if !o.scheme.IsGroupRegistered(resource.Group) {
return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
}
resourceOverride, resourceExists := o.resources[resource]
if resourceExists {
return resourceOverride.ExternalResourceEncoding, nil
}
return o.scheme.PrioritizedVersionsForGroup(resource.Group)[0], nil
}
func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
if !o.scheme.IsGroupRegistered(resource.Group) {
return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
}
resourceOverride, resourceExists := o.resources[resource]
if resourceExists {
return resourceOverride.InternalResourceEncoding, nil
}
return schema.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil
}
1.2、存储序列化器/反序列化器
type StorageCodecConfig struct {
StorageMediaType string // 存储类型,默认json
StorageSerializer runtime.StorageSerializer // 全局序列化器
StorageVersion schema.GroupVersion // 存储版本
MemoryVersion schema.GroupVersion // 内存版本
Config storagebackend.Config // 后端存储配置
EncoderDecoratorFn func(runtime.Encoder) runtime.Encoder
DecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
}
func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, runtime.GroupVersioner, error) {
mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
if err != nil {
return nil, nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
}
// 获取给定类型的解码器
serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok {
return nil, nil, fmt.Errorf("unable to find serializer for %q", mediaType)
}
s := serializer.Serializer
// wrapper
var encoder runtime.Encoder = s
if opts.EncoderDecoratorFn != nil {
encoder = opts.EncoderDecoratorFn(encoder)
}
decoders := []runtime.Decoder{
s,
opts.StorageSerializer.UniversalDeserializer(),
runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()),
}
if opts.DecoderDecoratorFn != nil {
decoders = opts.DecoderDecoratorFn(decoders)
}
// encodeVersioner可以认为等同于返回存储版本的gvk
encodeVersioner := runtime.NewMultiGroupVersioner(
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
)
encoder = opts.StorageSerializer.EncoderForVersion(
encoder,
encodeVersioner,
)
// 由于不确定etcd存储的是什么格式的数据,因此解码器是所有解码器的合集
// 解码要先解析格式再进行解码
// decodeVersioner一般也默认为内部版本
decoder := opts.StorageSerializer.DecoderToVersion(
recognizer.NewDecoder(decoders...),
runtime.NewCoercingMultiGroupVersioner(
opts.MemoryVersion,
schema.GroupKind{Group: opts.MemoryVersion.Group},
schema.GroupKind{Group: opts.StorageVersion.Group},
),
)
return runtime.NewCodec(encoder, decoder), encodeVersioner, nil
}
1.3、存储生成工厂
const AllResources = "*"
type StorageFactory interface {
// 根据给定gr生成对应的后端配置文件(包含了对应的序列化器和反序列化器)
// 然后根据利用kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go生成存储
NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error)
ResourcePrefix(groupResource schema.GroupResource) string
// 返回所有的etcd后端节点列表
Backends() []Backend
}
// etcd配置,用以生成etcd客户端
type Backend struct {
// 形如: https://etcd.domain:2379、http://etcd.domain:2379
Server string
TLSConfig *tls.Config
}
type DefaultStorageFactory struct {
// 保存了etcd需要的通用信息
StorageConfig storagebackend.Config
// 资源覆盖(包含存储版本等需要覆盖的信息)
Overrides map[schema.GroupResource]groupResourceOverrides
// 默认的gv存储前缀信息
DefaultResourcePrefixes map[schema.GroupResource]string
// 默认的存储类型
DefaultMediaType string
// 全局序列化器
DefaultSerializer runtime.StorageSerializer
// 参见1.1.1
ResourceEncodingConfig ResourceEncodingConfig
// 参见1.1.2
APIResourceConfigSource APIResourceConfigSource
// 为1.2里面的NewStorageCodec
newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
}
func NewDefaultStorageFactory(
config storagebackend.Config,
defaultMediaType string,
defaultSerializer runtime.StorageSerializer,
resourceEncodingConfig ResourceEncodingConfig,
resourceConfig APIResourceConfigSource,
specialDefaultResourcePrefixes map[schema.GroupResource]string,
) *DefaultStorageFactory {
config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if len(defaultMediaType) == 0 {
defaultMediaType = runtime.ContentTypeJSON
}
return &DefaultStorageFactory{
StorageConfig: config,
Overrides: map[schema.GroupResource]groupResourceOverrides{},
DefaultMediaType: defaultMediaType,
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
APIResourceConfigSource: resourceConfig,
DefaultResourcePrefixes: specialDefaultResourcePrefixes,
newStorageCodecFn: NewStorageCodec,
}
}
// 根据给定的gr返回对应的后端存储配置文件
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)
storageConfig := s.StorageConfig
codecConfig := StorageCodecConfig{
StorageMediaType: s.DefaultMediaType,
StorageSerializer: s.DefaultSerializer,
}
// 先覆盖group级别的参数
if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
// 再覆盖gr级别的参数
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}
var err error
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil {
return nil, err
}
codecConfig.Config = storageConfig
storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)
if err != nil {
return nil, err
}
klog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
return &storageConfig, nil
}
// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []Backend {
servers := sets.NewString(s.StorageConfig.Transport.ServerList...)
for _, overrides := range s.Overrides {
servers.Insert(overrides.etcdLocation...)
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)
if err != nil {
klog.Errorf("failed to load key pair while getting backends: %s", err)
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
if len(s.StorageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil {
klog.Errorf("failed to read ca file while getting backends: %s", err)
} else {
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caPool
tlsConfig.InsecureSkipVerify = false
}
}
backends := []Backend{}
for server := range servers {
backends = append(backends, Backend{
Server: server,
// We can't share TLSConfig across different backends to avoid races.
// For more details see: http://pr.k8s.io/59338
TLSConfig: tlsConfig.Clone(),
})
}
return backends
}
func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {
chosenStorageResource := s.getStorageGroupResource(groupResource)
groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
exactResourceOverride := s.Overrides[chosenStorageResource]
etcdResourcePrefix := s.DefaultResourcePrefixes[chosenStorageResource]
if len(groupOverride.etcdResourcePrefix) > 0 {
etcdResourcePrefix = groupOverride.etcdResourcePrefix
}
if len(exactResourceOverride.etcdResourcePrefix) > 0 {
etcdResourcePrefix = exactResourceOverride.etcdResourcePrefix
}
if len(etcdResourcePrefix) == 0 {
etcdResourcePrefix = strings.ToLower(chosenStorageResource.Resource)
}
return etcdResourcePrefix
}
// 以下接口为DefaultStorageFactory开放给用户自定义的func
// 设置etcd地址(给指定gv指定etcd机器)
func (s *DefaultStorageFactory) SetEtcdLocation(groupResource schema.GroupResource, location []string) {
overrides := s.Overrides[groupResource]
overrides.etcdLocation = location
s.Overrides[groupResource] = overrides
}
// 设置etcd的前缀
func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource schema.GroupResource, prefix string) {
overrides := s.Overrides[groupResource]
overrides.etcdPrefix = prefix
s.Overrides[groupResource] = overrides
}
// 设置disablePaging
func (s *DefaultStorageFactory) SetDisableAPIListChunking(groupResource schema.GroupResource) {
overrides := s.Overrides[groupResource]
overrides.disablePaging = true
s.Overrides[groupResource] = overrides
}
// 设置指定资源的前缀
// 则整体etcd前缀为etcdPrefix/etcdResourcePrefix
func (s *DefaultStorageFactory) SetResourceEtcdPrefix(groupResource schema.GroupResource, prefix string) {
overrides := s.Overrides[groupResource]
overrides.etcdResourcePrefix = prefix
s.Overrides[groupResource] = overrides
}
// 设置全局序列化器/序列化类型
func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource, mediaType string, serializer runtime.StorageSerializer) {
overrides := s.Overrides[groupResource]
overrides.mediaType = mediaType
overrides.serializer = serializer
s.Overrides[groupResource] = overrides
}
// 设置Transformer
func (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) {
overrides := s.Overrides[groupResource]
overrides.transformer = transformer
s.Overrides[groupResource] = overrides
}
// 增加关联关系资源
func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
overrides := s.Overrides[groupResource]
overrides.cohabitatingResources = groupResources
s.Overrides[groupResource] = overrides
}
}
// 设置序列化/反序列化器wrapper
func (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
overrides := s.Overrides[groupResource]
overrides.encoderDecoratorFn = encoderDecoratorFn
overrides.decoderDecoratorFn = decoderDecoratorFn
s.Overrides[groupResource] = overrides
}
}
func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.GroupResource) schema.GroupResource {
for _, potentialStorageResource := range s.Overrides[groupResource].cohabitatingResources {
if s.APIResourceConfigSource.AnyVersionForGroupEnabled(potentialStorageResource.Group) {
return potentialStorageResource
}
}
return groupResource
}
type groupResourceOverrides struct {
etcdLocation []string // etcd机器地址
etcdPrefix string // etcd前缀
etcdResourcePrefix string // etcd前缀
mediaType string // 序列化/反序列化类型
serializer runtime.StorageSerializer // 全局序列化器
// 资源关联信息。比如deployment有apps组,extentions组。
// 若apps没有启用,则会用extentions的覆盖信息进行覆盖
// 内置的一些关联关系参加1.3.1.1 pic1
cohabitatingResources []schema.GroupResource
encoderDecoratorFn func(runtime.Encoder) runtime.Encoder // 序列化器wrapper
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder // 反序列化器wrapper
transformer value.Transformer // transformer
disablePaging bool // disablePaging
}
// 覆盖配置
func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {
if len(o.etcdLocation) > 0 {
config.Transport.ServerList = o.etcdLocation
}
if len(o.etcdPrefix) > 0 {
config.Prefix = o.etcdPrefix
}
if len(o.mediaType) > 0 {
options.StorageMediaType = o.mediaType
}
if o.serializer != nil {
options.StorageSerializer = o.serializer
}
if o.encoderDecoratorFn != nil {
options.EncoderDecoratorFn = o.encoderDecoratorFn
}
if o.decoderDecoratorFn != nil {
options.DecoderDecoratorFn = o.decoderDecoratorFn
}
if o.transformer != nil {
config.Transformer = o.transformer
}
if o.disablePaging {
config.Paging = false
}
}
func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource {
return schema.GroupResource{Group: resource.Group, Resource: AllResources}
}
1.3.1、pic
1.3.1.1、pic1
2、generic registry
存储的实现分为以下两层
1、kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
封装了用户定义的一些存储之前的逻辑,比如创建策略等
2、kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
真正的存储,封装了etcd客户端以及序列化与反序列化等操作
2.1、interface
2.1.1、Create
2.2、implement
// ObjectFunc is a function to act on a given object. An error may be returned
// if the hook cannot be completed. An ObjectFunc may transform the provided
// object.
type ObjectFunc func(obj runtime.Object) error
// GenericStore interface can be used for type assertions when we need to access the underlying strategies.
type GenericStore interface {
GetCreateStrategy() rest.RESTCreateStrategy
GetUpdateStrategy() rest.RESTUpdateStrategy
GetDeleteStrategy() rest.RESTDeleteStrategy
GetExportStrategy() rest.RESTExportStrategy
}
// Store implements pkg/api/rest.StandardStorage. It's intended to be
// embeddable and allows the consumer to implement any non-generic functions
// that are required. This object is intended to be copyable so that it can be
// used in different ways but share the same underlying behavior.
//
// All fields are required unless specified.
//
// The intended use of this type is embedding within a Kind specific
// RESTStorage implementation. This type provides CRUD semantics on a Kubelike
// resource, handling details like conflict detection with ResourceVersion and
// semantics. The RESTCreateStrategy, RESTUpdateStrategy, and
// RESTDeleteStrategy are generic across all backends, and encapsulate logic
// specific to the API.
//
// TODO: make the default exposed methods exactly match a generic RESTStorage
type Store struct {
// NewFunc returns a new instance of the type this registry returns for a
// GET of a single object, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
NewFunc func() runtime.Object
// NewListFunc returns a new list of the type this registry; it is the
// type returned when the resource is listed, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource
NewListFunc func() runtime.Object
// DefaultQualifiedResource is the pluralized name of the resource.
// This field is used if there is no request info present in the context.
// See qualifiedResourceFromContext for details.
DefaultQualifiedResource schema.GroupResource
// KeyRootFunc returns the root etcd key for this resource; should not
// include trailing "/". This is used for operations that work on the
// entire collection (listing and watching).
//
// KeyRootFunc and KeyFunc must be supplied together or not at all.
KeyRootFunc func(ctx context.Context) string
// KeyFunc returns the key for a specific object in the collection.
// KeyFunc is called for Create/Update/Get/Delete. Note that 'namespace'
// can be gotten from ctx.
//
// KeyFunc and KeyRootFunc must be supplied together or not at all.
KeyFunc func(ctx context.Context, name string) (string, error)
// ObjectNameFunc returns the name of an object or an error.
ObjectNameFunc func(obj runtime.Object) (string, error)
// TTLFunc returns the TTL (time to live) that objects should be persisted
// with. The existing parameter is the current TTL or the default for this
// operation. The update parameter indicates whether this is an operation
// against an existing object.
//
// Objects that are persisted with a TTL are evicted once the TTL expires.
TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
// PredicateFunc returns a matcher corresponding to the provided labels
// and fields. The SelectionPredicate returned should return true if the
// object matches the given field and label selectors.
PredicateFunc func(label labels.Selector, field fields.Selector) storage.SelectionPredicate
// EnableGarbageCollection affects the handling of Update and Delete
// requests. Enabling garbage collection allows finalizers to do work to
// finalize this object before the store deletes it.
//
// If any store has garbage collection enabled, it must also be enabled in
// the kube-controller-manager.
EnableGarbageCollection bool
// DeleteCollectionWorkers is the maximum number of workers in a single
// DeleteCollection call. Delete requests for the items in a collection
// are issued in parallel.
DeleteCollectionWorkers int
// Decorator is an optional exit hook on an object returned from the
// underlying storage. The returned object could be an individual object
// (e.g. Pod) or a list type (e.g. PodList). Decorator is intended for
// integrations that are above storage and should only be used for
// specific cases where storage of the value is not appropriate, since
// they cannot be watched.
Decorator ObjectFunc
// CreateStrategy implements resource-specific behavior during creation.
CreateStrategy rest.RESTCreateStrategy
// AfterCreate implements a further operation to run after a resource is
// created and before it is decorated, optional.
AfterCreate ObjectFunc
// UpdateStrategy implements resource-specific behavior during updates.
UpdateStrategy rest.RESTUpdateStrategy
// AfterUpdate implements a further operation to run after a resource is
// updated and before it is decorated, optional.
AfterUpdate ObjectFunc
// DeleteStrategy implements resource-specific behavior during deletion.
DeleteStrategy rest.RESTDeleteStrategy
// AfterDelete implements a further operation to run after a resource is
// deleted and before it is decorated, optional.
AfterDelete ObjectFunc
// ReturnDeletedObject determines whether the Store returns the object
// that was deleted. Otherwise, return a generic success status response.
ReturnDeletedObject bool
// ShouldDeleteDuringUpdate is an optional function to determine whether
// an update from existing to obj should result in a delete.
// If specified, this is checked in addition to standard finalizer,
// deletionTimestamp, and deletionGracePeriodSeconds checks.
ShouldDeleteDuringUpdate func(ctx context.Context, key string, obj, existing runtime.Object) bool
// ExportStrategy implements resource-specific behavior during export,
// optional. Exported objects are not decorated.
ExportStrategy rest.RESTExportStrategy
// TableConvertor is an optional interface for transforming items or lists
// of items into tabular output. If unset, the default will be used.
TableConvertor rest.TableConvertor
// Storage is the interface for the underlying storage for the
// resource. It is wrapped into a "DryRunnableStorage" that will
// either pass-through or simply dry-run.
Storage DryRunnableStorage
// StorageVersioner outputs the <group/version/kind> an object will be
// converted to before persisted in etcd, given a list of possible
// kinds of the object.
// If the StorageVersioner is nil, apiserver will leave the
// storageVersionHash as empty in the discovery document.
StorageVersioner runtime.GroupVersioner
// Called to cleanup clients used by the underlying Storage; optional.
DestroyFunc func()
}
// Note: the rest.StandardStorage interface aggregates the common REST verbs
var _ rest.StandardStorage = &Store{}
var _ rest.Exporter = &Store{}
var _ rest.TableConvertor = &Store{}
var _ GenericStore = &Store{}
const (
OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
resourceCountPollPeriodJitter = 1.2
)
// NamespaceKeyRootFunc is the default function for constructing storage paths
// to resource directories enforcing namespace rules.
func NamespaceKeyRootFunc(ctx context.Context, prefix string) string {
key := prefix
ns, ok := genericapirequest.NamespaceFrom(ctx)
if ok && len(ns) > 0 {
key = key + "/" + ns
}
return key
}
// NamespaceKeyFunc is the default function for constructing storage paths to
// a resource relative to the given prefix enforcing namespace rules. If the
// context does not contain a namespace, it errors.
func NamespaceKeyFunc(ctx context.Context, prefix string, name string) (string, error) {
key := NamespaceKeyRootFunc(ctx, prefix)
ns, ok := genericapirequest.NamespaceFrom(ctx)
if !ok || len(ns) == 0 {
return "", apierrors.NewBadRequest("Namespace parameter required.")
}
if len(name) == 0 {
return "", apierrors.NewBadRequest("Name parameter required.")
}
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
}
key = key + "/" + name
return key, nil
}
// NoNamespaceKeyFunc is the default function for constructing storage paths
// to a resource relative to the given prefix without a namespace.
func NoNamespaceKeyFunc(ctx context.Context, prefix string, name string) (string, error) {
if len(name) == 0 {
return "", apierrors.NewBadRequest("Name parameter required.")
}
if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
}
key := prefix + "/" + name
return key, nil
}
// New implements RESTStorage.New.
func (e *Store) New() runtime.Object {
return e.NewFunc()
}
// NewList implements rest.Lister.
func (e *Store) NewList() runtime.Object {
return e.NewListFunc()
}
// NamespaceScoped indicates whether the resource is namespaced
func (e *Store) NamespaceScoped() bool {
if e.CreateStrategy != nil {
return e.CreateStrategy.NamespaceScoped()
}
if e.UpdateStrategy != nil {
return e.UpdateStrategy.NamespaceScoped()
}
panic("programmer error: no CRUD for resource, you're crazy, override NamespaceScoped too")
}
// GetCreateStrategy implements GenericStore.
func (e *Store) GetCreateStrategy() rest.RESTCreateStrategy {
return e.CreateStrategy
}
// GetUpdateStrategy implements GenericStore.
func (e *Store) GetUpdateStrategy() rest.RESTUpdateStrategy {
return e.UpdateStrategy
}
// GetDeleteStrategy implements GenericStore.
func (e *Store) GetDeleteStrategy() rest.RESTDeleteStrategy {
return e.DeleteStrategy
}
// GetExportStrategy implements GenericStore.
func (e *Store) GetExportStrategy() rest.RESTExportStrategy {
return e.ExportStrategy
}
// List returns a list of items matching labels and field according to the
// store's PredicateFunc.
func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
if err != nil {
return nil, err
}
if e.Decorator != nil {
if err := e.Decorator(out); err != nil {
return nil, err
}
}
return out, nil
}
// ListPredicate returns a list of all the items matching the given
// SelectionPredicate.
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
if options == nil {
// By default we should serve the request from etcd.
options = &metainternalversion.ListOptions{ResourceVersion: ""}
}
p.Limit = options.Limit
p.Continue = options.Continue
list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
// Create inserts a new item according to the unique key from the object.
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, err
}
out := e.NewFunc()
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
if !apierrors.IsAlreadyExists(err) {
return nil, err
}
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
return nil, err
}
accessor, errGetAcc := meta.Accessor(out)
if errGetAcc != nil {
return nil, err
}
if accessor.GetDeletionTimestamp() != nil {
msg := &err.(*apierrors.StatusError).ErrStatus.Message
*msg = fmt.Sprintf("object is being deleted: %s", *msg)
}
return nil, err
}
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, err
}
}
if e.Decorator != nil {
if err := e.Decorator(out); err != nil {
return nil, err
}
}
return out, nil
}
// ShouldDeleteDuringUpdate is the default function for
// checking if an object should be deleted during an update.
// It checks if the new object has no finalizers,
// the existing object's deletionTimestamp is set, and
// the existing object's deletionGracePeriodSeconds is 0 or nil
func ShouldDeleteDuringUpdate(ctx context.Context, key string, obj, existing runtime.Object) bool {
newMeta, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(err)
return false
}
oldMeta, err := meta.Accessor(existing)
if err != nil {
utilruntime.HandleError(err)
return false
}
if len(newMeta.GetFinalizers()) > 0 {
// don't delete with finalizers remaining in the new object
return false
}
if oldMeta.GetDeletionTimestamp() == nil {
// don't delete if the existing object hasn't had a delete request made
return false
}
// delete if the existing object has no grace period or a grace period of 0
return oldMeta.GetDeletionGracePeriodSeconds() == nil || *oldMeta.GetDeletionGracePeriodSeconds() == 0
}
// deleteWithoutFinalizers handles deleting an object ignoring its finalizer list.
// Used for objects that are either been finalized or have never initialized.
func (e *Store) deleteWithoutFinalizers(ctx context.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions, dryRun bool) (runtime.Object, bool, error) {
out := e.NewFunc()
klog.V(6).Infof("going to delete %s from registry, triggered by update", name)
// Using the rest.ValidateAllObjectFunc because the request is an UPDATE request and has already passed the admission for the UPDATE verb.
if err := e.Storage.Delete(ctx, key, out, preconditions, rest.ValidateAllObjectFunc, dryRun); err != nil {
// Deletion is racy, i.e., there could be multiple update
// requests to remove all finalizers from the object, so we
// ignore the NotFound error.
if storage.IsNotFound(err) {
_, err := e.finalizeDelete(ctx, obj, true)
// clients are expecting an updated object if a PUT succeeded,
// but finalizeDelete returns a metav1.Status, so return
// the object in the request instead.
return obj, false, err
}
return nil, false, storeerr.InterpretDeleteError(err, e.qualifiedResourceFromContext(ctx), name)
}
_, err := e.finalizeDelete(ctx, out, true)
// clients are expecting an updated object if a PUT succeeded, but
// finalizeDelete returns a metav1.Status, so return the object in
// the request instead.
return obj, false, err
}
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, false, err
}
var (
creatingObj runtime.Object
creating = false
)
qualifiedResource := e.qualifiedResourceFromContext(ctx)
storagePreconditions := &storage.Preconditions{}
if preconditions := objInfo.Preconditions(); preconditions != nil {
storagePreconditions.UID = preconditions.UID
storagePreconditions.ResourceVersion = preconditions.ResourceVersion
}
out := e.NewFunc()
// deleteObj is only used in case a deletion is carried out
var deleteObj runtime.Object
err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
// Given the existing object, get the new object
obj, err := objInfo.UpdatedObject(ctx, existing)
if err != nil {
return nil, nil, err
}
// If AllowUnconditionalUpdate() is true and the object specified by
// the user does not have a resource version, then we populate it with
// the latest version. Else, we check that the version specified by
// the user matches the version of latest storage object.
resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
if err != nil {
return nil, nil, err
}
doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()
version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
if err != nil {
return nil, nil, err
}
if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
return nil, nil, apierrors.NewNotFound(qualifiedResource, name)
}
creating = true
creatingObj = obj
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, nil, err
}
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, nil, err
}
return obj, &ttl, nil
}
creating = false
creatingObj = nil
if doUnconditionalUpdate {
// Update the object's resource version to match the latest
// storage object's resource version.
err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
if err != nil {
return nil, nil, err
}
} else {
// Check if the object's resource version matches the latest
// resource version.
if resourceVersion == 0 {
// TODO: The Invalid error should have a field for Resource.
// After that field is added, we should fill the Resource and
// leave the Kind field empty. See the discussion in #18526.
qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}
return nil, nil, apierrors.NewInvalid(qualifiedKind, name, fieldErrList)
}
if resourceVersion != version {
return nil, nil, apierrors.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
}
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if updateValidation != nil {
if err := updateValidation(ctx, obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
return nil, nil, err
}
}
// Check the default delete-during-update conditions, and store-specific conditions if provided
if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
(e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
deleteObj = obj
return nil, nil, errEmptiedFinalizers
}
ttl, err := e.calculateTTL(obj, res.TTL, true)
if err != nil {
return nil, nil, err
}
if int64(ttl) != res.TTL {
return obj, &ttl, nil
}
return obj, nil, nil
}, dryrun.IsDryRun(options.DryRun))
if err != nil {
// delete the object
if err == errEmptiedFinalizers {
return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions, dryrun.IsDryRun(options.DryRun))
}
if creating {
err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, creatingObj)
} else {
err = storeerr.InterpretUpdateError(err, qualifiedResource, name)
}
return nil, false, err
}
if creating {
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, false, err
}
}
} else {
if e.AfterUpdate != nil {
if err := e.AfterUpdate(out); err != nil {
return nil, false, err
}
}
}
if e.Decorator != nil {
if err := e.Decorator(out); err != nil {
return nil, false, err
}
}
return out, creating, nil
}
// Get retrieves the item from storage.
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
obj := e.NewFunc()
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}
// qualifiedResourceFromContext attempts to retrieve a GroupResource from the context's request info.
// If the context has no request info, DefaultQualifiedResource is used.
func (e *Store) qualifiedResourceFromContext(ctx context.Context) schema.GroupResource {
if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
return schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}
}
// some implementations access storage directly and thus the context has no RequestInfo
return e.DefaultQualifiedResource
}
var (
errAlreadyDeleting = fmt.Errorf("abort delete")
errDeleteNow = fmt.Errorf("delete now")
errEmptiedFinalizers = fmt.Errorf("emptied finalizers")
)
// shouldOrphanDependents returns true if the finalizer for orphaning should be set
// updated for FinalizerOrphanDependents. In the order of highest to lowest
// priority, there are three factors affect whether to add/remove the
// FinalizerOrphanDependents: options, existing finalizers of the object,
// and e.DeleteStrategy.DefaultGarbageCollectionPolicy.
func shouldOrphanDependents(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) bool {
// Get default GC policy from this REST object type
gcStrategy, ok := e.DeleteStrategy.(rest.GarbageCollectionDeleteStrategy)
var defaultGCPolicy rest.GarbageCollectionPolicy
if ok {
defaultGCPolicy = gcStrategy.DefaultGarbageCollectionPolicy(ctx)
}
if defaultGCPolicy == rest.Unsupported {
// return false to indicate that we should NOT orphan
return false
}
// An explicit policy was set at deletion time, that overrides everything
if options != nil && options.OrphanDependents != nil {
return *options.OrphanDependents
}
if options != nil && options.PropagationPolicy != nil {
switch *options.PropagationPolicy {
case metav1.DeletePropagationOrphan:
return true
case metav1.DeletePropagationBackground, metav1.DeletePropagationForeground:
return false
}
}
// If a finalizer is set in the object, it overrides the default
// validation should make sure the two cases won't be true at the same time.
finalizers := accessor.GetFinalizers()
for _, f := range finalizers {
switch f {
case metav1.FinalizerOrphanDependents:
return true
case metav1.FinalizerDeleteDependents:
return false
}
}
// Get default orphan policy from this REST object type if it exists
if defaultGCPolicy == rest.OrphanDependents {
return true
}
return false
}
// shouldDeleteDependents returns true if the finalizer for foreground deletion should be set
// updated for FinalizerDeleteDependents. In the order of highest to lowest
// priority, there are three factors affect whether to add/remove the
// FinalizerDeleteDependents: options, existing finalizers of the object, and
// e.DeleteStrategy.DefaultGarbageCollectionPolicy.
func shouldDeleteDependents(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) bool {
// Get default GC policy from this REST object type
if gcStrategy, ok := e.DeleteStrategy.(rest.GarbageCollectionDeleteStrategy); ok && gcStrategy.DefaultGarbageCollectionPolicy(ctx) == rest.Unsupported {
// return false to indicate that we should NOT delete in foreground
return false
}
// If an explicit policy was set at deletion time, that overrides both
if options != nil && options.OrphanDependents != nil {
return false
}
if options != nil && options.PropagationPolicy != nil {
switch *options.PropagationPolicy {
case metav1.DeletePropagationForeground:
return true
case metav1.DeletePropagationBackground, metav1.DeletePropagationOrphan:
return false
}
}
// If a finalizer is set in the object, it overrides the default
// validation has made sure the two cases won't be true at the same time.
finalizers := accessor.GetFinalizers()
for _, f := range finalizers {
switch f {
case metav1.FinalizerDeleteDependents:
return true
case metav1.FinalizerOrphanDependents:
return false
}
}
return false
}
// deletionFinalizersForGarbageCollection analyzes the object and delete options
// to determine whether the object is in need of finalization by the garbage
// collector. If so, returns the set of deletion finalizers to apply and a bool
// indicating whether the finalizer list has changed and is in need of updating.
//
// The finalizers returned are intended to be handled by the garbage collector.
// If garbage collection is disabled for the store, this function returns false
// to ensure finalizers aren't set which will never be cleared.
func deletionFinalizersForGarbageCollection(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) (bool, []string) {
if !e.EnableGarbageCollection {
return false, []string{}
}
shouldOrphan := shouldOrphanDependents(ctx, e, accessor, options)
shouldDeleteDependentInForeground := shouldDeleteDependents(ctx, e, accessor, options)
newFinalizers := []string{}
// first remove both finalizers, add them back if needed.
for _, f := range accessor.GetFinalizers() {
if f == metav1.FinalizerOrphanDependents || f == metav1.FinalizerDeleteDependents {
continue
}
newFinalizers = append(newFinalizers, f)
}
if shouldOrphan {
newFinalizers = append(newFinalizers, metav1.FinalizerOrphanDependents)
}
if shouldDeleteDependentInForeground {
newFinalizers = append(newFinalizers, metav1.FinalizerDeleteDependents)
}
oldFinalizerSet := sets.NewString(accessor.GetFinalizers()...)
newFinalizersSet := sets.NewString(newFinalizers...)
if oldFinalizerSet.Equal(newFinalizersSet) {
return false, accessor.GetFinalizers()
}
return true, newFinalizers
}
// markAsDeleting sets the obj's DeletionGracePeriodSeconds to 0, and sets the
// DeletionTimestamp to "now" if there is no existing deletionTimestamp or if the existing
// deletionTimestamp is further in future. Finalizers are watching for such updates and will
// finalize the object if their IDs are present in the object's Finalizers list.
func markAsDeleting(obj runtime.Object, now time.Time) (err error) {
objectMeta, kerr := meta.Accessor(obj)
if kerr != nil {
return kerr
}
// This handles Generation bump for resources that don't support graceful
// deletion. For resources that support graceful deletion is handle in
// pkg/api/rest/delete.go
if objectMeta.GetDeletionTimestamp() == nil && objectMeta.GetGeneration() > 0 {
objectMeta.SetGeneration(objectMeta.GetGeneration() + 1)
}
existingDeletionTimestamp := objectMeta.GetDeletionTimestamp()
if existingDeletionTimestamp == nil || existingDeletionTimestamp.After(now) {
metaNow := metav1.NewTime(now)
objectMeta.SetDeletionTimestamp(&metaNow)
}
var zero int64 = 0
objectMeta.SetDeletionGracePeriodSeconds(&zero)
return nil
}
// updateForGracefulDeletionAndFinalizers updates the given object for
// graceful deletion and finalization by setting the deletion timestamp and
// grace period seconds (graceful deletion) and updating the list of
// finalizers (finalization); it returns:
//
// 1. an error
// 2. a boolean indicating that the object was not found, but it should be
// ignored
// 3. a boolean indicating that the object's grace period is exhausted and it
// should be deleted immediately
// 4. a new output object with the state that was updated
// 5. a copy of the last existing state of the object
func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name, key string, options *metav1.DeleteOptions, preconditions storage.Preconditions, deleteValidation rest.ValidateObjectFunc, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) {
lastGraceful := int64(0)
var pendingFinalizers bool
out = e.NewFunc()
err = e.Storage.GuaranteedUpdate(
ctx,
key,
out,
false, /* ignoreNotFound */
&preconditions,
storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
if err := deleteValidation(ctx, existing); err != nil {
return nil, err
}
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options)
if err != nil {
return nil, err
}
if pendingGraceful {
return nil, errAlreadyDeleting
}
// Add/remove the orphan finalizer as the options dictates.
// Note that this occurs after checking pendingGraceufl, so
// finalizers cannot be updated via DeleteOptions if deletion has
// started.
existingAccessor, err := meta.Accessor(existing)
if err != nil {
return nil, err
}
needsUpdate, newFinalizers := deletionFinalizersForGarbageCollection(ctx, e, existingAccessor, options)
if needsUpdate {
existingAccessor.SetFinalizers(newFinalizers)
}
pendingFinalizers = len(existingAccessor.GetFinalizers()) != 0
if !graceful {
// set the DeleteGracePeriods to 0 if the object has pendingFinalizers but not supporting graceful deletion
if pendingFinalizers {
klog.V(6).Infof("update the DeletionTimestamp to \"now\" and GracePeriodSeconds to 0 for object %s, because it has pending finalizers", name)
err = markAsDeleting(existing, time.Now())
if err != nil {
return nil, err
}
return existing, nil
}
return nil, errDeleteNow
}
lastGraceful = *options.GracePeriodSeconds
lastExisting = existing
return existing, nil
}),
dryrun.IsDryRun(options.DryRun),
)
switch err {
case nil:
// If there are pending finalizers, we never delete the object immediately.
if pendingFinalizers {
return nil, false, false, out, lastExisting
}
if lastGraceful > 0 {
return nil, false, false, out, lastExisting
}
// If we are here, the registry supports grace period mechanism and
// we are intentionally delete gracelessly. In this case, we may
// enter a race with other k8s components. If other component wins
// the race, the object will not be found, and we should tolerate
// the NotFound error. See
// https://github.com/kubernetes/kubernetes/issues/19403 for
// details.
return nil, true, true, out, lastExisting
case errDeleteNow:
// we've updated the object to have a zero grace period, or it's already at 0, so
// we should fall through and truly delete the object.
return nil, false, true, out, lastExisting
case errAlreadyDeleting:
out, err = e.finalizeDelete(ctx, in, true)
return err, false, false, out, lastExisting
default:
return storeerr.InterpretUpdateError(err, e.qualifiedResourceFromContext(ctx), name), false, false, out, lastExisting
}
}
// Delete removes the item from storage.
func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, false, err
}
obj := e.NewFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
if err = e.Storage.Get(ctx, key, "", obj, false); err != nil {
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}
// support older consumers of delete by treating "nil" as delete immediately
if options == nil {
options = metav1.NewDeleteOptions(0)
}
var preconditions storage.Preconditions
if options.Preconditions != nil {
preconditions.UID = options.Preconditions.UID
preconditions.ResourceVersion = options.Preconditions.ResourceVersion
}
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
if err != nil {
return nil, false, err
}
// this means finalizers cannot be updated via DeleteOptions if a deletion is already pending
if pendingGraceful {
out, err := e.finalizeDelete(ctx, obj, false)
return out, false, err
}
// check if obj has pending finalizers
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, false, apierrors.NewInternalError(err)
}
pendingFinalizers := len(accessor.GetFinalizers()) != 0
var ignoreNotFound bool
var deleteImmediately bool = true
var lastExisting, out runtime.Object
// Handle combinations of graceful deletion and finalization by issuing
// the correct updates.
shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options)
// TODO: remove the check, because we support no-op updates now.
if graceful || pendingFinalizers || shouldUpdateFinalizers {
err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)
// Update the preconditions.ResourceVersion if set since we updated the object.
if err == nil && deleteImmediately && preconditions.ResourceVersion != nil {
accessor, err = meta.Accessor(out)
if err != nil {
return out, false, apierrors.NewInternalError(err)
}
resourceVersion := accessor.GetResourceVersion()
preconditions.ResourceVersion = &resourceVersion
}
}
// !deleteImmediately covers all cases where err != nil. We keep both to be future-proof.
if !deleteImmediately || err != nil {
return out, false, err
}
// Going further in this function is not useful when we are
// performing a dry-run request. Worse, it will actually
// override "out" with the version of the object in database
// that doesn't have the finalizer and deletiontimestamp set
// (because the update above was dry-run too). If we already
// have that version available, let's just return it now,
// otherwise, we can call dry-run delete that will get us the
// latest version of the object.
if dryrun.IsDryRun(options.DryRun) && out != nil {
return out, true, nil
}
// delete immediately, or no graceful deletion supported
klog.V(6).Infof("going to delete %s from registry: ", name)
out = e.NewFunc()
if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun)); err != nil {
// Please refer to the place where we set ignoreNotFound for the reason
// why we ignore the NotFound error .
if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
// The lastExisting object may not be the last state of the object
// before its deletion, but it's the best approximation.
out, err := e.finalizeDelete(ctx, lastExisting, true)
return out, true, err
}
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}
out, err = e.finalizeDelete(ctx, out, true)
return out, true, err
}
// DeleteReturnsDeletedObject implements the rest.MayReturnFullObjectDeleter interface
func (e *Store) DeleteReturnsDeletedObject() bool {
return e.ReturnDeletedObject
}
// DeleteCollection removes all items returned by List with a given ListOptions from storage.
//
// DeleteCollection is currently NOT atomic. It can happen that only subset of objects
// will be deleted from storage, and then an error will be returned.
// In case of success, the list of deleted objects will be returned.
//
// TODO: Currently, there is no easy way to remove 'directory' entry from storage (if we
// are removing all objects of a given type) with the current API (it's technically
// possibly with storage API, but watch is not delivered correctly then).
// It will be possible to fix it with v3 etcd API.
func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
if listOptions == nil {
listOptions = &metainternalversion.ListOptions{}
} else {
listOptions = listOptions.DeepCopy()
}
listObj, err := e.List(ctx, listOptions)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(listObj)
if err != nil {
return nil, err
}
// Spawn a number of goroutines, so that we can issue requests to storage
// in parallel to speed up deletion.
// TODO: Make this proportional to the number of items to delete, up to
// DeleteCollectionWorkers (it doesn't make much sense to spawn 16
// workers to delete 10 items).
workersNumber := e.DeleteCollectionWorkers
if workersNumber < 1 {
workersNumber = 1
}
wg := sync.WaitGroup{}
toProcess := make(chan int, 2*workersNumber)
errs := make(chan error, workersNumber+1)
go func() {
defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason)
})
for i := 0; i < len(items); i++ {
toProcess <- i
}
close(toProcess)
}()
wg.Add(workersNumber)
for i := 0; i < workersNumber; i++ {
go func() {
// panics don't cross goroutine boundaries
defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection goroutine panicked: %v", panicReason)
})
defer wg.Done()
for index := range toProcess {
accessor, err := meta.Accessor(items[index])
if err != nil {
errs <- err
return
}
if _, _, err := e.Delete(ctx, accessor.GetName(), deleteValidation, options); err != nil && !apierrors.IsNotFound(err) {
klog.V(4).Infof("Delete %s in DeleteCollection failed: %v", accessor.GetName(), err)
errs <- err
return
}
}
}()
}
wg.Wait()
select {
case err := <-errs:
return nil, err
default:
return listObj, nil
}
}
// finalizeDelete runs the Store's AfterDelete hook if runHooks is set and
// returns the decorated deleted object if appropriate.
func (e *Store) finalizeDelete(ctx context.Context, obj runtime.Object, runHooks bool) (runtime.Object, error) {
if runHooks && e.AfterDelete != nil {
if err := e.AfterDelete(obj); err != nil {
return nil, err
}
}
if e.ReturnDeletedObject {
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}
// Return information about the deleted object, which enables clients to
// verify that the object was actually deleted and not waiting for finalizers.
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
qualifiedResource := e.qualifiedResourceFromContext(ctx)
details := &metav1.StatusDetails{
Name: accessor.GetName(),
Group: qualifiedResource.Group,
Kind: qualifiedResource.Resource, // Yes we set Kind field to resource.
UID: accessor.GetUID(),
}
status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}
return status, nil
}
// Watch makes a matcher for the given label and field, and calls
// WatchPredicate. If possible, you should customize PredicateFunc to produce
// a matcher that matches by key. SelectionPredicate does this for you
// automatically.
func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
predicate := e.PredicateFunc(label, field)
resourceVersion := ""
if options != nil {
resourceVersion = options.ResourceVersion
predicate.AllowWatchBookmarks = options.AllowWatchBookmarks
}
return e.WatchPredicate(ctx, predicate, resourceVersion)
}
// WatchPredicate starts a watch for the items that matches.
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
w, err := e.Storage.Watch(ctx, key, resourceVersion, p)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
// if we cannot extract a key based on the current context, the
// optimization is skipped
}
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
if err != nil {
return nil, err
}
if e.Decorator != nil {
return newDecoratedWatcher(w, e.Decorator), nil
}
return w, nil
}
// calculateTTL is a helper for retrieving the updated TTL for an object or
// returning an error if the TTL cannot be calculated. The defaultTTL is
// changed to 1 if less than zero. Zero means no TTL, not expire immediately.
func (e *Store) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) {
// TODO: validate this is assertion is still valid.
// etcd may return a negative TTL for a node if the expiration has not
// occurred due to server lag - we will ensure that the value is at least
// set.
if defaultTTL < 0 {
defaultTTL = 1
}
ttl = uint64(defaultTTL)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, ttl, update)
}
return ttl, err
}
// exportObjectMeta unsets the fields on the given object that should not be
// present when the object is exported.
func exportObjectMeta(accessor metav1.Object, exact bool) {
accessor.SetUID("")
if !exact {
accessor.SetNamespace("")
}
accessor.SetCreationTimestamp(metav1.Time{})
accessor.SetDeletionTimestamp(nil)
accessor.SetResourceVersion("")
accessor.SetSelfLink("")
if len(accessor.GetGenerateName()) > 0 && !exact {
accessor.SetName("")
}
}
// Export implements the rest.Exporter interface
func (e *Store) Export(ctx context.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
obj, err := e.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, err
}
if accessor, err := meta.Accessor(obj); err == nil {
exportObjectMeta(accessor, opts.Exact)
} else {
klog.V(4).Infof("Object of type %v does not have ObjectMeta: %v", reflect.TypeOf(obj), err)
}
if e.ExportStrategy != nil {
if err = e.ExportStrategy.Export(ctx, obj, opts.Exact); err != nil {
return nil, err
}
} else {
e.CreateStrategy.PrepareForCreate(ctx, obj)
}
return obj, nil
}
// CompleteWithOptions updates the store with the provided options and
// defaults common fields.
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
if e.DefaultQualifiedResource.Empty() {
return fmt.Errorf("store %#v must have a non-empty qualified resource", e)
}
if e.NewFunc == nil {
return fmt.Errorf("store for %s must have NewFunc set", e.DefaultQualifiedResource.String())
}
if e.NewListFunc == nil {
return fmt.Errorf("store for %s must have NewListFunc set", e.DefaultQualifiedResource.String())
}
if (e.KeyRootFunc == nil) != (e.KeyFunc == nil) {
return fmt.Errorf("store for %s must set both KeyRootFunc and KeyFunc or neither", e.DefaultQualifiedResource.String())
}
var isNamespaced bool
switch {
case e.CreateStrategy != nil:
isNamespaced = e.CreateStrategy.NamespaceScoped()
case e.UpdateStrategy != nil:
isNamespaced = e.UpdateStrategy.NamespaceScoped()
default:
return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
}
if e.DeleteStrategy == nil {
return fmt.Errorf("store for %s must have DeleteStrategy set", e.DefaultQualifiedResource.String())
}
if options.RESTOptions == nil {
return fmt.Errorf("options for %s must have RESTOptions set", e.DefaultQualifiedResource.String())
}
attrFunc := options.AttrFunc
if attrFunc == nil {
if isNamespaced {
attrFunc = storage.DefaultNamespaceScopedAttr
} else {
attrFunc = storage.DefaultClusterScopedAttr
}
}
if e.PredicateFunc == nil {
e.PredicateFunc = func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: attrFunc,
}
}
}
err := validateIndexers(options.Indexers)
if err != nil {
return err
}
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
// ResourcePrefix must come from the underlying factory
prefix := opts.ResourcePrefix
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
}
if prefix == "/" {
return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
}
// Set the default behavior for storage key generation
if e.KeyRootFunc == nil && e.KeyFunc == nil {
if isNamespaced {
e.KeyRootFunc = func(ctx context.Context) string {
return NamespaceKeyRootFunc(ctx, prefix)
}
e.KeyFunc = func(ctx context.Context, name string) (string, error) {
return NamespaceKeyFunc(ctx, prefix, name)
}
} else {
e.KeyRootFunc = func(ctx context.Context) string {
return prefix
}
e.KeyFunc = func(ctx context.Context, name string) (string, error) {
return NoNamespaceKeyFunc(ctx, prefix, name)
}
}
}
// We adapt the store's keyFunc so that we can use it with the StorageDecorator
// without making any assumptions about where objects are stored in etcd
keyFunc := func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}
if isNamespaced {
return e.KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())
}
return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())
}
if e.DeleteCollectionWorkers == 0 {
e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
}
e.EnableGarbageCollection = opts.EnableGarbageCollection
if e.ObjectNameFunc == nil {
e.ObjectNameFunc = func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
}
return accessor.GetName(), nil
}
}
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
}
}
}
return nil
}
// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
func (e *Store) startObservingCount(period time.Duration) func() {
prefix := e.KeyRootFunc(genericapirequest.NewContext())
resourceName := e.DefaultQualifiedResource.String()
klog.V(2).Infof("Monitoring %v count at <storage-prefix>/%v", resourceName, prefix)
stopCh := make(chan struct{})
go wait.JitterUntil(func() {
count, err := e.Storage.Count(prefix)
if err != nil {
klog.V(5).Infof("Failed to update storage count metric: %v", err)
metrics.UpdateObjectCount(resourceName, -1)
} else {
metrics.UpdateObjectCount(resourceName, count)
}
}, period, resourceCountPollPeriodJitter, true, stopCh)
return func() { close(stopCh) }
}
func (e *Store) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
if e.TableConvertor != nil {
return e.TableConvertor.ConvertToTable(ctx, object, tableOptions)
}
return rest.NewDefaultTableConvertor(e.qualifiedResourceFromContext(ctx)).ConvertToTable(ctx, object, tableOptions)
}
func (e *Store) StorageVersion() runtime.GroupVersioner {
return e.StorageVersioner
}
// validateIndexers will check the prefix of indexers.
func validateIndexers(indexers *cache.Indexers) error {
if indexers == nil {
return nil
}
for indexName := range *indexers {
if len(indexName) <= 2 || (indexName[:2] != "l:" && indexName[:2] != "f:") {
return fmt.Errorf("index must prefix with \"l:\" or \"f:\"")
}
}
return nil
}