0、架构图
1、存储相关配置
1.1、资源与版本
1.1.1、资源与版本的启用
// ResourceConfig记录了每个Group启用了哪个版本以及哪个具体的GVK是否启用// 用gv/gvk map来维护信息// eg. 若当前ResourceConfig记录信息如下// GroupVersionConfigs:// testGroup01/v1 true// testGroup01/v1beta1 false// testGroup02/v1 true//// testGroup01/v1/type01 false//// 则版本testGroup01/v1 testGroup02/v1处于启用状态// testGroup01/v1beta1处于禁用状态(禁用状态不会生成相应的rest api)// 由于testGroup01/v1/type01为false,则testGroup01/v1虽然启用,但是其资源type01// 处于禁用状态,则不会生成testGroup01/v1/type01的rest apitype ResourceConfig struct {GroupVersionConfigs map[schema.GroupVersion]boolResourceConfigs map[schema.GroupVersionResource]bool}func NewResourceConfig() *ResourceConfig {return &ResourceConfig{GroupVersionConfigs: map[schema.GroupVersion]bool{}, ResourceConfigs: map[schema.GroupVersionResource]bool{}}}func (o *ResourceConfig) DisableAll() {for k := range o.GroupVersionConfigs {o.GroupVersionConfigs[k] = false}}func (o *ResourceConfig) EnableAll() {for k := range o.GroupVersionConfigs {o.GroupVersionConfigs[k] = true}}func (o *ResourceConfig) DisableMatchingVersions(matcher func(gv schema.GroupVersion) bool) {for k := range o.GroupVersionConfigs {if matcher(k) {o.GroupVersionConfigs[k] = false}}}func (o *ResourceConfig) EnableMatchingVersions(matcher func(gv schema.GroupVersion) bool) {for k := range o.GroupVersionConfigs {if matcher(k) {o.GroupVersionConfigs[k] = true}}}func (o *ResourceConfig) DisableVersions(versions ...schema.GroupVersion) {for _, version := range versions {o.GroupVersionConfigs[version] = false}}func (o *ResourceConfig) EnableVersions(versions ...schema.GroupVersion) {for _, version := range versions {o.GroupVersionConfigs[version] = true}}func (o *ResourceConfig) VersionEnabled(version schema.GroupVersion) bool {enabled, _ := o.GroupVersionConfigs[version]if enabled {return true}return false}func (o *ResourceConfig) DisableResources(resources ...schema.GroupVersionResource) {for _, resource := range resources {o.ResourceConfigs[resource] = false}}func (o *ResourceConfig) EnableResources(resources ...schema.GroupVersionResource) {for _, resource := range resources {o.ResourceConfigs[resource] = true}}func (o *ResourceConfig) ResourceEnabled(resource schema.GroupVersionResource) bool {if !o.VersionEnabled(resource.GroupVersion()) {return false}resourceEnabled, explicitlySet := o.ResourceConfigs[resource]if !explicitlySet {return true}return resourceEnabled}func (o *ResourceConfig) AnyVersionForGroupEnabled(group string) bool {for version := range o.GroupVersionConfigs {if version.Group == group {if o.VersionEnabled(version) {return true}}}return false}
1.1.2、资源的存储版本和内存版本
// DefaultResourceEncodingConfig记录了每个gv的存储版本和内存版本,// 用以生成相应的存储序列化器和反序列化器// 此处的配置叫OverridingResourceEncoding。即覆盖配置。因为若无配置,则默认系统会用// v1作为存储版本,__internal用作内存版本(因为一般自定义资源注册版本优先级的时候v1版本优先级最高)// 若用户有自定义,则用OverridingResourceEncoding覆盖默认配置type DefaultResourceEncodingConfig struct {resources map[schema.GroupResource]*OverridingResourceEncodingscheme *runtime.Scheme}type OverridingResourceEncoding struct {ExternalResourceEncoding schema.GroupVersionInternalResourceEncoding schema.GroupVersion}func NewDefaultResourceEncodingConfig(scheme *runtime.Scheme) *DefaultResourceEncodingConfig {return &DefaultResourceEncodingConfig{resources: map[schema.GroupResource]*OverridingResourceEncoding{}, scheme: scheme}}func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored schema.GroupResource, externalEncodingVersion, internalVersion schema.GroupVersion) {o.resources[resourceBeingStored] = &OverridingResourceEncoding{ExternalResourceEncoding: externalEncodingVersion,InternalResourceEncoding: internalVersion,}}func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {if !o.scheme.IsGroupRegistered(resource.Group) {return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)}resourceOverride, resourceExists := o.resources[resource]if resourceExists {return resourceOverride.ExternalResourceEncoding, nil}return o.scheme.PrioritizedVersionsForGroup(resource.Group)[0], nil}func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {if !o.scheme.IsGroupRegistered(resource.Group) {return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)}resourceOverride, resourceExists := o.resources[resource]if resourceExists {return resourceOverride.InternalResourceEncoding, nil}return schema.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil}
1.2、存储序列化器/反序列化器
type StorageCodecConfig struct {StorageMediaType string // 存储类型,默认jsonStorageSerializer runtime.StorageSerializer // 全局序列化器StorageVersion schema.GroupVersion // 存储版本MemoryVersion schema.GroupVersion // 内存版本Config storagebackend.Config // 后端存储配置EncoderDecoratorFn func(runtime.Encoder) runtime.EncoderDecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder}func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, runtime.GroupVersioner, error) {mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)if err != nil {return nil, nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)}// 获取给定类型的解码器serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)if !ok {return nil, nil, fmt.Errorf("unable to find serializer for %q", mediaType)}s := serializer.Serializer// wrappervar encoder runtime.Encoder = sif opts.EncoderDecoratorFn != nil {encoder = opts.EncoderDecoratorFn(encoder)}decoders := []runtime.Decoder{s,opts.StorageSerializer.UniversalDeserializer(),runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()),}if opts.DecoderDecoratorFn != nil {decoders = opts.DecoderDecoratorFn(decoders)}// encodeVersioner可以认为等同于返回存储版本的gvkencodeVersioner := runtime.NewMultiGroupVersioner(opts.StorageVersion,schema.GroupKind{Group: opts.StorageVersion.Group},schema.GroupKind{Group: opts.MemoryVersion.Group},)encoder = opts.StorageSerializer.EncoderForVersion(encoder,encodeVersioner,)// 由于不确定etcd存储的是什么格式的数据,因此解码器是所有解码器的合集// 解码要先解析格式再进行解码// decodeVersioner一般也默认为内部版本decoder := opts.StorageSerializer.DecoderToVersion(recognizer.NewDecoder(decoders...),runtime.NewCoercingMultiGroupVersioner(opts.MemoryVersion,schema.GroupKind{Group: opts.MemoryVersion.Group},schema.GroupKind{Group: opts.StorageVersion.Group},),)return runtime.NewCodec(encoder, decoder), encodeVersioner, nil}
1.3、存储生成工厂
const AllResources = "*"type StorageFactory interface {// 根据给定gr生成对应的后端配置文件(包含了对应的序列化器和反序列化器)// 然后根据利用kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go生成存储NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error)ResourcePrefix(groupResource schema.GroupResource) string// 返回所有的etcd后端节点列表Backends() []Backend}// etcd配置,用以生成etcd客户端type Backend struct {// 形如: https://etcd.domain:2379、http://etcd.domain:2379Server stringTLSConfig *tls.Config}type DefaultStorageFactory struct {// 保存了etcd需要的通用信息StorageConfig storagebackend.Config// 资源覆盖(包含存储版本等需要覆盖的信息)Overrides map[schema.GroupResource]groupResourceOverrides// 默认的gv存储前缀信息DefaultResourcePrefixes map[schema.GroupResource]string// 默认的存储类型DefaultMediaType string// 全局序列化器DefaultSerializer runtime.StorageSerializer// 参见1.1.1ResourceEncodingConfig ResourceEncodingConfig// 参见1.1.2APIResourceConfigSource APIResourceConfigSource// 为1.2里面的NewStorageCodecnewStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)}func NewDefaultStorageFactory(config storagebackend.Config,defaultMediaType string,defaultSerializer runtime.StorageSerializer,resourceEncodingConfig ResourceEncodingConfig,resourceConfig APIResourceConfigSource,specialDefaultResourcePrefixes map[schema.GroupResource]string,) *DefaultStorageFactory {config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)if len(defaultMediaType) == 0 {defaultMediaType = runtime.ContentTypeJSON}return &DefaultStorageFactory{StorageConfig: config,Overrides: map[schema.GroupResource]groupResourceOverrides{},DefaultMediaType: defaultMediaType,DefaultSerializer: defaultSerializer,ResourceEncodingConfig: resourceEncodingConfig,APIResourceConfigSource: resourceConfig,DefaultResourcePrefixes: specialDefaultResourcePrefixes,newStorageCodecFn: NewStorageCodec,}}// 根据给定的gr返回对应的后端存储配置文件func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {chosenStorageResource := s.getStorageGroupResource(groupResource)storageConfig := s.StorageConfigcodecConfig := StorageCodecConfig{StorageMediaType: s.DefaultMediaType,StorageSerializer: s.DefaultSerializer,}// 先覆盖group级别的参数if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {override.Apply(&storageConfig, &codecConfig)}// 再覆盖gr级别的参数if override, ok := s.Overrides[chosenStorageResource]; ok {override.Apply(&storageConfig, &codecConfig)}var err errorcodecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)if err != nil {return nil, err}codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)if err != nil {return nil, err}codecConfig.Config = storageConfigstorageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)if err != nil {return nil, err}klog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)return &storageConfig, nil}// Backends returns all backends for all registered storage destinations.// Used for getting all instances for health validations.func (s *DefaultStorageFactory) Backends() []Backend {servers := sets.NewString(s.StorageConfig.Transport.ServerList...)for _, overrides := range s.Overrides {servers.Insert(overrides.etcdLocation...)}tlsConfig := &tls.Config{InsecureSkipVerify: true,}if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)if err != nil {klog.Errorf("failed to load key pair while getting backends: %s", err)} else {tlsConfig.Certificates = []tls.Certificate{cert}}}if len(s.StorageConfig.Transport.TrustedCAFile) > 0 {if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil {klog.Errorf("failed to read ca file while getting backends: %s", err)} else {caPool := x509.NewCertPool()caPool.AppendCertsFromPEM(caCert)tlsConfig.RootCAs = caPooltlsConfig.InsecureSkipVerify = false}}backends := []Backend{}for server := range servers {backends = append(backends, Backend{Server: server,// We can't share TLSConfig across different backends to avoid races.// For more details see: http://pr.k8s.io/59338TLSConfig: tlsConfig.Clone(),})}return backends}func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {chosenStorageResource := s.getStorageGroupResource(groupResource)groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]exactResourceOverride := s.Overrides[chosenStorageResource]etcdResourcePrefix := s.DefaultResourcePrefixes[chosenStorageResource]if len(groupOverride.etcdResourcePrefix) > 0 {etcdResourcePrefix = groupOverride.etcdResourcePrefix}if len(exactResourceOverride.etcdResourcePrefix) > 0 {etcdResourcePrefix = exactResourceOverride.etcdResourcePrefix}if len(etcdResourcePrefix) == 0 {etcdResourcePrefix = strings.ToLower(chosenStorageResource.Resource)}return etcdResourcePrefix}// 以下接口为DefaultStorageFactory开放给用户自定义的func// 设置etcd地址(给指定gv指定etcd机器)func (s *DefaultStorageFactory) SetEtcdLocation(groupResource schema.GroupResource, location []string) {overrides := s.Overrides[groupResource]overrides.etcdLocation = locations.Overrides[groupResource] = overrides}// 设置etcd的前缀func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource schema.GroupResource, prefix string) {overrides := s.Overrides[groupResource]overrides.etcdPrefix = prefixs.Overrides[groupResource] = overrides}// 设置disablePagingfunc (s *DefaultStorageFactory) SetDisableAPIListChunking(groupResource schema.GroupResource) {overrides := s.Overrides[groupResource]overrides.disablePaging = trues.Overrides[groupResource] = overrides}// 设置指定资源的前缀// 则整体etcd前缀为etcdPrefix/etcdResourcePrefixfunc (s *DefaultStorageFactory) SetResourceEtcdPrefix(groupResource schema.GroupResource, prefix string) {overrides := s.Overrides[groupResource]overrides.etcdResourcePrefix = prefixs.Overrides[groupResource] = overrides}// 设置全局序列化器/序列化类型func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource, mediaType string, serializer runtime.StorageSerializer) {overrides := s.Overrides[groupResource]overrides.mediaType = mediaTypeoverrides.serializer = serializers.Overrides[groupResource] = overrides}// 设置Transformerfunc (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) {overrides := s.Overrides[groupResource]overrides.transformer = transformers.Overrides[groupResource] = overrides}// 增加关联关系资源func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) {for _, groupResource := range groupResources {overrides := s.Overrides[groupResource]overrides.cohabitatingResources = groupResourcess.Overrides[groupResource] = overrides}}// 设置序列化/反序列化器wrapperfunc (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) {for _, groupResource := range groupResources {overrides := s.Overrides[groupResource]overrides.encoderDecoratorFn = encoderDecoratorFnoverrides.decoderDecoratorFn = decoderDecoratorFns.Overrides[groupResource] = overrides}}func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.GroupResource) schema.GroupResource {for _, potentialStorageResource := range s.Overrides[groupResource].cohabitatingResources {if s.APIResourceConfigSource.AnyVersionForGroupEnabled(potentialStorageResource.Group) {return potentialStorageResource}}return groupResource}type groupResourceOverrides struct {etcdLocation []string // etcd机器地址etcdPrefix string // etcd前缀etcdResourcePrefix string // etcd前缀mediaType string // 序列化/反序列化类型serializer runtime.StorageSerializer // 全局序列化器// 资源关联信息。比如deployment有apps组,extentions组。// 若apps没有启用,则会用extentions的覆盖信息进行覆盖// 内置的一些关联关系参加1.3.1.1 pic1cohabitatingResources []schema.GroupResourceencoderDecoratorFn func(runtime.Encoder) runtime.Encoder // 序列化器wrapperdecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder // 反序列化器wrappertransformer value.Transformer // transformerdisablePaging bool // disablePaging}// 覆盖配置func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {if len(o.etcdLocation) > 0 {config.Transport.ServerList = o.etcdLocation}if len(o.etcdPrefix) > 0 {config.Prefix = o.etcdPrefix}if len(o.mediaType) > 0 {options.StorageMediaType = o.mediaType}if o.serializer != nil {options.StorageSerializer = o.serializer}if o.encoderDecoratorFn != nil {options.EncoderDecoratorFn = o.encoderDecoratorFn}if o.decoderDecoratorFn != nil {options.DecoderDecoratorFn = o.decoderDecoratorFn}if o.transformer != nil {config.Transformer = o.transformer}if o.disablePaging {config.Paging = false}}func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource {return schema.GroupResource{Group: resource.Group, Resource: AllResources}}
1.3.1、pic
1.3.1.1、pic1
2、generic registry
存储的实现分为以下两层
1、kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
封装了用户定义的一些存储之前的逻辑,比如创建策略等
2、kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
真正的存储,封装了etcd客户端以及序列化与反序列化等操作
2.1、interface
2.1.1、Create
2.2、implement
// ObjectFunc is a function to act on a given object. An error may be returned// if the hook cannot be completed. An ObjectFunc may transform the provided// object.type ObjectFunc func(obj runtime.Object) error// GenericStore interface can be used for type assertions when we need to access the underlying strategies.type GenericStore interface {GetCreateStrategy() rest.RESTCreateStrategyGetUpdateStrategy() rest.RESTUpdateStrategyGetDeleteStrategy() rest.RESTDeleteStrategyGetExportStrategy() rest.RESTExportStrategy}// Store implements pkg/api/rest.StandardStorage. It's intended to be// embeddable and allows the consumer to implement any non-generic functions// that are required. This object is intended to be copyable so that it can be// used in different ways but share the same underlying behavior.//// All fields are required unless specified.//// The intended use of this type is embedding within a Kind specific// RESTStorage implementation. This type provides CRUD semantics on a Kubelike// resource, handling details like conflict detection with ResourceVersion and// semantics. The RESTCreateStrategy, RESTUpdateStrategy, and// RESTDeleteStrategy are generic across all backends, and encapsulate logic// specific to the API.//// TODO: make the default exposed methods exactly match a generic RESTStoragetype Store struct {// NewFunc returns a new instance of the type this registry returns for a// GET of a single object, e.g.://// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-objectNewFunc func() runtime.Object// NewListFunc returns a new list of the type this registry; it is the// type returned when the resource is listed, e.g.://// curl GET /apis/group/version/namespaces/my-ns/myresourceNewListFunc func() runtime.Object// DefaultQualifiedResource is the pluralized name of the resource.// This field is used if there is no request info present in the context.// See qualifiedResourceFromContext for details.DefaultQualifiedResource schema.GroupResource// KeyRootFunc returns the root etcd key for this resource; should not// include trailing "/". This is used for operations that work on the// entire collection (listing and watching).//// KeyRootFunc and KeyFunc must be supplied together or not at all.KeyRootFunc func(ctx context.Context) string// KeyFunc returns the key for a specific object in the collection.// KeyFunc is called for Create/Update/Get/Delete. Note that 'namespace'// can be gotten from ctx.//// KeyFunc and KeyRootFunc must be supplied together or not at all.KeyFunc func(ctx context.Context, name string) (string, error)// ObjectNameFunc returns the name of an object or an error.ObjectNameFunc func(obj runtime.Object) (string, error)// TTLFunc returns the TTL (time to live) that objects should be persisted// with. The existing parameter is the current TTL or the default for this// operation. The update parameter indicates whether this is an operation// against an existing object.//// Objects that are persisted with a TTL are evicted once the TTL expires.TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)// PredicateFunc returns a matcher corresponding to the provided labels// and fields. The SelectionPredicate returned should return true if the// object matches the given field and label selectors.PredicateFunc func(label labels.Selector, field fields.Selector) storage.SelectionPredicate// EnableGarbageCollection affects the handling of Update and Delete// requests. Enabling garbage collection allows finalizers to do work to// finalize this object before the store deletes it.//// If any store has garbage collection enabled, it must also be enabled in// the kube-controller-manager.EnableGarbageCollection bool// DeleteCollectionWorkers is the maximum number of workers in a single// DeleteCollection call. Delete requests for the items in a collection// are issued in parallel.DeleteCollectionWorkers int// Decorator is an optional exit hook on an object returned from the// underlying storage. The returned object could be an individual object// (e.g. Pod) or a list type (e.g. PodList). Decorator is intended for// integrations that are above storage and should only be used for// specific cases where storage of the value is not appropriate, since// they cannot be watched.Decorator ObjectFunc// CreateStrategy implements resource-specific behavior during creation.CreateStrategy rest.RESTCreateStrategy// AfterCreate implements a further operation to run after a resource is// created and before it is decorated, optional.AfterCreate ObjectFunc// UpdateStrategy implements resource-specific behavior during updates.UpdateStrategy rest.RESTUpdateStrategy// AfterUpdate implements a further operation to run after a resource is// updated and before it is decorated, optional.AfterUpdate ObjectFunc// DeleteStrategy implements resource-specific behavior during deletion.DeleteStrategy rest.RESTDeleteStrategy// AfterDelete implements a further operation to run after a resource is// deleted and before it is decorated, optional.AfterDelete ObjectFunc// ReturnDeletedObject determines whether the Store returns the object// that was deleted. Otherwise, return a generic success status response.ReturnDeletedObject bool// ShouldDeleteDuringUpdate is an optional function to determine whether// an update from existing to obj should result in a delete.// If specified, this is checked in addition to standard finalizer,// deletionTimestamp, and deletionGracePeriodSeconds checks.ShouldDeleteDuringUpdate func(ctx context.Context, key string, obj, existing runtime.Object) bool// ExportStrategy implements resource-specific behavior during export,// optional. Exported objects are not decorated.ExportStrategy rest.RESTExportStrategy// TableConvertor is an optional interface for transforming items or lists// of items into tabular output. If unset, the default will be used.TableConvertor rest.TableConvertor// Storage is the interface for the underlying storage for the// resource. It is wrapped into a "DryRunnableStorage" that will// either pass-through or simply dry-run.Storage DryRunnableStorage// StorageVersioner outputs the <group/version/kind> an object will be// converted to before persisted in etcd, given a list of possible// kinds of the object.// If the StorageVersioner is nil, apiserver will leave the// storageVersionHash as empty in the discovery document.StorageVersioner runtime.GroupVersioner// Called to cleanup clients used by the underlying Storage; optional.DestroyFunc func()}// Note: the rest.StandardStorage interface aggregates the common REST verbsvar _ rest.StandardStorage = &Store{}var _ rest.Exporter = &Store{}var _ rest.TableConvertor = &Store{}var _ GenericStore = &Store{}const (OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"resourceCountPollPeriodJitter = 1.2)// NamespaceKeyRootFunc is the default function for constructing storage paths// to resource directories enforcing namespace rules.func NamespaceKeyRootFunc(ctx context.Context, prefix string) string {key := prefixns, ok := genericapirequest.NamespaceFrom(ctx)if ok && len(ns) > 0 {key = key + "/" + ns}return key}// NamespaceKeyFunc is the default function for constructing storage paths to// a resource relative to the given prefix enforcing namespace rules. If the// context does not contain a namespace, it errors.func NamespaceKeyFunc(ctx context.Context, prefix string, name string) (string, error) {key := NamespaceKeyRootFunc(ctx, prefix)ns, ok := genericapirequest.NamespaceFrom(ctx)if !ok || len(ns) == 0 {return "", apierrors.NewBadRequest("Namespace parameter required.")}if len(name) == 0 {return "", apierrors.NewBadRequest("Name parameter required.")}if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))}key = key + "/" + namereturn key, nil}// NoNamespaceKeyFunc is the default function for constructing storage paths// to a resource relative to the given prefix without a namespace.func NoNamespaceKeyFunc(ctx context.Context, prefix string, name string) (string, error) {if len(name) == 0 {return "", apierrors.NewBadRequest("Name parameter required.")}if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))}key := prefix + "/" + namereturn key, nil}// New implements RESTStorage.New.func (e *Store) New() runtime.Object {return e.NewFunc()}// NewList implements rest.Lister.func (e *Store) NewList() runtime.Object {return e.NewListFunc()}// NamespaceScoped indicates whether the resource is namespacedfunc (e *Store) NamespaceScoped() bool {if e.CreateStrategy != nil {return e.CreateStrategy.NamespaceScoped()}if e.UpdateStrategy != nil {return e.UpdateStrategy.NamespaceScoped()}panic("programmer error: no CRUD for resource, you're crazy, override NamespaceScoped too")}// GetCreateStrategy implements GenericStore.func (e *Store) GetCreateStrategy() rest.RESTCreateStrategy {return e.CreateStrategy}// GetUpdateStrategy implements GenericStore.func (e *Store) GetUpdateStrategy() rest.RESTUpdateStrategy {return e.UpdateStrategy}// GetDeleteStrategy implements GenericStore.func (e *Store) GetDeleteStrategy() rest.RESTDeleteStrategy {return e.DeleteStrategy}// GetExportStrategy implements GenericStore.func (e *Store) GetExportStrategy() rest.RESTExportStrategy {return e.ExportStrategy}// List returns a list of items matching labels and field according to the// store's PredicateFunc.func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {label := labels.Everything()if options != nil && options.LabelSelector != nil {label = options.LabelSelector}field := fields.Everything()if options != nil && options.FieldSelector != nil {field = options.FieldSelector}out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)if err != nil {return nil, err}if e.Decorator != nil {if err := e.Decorator(out); err != nil {return nil, err}}return out, nil}// ListPredicate returns a list of all the items matching the given// SelectionPredicate.func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {if options == nil {// By default we should serve the request from etcd.options = &metainternalversion.ListOptions{ResourceVersion: ""}}p.Limit = options.Limitp.Continue = options.Continuelist := e.NewListFunc()qualifiedResource := e.qualifiedResourceFromContext(ctx)if name, ok := p.MatchesSingle(); ok {if key, err := e.KeyFunc(ctx, name); err == nil {err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)return list, storeerr.InterpretListError(err, qualifiedResource)}// if we cannot extract a key based on the current context, the optimization is skipped}err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)return list, storeerr.InterpretListError(err, qualifiedResource)}// Create inserts a new item according to the unique key from the object.func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {return nil, err}// at this point we have a fully formed object. It is time to call the validators that the apiserver// handling chain wants to enforce.if createValidation != nil {if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {return nil, err}}name, err := e.ObjectNameFunc(obj)if err != nil {return nil, err}key, err := e.KeyFunc(ctx, name)if err != nil {return nil, err}qualifiedResource := e.qualifiedResourceFromContext(ctx)ttl, err := e.calculateTTL(obj, 0, false)if err != nil {return nil, err}out := e.NewFunc()if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {err = storeerr.InterpretCreateError(err, qualifiedResource, name)err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)if !apierrors.IsAlreadyExists(err) {return nil, err}if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {return nil, err}accessor, errGetAcc := meta.Accessor(out)if errGetAcc != nil {return nil, err}if accessor.GetDeletionTimestamp() != nil {msg := &err.(*apierrors.StatusError).ErrStatus.Message*msg = fmt.Sprintf("object is being deleted: %s", *msg)}return nil, err}if e.AfterCreate != nil {if err := e.AfterCreate(out); err != nil {return nil, err}}if e.Decorator != nil {if err := e.Decorator(out); err != nil {return nil, err}}return out, nil}// ShouldDeleteDuringUpdate is the default function for// checking if an object should be deleted during an update.// It checks if the new object has no finalizers,// the existing object's deletionTimestamp is set, and// the existing object's deletionGracePeriodSeconds is 0 or nilfunc ShouldDeleteDuringUpdate(ctx context.Context, key string, obj, existing runtime.Object) bool {newMeta, err := meta.Accessor(obj)if err != nil {utilruntime.HandleError(err)return false}oldMeta, err := meta.Accessor(existing)if err != nil {utilruntime.HandleError(err)return false}if len(newMeta.GetFinalizers()) > 0 {// don't delete with finalizers remaining in the new objectreturn false}if oldMeta.GetDeletionTimestamp() == nil {// don't delete if the existing object hasn't had a delete request madereturn false}// delete if the existing object has no grace period or a grace period of 0return oldMeta.GetDeletionGracePeriodSeconds() == nil || *oldMeta.GetDeletionGracePeriodSeconds() == 0}// deleteWithoutFinalizers handles deleting an object ignoring its finalizer list.// Used for objects that are either been finalized or have never initialized.func (e *Store) deleteWithoutFinalizers(ctx context.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions, dryRun bool) (runtime.Object, bool, error) {out := e.NewFunc()klog.V(6).Infof("going to delete %s from registry, triggered by update", name)// Using the rest.ValidateAllObjectFunc because the request is an UPDATE request and has already passed the admission for the UPDATE verb.if err := e.Storage.Delete(ctx, key, out, preconditions, rest.ValidateAllObjectFunc, dryRun); err != nil {// Deletion is racy, i.e., there could be multiple update// requests to remove all finalizers from the object, so we// ignore the NotFound error.if storage.IsNotFound(err) {_, err := e.finalizeDelete(ctx, obj, true)// clients are expecting an updated object if a PUT succeeded,// but finalizeDelete returns a metav1.Status, so return// the object in the request instead.return obj, false, err}return nil, false, storeerr.InterpretDeleteError(err, e.qualifiedResourceFromContext(ctx), name)}_, err := e.finalizeDelete(ctx, out, true)// clients are expecting an updated object if a PUT succeeded, but// finalizeDelete returns a metav1.Status, so return the object in// the request instead.return obj, false, err}// Update performs an atomic update and set of the object. Returns the result of the update// or an error. If the registry allows create-on-update, the create flow will be executed.// A bool is returned along with the object and any errors, to indicate object creation.func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {key, err := e.KeyFunc(ctx, name)if err != nil {return nil, false, err}var (creatingObj runtime.Objectcreating = false)qualifiedResource := e.qualifiedResourceFromContext(ctx)storagePreconditions := &storage.Preconditions{}if preconditions := objInfo.Preconditions(); preconditions != nil {storagePreconditions.UID = preconditions.UIDstoragePreconditions.ResourceVersion = preconditions.ResourceVersion}out := e.NewFunc()// deleteObj is only used in case a deletion is carried outvar deleteObj runtime.Objecterr = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {// Given the existing object, get the new objectobj, err := objInfo.UpdatedObject(ctx, existing)if err != nil {return nil, nil, err}// If AllowUnconditionalUpdate() is true and the object specified by// the user does not have a resource version, then we populate it with// the latest version. Else, we check that the version specified by// the user matches the version of latest storage object.resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)if err != nil {return nil, nil, err}doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()version, err := e.Storage.Versioner().ObjectResourceVersion(existing)if err != nil {return nil, nil, err}if version == 0 {if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {return nil, nil, apierrors.NewNotFound(qualifiedResource, name)}creating = truecreatingObj = objif err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {return nil, nil, err}// at this point we have a fully formed object. It is time to call the validators that the apiserver// handling chain wants to enforce.if createValidation != nil {if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {return nil, nil, err}}ttl, err := e.calculateTTL(obj, 0, false)if err != nil {return nil, nil, err}return obj, &ttl, nil}creating = falsecreatingObj = nilif doUnconditionalUpdate {// Update the object's resource version to match the latest// storage object's resource version.err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)if err != nil {return nil, nil, err}} else {// Check if the object's resource version matches the latest// resource version.if resourceVersion == 0 {// TODO: The Invalid error should have a field for Resource.// After that field is added, we should fill the Resource and// leave the Kind field empty. See the discussion in #18526.qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}return nil, nil, apierrors.NewInvalid(qualifiedKind, name, fieldErrList)}if resourceVersion != version {return nil, nil, apierrors.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))}}if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {return nil, nil, err}// at this point we have a fully formed object. It is time to call the validators that the apiserver// handling chain wants to enforce.if updateValidation != nil {if err := updateValidation(ctx, obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {return nil, nil, err}}// Check the default delete-during-update conditions, and store-specific conditions if providedif ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&(e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {deleteObj = objreturn nil, nil, errEmptiedFinalizers}ttl, err := e.calculateTTL(obj, res.TTL, true)if err != nil {return nil, nil, err}if int64(ttl) != res.TTL {return obj, &ttl, nil}return obj, nil, nil}, dryrun.IsDryRun(options.DryRun))if err != nil {// delete the objectif err == errEmptiedFinalizers {return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions, dryrun.IsDryRun(options.DryRun))}if creating {err = storeerr.InterpretCreateError(err, qualifiedResource, name)err = rest.CheckGeneratedNameError(e.CreateStrategy, err, creatingObj)} else {err = storeerr.InterpretUpdateError(err, qualifiedResource, name)}return nil, false, err}if creating {if e.AfterCreate != nil {if err := e.AfterCreate(out); err != nil {return nil, false, err}}} else {if e.AfterUpdate != nil {if err := e.AfterUpdate(out); err != nil {return nil, false, err}}}if e.Decorator != nil {if err := e.Decorator(out); err != nil {return nil, false, err}}return out, creating, nil}// Get retrieves the item from storage.func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {obj := e.NewFunc()key, err := e.KeyFunc(ctx, name)if err != nil {return nil, err}if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)}if e.Decorator != nil {if err := e.Decorator(obj); err != nil {return nil, err}}return obj, nil}// qualifiedResourceFromContext attempts to retrieve a GroupResource from the context's request info.// If the context has no request info, DefaultQualifiedResource is used.func (e *Store) qualifiedResourceFromContext(ctx context.Context) schema.GroupResource {if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {return schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}}// some implementations access storage directly and thus the context has no RequestInforeturn e.DefaultQualifiedResource}var (errAlreadyDeleting = fmt.Errorf("abort delete")errDeleteNow = fmt.Errorf("delete now")errEmptiedFinalizers = fmt.Errorf("emptied finalizers"))// shouldOrphanDependents returns true if the finalizer for orphaning should be set// updated for FinalizerOrphanDependents. In the order of highest to lowest// priority, there are three factors affect whether to add/remove the// FinalizerOrphanDependents: options, existing finalizers of the object,// and e.DeleteStrategy.DefaultGarbageCollectionPolicy.func shouldOrphanDependents(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) bool {// Get default GC policy from this REST object typegcStrategy, ok := e.DeleteStrategy.(rest.GarbageCollectionDeleteStrategy)var defaultGCPolicy rest.GarbageCollectionPolicyif ok {defaultGCPolicy = gcStrategy.DefaultGarbageCollectionPolicy(ctx)}if defaultGCPolicy == rest.Unsupported {// return false to indicate that we should NOT orphanreturn false}// An explicit policy was set at deletion time, that overrides everythingif options != nil && options.OrphanDependents != nil {return *options.OrphanDependents}if options != nil && options.PropagationPolicy != nil {switch *options.PropagationPolicy {case metav1.DeletePropagationOrphan:return truecase metav1.DeletePropagationBackground, metav1.DeletePropagationForeground:return false}}// If a finalizer is set in the object, it overrides the default// validation should make sure the two cases won't be true at the same time.finalizers := accessor.GetFinalizers()for _, f := range finalizers {switch f {case metav1.FinalizerOrphanDependents:return truecase metav1.FinalizerDeleteDependents:return false}}// Get default orphan policy from this REST object type if it existsif defaultGCPolicy == rest.OrphanDependents {return true}return false}// shouldDeleteDependents returns true if the finalizer for foreground deletion should be set// updated for FinalizerDeleteDependents. In the order of highest to lowest// priority, there are three factors affect whether to add/remove the// FinalizerDeleteDependents: options, existing finalizers of the object, and// e.DeleteStrategy.DefaultGarbageCollectionPolicy.func shouldDeleteDependents(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) bool {// Get default GC policy from this REST object typeif gcStrategy, ok := e.DeleteStrategy.(rest.GarbageCollectionDeleteStrategy); ok && gcStrategy.DefaultGarbageCollectionPolicy(ctx) == rest.Unsupported {// return false to indicate that we should NOT delete in foregroundreturn false}// If an explicit policy was set at deletion time, that overrides bothif options != nil && options.OrphanDependents != nil {return false}if options != nil && options.PropagationPolicy != nil {switch *options.PropagationPolicy {case metav1.DeletePropagationForeground:return truecase metav1.DeletePropagationBackground, metav1.DeletePropagationOrphan:return false}}// If a finalizer is set in the object, it overrides the default// validation has made sure the two cases won't be true at the same time.finalizers := accessor.GetFinalizers()for _, f := range finalizers {switch f {case metav1.FinalizerDeleteDependents:return truecase metav1.FinalizerOrphanDependents:return false}}return false}// deletionFinalizersForGarbageCollection analyzes the object and delete options// to determine whether the object is in need of finalization by the garbage// collector. If so, returns the set of deletion finalizers to apply and a bool// indicating whether the finalizer list has changed and is in need of updating.//// The finalizers returned are intended to be handled by the garbage collector.// If garbage collection is disabled for the store, this function returns false// to ensure finalizers aren't set which will never be cleared.func deletionFinalizersForGarbageCollection(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) (bool, []string) {if !e.EnableGarbageCollection {return false, []string{}}shouldOrphan := shouldOrphanDependents(ctx, e, accessor, options)shouldDeleteDependentInForeground := shouldDeleteDependents(ctx, e, accessor, options)newFinalizers := []string{}// first remove both finalizers, add them back if needed.for _, f := range accessor.GetFinalizers() {if f == metav1.FinalizerOrphanDependents || f == metav1.FinalizerDeleteDependents {continue}newFinalizers = append(newFinalizers, f)}if shouldOrphan {newFinalizers = append(newFinalizers, metav1.FinalizerOrphanDependents)}if shouldDeleteDependentInForeground {newFinalizers = append(newFinalizers, metav1.FinalizerDeleteDependents)}oldFinalizerSet := sets.NewString(accessor.GetFinalizers()...)newFinalizersSet := sets.NewString(newFinalizers...)if oldFinalizerSet.Equal(newFinalizersSet) {return false, accessor.GetFinalizers()}return true, newFinalizers}// markAsDeleting sets the obj's DeletionGracePeriodSeconds to 0, and sets the// DeletionTimestamp to "now" if there is no existing deletionTimestamp or if the existing// deletionTimestamp is further in future. Finalizers are watching for such updates and will// finalize the object if their IDs are present in the object's Finalizers list.func markAsDeleting(obj runtime.Object, now time.Time) (err error) {objectMeta, kerr := meta.Accessor(obj)if kerr != nil {return kerr}// This handles Generation bump for resources that don't support graceful// deletion. For resources that support graceful deletion is handle in// pkg/api/rest/delete.goif objectMeta.GetDeletionTimestamp() == nil && objectMeta.GetGeneration() > 0 {objectMeta.SetGeneration(objectMeta.GetGeneration() + 1)}existingDeletionTimestamp := objectMeta.GetDeletionTimestamp()if existingDeletionTimestamp == nil || existingDeletionTimestamp.After(now) {metaNow := metav1.NewTime(now)objectMeta.SetDeletionTimestamp(&metaNow)}var zero int64 = 0objectMeta.SetDeletionGracePeriodSeconds(&zero)return nil}// updateForGracefulDeletionAndFinalizers updates the given object for// graceful deletion and finalization by setting the deletion timestamp and// grace period seconds (graceful deletion) and updating the list of// finalizers (finalization); it returns://// 1. an error// 2. a boolean indicating that the object was not found, but it should be// ignored// 3. a boolean indicating that the object's grace period is exhausted and it// should be deleted immediately// 4. a new output object with the state that was updated// 5. a copy of the last existing state of the objectfunc (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name, key string, options *metav1.DeleteOptions, preconditions storage.Preconditions, deleteValidation rest.ValidateObjectFunc, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) {lastGraceful := int64(0)var pendingFinalizers boolout = e.NewFunc()err = e.Storage.GuaranteedUpdate(ctx,key,out,false, /* ignoreNotFound */&preconditions,storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {if err := deleteValidation(ctx, existing); err != nil {return nil, err}graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options)if err != nil {return nil, err}if pendingGraceful {return nil, errAlreadyDeleting}// Add/remove the orphan finalizer as the options dictates.// Note that this occurs after checking pendingGraceufl, so// finalizers cannot be updated via DeleteOptions if deletion has// started.existingAccessor, err := meta.Accessor(existing)if err != nil {return nil, err}needsUpdate, newFinalizers := deletionFinalizersForGarbageCollection(ctx, e, existingAccessor, options)if needsUpdate {existingAccessor.SetFinalizers(newFinalizers)}pendingFinalizers = len(existingAccessor.GetFinalizers()) != 0if !graceful {// set the DeleteGracePeriods to 0 if the object has pendingFinalizers but not supporting graceful deletionif pendingFinalizers {klog.V(6).Infof("update the DeletionTimestamp to \"now\" and GracePeriodSeconds to 0 for object %s, because it has pending finalizers", name)err = markAsDeleting(existing, time.Now())if err != nil {return nil, err}return existing, nil}return nil, errDeleteNow}lastGraceful = *options.GracePeriodSecondslastExisting = existingreturn existing, nil}),dryrun.IsDryRun(options.DryRun),)switch err {case nil:// If there are pending finalizers, we never delete the object immediately.if pendingFinalizers {return nil, false, false, out, lastExisting}if lastGraceful > 0 {return nil, false, false, out, lastExisting}// If we are here, the registry supports grace period mechanism and// we are intentionally delete gracelessly. In this case, we may// enter a race with other k8s components. If other component wins// the race, the object will not be found, and we should tolerate// the NotFound error. See// https://github.com/kubernetes/kubernetes/issues/19403 for// details.return nil, true, true, out, lastExistingcase errDeleteNow:// we've updated the object to have a zero grace period, or it's already at 0, so// we should fall through and truly delete the object.return nil, false, true, out, lastExistingcase errAlreadyDeleting:out, err = e.finalizeDelete(ctx, in, true)return err, false, false, out, lastExistingdefault:return storeerr.InterpretUpdateError(err, e.qualifiedResourceFromContext(ctx), name), false, false, out, lastExisting}}// Delete removes the item from storage.func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {key, err := e.KeyFunc(ctx, name)if err != nil {return nil, false, err}obj := e.NewFunc()qualifiedResource := e.qualifiedResourceFromContext(ctx)if err = e.Storage.Get(ctx, key, "", obj, false); err != nil {return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)}// support older consumers of delete by treating "nil" as delete immediatelyif options == nil {options = metav1.NewDeleteOptions(0)}var preconditions storage.Preconditionsif options.Preconditions != nil {preconditions.UID = options.Preconditions.UIDpreconditions.ResourceVersion = options.Preconditions.ResourceVersion}graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)if err != nil {return nil, false, err}// this means finalizers cannot be updated via DeleteOptions if a deletion is already pendingif pendingGraceful {out, err := e.finalizeDelete(ctx, obj, false)return out, false, err}// check if obj has pending finalizersaccessor, err := meta.Accessor(obj)if err != nil {return nil, false, apierrors.NewInternalError(err)}pendingFinalizers := len(accessor.GetFinalizers()) != 0var ignoreNotFound boolvar deleteImmediately bool = truevar lastExisting, out runtime.Object// Handle combinations of graceful deletion and finalization by issuing// the correct updates.shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options)// TODO: remove the check, because we support no-op updates now.if graceful || pendingFinalizers || shouldUpdateFinalizers {err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)// Update the preconditions.ResourceVersion if set since we updated the object.if err == nil && deleteImmediately && preconditions.ResourceVersion != nil {accessor, err = meta.Accessor(out)if err != nil {return out, false, apierrors.NewInternalError(err)}resourceVersion := accessor.GetResourceVersion()preconditions.ResourceVersion = &resourceVersion}}// !deleteImmediately covers all cases where err != nil. We keep both to be future-proof.if !deleteImmediately || err != nil {return out, false, err}// Going further in this function is not useful when we are// performing a dry-run request. Worse, it will actually// override "out" with the version of the object in database// that doesn't have the finalizer and deletiontimestamp set// (because the update above was dry-run too). If we already// have that version available, let's just return it now,// otherwise, we can call dry-run delete that will get us the// latest version of the object.if dryrun.IsDryRun(options.DryRun) && out != nil {return out, true, nil}// delete immediately, or no graceful deletion supportedklog.V(6).Infof("going to delete %s from registry: ", name)out = e.NewFunc()if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun)); err != nil {// Please refer to the place where we set ignoreNotFound for the reason// why we ignore the NotFound error .if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {// The lastExisting object may not be the last state of the object// before its deletion, but it's the best approximation.out, err := e.finalizeDelete(ctx, lastExisting, true)return out, true, err}return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)}out, err = e.finalizeDelete(ctx, out, true)return out, true, err}// DeleteReturnsDeletedObject implements the rest.MayReturnFullObjectDeleter interfacefunc (e *Store) DeleteReturnsDeletedObject() bool {return e.ReturnDeletedObject}// DeleteCollection removes all items returned by List with a given ListOptions from storage.//// DeleteCollection is currently NOT atomic. It can happen that only subset of objects// will be deleted from storage, and then an error will be returned.// In case of success, the list of deleted objects will be returned.//// TODO: Currently, there is no easy way to remove 'directory' entry from storage (if we// are removing all objects of a given type) with the current API (it's technically// possibly with storage API, but watch is not delivered correctly then).// It will be possible to fix it with v3 etcd API.func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {if listOptions == nil {listOptions = &metainternalversion.ListOptions{}} else {listOptions = listOptions.DeepCopy()}listObj, err := e.List(ctx, listOptions)if err != nil {return nil, err}items, err := meta.ExtractList(listObj)if err != nil {return nil, err}// Spawn a number of goroutines, so that we can issue requests to storage// in parallel to speed up deletion.// TODO: Make this proportional to the number of items to delete, up to// DeleteCollectionWorkers (it doesn't make much sense to spawn 16// workers to delete 10 items).workersNumber := e.DeleteCollectionWorkersif workersNumber < 1 {workersNumber = 1}wg := sync.WaitGroup{}toProcess := make(chan int, 2*workersNumber)errs := make(chan error, workersNumber+1)go func() {defer utilruntime.HandleCrash(func(panicReason interface{}) {errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason)})for i := 0; i < len(items); i++ {toProcess <- i}close(toProcess)}()wg.Add(workersNumber)for i := 0; i < workersNumber; i++ {go func() {// panics don't cross goroutine boundariesdefer utilruntime.HandleCrash(func(panicReason interface{}) {errs <- fmt.Errorf("DeleteCollection goroutine panicked: %v", panicReason)})defer wg.Done()for index := range toProcess {accessor, err := meta.Accessor(items[index])if err != nil {errs <- errreturn}if _, _, err := e.Delete(ctx, accessor.GetName(), deleteValidation, options); err != nil && !apierrors.IsNotFound(err) {klog.V(4).Infof("Delete %s in DeleteCollection failed: %v", accessor.GetName(), err)errs <- errreturn}}}()}wg.Wait()select {case err := <-errs:return nil, errdefault:return listObj, nil}}// finalizeDelete runs the Store's AfterDelete hook if runHooks is set and// returns the decorated deleted object if appropriate.func (e *Store) finalizeDelete(ctx context.Context, obj runtime.Object, runHooks bool) (runtime.Object, error) {if runHooks && e.AfterDelete != nil {if err := e.AfterDelete(obj); err != nil {return nil, err}}if e.ReturnDeletedObject {if e.Decorator != nil {if err := e.Decorator(obj); err != nil {return nil, err}}return obj, nil}// Return information about the deleted object, which enables clients to// verify that the object was actually deleted and not waiting for finalizers.accessor, err := meta.Accessor(obj)if err != nil {return nil, err}qualifiedResource := e.qualifiedResourceFromContext(ctx)details := &metav1.StatusDetails{Name: accessor.GetName(),Group: qualifiedResource.Group,Kind: qualifiedResource.Resource, // Yes we set Kind field to resource.UID: accessor.GetUID(),}status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}return status, nil}// Watch makes a matcher for the given label and field, and calls// WatchPredicate. If possible, you should customize PredicateFunc to produce// a matcher that matches by key. SelectionPredicate does this for you// automatically.func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {label := labels.Everything()if options != nil && options.LabelSelector != nil {label = options.LabelSelector}field := fields.Everything()if options != nil && options.FieldSelector != nil {field = options.FieldSelector}predicate := e.PredicateFunc(label, field)resourceVersion := ""if options != nil {resourceVersion = options.ResourceVersionpredicate.AllowWatchBookmarks = options.AllowWatchBookmarks}return e.WatchPredicate(ctx, predicate, resourceVersion)}// WatchPredicate starts a watch for the items that matches.func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {if name, ok := p.MatchesSingle(); ok {if key, err := e.KeyFunc(ctx, name); err == nil {w, err := e.Storage.Watch(ctx, key, resourceVersion, p)if err != nil {return nil, err}if e.Decorator != nil {return newDecoratedWatcher(w, e.Decorator), nil}return w, nil}// if we cannot extract a key based on the current context, the// optimization is skipped}w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)if err != nil {return nil, err}if e.Decorator != nil {return newDecoratedWatcher(w, e.Decorator), nil}return w, nil}// calculateTTL is a helper for retrieving the updated TTL for an object or// returning an error if the TTL cannot be calculated. The defaultTTL is// changed to 1 if less than zero. Zero means no TTL, not expire immediately.func (e *Store) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) {// TODO: validate this is assertion is still valid.// etcd may return a negative TTL for a node if the expiration has not// occurred due to server lag - we will ensure that the value is at least// set.if defaultTTL < 0 {defaultTTL = 1}ttl = uint64(defaultTTL)if e.TTLFunc != nil {ttl, err = e.TTLFunc(obj, ttl, update)}return ttl, err}// exportObjectMeta unsets the fields on the given object that should not be// present when the object is exported.func exportObjectMeta(accessor metav1.Object, exact bool) {accessor.SetUID("")if !exact {accessor.SetNamespace("")}accessor.SetCreationTimestamp(metav1.Time{})accessor.SetDeletionTimestamp(nil)accessor.SetResourceVersion("")accessor.SetSelfLink("")if len(accessor.GetGenerateName()) > 0 && !exact {accessor.SetName("")}}// Export implements the rest.Exporter interfacefunc (e *Store) Export(ctx context.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {obj, err := e.Get(ctx, name, &metav1.GetOptions{})if err != nil {return nil, err}if accessor, err := meta.Accessor(obj); err == nil {exportObjectMeta(accessor, opts.Exact)} else {klog.V(4).Infof("Object of type %v does not have ObjectMeta: %v", reflect.TypeOf(obj), err)}if e.ExportStrategy != nil {if err = e.ExportStrategy.Export(ctx, obj, opts.Exact); err != nil {return nil, err}} else {e.CreateStrategy.PrepareForCreate(ctx, obj)}return obj, nil}// CompleteWithOptions updates the store with the provided options and// defaults common fields.func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {if e.DefaultQualifiedResource.Empty() {return fmt.Errorf("store %#v must have a non-empty qualified resource", e)}if e.NewFunc == nil {return fmt.Errorf("store for %s must have NewFunc set", e.DefaultQualifiedResource.String())}if e.NewListFunc == nil {return fmt.Errorf("store for %s must have NewListFunc set", e.DefaultQualifiedResource.String())}if (e.KeyRootFunc == nil) != (e.KeyFunc == nil) {return fmt.Errorf("store for %s must set both KeyRootFunc and KeyFunc or neither", e.DefaultQualifiedResource.String())}var isNamespaced boolswitch {case e.CreateStrategy != nil:isNamespaced = e.CreateStrategy.NamespaceScoped()case e.UpdateStrategy != nil:isNamespaced = e.UpdateStrategy.NamespaceScoped()default:return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())}if e.DeleteStrategy == nil {return fmt.Errorf("store for %s must have DeleteStrategy set", e.DefaultQualifiedResource.String())}if options.RESTOptions == nil {return fmt.Errorf("options for %s must have RESTOptions set", e.DefaultQualifiedResource.String())}attrFunc := options.AttrFuncif attrFunc == nil {if isNamespaced {attrFunc = storage.DefaultNamespaceScopedAttr} else {attrFunc = storage.DefaultClusterScopedAttr}}if e.PredicateFunc == nil {e.PredicateFunc = func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {return storage.SelectionPredicate{Label: label,Field: field,GetAttrs: attrFunc,}}}err := validateIndexers(options.Indexers)if err != nil {return err}opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)if err != nil {return err}// ResourcePrefix must come from the underlying factoryprefix := opts.ResourcePrefixif !strings.HasPrefix(prefix, "/") {prefix = "/" + prefix}if prefix == "/" {return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)}// Set the default behavior for storage key generationif e.KeyRootFunc == nil && e.KeyFunc == nil {if isNamespaced {e.KeyRootFunc = func(ctx context.Context) string {return NamespaceKeyRootFunc(ctx, prefix)}e.KeyFunc = func(ctx context.Context, name string) (string, error) {return NamespaceKeyFunc(ctx, prefix, name)}} else {e.KeyRootFunc = func(ctx context.Context) string {return prefix}e.KeyFunc = func(ctx context.Context, name string) (string, error) {return NoNamespaceKeyFunc(ctx, prefix, name)}}}// We adapt the store's keyFunc so that we can use it with the StorageDecorator// without making any assumptions about where objects are stored in etcdkeyFunc := func(obj runtime.Object) (string, error) {accessor, err := meta.Accessor(obj)if err != nil {return "", err}if isNamespaced {return e.KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())}return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())}if e.DeleteCollectionWorkers == 0 {e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers}e.EnableGarbageCollection = opts.EnableGarbageCollectionif e.ObjectNameFunc == nil {e.ObjectNameFunc = func(obj runtime.Object) (string, error) {accessor, err := meta.Accessor(obj)if err != nil {return "", err}return accessor.GetName(), nil}}if e.Storage.Storage == nil {e.Storage.Codec = opts.StorageConfig.Codecvar err errore.Storage.Storage, e.DestroyFunc, err = opts.Decorator(opts.StorageConfig,prefix,keyFunc,e.NewFunc,e.NewListFunc,attrFunc,options.TriggerFunc,options.Indexers,)if err != nil {return err}e.StorageVersioner = opts.StorageConfig.EncodeVersionerif opts.CountMetricPollPeriod > 0 {stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)previousDestroy := e.DestroyFunce.DestroyFunc = func() {stopFunc()if previousDestroy != nil {previousDestroy()}}}}return nil}// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.func (e *Store) startObservingCount(period time.Duration) func() {prefix := e.KeyRootFunc(genericapirequest.NewContext())resourceName := e.DefaultQualifiedResource.String()klog.V(2).Infof("Monitoring %v count at <storage-prefix>/%v", resourceName, prefix)stopCh := make(chan struct{})go wait.JitterUntil(func() {count, err := e.Storage.Count(prefix)if err != nil {klog.V(5).Infof("Failed to update storage count metric: %v", err)metrics.UpdateObjectCount(resourceName, -1)} else {metrics.UpdateObjectCount(resourceName, count)}}, period, resourceCountPollPeriodJitter, true, stopCh)return func() { close(stopCh) }}func (e *Store) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {if e.TableConvertor != nil {return e.TableConvertor.ConvertToTable(ctx, object, tableOptions)}return rest.NewDefaultTableConvertor(e.qualifiedResourceFromContext(ctx)).ConvertToTable(ctx, object, tableOptions)}func (e *Store) StorageVersion() runtime.GroupVersioner {return e.StorageVersioner}// validateIndexers will check the prefix of indexers.func validateIndexers(indexers *cache.Indexers) error {if indexers == nil {return nil}for indexName := range *indexers {if len(indexName) <= 2 || (indexName[:2] != "l:" && indexName[:2] != "f:") {return fmt.Errorf("index must prefix with \"l:\" or \"f:\"")}}return nil}
