0、架构图

1、存储相关配置

1.1、资源与版本

1.1.1、资源与版本的启用

  1. // ResourceConfig记录了每个Group启用了哪个版本以及哪个具体的GVK是否启用
  2. // 用gv/gvk map来维护信息
  3. // eg. 若当前ResourceConfig记录信息如下
  4. // GroupVersionConfigs:
  5. // testGroup01/v1 true
  6. // testGroup01/v1beta1 false
  7. // testGroup02/v1 true
  8. //
  9. // testGroup01/v1/type01 false
  10. //
  11. // 则版本testGroup01/v1 testGroup02/v1处于启用状态
  12. // testGroup01/v1beta1处于禁用状态(禁用状态不会生成相应的rest api)
  13. // 由于testGroup01/v1/type01为false,则testGroup01/v1虽然启用,但是其资源type01
  14. // 处于禁用状态,则不会生成testGroup01/v1/type01的rest api
  15. type ResourceConfig struct {
  16. GroupVersionConfigs map[schema.GroupVersion]bool
  17. ResourceConfigs map[schema.GroupVersionResource]bool
  18. }
  19. func NewResourceConfig() *ResourceConfig {
  20. return &ResourceConfig{GroupVersionConfigs: map[schema.GroupVersion]bool{}, ResourceConfigs: map[schema.GroupVersionResource]bool{}}
  21. }
  22. func (o *ResourceConfig) DisableAll() {
  23. for k := range o.GroupVersionConfigs {
  24. o.GroupVersionConfigs[k] = false
  25. }
  26. }
  27. func (o *ResourceConfig) EnableAll() {
  28. for k := range o.GroupVersionConfigs {
  29. o.GroupVersionConfigs[k] = true
  30. }
  31. }
  32. func (o *ResourceConfig) DisableMatchingVersions(matcher func(gv schema.GroupVersion) bool) {
  33. for k := range o.GroupVersionConfigs {
  34. if matcher(k) {
  35. o.GroupVersionConfigs[k] = false
  36. }
  37. }
  38. }
  39. func (o *ResourceConfig) EnableMatchingVersions(matcher func(gv schema.GroupVersion) bool) {
  40. for k := range o.GroupVersionConfigs {
  41. if matcher(k) {
  42. o.GroupVersionConfigs[k] = true
  43. }
  44. }
  45. }
  46. func (o *ResourceConfig) DisableVersions(versions ...schema.GroupVersion) {
  47. for _, version := range versions {
  48. o.GroupVersionConfigs[version] = false
  49. }
  50. }
  51. func (o *ResourceConfig) EnableVersions(versions ...schema.GroupVersion) {
  52. for _, version := range versions {
  53. o.GroupVersionConfigs[version] = true
  54. }
  55. }
  56. func (o *ResourceConfig) VersionEnabled(version schema.GroupVersion) bool {
  57. enabled, _ := o.GroupVersionConfigs[version]
  58. if enabled {
  59. return true
  60. }
  61. return false
  62. }
  63. func (o *ResourceConfig) DisableResources(resources ...schema.GroupVersionResource) {
  64. for _, resource := range resources {
  65. o.ResourceConfigs[resource] = false
  66. }
  67. }
  68. func (o *ResourceConfig) EnableResources(resources ...schema.GroupVersionResource) {
  69. for _, resource := range resources {
  70. o.ResourceConfigs[resource] = true
  71. }
  72. }
  73. func (o *ResourceConfig) ResourceEnabled(resource schema.GroupVersionResource) bool {
  74. if !o.VersionEnabled(resource.GroupVersion()) {
  75. return false
  76. }
  77. resourceEnabled, explicitlySet := o.ResourceConfigs[resource]
  78. if !explicitlySet {
  79. return true
  80. }
  81. return resourceEnabled
  82. }
  83. func (o *ResourceConfig) AnyVersionForGroupEnabled(group string) bool {
  84. for version := range o.GroupVersionConfigs {
  85. if version.Group == group {
  86. if o.VersionEnabled(version) {
  87. return true
  88. }
  89. }
  90. }
  91. return false
  92. }

1.1.2、资源的存储版本和内存版本

  1. // DefaultResourceEncodingConfig记录了每个gv的存储版本和内存版本,
  2. // 用以生成相应的存储序列化器和反序列化器
  3. // 此处的配置叫OverridingResourceEncoding。即覆盖配置。因为若无配置,则默认系统会用
  4. // v1作为存储版本,__internal用作内存版本(因为一般自定义资源注册版本优先级的时候v1版本优先级最高)
  5. // 若用户有自定义,则用OverridingResourceEncoding覆盖默认配置
  6. type DefaultResourceEncodingConfig struct {
  7. resources map[schema.GroupResource]*OverridingResourceEncoding
  8. scheme *runtime.Scheme
  9. }
  10. type OverridingResourceEncoding struct {
  11. ExternalResourceEncoding schema.GroupVersion
  12. InternalResourceEncoding schema.GroupVersion
  13. }
  14. func NewDefaultResourceEncodingConfig(scheme *runtime.Scheme) *DefaultResourceEncodingConfig {
  15. return &DefaultResourceEncodingConfig{resources: map[schema.GroupResource]*OverridingResourceEncoding{}, scheme: scheme}
  16. }
  17. func (o *DefaultResourceEncodingConfig) SetResourceEncoding(resourceBeingStored schema.GroupResource, externalEncodingVersion, internalVersion schema.GroupVersion) {
  18. o.resources[resourceBeingStored] = &OverridingResourceEncoding{
  19. ExternalResourceEncoding: externalEncodingVersion,
  20. InternalResourceEncoding: internalVersion,
  21. }
  22. }
  23. func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
  24. if !o.scheme.IsGroupRegistered(resource.Group) {
  25. return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
  26. }
  27. resourceOverride, resourceExists := o.resources[resource]
  28. if resourceExists {
  29. return resourceOverride.ExternalResourceEncoding, nil
  30. }
  31. return o.scheme.PrioritizedVersionsForGroup(resource.Group)[0], nil
  32. }
  33. func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
  34. if !o.scheme.IsGroupRegistered(resource.Group) {
  35. return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
  36. }
  37. resourceOverride, resourceExists := o.resources[resource]
  38. if resourceExists {
  39. return resourceOverride.InternalResourceEncoding, nil
  40. }
  41. return schema.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil
  42. }

1.2、存储序列化器/反序列化器

  1. type StorageCodecConfig struct {
  2. StorageMediaType string // 存储类型,默认json
  3. StorageSerializer runtime.StorageSerializer // 全局序列化器
  4. StorageVersion schema.GroupVersion // 存储版本
  5. MemoryVersion schema.GroupVersion // 内存版本
  6. Config storagebackend.Config // 后端存储配置
  7. EncoderDecoratorFn func(runtime.Encoder) runtime.Encoder
  8. DecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
  9. }
  10. func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, runtime.GroupVersioner, error) {
  11. mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
  12. if err != nil {
  13. return nil, nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
  14. }
  15. // 获取给定类型的解码器
  16. serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
  17. if !ok {
  18. return nil, nil, fmt.Errorf("unable to find serializer for %q", mediaType)
  19. }
  20. s := serializer.Serializer
  21. // wrapper
  22. var encoder runtime.Encoder = s
  23. if opts.EncoderDecoratorFn != nil {
  24. encoder = opts.EncoderDecoratorFn(encoder)
  25. }
  26. decoders := []runtime.Decoder{
  27. s,
  28. opts.StorageSerializer.UniversalDeserializer(),
  29. runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()),
  30. }
  31. if opts.DecoderDecoratorFn != nil {
  32. decoders = opts.DecoderDecoratorFn(decoders)
  33. }
  34. // encodeVersioner可以认为等同于返回存储版本的gvk
  35. encodeVersioner := runtime.NewMultiGroupVersioner(
  36. opts.StorageVersion,
  37. schema.GroupKind{Group: opts.StorageVersion.Group},
  38. schema.GroupKind{Group: opts.MemoryVersion.Group},
  39. )
  40. encoder = opts.StorageSerializer.EncoderForVersion(
  41. encoder,
  42. encodeVersioner,
  43. )
  44. // 由于不确定etcd存储的是什么格式的数据,因此解码器是所有解码器的合集
  45. // 解码要先解析格式再进行解码
  46. // decodeVersioner一般也默认为内部版本
  47. decoder := opts.StorageSerializer.DecoderToVersion(
  48. recognizer.NewDecoder(decoders...),
  49. runtime.NewCoercingMultiGroupVersioner(
  50. opts.MemoryVersion,
  51. schema.GroupKind{Group: opts.MemoryVersion.Group},
  52. schema.GroupKind{Group: opts.StorageVersion.Group},
  53. ),
  54. )
  55. return runtime.NewCodec(encoder, decoder), encodeVersioner, nil
  56. }

1.3、存储生成工厂

  1. const AllResources = "*"
  2. type StorageFactory interface {
  3. // 根据给定gr生成对应的后端配置文件(包含了对应的序列化器和反序列化器)
  4. // 然后根据利用kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go生成存储
  5. NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error)
  6. ResourcePrefix(groupResource schema.GroupResource) string
  7. // 返回所有的etcd后端节点列表
  8. Backends() []Backend
  9. }
  10. // etcd配置,用以生成etcd客户端
  11. type Backend struct {
  12. // 形如: https://etcd.domain:2379、http://etcd.domain:2379
  13. Server string
  14. TLSConfig *tls.Config
  15. }
  16. type DefaultStorageFactory struct {
  17. // 保存了etcd需要的通用信息
  18. StorageConfig storagebackend.Config
  19. // 资源覆盖(包含存储版本等需要覆盖的信息)
  20. Overrides map[schema.GroupResource]groupResourceOverrides
  21. // 默认的gv存储前缀信息
  22. DefaultResourcePrefixes map[schema.GroupResource]string
  23. // 默认的存储类型
  24. DefaultMediaType string
  25. // 全局序列化器
  26. DefaultSerializer runtime.StorageSerializer
  27. // 参见1.1.1
  28. ResourceEncodingConfig ResourceEncodingConfig
  29. // 参见1.1.2
  30. APIResourceConfigSource APIResourceConfigSource
  31. // 为1.2里面的NewStorageCodec
  32. newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
  33. }
  34. func NewDefaultStorageFactory(
  35. config storagebackend.Config,
  36. defaultMediaType string,
  37. defaultSerializer runtime.StorageSerializer,
  38. resourceEncodingConfig ResourceEncodingConfig,
  39. resourceConfig APIResourceConfigSource,
  40. specialDefaultResourcePrefixes map[schema.GroupResource]string,
  41. ) *DefaultStorageFactory {
  42. config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
  43. if len(defaultMediaType) == 0 {
  44. defaultMediaType = runtime.ContentTypeJSON
  45. }
  46. return &DefaultStorageFactory{
  47. StorageConfig: config,
  48. Overrides: map[schema.GroupResource]groupResourceOverrides{},
  49. DefaultMediaType: defaultMediaType,
  50. DefaultSerializer: defaultSerializer,
  51. ResourceEncodingConfig: resourceEncodingConfig,
  52. APIResourceConfigSource: resourceConfig,
  53. DefaultResourcePrefixes: specialDefaultResourcePrefixes,
  54. newStorageCodecFn: NewStorageCodec,
  55. }
  56. }
  57. // 根据给定的gr返回对应的后端存储配置文件
  58. func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
  59. chosenStorageResource := s.getStorageGroupResource(groupResource)
  60. storageConfig := s.StorageConfig
  61. codecConfig := StorageCodecConfig{
  62. StorageMediaType: s.DefaultMediaType,
  63. StorageSerializer: s.DefaultSerializer,
  64. }
  65. // 先覆盖group级别的参数
  66. if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
  67. override.Apply(&storageConfig, &codecConfig)
  68. }
  69. // 再覆盖gr级别的参数
  70. if override, ok := s.Overrides[chosenStorageResource]; ok {
  71. override.Apply(&storageConfig, &codecConfig)
  72. }
  73. var err error
  74. codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
  75. if err != nil {
  76. return nil, err
  77. }
  78. codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
  79. if err != nil {
  80. return nil, err
  81. }
  82. codecConfig.Config = storageConfig
  83. storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)
  84. if err != nil {
  85. return nil, err
  86. }
  87. klog.V(3).Infof("storing %v in %v, reading as %v from %#v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
  88. return &storageConfig, nil
  89. }
  90. // Backends returns all backends for all registered storage destinations.
  91. // Used for getting all instances for health validations.
  92. func (s *DefaultStorageFactory) Backends() []Backend {
  93. servers := sets.NewString(s.StorageConfig.Transport.ServerList...)
  94. for _, overrides := range s.Overrides {
  95. servers.Insert(overrides.etcdLocation...)
  96. }
  97. tlsConfig := &tls.Config{
  98. InsecureSkipVerify: true,
  99. }
  100. if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {
  101. cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)
  102. if err != nil {
  103. klog.Errorf("failed to load key pair while getting backends: %s", err)
  104. } else {
  105. tlsConfig.Certificates = []tls.Certificate{cert}
  106. }
  107. }
  108. if len(s.StorageConfig.Transport.TrustedCAFile) > 0 {
  109. if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil {
  110. klog.Errorf("failed to read ca file while getting backends: %s", err)
  111. } else {
  112. caPool := x509.NewCertPool()
  113. caPool.AppendCertsFromPEM(caCert)
  114. tlsConfig.RootCAs = caPool
  115. tlsConfig.InsecureSkipVerify = false
  116. }
  117. }
  118. backends := []Backend{}
  119. for server := range servers {
  120. backends = append(backends, Backend{
  121. Server: server,
  122. // We can't share TLSConfig across different backends to avoid races.
  123. // For more details see: http://pr.k8s.io/59338
  124. TLSConfig: tlsConfig.Clone(),
  125. })
  126. }
  127. return backends
  128. }
  129. func (s *DefaultStorageFactory) ResourcePrefix(groupResource schema.GroupResource) string {
  130. chosenStorageResource := s.getStorageGroupResource(groupResource)
  131. groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
  132. exactResourceOverride := s.Overrides[chosenStorageResource]
  133. etcdResourcePrefix := s.DefaultResourcePrefixes[chosenStorageResource]
  134. if len(groupOverride.etcdResourcePrefix) > 0 {
  135. etcdResourcePrefix = groupOverride.etcdResourcePrefix
  136. }
  137. if len(exactResourceOverride.etcdResourcePrefix) > 0 {
  138. etcdResourcePrefix = exactResourceOverride.etcdResourcePrefix
  139. }
  140. if len(etcdResourcePrefix) == 0 {
  141. etcdResourcePrefix = strings.ToLower(chosenStorageResource.Resource)
  142. }
  143. return etcdResourcePrefix
  144. }
  145. // 以下接口为DefaultStorageFactory开放给用户自定义的func
  146. // 设置etcd地址(给指定gv指定etcd机器)
  147. func (s *DefaultStorageFactory) SetEtcdLocation(groupResource schema.GroupResource, location []string) {
  148. overrides := s.Overrides[groupResource]
  149. overrides.etcdLocation = location
  150. s.Overrides[groupResource] = overrides
  151. }
  152. // 设置etcd的前缀
  153. func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource schema.GroupResource, prefix string) {
  154. overrides := s.Overrides[groupResource]
  155. overrides.etcdPrefix = prefix
  156. s.Overrides[groupResource] = overrides
  157. }
  158. // 设置disablePaging
  159. func (s *DefaultStorageFactory) SetDisableAPIListChunking(groupResource schema.GroupResource) {
  160. overrides := s.Overrides[groupResource]
  161. overrides.disablePaging = true
  162. s.Overrides[groupResource] = overrides
  163. }
  164. // 设置指定资源的前缀
  165. // 则整体etcd前缀为etcdPrefix/etcdResourcePrefix
  166. func (s *DefaultStorageFactory) SetResourceEtcdPrefix(groupResource schema.GroupResource, prefix string) {
  167. overrides := s.Overrides[groupResource]
  168. overrides.etcdResourcePrefix = prefix
  169. s.Overrides[groupResource] = overrides
  170. }
  171. // 设置全局序列化器/序列化类型
  172. func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource, mediaType string, serializer runtime.StorageSerializer) {
  173. overrides := s.Overrides[groupResource]
  174. overrides.mediaType = mediaType
  175. overrides.serializer = serializer
  176. s.Overrides[groupResource] = overrides
  177. }
  178. // 设置Transformer
  179. func (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) {
  180. overrides := s.Overrides[groupResource]
  181. overrides.transformer = transformer
  182. s.Overrides[groupResource] = overrides
  183. }
  184. // 增加关联关系资源
  185. func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) {
  186. for _, groupResource := range groupResources {
  187. overrides := s.Overrides[groupResource]
  188. overrides.cohabitatingResources = groupResources
  189. s.Overrides[groupResource] = overrides
  190. }
  191. }
  192. // 设置序列化/反序列化器wrapper
  193. func (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) {
  194. for _, groupResource := range groupResources {
  195. overrides := s.Overrides[groupResource]
  196. overrides.encoderDecoratorFn = encoderDecoratorFn
  197. overrides.decoderDecoratorFn = decoderDecoratorFn
  198. s.Overrides[groupResource] = overrides
  199. }
  200. }
  201. func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.GroupResource) schema.GroupResource {
  202. for _, potentialStorageResource := range s.Overrides[groupResource].cohabitatingResources {
  203. if s.APIResourceConfigSource.AnyVersionForGroupEnabled(potentialStorageResource.Group) {
  204. return potentialStorageResource
  205. }
  206. }
  207. return groupResource
  208. }
  209. type groupResourceOverrides struct {
  210. etcdLocation []string // etcd机器地址
  211. etcdPrefix string // etcd前缀
  212. etcdResourcePrefix string // etcd前缀
  213. mediaType string // 序列化/反序列化类型
  214. serializer runtime.StorageSerializer // 全局序列化器
  215. // 资源关联信息。比如deployment有apps组,extentions组。
  216. // 若apps没有启用,则会用extentions的覆盖信息进行覆盖
  217. // 内置的一些关联关系参加1.3.1.1 pic1
  218. cohabitatingResources []schema.GroupResource
  219. encoderDecoratorFn func(runtime.Encoder) runtime.Encoder // 序列化器wrapper
  220. decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder // 反序列化器wrapper
  221. transformer value.Transformer // transformer
  222. disablePaging bool // disablePaging
  223. }
  224. // 覆盖配置
  225. func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {
  226. if len(o.etcdLocation) > 0 {
  227. config.Transport.ServerList = o.etcdLocation
  228. }
  229. if len(o.etcdPrefix) > 0 {
  230. config.Prefix = o.etcdPrefix
  231. }
  232. if len(o.mediaType) > 0 {
  233. options.StorageMediaType = o.mediaType
  234. }
  235. if o.serializer != nil {
  236. options.StorageSerializer = o.serializer
  237. }
  238. if o.encoderDecoratorFn != nil {
  239. options.EncoderDecoratorFn = o.encoderDecoratorFn
  240. }
  241. if o.decoderDecoratorFn != nil {
  242. options.DecoderDecoratorFn = o.decoderDecoratorFn
  243. }
  244. if o.transformer != nil {
  245. config.Transformer = o.transformer
  246. }
  247. if o.disablePaging {
  248. config.Paging = false
  249. }
  250. }
  251. func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource {
  252. return schema.GroupResource{Group: resource.Group, Resource: AllResources}
  253. }

1.3.1、pic

1.3.1.1、pic1

image.png

2、generic registry

存储的实现分为以下两层
1、kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
封装了用户定义的一些存储之前的逻辑,比如创建策略等
2、kubernetes/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
真正的存储,封装了etcd客户端以及序列化与反序列化等操作

2.1、interface

2.1.1、Create

2.2、implement

  1. // ObjectFunc is a function to act on a given object. An error may be returned
  2. // if the hook cannot be completed. An ObjectFunc may transform the provided
  3. // object.
  4. type ObjectFunc func(obj runtime.Object) error
  5. // GenericStore interface can be used for type assertions when we need to access the underlying strategies.
  6. type GenericStore interface {
  7. GetCreateStrategy() rest.RESTCreateStrategy
  8. GetUpdateStrategy() rest.RESTUpdateStrategy
  9. GetDeleteStrategy() rest.RESTDeleteStrategy
  10. GetExportStrategy() rest.RESTExportStrategy
  11. }
  12. // Store implements pkg/api/rest.StandardStorage. It's intended to be
  13. // embeddable and allows the consumer to implement any non-generic functions
  14. // that are required. This object is intended to be copyable so that it can be
  15. // used in different ways but share the same underlying behavior.
  16. //
  17. // All fields are required unless specified.
  18. //
  19. // The intended use of this type is embedding within a Kind specific
  20. // RESTStorage implementation. This type provides CRUD semantics on a Kubelike
  21. // resource, handling details like conflict detection with ResourceVersion and
  22. // semantics. The RESTCreateStrategy, RESTUpdateStrategy, and
  23. // RESTDeleteStrategy are generic across all backends, and encapsulate logic
  24. // specific to the API.
  25. //
  26. // TODO: make the default exposed methods exactly match a generic RESTStorage
  27. type Store struct {
  28. // NewFunc returns a new instance of the type this registry returns for a
  29. // GET of a single object, e.g.:
  30. //
  31. // curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
  32. NewFunc func() runtime.Object
  33. // NewListFunc returns a new list of the type this registry; it is the
  34. // type returned when the resource is listed, e.g.:
  35. //
  36. // curl GET /apis/group/version/namespaces/my-ns/myresource
  37. NewListFunc func() runtime.Object
  38. // DefaultQualifiedResource is the pluralized name of the resource.
  39. // This field is used if there is no request info present in the context.
  40. // See qualifiedResourceFromContext for details.
  41. DefaultQualifiedResource schema.GroupResource
  42. // KeyRootFunc returns the root etcd key for this resource; should not
  43. // include trailing "/". This is used for operations that work on the
  44. // entire collection (listing and watching).
  45. //
  46. // KeyRootFunc and KeyFunc must be supplied together or not at all.
  47. KeyRootFunc func(ctx context.Context) string
  48. // KeyFunc returns the key for a specific object in the collection.
  49. // KeyFunc is called for Create/Update/Get/Delete. Note that 'namespace'
  50. // can be gotten from ctx.
  51. //
  52. // KeyFunc and KeyRootFunc must be supplied together or not at all.
  53. KeyFunc func(ctx context.Context, name string) (string, error)
  54. // ObjectNameFunc returns the name of an object or an error.
  55. ObjectNameFunc func(obj runtime.Object) (string, error)
  56. // TTLFunc returns the TTL (time to live) that objects should be persisted
  57. // with. The existing parameter is the current TTL or the default for this
  58. // operation. The update parameter indicates whether this is an operation
  59. // against an existing object.
  60. //
  61. // Objects that are persisted with a TTL are evicted once the TTL expires.
  62. TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
  63. // PredicateFunc returns a matcher corresponding to the provided labels
  64. // and fields. The SelectionPredicate returned should return true if the
  65. // object matches the given field and label selectors.
  66. PredicateFunc func(label labels.Selector, field fields.Selector) storage.SelectionPredicate
  67. // EnableGarbageCollection affects the handling of Update and Delete
  68. // requests. Enabling garbage collection allows finalizers to do work to
  69. // finalize this object before the store deletes it.
  70. //
  71. // If any store has garbage collection enabled, it must also be enabled in
  72. // the kube-controller-manager.
  73. EnableGarbageCollection bool
  74. // DeleteCollectionWorkers is the maximum number of workers in a single
  75. // DeleteCollection call. Delete requests for the items in a collection
  76. // are issued in parallel.
  77. DeleteCollectionWorkers int
  78. // Decorator is an optional exit hook on an object returned from the
  79. // underlying storage. The returned object could be an individual object
  80. // (e.g. Pod) or a list type (e.g. PodList). Decorator is intended for
  81. // integrations that are above storage and should only be used for
  82. // specific cases where storage of the value is not appropriate, since
  83. // they cannot be watched.
  84. Decorator ObjectFunc
  85. // CreateStrategy implements resource-specific behavior during creation.
  86. CreateStrategy rest.RESTCreateStrategy
  87. // AfterCreate implements a further operation to run after a resource is
  88. // created and before it is decorated, optional.
  89. AfterCreate ObjectFunc
  90. // UpdateStrategy implements resource-specific behavior during updates.
  91. UpdateStrategy rest.RESTUpdateStrategy
  92. // AfterUpdate implements a further operation to run after a resource is
  93. // updated and before it is decorated, optional.
  94. AfterUpdate ObjectFunc
  95. // DeleteStrategy implements resource-specific behavior during deletion.
  96. DeleteStrategy rest.RESTDeleteStrategy
  97. // AfterDelete implements a further operation to run after a resource is
  98. // deleted and before it is decorated, optional.
  99. AfterDelete ObjectFunc
  100. // ReturnDeletedObject determines whether the Store returns the object
  101. // that was deleted. Otherwise, return a generic success status response.
  102. ReturnDeletedObject bool
  103. // ShouldDeleteDuringUpdate is an optional function to determine whether
  104. // an update from existing to obj should result in a delete.
  105. // If specified, this is checked in addition to standard finalizer,
  106. // deletionTimestamp, and deletionGracePeriodSeconds checks.
  107. ShouldDeleteDuringUpdate func(ctx context.Context, key string, obj, existing runtime.Object) bool
  108. // ExportStrategy implements resource-specific behavior during export,
  109. // optional. Exported objects are not decorated.
  110. ExportStrategy rest.RESTExportStrategy
  111. // TableConvertor is an optional interface for transforming items or lists
  112. // of items into tabular output. If unset, the default will be used.
  113. TableConvertor rest.TableConvertor
  114. // Storage is the interface for the underlying storage for the
  115. // resource. It is wrapped into a "DryRunnableStorage" that will
  116. // either pass-through or simply dry-run.
  117. Storage DryRunnableStorage
  118. // StorageVersioner outputs the <group/version/kind> an object will be
  119. // converted to before persisted in etcd, given a list of possible
  120. // kinds of the object.
  121. // If the StorageVersioner is nil, apiserver will leave the
  122. // storageVersionHash as empty in the discovery document.
  123. StorageVersioner runtime.GroupVersioner
  124. // Called to cleanup clients used by the underlying Storage; optional.
  125. DestroyFunc func()
  126. }
  127. // Note: the rest.StandardStorage interface aggregates the common REST verbs
  128. var _ rest.StandardStorage = &Store{}
  129. var _ rest.Exporter = &Store{}
  130. var _ rest.TableConvertor = &Store{}
  131. var _ GenericStore = &Store{}
  132. const (
  133. OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
  134. resourceCountPollPeriodJitter = 1.2
  135. )
  136. // NamespaceKeyRootFunc is the default function for constructing storage paths
  137. // to resource directories enforcing namespace rules.
  138. func NamespaceKeyRootFunc(ctx context.Context, prefix string) string {
  139. key := prefix
  140. ns, ok := genericapirequest.NamespaceFrom(ctx)
  141. if ok && len(ns) > 0 {
  142. key = key + "/" + ns
  143. }
  144. return key
  145. }
  146. // NamespaceKeyFunc is the default function for constructing storage paths to
  147. // a resource relative to the given prefix enforcing namespace rules. If the
  148. // context does not contain a namespace, it errors.
  149. func NamespaceKeyFunc(ctx context.Context, prefix string, name string) (string, error) {
  150. key := NamespaceKeyRootFunc(ctx, prefix)
  151. ns, ok := genericapirequest.NamespaceFrom(ctx)
  152. if !ok || len(ns) == 0 {
  153. return "", apierrors.NewBadRequest("Namespace parameter required.")
  154. }
  155. if len(name) == 0 {
  156. return "", apierrors.NewBadRequest("Name parameter required.")
  157. }
  158. if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
  159. return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
  160. }
  161. key = key + "/" + name
  162. return key, nil
  163. }
  164. // NoNamespaceKeyFunc is the default function for constructing storage paths
  165. // to a resource relative to the given prefix without a namespace.
  166. func NoNamespaceKeyFunc(ctx context.Context, prefix string, name string) (string, error) {
  167. if len(name) == 0 {
  168. return "", apierrors.NewBadRequest("Name parameter required.")
  169. }
  170. if msgs := path.IsValidPathSegmentName(name); len(msgs) != 0 {
  171. return "", apierrors.NewBadRequest(fmt.Sprintf("Name parameter invalid: %q: %s", name, strings.Join(msgs, ";")))
  172. }
  173. key := prefix + "/" + name
  174. return key, nil
  175. }
  176. // New implements RESTStorage.New.
  177. func (e *Store) New() runtime.Object {
  178. return e.NewFunc()
  179. }
  180. // NewList implements rest.Lister.
  181. func (e *Store) NewList() runtime.Object {
  182. return e.NewListFunc()
  183. }
  184. // NamespaceScoped indicates whether the resource is namespaced
  185. func (e *Store) NamespaceScoped() bool {
  186. if e.CreateStrategy != nil {
  187. return e.CreateStrategy.NamespaceScoped()
  188. }
  189. if e.UpdateStrategy != nil {
  190. return e.UpdateStrategy.NamespaceScoped()
  191. }
  192. panic("programmer error: no CRUD for resource, you're crazy, override NamespaceScoped too")
  193. }
  194. // GetCreateStrategy implements GenericStore.
  195. func (e *Store) GetCreateStrategy() rest.RESTCreateStrategy {
  196. return e.CreateStrategy
  197. }
  198. // GetUpdateStrategy implements GenericStore.
  199. func (e *Store) GetUpdateStrategy() rest.RESTUpdateStrategy {
  200. return e.UpdateStrategy
  201. }
  202. // GetDeleteStrategy implements GenericStore.
  203. func (e *Store) GetDeleteStrategy() rest.RESTDeleteStrategy {
  204. return e.DeleteStrategy
  205. }
  206. // GetExportStrategy implements GenericStore.
  207. func (e *Store) GetExportStrategy() rest.RESTExportStrategy {
  208. return e.ExportStrategy
  209. }
  210. // List returns a list of items matching labels and field according to the
  211. // store's PredicateFunc.
  212. func (e *Store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
  213. label := labels.Everything()
  214. if options != nil && options.LabelSelector != nil {
  215. label = options.LabelSelector
  216. }
  217. field := fields.Everything()
  218. if options != nil && options.FieldSelector != nil {
  219. field = options.FieldSelector
  220. }
  221. out, err := e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
  222. if err != nil {
  223. return nil, err
  224. }
  225. if e.Decorator != nil {
  226. if err := e.Decorator(out); err != nil {
  227. return nil, err
  228. }
  229. }
  230. return out, nil
  231. }
  232. // ListPredicate returns a list of all the items matching the given
  233. // SelectionPredicate.
  234. func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
  235. if options == nil {
  236. // By default we should serve the request from etcd.
  237. options = &metainternalversion.ListOptions{ResourceVersion: ""}
  238. }
  239. p.Limit = options.Limit
  240. p.Continue = options.Continue
  241. list := e.NewListFunc()
  242. qualifiedResource := e.qualifiedResourceFromContext(ctx)
  243. if name, ok := p.MatchesSingle(); ok {
  244. if key, err := e.KeyFunc(ctx, name); err == nil {
  245. err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
  246. return list, storeerr.InterpretListError(err, qualifiedResource)
  247. }
  248. // if we cannot extract a key based on the current context, the optimization is skipped
  249. }
  250. err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
  251. return list, storeerr.InterpretListError(err, qualifiedResource)
  252. }
  253. // Create inserts a new item according to the unique key from the object.
  254. func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
  255. if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
  256. return nil, err
  257. }
  258. // at this point we have a fully formed object. It is time to call the validators that the apiserver
  259. // handling chain wants to enforce.
  260. if createValidation != nil {
  261. if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
  262. return nil, err
  263. }
  264. }
  265. name, err := e.ObjectNameFunc(obj)
  266. if err != nil {
  267. return nil, err
  268. }
  269. key, err := e.KeyFunc(ctx, name)
  270. if err != nil {
  271. return nil, err
  272. }
  273. qualifiedResource := e.qualifiedResourceFromContext(ctx)
  274. ttl, err := e.calculateTTL(obj, 0, false)
  275. if err != nil {
  276. return nil, err
  277. }
  278. out := e.NewFunc()
  279. if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
  280. err = storeerr.InterpretCreateError(err, qualifiedResource, name)
  281. err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
  282. if !apierrors.IsAlreadyExists(err) {
  283. return nil, err
  284. }
  285. if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
  286. return nil, err
  287. }
  288. accessor, errGetAcc := meta.Accessor(out)
  289. if errGetAcc != nil {
  290. return nil, err
  291. }
  292. if accessor.GetDeletionTimestamp() != nil {
  293. msg := &err.(*apierrors.StatusError).ErrStatus.Message
  294. *msg = fmt.Sprintf("object is being deleted: %s", *msg)
  295. }
  296. return nil, err
  297. }
  298. if e.AfterCreate != nil {
  299. if err := e.AfterCreate(out); err != nil {
  300. return nil, err
  301. }
  302. }
  303. if e.Decorator != nil {
  304. if err := e.Decorator(out); err != nil {
  305. return nil, err
  306. }
  307. }
  308. return out, nil
  309. }
  310. // ShouldDeleteDuringUpdate is the default function for
  311. // checking if an object should be deleted during an update.
  312. // It checks if the new object has no finalizers,
  313. // the existing object's deletionTimestamp is set, and
  314. // the existing object's deletionGracePeriodSeconds is 0 or nil
  315. func ShouldDeleteDuringUpdate(ctx context.Context, key string, obj, existing runtime.Object) bool {
  316. newMeta, err := meta.Accessor(obj)
  317. if err != nil {
  318. utilruntime.HandleError(err)
  319. return false
  320. }
  321. oldMeta, err := meta.Accessor(existing)
  322. if err != nil {
  323. utilruntime.HandleError(err)
  324. return false
  325. }
  326. if len(newMeta.GetFinalizers()) > 0 {
  327. // don't delete with finalizers remaining in the new object
  328. return false
  329. }
  330. if oldMeta.GetDeletionTimestamp() == nil {
  331. // don't delete if the existing object hasn't had a delete request made
  332. return false
  333. }
  334. // delete if the existing object has no grace period or a grace period of 0
  335. return oldMeta.GetDeletionGracePeriodSeconds() == nil || *oldMeta.GetDeletionGracePeriodSeconds() == 0
  336. }
  337. // deleteWithoutFinalizers handles deleting an object ignoring its finalizer list.
  338. // Used for objects that are either been finalized or have never initialized.
  339. func (e *Store) deleteWithoutFinalizers(ctx context.Context, name, key string, obj runtime.Object, preconditions *storage.Preconditions, dryRun bool) (runtime.Object, bool, error) {
  340. out := e.NewFunc()
  341. klog.V(6).Infof("going to delete %s from registry, triggered by update", name)
  342. // Using the rest.ValidateAllObjectFunc because the request is an UPDATE request and has already passed the admission for the UPDATE verb.
  343. if err := e.Storage.Delete(ctx, key, out, preconditions, rest.ValidateAllObjectFunc, dryRun); err != nil {
  344. // Deletion is racy, i.e., there could be multiple update
  345. // requests to remove all finalizers from the object, so we
  346. // ignore the NotFound error.
  347. if storage.IsNotFound(err) {
  348. _, err := e.finalizeDelete(ctx, obj, true)
  349. // clients are expecting an updated object if a PUT succeeded,
  350. // but finalizeDelete returns a metav1.Status, so return
  351. // the object in the request instead.
  352. return obj, false, err
  353. }
  354. return nil, false, storeerr.InterpretDeleteError(err, e.qualifiedResourceFromContext(ctx), name)
  355. }
  356. _, err := e.finalizeDelete(ctx, out, true)
  357. // clients are expecting an updated object if a PUT succeeded, but
  358. // finalizeDelete returns a metav1.Status, so return the object in
  359. // the request instead.
  360. return obj, false, err
  361. }
  362. // Update performs an atomic update and set of the object. Returns the result of the update
  363. // or an error. If the registry allows create-on-update, the create flow will be executed.
  364. // A bool is returned along with the object and any errors, to indicate object creation.
  365. func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
  366. key, err := e.KeyFunc(ctx, name)
  367. if err != nil {
  368. return nil, false, err
  369. }
  370. var (
  371. creatingObj runtime.Object
  372. creating = false
  373. )
  374. qualifiedResource := e.qualifiedResourceFromContext(ctx)
  375. storagePreconditions := &storage.Preconditions{}
  376. if preconditions := objInfo.Preconditions(); preconditions != nil {
  377. storagePreconditions.UID = preconditions.UID
  378. storagePreconditions.ResourceVersion = preconditions.ResourceVersion
  379. }
  380. out := e.NewFunc()
  381. // deleteObj is only used in case a deletion is carried out
  382. var deleteObj runtime.Object
  383. err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
  384. // Given the existing object, get the new object
  385. obj, err := objInfo.UpdatedObject(ctx, existing)
  386. if err != nil {
  387. return nil, nil, err
  388. }
  389. // If AllowUnconditionalUpdate() is true and the object specified by
  390. // the user does not have a resource version, then we populate it with
  391. // the latest version. Else, we check that the version specified by
  392. // the user matches the version of latest storage object.
  393. resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
  394. if err != nil {
  395. return nil, nil, err
  396. }
  397. doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()
  398. version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
  399. if err != nil {
  400. return nil, nil, err
  401. }
  402. if version == 0 {
  403. if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
  404. return nil, nil, apierrors.NewNotFound(qualifiedResource, name)
  405. }
  406. creating = true
  407. creatingObj = obj
  408. if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
  409. return nil, nil, err
  410. }
  411. // at this point we have a fully formed object. It is time to call the validators that the apiserver
  412. // handling chain wants to enforce.
  413. if createValidation != nil {
  414. if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
  415. return nil, nil, err
  416. }
  417. }
  418. ttl, err := e.calculateTTL(obj, 0, false)
  419. if err != nil {
  420. return nil, nil, err
  421. }
  422. return obj, &ttl, nil
  423. }
  424. creating = false
  425. creatingObj = nil
  426. if doUnconditionalUpdate {
  427. // Update the object's resource version to match the latest
  428. // storage object's resource version.
  429. err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
  430. if err != nil {
  431. return nil, nil, err
  432. }
  433. } else {
  434. // Check if the object's resource version matches the latest
  435. // resource version.
  436. if resourceVersion == 0 {
  437. // TODO: The Invalid error should have a field for Resource.
  438. // After that field is added, we should fill the Resource and
  439. // leave the Kind field empty. See the discussion in #18526.
  440. qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
  441. fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}
  442. return nil, nil, apierrors.NewInvalid(qualifiedKind, name, fieldErrList)
  443. }
  444. if resourceVersion != version {
  445. return nil, nil, apierrors.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
  446. }
  447. }
  448. if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
  449. return nil, nil, err
  450. }
  451. // at this point we have a fully formed object. It is time to call the validators that the apiserver
  452. // handling chain wants to enforce.
  453. if updateValidation != nil {
  454. if err := updateValidation(ctx, obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
  455. return nil, nil, err
  456. }
  457. }
  458. // Check the default delete-during-update conditions, and store-specific conditions if provided
  459. if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
  460. (e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
  461. deleteObj = obj
  462. return nil, nil, errEmptiedFinalizers
  463. }
  464. ttl, err := e.calculateTTL(obj, res.TTL, true)
  465. if err != nil {
  466. return nil, nil, err
  467. }
  468. if int64(ttl) != res.TTL {
  469. return obj, &ttl, nil
  470. }
  471. return obj, nil, nil
  472. }, dryrun.IsDryRun(options.DryRun))
  473. if err != nil {
  474. // delete the object
  475. if err == errEmptiedFinalizers {
  476. return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions, dryrun.IsDryRun(options.DryRun))
  477. }
  478. if creating {
  479. err = storeerr.InterpretCreateError(err, qualifiedResource, name)
  480. err = rest.CheckGeneratedNameError(e.CreateStrategy, err, creatingObj)
  481. } else {
  482. err = storeerr.InterpretUpdateError(err, qualifiedResource, name)
  483. }
  484. return nil, false, err
  485. }
  486. if creating {
  487. if e.AfterCreate != nil {
  488. if err := e.AfterCreate(out); err != nil {
  489. return nil, false, err
  490. }
  491. }
  492. } else {
  493. if e.AfterUpdate != nil {
  494. if err := e.AfterUpdate(out); err != nil {
  495. return nil, false, err
  496. }
  497. }
  498. }
  499. if e.Decorator != nil {
  500. if err := e.Decorator(out); err != nil {
  501. return nil, false, err
  502. }
  503. }
  504. return out, creating, nil
  505. }
  506. // Get retrieves the item from storage.
  507. func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
  508. obj := e.NewFunc()
  509. key, err := e.KeyFunc(ctx, name)
  510. if err != nil {
  511. return nil, err
  512. }
  513. if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
  514. return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
  515. }
  516. if e.Decorator != nil {
  517. if err := e.Decorator(obj); err != nil {
  518. return nil, err
  519. }
  520. }
  521. return obj, nil
  522. }
  523. // qualifiedResourceFromContext attempts to retrieve a GroupResource from the context's request info.
  524. // If the context has no request info, DefaultQualifiedResource is used.
  525. func (e *Store) qualifiedResourceFromContext(ctx context.Context) schema.GroupResource {
  526. if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
  527. return schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}
  528. }
  529. // some implementations access storage directly and thus the context has no RequestInfo
  530. return e.DefaultQualifiedResource
  531. }
  532. var (
  533. errAlreadyDeleting = fmt.Errorf("abort delete")
  534. errDeleteNow = fmt.Errorf("delete now")
  535. errEmptiedFinalizers = fmt.Errorf("emptied finalizers")
  536. )
  537. // shouldOrphanDependents returns true if the finalizer for orphaning should be set
  538. // updated for FinalizerOrphanDependents. In the order of highest to lowest
  539. // priority, there are three factors affect whether to add/remove the
  540. // FinalizerOrphanDependents: options, existing finalizers of the object,
  541. // and e.DeleteStrategy.DefaultGarbageCollectionPolicy.
  542. func shouldOrphanDependents(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) bool {
  543. // Get default GC policy from this REST object type
  544. gcStrategy, ok := e.DeleteStrategy.(rest.GarbageCollectionDeleteStrategy)
  545. var defaultGCPolicy rest.GarbageCollectionPolicy
  546. if ok {
  547. defaultGCPolicy = gcStrategy.DefaultGarbageCollectionPolicy(ctx)
  548. }
  549. if defaultGCPolicy == rest.Unsupported {
  550. // return false to indicate that we should NOT orphan
  551. return false
  552. }
  553. // An explicit policy was set at deletion time, that overrides everything
  554. if options != nil && options.OrphanDependents != nil {
  555. return *options.OrphanDependents
  556. }
  557. if options != nil && options.PropagationPolicy != nil {
  558. switch *options.PropagationPolicy {
  559. case metav1.DeletePropagationOrphan:
  560. return true
  561. case metav1.DeletePropagationBackground, metav1.DeletePropagationForeground:
  562. return false
  563. }
  564. }
  565. // If a finalizer is set in the object, it overrides the default
  566. // validation should make sure the two cases won't be true at the same time.
  567. finalizers := accessor.GetFinalizers()
  568. for _, f := range finalizers {
  569. switch f {
  570. case metav1.FinalizerOrphanDependents:
  571. return true
  572. case metav1.FinalizerDeleteDependents:
  573. return false
  574. }
  575. }
  576. // Get default orphan policy from this REST object type if it exists
  577. if defaultGCPolicy == rest.OrphanDependents {
  578. return true
  579. }
  580. return false
  581. }
  582. // shouldDeleteDependents returns true if the finalizer for foreground deletion should be set
  583. // updated for FinalizerDeleteDependents. In the order of highest to lowest
  584. // priority, there are three factors affect whether to add/remove the
  585. // FinalizerDeleteDependents: options, existing finalizers of the object, and
  586. // e.DeleteStrategy.DefaultGarbageCollectionPolicy.
  587. func shouldDeleteDependents(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) bool {
  588. // Get default GC policy from this REST object type
  589. if gcStrategy, ok := e.DeleteStrategy.(rest.GarbageCollectionDeleteStrategy); ok && gcStrategy.DefaultGarbageCollectionPolicy(ctx) == rest.Unsupported {
  590. // return false to indicate that we should NOT delete in foreground
  591. return false
  592. }
  593. // If an explicit policy was set at deletion time, that overrides both
  594. if options != nil && options.OrphanDependents != nil {
  595. return false
  596. }
  597. if options != nil && options.PropagationPolicy != nil {
  598. switch *options.PropagationPolicy {
  599. case metav1.DeletePropagationForeground:
  600. return true
  601. case metav1.DeletePropagationBackground, metav1.DeletePropagationOrphan:
  602. return false
  603. }
  604. }
  605. // If a finalizer is set in the object, it overrides the default
  606. // validation has made sure the two cases won't be true at the same time.
  607. finalizers := accessor.GetFinalizers()
  608. for _, f := range finalizers {
  609. switch f {
  610. case metav1.FinalizerDeleteDependents:
  611. return true
  612. case metav1.FinalizerOrphanDependents:
  613. return false
  614. }
  615. }
  616. return false
  617. }
  618. // deletionFinalizersForGarbageCollection analyzes the object and delete options
  619. // to determine whether the object is in need of finalization by the garbage
  620. // collector. If so, returns the set of deletion finalizers to apply and a bool
  621. // indicating whether the finalizer list has changed and is in need of updating.
  622. //
  623. // The finalizers returned are intended to be handled by the garbage collector.
  624. // If garbage collection is disabled for the store, this function returns false
  625. // to ensure finalizers aren't set which will never be cleared.
  626. func deletionFinalizersForGarbageCollection(ctx context.Context, e *Store, accessor metav1.Object, options *metav1.DeleteOptions) (bool, []string) {
  627. if !e.EnableGarbageCollection {
  628. return false, []string{}
  629. }
  630. shouldOrphan := shouldOrphanDependents(ctx, e, accessor, options)
  631. shouldDeleteDependentInForeground := shouldDeleteDependents(ctx, e, accessor, options)
  632. newFinalizers := []string{}
  633. // first remove both finalizers, add them back if needed.
  634. for _, f := range accessor.GetFinalizers() {
  635. if f == metav1.FinalizerOrphanDependents || f == metav1.FinalizerDeleteDependents {
  636. continue
  637. }
  638. newFinalizers = append(newFinalizers, f)
  639. }
  640. if shouldOrphan {
  641. newFinalizers = append(newFinalizers, metav1.FinalizerOrphanDependents)
  642. }
  643. if shouldDeleteDependentInForeground {
  644. newFinalizers = append(newFinalizers, metav1.FinalizerDeleteDependents)
  645. }
  646. oldFinalizerSet := sets.NewString(accessor.GetFinalizers()...)
  647. newFinalizersSet := sets.NewString(newFinalizers...)
  648. if oldFinalizerSet.Equal(newFinalizersSet) {
  649. return false, accessor.GetFinalizers()
  650. }
  651. return true, newFinalizers
  652. }
  653. // markAsDeleting sets the obj's DeletionGracePeriodSeconds to 0, and sets the
  654. // DeletionTimestamp to "now" if there is no existing deletionTimestamp or if the existing
  655. // deletionTimestamp is further in future. Finalizers are watching for such updates and will
  656. // finalize the object if their IDs are present in the object's Finalizers list.
  657. func markAsDeleting(obj runtime.Object, now time.Time) (err error) {
  658. objectMeta, kerr := meta.Accessor(obj)
  659. if kerr != nil {
  660. return kerr
  661. }
  662. // This handles Generation bump for resources that don't support graceful
  663. // deletion. For resources that support graceful deletion is handle in
  664. // pkg/api/rest/delete.go
  665. if objectMeta.GetDeletionTimestamp() == nil && objectMeta.GetGeneration() > 0 {
  666. objectMeta.SetGeneration(objectMeta.GetGeneration() + 1)
  667. }
  668. existingDeletionTimestamp := objectMeta.GetDeletionTimestamp()
  669. if existingDeletionTimestamp == nil || existingDeletionTimestamp.After(now) {
  670. metaNow := metav1.NewTime(now)
  671. objectMeta.SetDeletionTimestamp(&metaNow)
  672. }
  673. var zero int64 = 0
  674. objectMeta.SetDeletionGracePeriodSeconds(&zero)
  675. return nil
  676. }
  677. // updateForGracefulDeletionAndFinalizers updates the given object for
  678. // graceful deletion and finalization by setting the deletion timestamp and
  679. // grace period seconds (graceful deletion) and updating the list of
  680. // finalizers (finalization); it returns:
  681. //
  682. // 1. an error
  683. // 2. a boolean indicating that the object was not found, but it should be
  684. // ignored
  685. // 3. a boolean indicating that the object's grace period is exhausted and it
  686. // should be deleted immediately
  687. // 4. a new output object with the state that was updated
  688. // 5. a copy of the last existing state of the object
  689. func (e *Store) updateForGracefulDeletionAndFinalizers(ctx context.Context, name, key string, options *metav1.DeleteOptions, preconditions storage.Preconditions, deleteValidation rest.ValidateObjectFunc, in runtime.Object) (err error, ignoreNotFound, deleteImmediately bool, out, lastExisting runtime.Object) {
  690. lastGraceful := int64(0)
  691. var pendingFinalizers bool
  692. out = e.NewFunc()
  693. err = e.Storage.GuaranteedUpdate(
  694. ctx,
  695. key,
  696. out,
  697. false, /* ignoreNotFound */
  698. &preconditions,
  699. storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
  700. if err := deleteValidation(ctx, existing); err != nil {
  701. return nil, err
  702. }
  703. graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options)
  704. if err != nil {
  705. return nil, err
  706. }
  707. if pendingGraceful {
  708. return nil, errAlreadyDeleting
  709. }
  710. // Add/remove the orphan finalizer as the options dictates.
  711. // Note that this occurs after checking pendingGraceufl, so
  712. // finalizers cannot be updated via DeleteOptions if deletion has
  713. // started.
  714. existingAccessor, err := meta.Accessor(existing)
  715. if err != nil {
  716. return nil, err
  717. }
  718. needsUpdate, newFinalizers := deletionFinalizersForGarbageCollection(ctx, e, existingAccessor, options)
  719. if needsUpdate {
  720. existingAccessor.SetFinalizers(newFinalizers)
  721. }
  722. pendingFinalizers = len(existingAccessor.GetFinalizers()) != 0
  723. if !graceful {
  724. // set the DeleteGracePeriods to 0 if the object has pendingFinalizers but not supporting graceful deletion
  725. if pendingFinalizers {
  726. klog.V(6).Infof("update the DeletionTimestamp to \"now\" and GracePeriodSeconds to 0 for object %s, because it has pending finalizers", name)
  727. err = markAsDeleting(existing, time.Now())
  728. if err != nil {
  729. return nil, err
  730. }
  731. return existing, nil
  732. }
  733. return nil, errDeleteNow
  734. }
  735. lastGraceful = *options.GracePeriodSeconds
  736. lastExisting = existing
  737. return existing, nil
  738. }),
  739. dryrun.IsDryRun(options.DryRun),
  740. )
  741. switch err {
  742. case nil:
  743. // If there are pending finalizers, we never delete the object immediately.
  744. if pendingFinalizers {
  745. return nil, false, false, out, lastExisting
  746. }
  747. if lastGraceful > 0 {
  748. return nil, false, false, out, lastExisting
  749. }
  750. // If we are here, the registry supports grace period mechanism and
  751. // we are intentionally delete gracelessly. In this case, we may
  752. // enter a race with other k8s components. If other component wins
  753. // the race, the object will not be found, and we should tolerate
  754. // the NotFound error. See
  755. // https://github.com/kubernetes/kubernetes/issues/19403 for
  756. // details.
  757. return nil, true, true, out, lastExisting
  758. case errDeleteNow:
  759. // we've updated the object to have a zero grace period, or it's already at 0, so
  760. // we should fall through and truly delete the object.
  761. return nil, false, true, out, lastExisting
  762. case errAlreadyDeleting:
  763. out, err = e.finalizeDelete(ctx, in, true)
  764. return err, false, false, out, lastExisting
  765. default:
  766. return storeerr.InterpretUpdateError(err, e.qualifiedResourceFromContext(ctx), name), false, false, out, lastExisting
  767. }
  768. }
  769. // Delete removes the item from storage.
  770. func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
  771. key, err := e.KeyFunc(ctx, name)
  772. if err != nil {
  773. return nil, false, err
  774. }
  775. obj := e.NewFunc()
  776. qualifiedResource := e.qualifiedResourceFromContext(ctx)
  777. if err = e.Storage.Get(ctx, key, "", obj, false); err != nil {
  778. return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
  779. }
  780. // support older consumers of delete by treating "nil" as delete immediately
  781. if options == nil {
  782. options = metav1.NewDeleteOptions(0)
  783. }
  784. var preconditions storage.Preconditions
  785. if options.Preconditions != nil {
  786. preconditions.UID = options.Preconditions.UID
  787. preconditions.ResourceVersion = options.Preconditions.ResourceVersion
  788. }
  789. graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
  790. if err != nil {
  791. return nil, false, err
  792. }
  793. // this means finalizers cannot be updated via DeleteOptions if a deletion is already pending
  794. if pendingGraceful {
  795. out, err := e.finalizeDelete(ctx, obj, false)
  796. return out, false, err
  797. }
  798. // check if obj has pending finalizers
  799. accessor, err := meta.Accessor(obj)
  800. if err != nil {
  801. return nil, false, apierrors.NewInternalError(err)
  802. }
  803. pendingFinalizers := len(accessor.GetFinalizers()) != 0
  804. var ignoreNotFound bool
  805. var deleteImmediately bool = true
  806. var lastExisting, out runtime.Object
  807. // Handle combinations of graceful deletion and finalization by issuing
  808. // the correct updates.
  809. shouldUpdateFinalizers, _ := deletionFinalizersForGarbageCollection(ctx, e, accessor, options)
  810. // TODO: remove the check, because we support no-op updates now.
  811. if graceful || pendingFinalizers || shouldUpdateFinalizers {
  812. err, ignoreNotFound, deleteImmediately, out, lastExisting = e.updateForGracefulDeletionAndFinalizers(ctx, name, key, options, preconditions, deleteValidation, obj)
  813. // Update the preconditions.ResourceVersion if set since we updated the object.
  814. if err == nil && deleteImmediately && preconditions.ResourceVersion != nil {
  815. accessor, err = meta.Accessor(out)
  816. if err != nil {
  817. return out, false, apierrors.NewInternalError(err)
  818. }
  819. resourceVersion := accessor.GetResourceVersion()
  820. preconditions.ResourceVersion = &resourceVersion
  821. }
  822. }
  823. // !deleteImmediately covers all cases where err != nil. We keep both to be future-proof.
  824. if !deleteImmediately || err != nil {
  825. return out, false, err
  826. }
  827. // Going further in this function is not useful when we are
  828. // performing a dry-run request. Worse, it will actually
  829. // override "out" with the version of the object in database
  830. // that doesn't have the finalizer and deletiontimestamp set
  831. // (because the update above was dry-run too). If we already
  832. // have that version available, let's just return it now,
  833. // otherwise, we can call dry-run delete that will get us the
  834. // latest version of the object.
  835. if dryrun.IsDryRun(options.DryRun) && out != nil {
  836. return out, true, nil
  837. }
  838. // delete immediately, or no graceful deletion supported
  839. klog.V(6).Infof("going to delete %s from registry: ", name)
  840. out = e.NewFunc()
  841. if err := e.Storage.Delete(ctx, key, out, &preconditions, storage.ValidateObjectFunc(deleteValidation), dryrun.IsDryRun(options.DryRun)); err != nil {
  842. // Please refer to the place where we set ignoreNotFound for the reason
  843. // why we ignore the NotFound error .
  844. if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
  845. // The lastExisting object may not be the last state of the object
  846. // before its deletion, but it's the best approximation.
  847. out, err := e.finalizeDelete(ctx, lastExisting, true)
  848. return out, true, err
  849. }
  850. return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
  851. }
  852. out, err = e.finalizeDelete(ctx, out, true)
  853. return out, true, err
  854. }
  855. // DeleteReturnsDeletedObject implements the rest.MayReturnFullObjectDeleter interface
  856. func (e *Store) DeleteReturnsDeletedObject() bool {
  857. return e.ReturnDeletedObject
  858. }
  859. // DeleteCollection removes all items returned by List with a given ListOptions from storage.
  860. //
  861. // DeleteCollection is currently NOT atomic. It can happen that only subset of objects
  862. // will be deleted from storage, and then an error will be returned.
  863. // In case of success, the list of deleted objects will be returned.
  864. //
  865. // TODO: Currently, there is no easy way to remove 'directory' entry from storage (if we
  866. // are removing all objects of a given type) with the current API (it's technically
  867. // possibly with storage API, but watch is not delivered correctly then).
  868. // It will be possible to fix it with v3 etcd API.
  869. func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
  870. if listOptions == nil {
  871. listOptions = &metainternalversion.ListOptions{}
  872. } else {
  873. listOptions = listOptions.DeepCopy()
  874. }
  875. listObj, err := e.List(ctx, listOptions)
  876. if err != nil {
  877. return nil, err
  878. }
  879. items, err := meta.ExtractList(listObj)
  880. if err != nil {
  881. return nil, err
  882. }
  883. // Spawn a number of goroutines, so that we can issue requests to storage
  884. // in parallel to speed up deletion.
  885. // TODO: Make this proportional to the number of items to delete, up to
  886. // DeleteCollectionWorkers (it doesn't make much sense to spawn 16
  887. // workers to delete 10 items).
  888. workersNumber := e.DeleteCollectionWorkers
  889. if workersNumber < 1 {
  890. workersNumber = 1
  891. }
  892. wg := sync.WaitGroup{}
  893. toProcess := make(chan int, 2*workersNumber)
  894. errs := make(chan error, workersNumber+1)
  895. go func() {
  896. defer utilruntime.HandleCrash(func(panicReason interface{}) {
  897. errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason)
  898. })
  899. for i := 0; i < len(items); i++ {
  900. toProcess <- i
  901. }
  902. close(toProcess)
  903. }()
  904. wg.Add(workersNumber)
  905. for i := 0; i < workersNumber; i++ {
  906. go func() {
  907. // panics don't cross goroutine boundaries
  908. defer utilruntime.HandleCrash(func(panicReason interface{}) {
  909. errs <- fmt.Errorf("DeleteCollection goroutine panicked: %v", panicReason)
  910. })
  911. defer wg.Done()
  912. for index := range toProcess {
  913. accessor, err := meta.Accessor(items[index])
  914. if err != nil {
  915. errs <- err
  916. return
  917. }
  918. if _, _, err := e.Delete(ctx, accessor.GetName(), deleteValidation, options); err != nil && !apierrors.IsNotFound(err) {
  919. klog.V(4).Infof("Delete %s in DeleteCollection failed: %v", accessor.GetName(), err)
  920. errs <- err
  921. return
  922. }
  923. }
  924. }()
  925. }
  926. wg.Wait()
  927. select {
  928. case err := <-errs:
  929. return nil, err
  930. default:
  931. return listObj, nil
  932. }
  933. }
  934. // finalizeDelete runs the Store's AfterDelete hook if runHooks is set and
  935. // returns the decorated deleted object if appropriate.
  936. func (e *Store) finalizeDelete(ctx context.Context, obj runtime.Object, runHooks bool) (runtime.Object, error) {
  937. if runHooks && e.AfterDelete != nil {
  938. if err := e.AfterDelete(obj); err != nil {
  939. return nil, err
  940. }
  941. }
  942. if e.ReturnDeletedObject {
  943. if e.Decorator != nil {
  944. if err := e.Decorator(obj); err != nil {
  945. return nil, err
  946. }
  947. }
  948. return obj, nil
  949. }
  950. // Return information about the deleted object, which enables clients to
  951. // verify that the object was actually deleted and not waiting for finalizers.
  952. accessor, err := meta.Accessor(obj)
  953. if err != nil {
  954. return nil, err
  955. }
  956. qualifiedResource := e.qualifiedResourceFromContext(ctx)
  957. details := &metav1.StatusDetails{
  958. Name: accessor.GetName(),
  959. Group: qualifiedResource.Group,
  960. Kind: qualifiedResource.Resource, // Yes we set Kind field to resource.
  961. UID: accessor.GetUID(),
  962. }
  963. status := &metav1.Status{Status: metav1.StatusSuccess, Details: details}
  964. return status, nil
  965. }
  966. // Watch makes a matcher for the given label and field, and calls
  967. // WatchPredicate. If possible, you should customize PredicateFunc to produce
  968. // a matcher that matches by key. SelectionPredicate does this for you
  969. // automatically.
  970. func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
  971. label := labels.Everything()
  972. if options != nil && options.LabelSelector != nil {
  973. label = options.LabelSelector
  974. }
  975. field := fields.Everything()
  976. if options != nil && options.FieldSelector != nil {
  977. field = options.FieldSelector
  978. }
  979. predicate := e.PredicateFunc(label, field)
  980. resourceVersion := ""
  981. if options != nil {
  982. resourceVersion = options.ResourceVersion
  983. predicate.AllowWatchBookmarks = options.AllowWatchBookmarks
  984. }
  985. return e.WatchPredicate(ctx, predicate, resourceVersion)
  986. }
  987. // WatchPredicate starts a watch for the items that matches.
  988. func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
  989. if name, ok := p.MatchesSingle(); ok {
  990. if key, err := e.KeyFunc(ctx, name); err == nil {
  991. w, err := e.Storage.Watch(ctx, key, resourceVersion, p)
  992. if err != nil {
  993. return nil, err
  994. }
  995. if e.Decorator != nil {
  996. return newDecoratedWatcher(w, e.Decorator), nil
  997. }
  998. return w, nil
  999. }
  1000. // if we cannot extract a key based on the current context, the
  1001. // optimization is skipped
  1002. }
  1003. w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
  1004. if err != nil {
  1005. return nil, err
  1006. }
  1007. if e.Decorator != nil {
  1008. return newDecoratedWatcher(w, e.Decorator), nil
  1009. }
  1010. return w, nil
  1011. }
  1012. // calculateTTL is a helper for retrieving the updated TTL for an object or
  1013. // returning an error if the TTL cannot be calculated. The defaultTTL is
  1014. // changed to 1 if less than zero. Zero means no TTL, not expire immediately.
  1015. func (e *Store) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) {
  1016. // TODO: validate this is assertion is still valid.
  1017. // etcd may return a negative TTL for a node if the expiration has not
  1018. // occurred due to server lag - we will ensure that the value is at least
  1019. // set.
  1020. if defaultTTL < 0 {
  1021. defaultTTL = 1
  1022. }
  1023. ttl = uint64(defaultTTL)
  1024. if e.TTLFunc != nil {
  1025. ttl, err = e.TTLFunc(obj, ttl, update)
  1026. }
  1027. return ttl, err
  1028. }
  1029. // exportObjectMeta unsets the fields on the given object that should not be
  1030. // present when the object is exported.
  1031. func exportObjectMeta(accessor metav1.Object, exact bool) {
  1032. accessor.SetUID("")
  1033. if !exact {
  1034. accessor.SetNamespace("")
  1035. }
  1036. accessor.SetCreationTimestamp(metav1.Time{})
  1037. accessor.SetDeletionTimestamp(nil)
  1038. accessor.SetResourceVersion("")
  1039. accessor.SetSelfLink("")
  1040. if len(accessor.GetGenerateName()) > 0 && !exact {
  1041. accessor.SetName("")
  1042. }
  1043. }
  1044. // Export implements the rest.Exporter interface
  1045. func (e *Store) Export(ctx context.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
  1046. obj, err := e.Get(ctx, name, &metav1.GetOptions{})
  1047. if err != nil {
  1048. return nil, err
  1049. }
  1050. if accessor, err := meta.Accessor(obj); err == nil {
  1051. exportObjectMeta(accessor, opts.Exact)
  1052. } else {
  1053. klog.V(4).Infof("Object of type %v does not have ObjectMeta: %v", reflect.TypeOf(obj), err)
  1054. }
  1055. if e.ExportStrategy != nil {
  1056. if err = e.ExportStrategy.Export(ctx, obj, opts.Exact); err != nil {
  1057. return nil, err
  1058. }
  1059. } else {
  1060. e.CreateStrategy.PrepareForCreate(ctx, obj)
  1061. }
  1062. return obj, nil
  1063. }
  1064. // CompleteWithOptions updates the store with the provided options and
  1065. // defaults common fields.
  1066. func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
  1067. if e.DefaultQualifiedResource.Empty() {
  1068. return fmt.Errorf("store %#v must have a non-empty qualified resource", e)
  1069. }
  1070. if e.NewFunc == nil {
  1071. return fmt.Errorf("store for %s must have NewFunc set", e.DefaultQualifiedResource.String())
  1072. }
  1073. if e.NewListFunc == nil {
  1074. return fmt.Errorf("store for %s must have NewListFunc set", e.DefaultQualifiedResource.String())
  1075. }
  1076. if (e.KeyRootFunc == nil) != (e.KeyFunc == nil) {
  1077. return fmt.Errorf("store for %s must set both KeyRootFunc and KeyFunc or neither", e.DefaultQualifiedResource.String())
  1078. }
  1079. var isNamespaced bool
  1080. switch {
  1081. case e.CreateStrategy != nil:
  1082. isNamespaced = e.CreateStrategy.NamespaceScoped()
  1083. case e.UpdateStrategy != nil:
  1084. isNamespaced = e.UpdateStrategy.NamespaceScoped()
  1085. default:
  1086. return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
  1087. }
  1088. if e.DeleteStrategy == nil {
  1089. return fmt.Errorf("store for %s must have DeleteStrategy set", e.DefaultQualifiedResource.String())
  1090. }
  1091. if options.RESTOptions == nil {
  1092. return fmt.Errorf("options for %s must have RESTOptions set", e.DefaultQualifiedResource.String())
  1093. }
  1094. attrFunc := options.AttrFunc
  1095. if attrFunc == nil {
  1096. if isNamespaced {
  1097. attrFunc = storage.DefaultNamespaceScopedAttr
  1098. } else {
  1099. attrFunc = storage.DefaultClusterScopedAttr
  1100. }
  1101. }
  1102. if e.PredicateFunc == nil {
  1103. e.PredicateFunc = func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
  1104. return storage.SelectionPredicate{
  1105. Label: label,
  1106. Field: field,
  1107. GetAttrs: attrFunc,
  1108. }
  1109. }
  1110. }
  1111. err := validateIndexers(options.Indexers)
  1112. if err != nil {
  1113. return err
  1114. }
  1115. opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
  1116. if err != nil {
  1117. return err
  1118. }
  1119. // ResourcePrefix must come from the underlying factory
  1120. prefix := opts.ResourcePrefix
  1121. if !strings.HasPrefix(prefix, "/") {
  1122. prefix = "/" + prefix
  1123. }
  1124. if prefix == "/" {
  1125. return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
  1126. }
  1127. // Set the default behavior for storage key generation
  1128. if e.KeyRootFunc == nil && e.KeyFunc == nil {
  1129. if isNamespaced {
  1130. e.KeyRootFunc = func(ctx context.Context) string {
  1131. return NamespaceKeyRootFunc(ctx, prefix)
  1132. }
  1133. e.KeyFunc = func(ctx context.Context, name string) (string, error) {
  1134. return NamespaceKeyFunc(ctx, prefix, name)
  1135. }
  1136. } else {
  1137. e.KeyRootFunc = func(ctx context.Context) string {
  1138. return prefix
  1139. }
  1140. e.KeyFunc = func(ctx context.Context, name string) (string, error) {
  1141. return NoNamespaceKeyFunc(ctx, prefix, name)
  1142. }
  1143. }
  1144. }
  1145. // We adapt the store's keyFunc so that we can use it with the StorageDecorator
  1146. // without making any assumptions about where objects are stored in etcd
  1147. keyFunc := func(obj runtime.Object) (string, error) {
  1148. accessor, err := meta.Accessor(obj)
  1149. if err != nil {
  1150. return "", err
  1151. }
  1152. if isNamespaced {
  1153. return e.KeyFunc(genericapirequest.WithNamespace(genericapirequest.NewContext(), accessor.GetNamespace()), accessor.GetName())
  1154. }
  1155. return e.KeyFunc(genericapirequest.NewContext(), accessor.GetName())
  1156. }
  1157. if e.DeleteCollectionWorkers == 0 {
  1158. e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
  1159. }
  1160. e.EnableGarbageCollection = opts.EnableGarbageCollection
  1161. if e.ObjectNameFunc == nil {
  1162. e.ObjectNameFunc = func(obj runtime.Object) (string, error) {
  1163. accessor, err := meta.Accessor(obj)
  1164. if err != nil {
  1165. return "", err
  1166. }
  1167. return accessor.GetName(), nil
  1168. }
  1169. }
  1170. if e.Storage.Storage == nil {
  1171. e.Storage.Codec = opts.StorageConfig.Codec
  1172. var err error
  1173. e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
  1174. opts.StorageConfig,
  1175. prefix,
  1176. keyFunc,
  1177. e.NewFunc,
  1178. e.NewListFunc,
  1179. attrFunc,
  1180. options.TriggerFunc,
  1181. options.Indexers,
  1182. )
  1183. if err != nil {
  1184. return err
  1185. }
  1186. e.StorageVersioner = opts.StorageConfig.EncodeVersioner
  1187. if opts.CountMetricPollPeriod > 0 {
  1188. stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
  1189. previousDestroy := e.DestroyFunc
  1190. e.DestroyFunc = func() {
  1191. stopFunc()
  1192. if previousDestroy != nil {
  1193. previousDestroy()
  1194. }
  1195. }
  1196. }
  1197. }
  1198. return nil
  1199. }
  1200. // startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
  1201. func (e *Store) startObservingCount(period time.Duration) func() {
  1202. prefix := e.KeyRootFunc(genericapirequest.NewContext())
  1203. resourceName := e.DefaultQualifiedResource.String()
  1204. klog.V(2).Infof("Monitoring %v count at <storage-prefix>/%v", resourceName, prefix)
  1205. stopCh := make(chan struct{})
  1206. go wait.JitterUntil(func() {
  1207. count, err := e.Storage.Count(prefix)
  1208. if err != nil {
  1209. klog.V(5).Infof("Failed to update storage count metric: %v", err)
  1210. metrics.UpdateObjectCount(resourceName, -1)
  1211. } else {
  1212. metrics.UpdateObjectCount(resourceName, count)
  1213. }
  1214. }, period, resourceCountPollPeriodJitter, true, stopCh)
  1215. return func() { close(stopCh) }
  1216. }
  1217. func (e *Store) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
  1218. if e.TableConvertor != nil {
  1219. return e.TableConvertor.ConvertToTable(ctx, object, tableOptions)
  1220. }
  1221. return rest.NewDefaultTableConvertor(e.qualifiedResourceFromContext(ctx)).ConvertToTable(ctx, object, tableOptions)
  1222. }
  1223. func (e *Store) StorageVersion() runtime.GroupVersioner {
  1224. return e.StorageVersioner
  1225. }
  1226. // validateIndexers will check the prefix of indexers.
  1227. func validateIndexers(indexers *cache.Indexers) error {
  1228. if indexers == nil {
  1229. return nil
  1230. }
  1231. for indexName := range *indexers {
  1232. if len(indexName) <= 2 || (indexName[:2] != "l:" && indexName[:2] != "f:") {
  1233. return fmt.Errorf("index must prefix with \"l:\" or \"f:\"")
  1234. }
  1235. }
  1236. return nil
  1237. }

2.2.1.2、generic implement(with cache)

3、rest provider

3.1、架构

image.png

3.2、apps/deployment 例子说明