跟踪 ApiServer 对 PATCH 请求的处理

ApiServer 处理 PATCH 请求的调用栈如下:

  1. restfulPatchResource
  2. // 参数
  3. r rest.Patcher // registry.Store
  4. scope handlers.RequestScope
  5. admit admission.Interface
  6. supportedTypes []string // 支持的资源对象 PATCH 类型
  7. // 返回值
  8. restful.RouteFunction // go-restful 的路由处理回调
  9. handlers.PatchResource
  10. // 创建 handlers.patcher 对象
  11. // 参数
  12. r rest.Patcher // registry.Store
  13. scope *RequestScope
  14. admit admission.Interface
  15. patchTypes []string // 支持的资源对象 PATCH 类型
  16. // 返回值
  17. http.HandlerFunc
  18. k8s.io/apiserver/pkg/endpoints/handlers.(*patcher).patchResource
  19. // 参数
  20. ctx context.Context // http.Request.Context
  21. scope *RequestScope
  22. // 返回值
  23. runtime.Object // 更新后的资源对象
  24. bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)
  25. error
  26. k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update
  27. // 参数
  28. ctx context.Context
  29. name string // 资源对象的名称,例如:/deployments/default/nginx
  30. objInfo rest.UpdatedObjectInfo // 更新资源对象所需要的信息
  31. createValidation rest.ValidateObjectFunc
  32. updateValidation rest.ValidateObjectUpdateFunc
  33. forceAllowCreate bool
  34. options *metav1.UpdateOptions
  35. // 返回值
  36. runtime.Object // 更新后的资源对象
  37. bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)
  38. error
  39. k8s.io/apiserver/pkg/registry/generic/registry.(*DryRunnableStorage).GuaranteedUpdate
  40. // 实现对资源对象进行模拟更新的逻辑
  41. // 如果 dryRun = true,则执行完后不会再调用持久化操作
  42. // 参数
  43. ctx context.Context
  44. key string // 资源对象的名称,例如:/deployments/default/nginx
  45. ptrToType runtime.Object, // 资源对象的指针类型,由 registry.Store.NewFunc 创建
  46. ignoreNotFound bool, // ???
  47. preconditions *storage.Preconditions, // 执行更新前的检查,检查 UID 和 ResourceVersion
  48. tryUpdate storage.UpdateFunc, // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
  49. dryRun bool, // 是否空跑(即执行输入检查,在内存中尝试更新,但不持久化)
  50. suggestion ...runtime.Object
  51. // 返回值
  52. error
  53. k8s.io/apiserver/pkg/storage/cacher.(*Cacher).GuaranteedUpdate
  54. // 从内存缓存 cacher.Cacher.watchCache 中查询 key 是否存在,若存在,作为 suggestion 传递下去
  55. // 参数
  56. ctx context.Context
  57. key string // 资源对象的名称,例如:/deployments/default/nginx
  58. ptrToType runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建
  59. ignoreNotFound bool
  60. preconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersion
  61. tryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
  62. _ ...runtime.Object
  63. // 返回值
  64. error
  65. k8s.io/apiserver/pkg/storage/etcd3.(*store).GuaranteedUpdate
  66. // 调用 tryUpdate 更新资源对象,并通过 Etcd 的事务更新操作,持久化到 Etcd
  67. // 参数
  68. ctx context.Context
  69. key string // 资源对象的名称,例如:/deployments/default/nginx
  70. out runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建
  71. ignoreNotFound bool
  72. preconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersion
  73. tryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
  74. suggestion ...runtime.Object // 如果在内存缓存中能找到,则可以避免直接访问 Etcd
  75. // 返回值
  76. error

在整个调用栈中,有几个关键点需要展开分析下:

storage.Preconditions

执行 tryUpdate 前的检查。

  1. // Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
  2. type Preconditions struct {
  3. // Specifies the target UID.
  4. // +optional
  5. UID *types.UID `json:"uid,omitempty"`
  6. // Specifies the target ResourceVersion
  7. // +optional
  8. ResourceVersion *string `json:"resourceVersion,omitempty"`
  9. }

storage.Preconditions 只有一个方法:func (p *Preconditions) Check(key string, obj runtime.Object) error,检查 obj 的 UID 和 ResourceVersion 是否与 Preconditions 一致。
在 PATCH 请求处理中,Preconditions 从 rest.UpdatedObjectInfo.Preconditions() 获取。

rest.UpdatedObjectInfo

tryUpdate 闭包中会调用 rest.UpdatedObjectInfo.UpdatedObject() 对资源对象执行更新操作。

  1. // UpdatedObjectInfo provides information about an updated object to an Updater.
  2. // It requires access to the old object in order to return the newly updated object.
  3. type UpdatedObjectInfo interface {
  4. // Returns preconditions built from the updated object, if applicable.
  5. // May return nil, or a preconditions object containing nil fields,
  6. // if no preconditions can be determined from the updated object.
  7. Preconditions() *metav1.Preconditions
  8. // UpdatedObject returns the updated object, given a context and old object.
  9. // The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).
  10. // 根据提交的请求内容,修改object
  11. UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error)
  12. }


在 PATCH 请求处理中,rest.UpdatedObjectInfo 的实现为 rest.defaultUpdatedObjectInfo:

  1. // defaultUpdatedObjectInfo implements UpdatedObjectInfo
  2. type defaultUpdatedObjectInfo struct {
  3. // obj is the updated object
  4. obj runtime.Object
  5. // transformers is an optional list of transforming functions that modify or
  6. // replace obj using information from the context, old object, or other sources.
  7. transformers []TransformFunc
  8. }
  9. // DefaultUpdatedObjectInfo returns an UpdatedObjectInfo impl based on the specified object.
  10. func DefaultUpdatedObjectInfo(obj runtime.Object, transformers ...TransformFunc) UpdatedObjectInfo {
  11. return &defaultUpdatedObjectInfo{obj, transformers}
  12. }
  13. // Preconditions satisfies the UpdatedObjectInfo interface.
  14. // 注意这里只获取了 UID
  15. func (i *defaultUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
  16. // Attempt to get the UID out of the object
  17. accessor, err := meta.Accessor(i.obj)
  18. if err != nil {
  19. // If no UID can be read, no preconditions are possible
  20. return nil
  21. }
  22. // If empty, no preconditions needed
  23. uid := accessor.GetUID()
  24. if len(uid) == 0 {
  25. return nil
  26. }
  27. return &metav1.Preconditions{UID: &uid}
  28. }
  29. // UpdatedObject satisfies the UpdatedObjectInfo interface.
  30. // It returns a copy of the held obj, passed through any configured transformers.
  31. func (i *defaultUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
  32. var err error
  33. // Start with the configured object
  34. newObj := i.obj
  35. // If the original is non-nil (might be nil if the first transformer builds the object from the oldObj), make a copy,
  36. // so we don't return the original. BeforeUpdate can mutate the returned object, doing things like clearing ResourceVersion.
  37. // If we're re-called, we need to be able to return the pristine version.
  38. if newObj != nil {
  39. newObj = newObj.DeepCopyObject()
  40. }
  41. // Allow any configured transformers to update the new object
  42. for _, transformer := range i.transformers {
  43. // 例如使用patcher.applyPatch 更新object
  44. newObj, err = transformer(ctx, newObj, oldObj) // if http.method == patch: patcher.applyPatch, patcher.applyAdmission
  45. if err != nil {
  46. return nil, err
  47. }
  48. }
  49. return newObj, nil
  50. }

创建时 defaultUpdatedObjectInfo.obj 为 nil,则 defaultUpdatedObjectInfo.Preconditions() 返回 nil,即不会进行 Preconditions 检查。

  1. // patchResource divides PatchResource for easier unit testing
  2. func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runtime.Object, bool, error) {
  3. ......
  4. p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
  5. ......
  6. }

tryUpdate 闭包更新
  1. // Update performs an atomic update and set of the object. Returns the result of the update
  2. // or an error. If the registry allows create-on-update, the create flow will be executed.
  3. // A bool is returned along with the object and any errors, to indicate object creation.
  4. func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
  5. ......
  6. err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
  7. // Given the existing object, get the new object
  8. // 更新资源对应obj
  9. obj, err := objInfo.UpdatedObject(ctx, existing) // defaultUpdatedObjectInfo.UpdatedObject
  10. if err != nil {
  11. return nil, nil, err
  12. }
  13. // If AllowUnconditionalUpdate() is true and the object specified by
  14. // the user does not have a resource version, then we populate it with
  15. // the latest version. Else, we check that the version specified by
  16. // the user matches the version of latest storage object.
  17. resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
  18. if err != nil {
  19. return nil, nil, err
  20. }
  21. doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()
  22. version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
  23. if err != nil {
  24. return nil, nil, err
  25. }
  26. // version == 0 说明 key 对应的资源对象不存在
  27. if version == 0 {
  28. if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
  29. return nil, nil, kubeerr.NewNotFound(qualifiedResource, name)
  30. }
  31. creating = true
  32. creatingObj = obj
  33. if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
  34. return nil, nil, err
  35. }
  36. // at this point we have a fully formed object. It is time to call the validators that the apiserver
  37. // handling chain wants to enforce.
  38. if createValidation != nil {
  39. if err := createValidation(obj.DeepCopyObject()); err != nil {
  40. return nil, nil, err
  41. }
  42. }
  43. ttl, err := e.calculateTTL(obj, 0, false)
  44. if err != nil {
  45. return nil, nil, err
  46. }
  47. return obj, &ttl, nil
  48. }
  49. creating = false
  50. creatingObj = nil
  51. if doUnconditionalUpdate {
  52. // Update the object's resource version to match the latest
  53. // storage object's resource version.
  54. // 无条件更新
  55. err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
  56. if err != nil {
  57. return nil, nil, err
  58. }
  59. } else {
  60. // Check if the object's resource version matches the latest
  61. // resource version.
  62. if resourceVersion == 0 {
  63. // TODO: The Invalid error should have a field for Resource.
  64. // After that field is added, we should fill the Resource and
  65. // leave the Kind field empty. See the discussion in #18526.
  66. qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
  67. fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}
  68. return nil, nil, kubeerr.NewInvalid(qualifiedKind, name, fieldErrList)
  69. }
  70. // 这里是关键,更新前后的资源对象版本不一致,说明出现了并发更新冲突
  71. // PATCH 请求中 resourceVersion 始终是等于 version 的!!!
  72. // PUT 请求中才可能会出现 resourceVersion != version
  73. if resourceVersion != version {
  74. return nil, nil, kubeerr.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
  75. }
  76. }
  77. if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
  78. return nil, nil, err
  79. }
  80. // at this point we have a fully formed object. It is time to call the validators that the apiserver
  81. // handling chain wants to enforce.
  82. if updateValidation != nil {
  83. if err := updateValidation(obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
  84. return nil, nil, err
  85. }
  86. }
  87. // Check the default delete-during-update conditions, and store-specific conditions if provided
  88. if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
  89. (e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
  90. deleteObj = obj
  91. return nil, nil, errEmptiedFinalizers
  92. }
  93. ttl, err := e.calculateTTL(obj, res.TTL, true)
  94. if err != nil {
  95. return nil, nil, err
  96. }
  97. if int64(ttl) != res.TTL {
  98. return obj, &ttl, nil
  99. }
  100. return obj, nil, nil
  101. }, dryrun.IsDryRun(options.DryRun))
  102. ......
  103. }

持久化到 Etcd
  1. // GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
  2. func (s *store) GuaranteedUpdate(
  3. ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
  4. preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
  5. trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out)))
  6. defer trace.LogIfLong(500 * time.Millisecond)
  7. v, err := conversion.EnforcePtr(out) // out 必须是非 nil 值的指针类型
  8. if err != nil {
  9. panic("unable to convert output object to pointer")
  10. }
  11. key = path.Join(s.pathPrefix, key)
  12. // 从 Etcd 存储中获取 key 对象的状态
  13. getCurrentState := func() (*objState, error) {
  14. startTime := time.Now()
  15. getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
  16. metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
  17. if err != nil {
  18. return nil, err
  19. }
  20. return s.getState(getResp, key, v, ignoreNotFound)
  21. }
  22. var origState *objState
  23. var mustCheckData bool // = true 说明 origState 有可能不是最新的
  24. // 如果上层调用提供了 key 对象的值(从 k8s.io/apiserver/pkg/storage/cacher.Cacher.watchCache.GetByKey(key) 获取)
  25. // 则不需要访问 Etcd
  26. if len(suggestion) == 1 && suggestion[0] != nil {
  27. span.LogFields(log.Object("suggestion[0]", suggestion[0]))
  28. origState, err = s.getStateFromObject(suggestion[0])
  29. span.LogFields(log.Object("origState", origState))
  30. if err != nil {
  31. return err
  32. }
  33. mustCheckData = true
  34. } else {
  35. origState, err = getCurrentState()
  36. if err != nil {
  37. return err
  38. }
  39. }
  40. trace.Step("initial value restored")
  41. transformContext := authenticatedDataString(key)
  42. for {
  43. // 检查 origState.obj 的 UID、 ResourceVersion 是否与 preconditions 一致
  44. // PATCH 和 PUT 请求都不做检查!!!
  45. if err := preconditions.Check(key, origState.obj); err != nil {
  46. // If our data is already up to date, return the error
  47. // 如果 origState 已经是最新的状态了,则返回错误
  48. if !mustCheckData {
  49. return err
  50. }
  51. // 可能 origState 不是最新的状态,从 Etcd 获取最新的状态
  52. // It's possible we were working with stale data
  53. // Actually fetch
  54. origState, err = getCurrentState()
  55. if err != nil {
  56. return err
  57. }
  58. mustCheckData = false
  59. // Retry
  60. continue
  61. }
  62. // 在 origState.obj 的基础上进行修改.
  63. // 比如patch的apply,是在这里执行的。调用tryUpdate。
  64. ret, ttl, err := s.updateState(origState, tryUpdate)
  65. if err != nil {
  66. // origState 可能不是最新的状态,会从 Etcd 中获取最新的状态,再尝试更新一次
  67. // If our data is already up to date, return the error
  68. if !mustCheckData {
  69. return err
  70. }
  71. // It's possible we were working with stale data
  72. // Actually fetch
  73. origState, err = getCurrentState()
  74. if err != nil {
  75. return err
  76. }
  77. mustCheckData = false
  78. // Retry
  79. continue
  80. }
  81. data, err := runtime.Encode(s.codec, ret)
  82. if err != nil {
  83. return err
  84. }
  85. // 目前 origState.stale 始终为 false
  86. // 判断修改后的资源对象是否有变化
  87. if !origState.stale && bytes.Equal(data, origState.data) {
  88. // if we skipped the original Get in this loop, we must refresh from
  89. // etcd in order to be sure the data in the store is equivalent to
  90. // our desired serialization
  91. // mustCheckData = true 说明 origState 有可能不是最新的
  92. // 必须再确认一遍
  93. if mustCheckData {
  94. origState, err = getCurrentState()
  95. if err != nil {
  96. return err
  97. }
  98. mustCheckData = false
  99. if !bytes.Equal(data, origState.data) {
  100. // original data changed, restart loop
  101. continue
  102. }
  103. }
  104. // recheck that the data from etcd is not stale before short-circuiting a write
  105. if !origState.stale {
  106. return decode(s.codec, s.versioner, origState.data, out, origState.rev)
  107. }
  108. }
  109. // 将对象序列化为二进制数据存储到 Etcd
  110. newData, err := s.transformer.TransformToStorage(data, transformContext)
  111. if err != nil {
  112. return storage.NewInternalError(err.Error())
  113. }
  114. // 设置 key 的过期时间
  115. opts, err := s.ttlOpts(ctx, int64(ttl))
  116. if err != nil {
  117. return err
  118. }
  119. trace.Step("Transaction prepared")
  120. span.LogFields(log.Uint64("ttl", ttl), log.Int64("origState.rev", origState.rev))
  121. // Etcd 事务
  122. // 注意这里会比较 Etcd 中的资源对象版本跟 origState.rev 是否一致
  123. // 如果一致,则更新
  124. // 否则,更新失败并获取当前最新的资源对象
  125. startTime := time.Now()
  126. txnResp, err := s.client.KV.Txn(ctx).If(
  127. clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
  128. ).Then(
  129. clientv3.OpPut(key, string(newData), opts...),
  130. ).Else(
  131. clientv3.OpGet(key),
  132. ).Commit()
  133. metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
  134. if err != nil {
  135. return err
  136. }
  137. trace.Step("Transaction committed")
  138. if !txnResp.Succeeded {
  139. // 事务执行失败
  140. getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
  141. klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
  142. // 获取最新的状态,并重试
  143. origState, err = s.getState(getResp, key, v, ignoreNotFound)
  144. if err != nil {
  145. return err
  146. }
  147. trace.Step("Retry value restored")
  148. mustCheckData = false
  149. continue
  150. }
  151. // 事务执行成功
  152. putResp := txnResp.Responses[0].GetResponsePut()
  153. return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
  154. }
  155. }

分析总结

etcd中并发更新是如何确保更新不丢失的?

在将更新后的对象持久化到 Etcd 中时,通过事务保证的,当事务执行失败时会不断重试,事务的伪代码逻辑如下:

  1. // oldObj = FromMemCache(key) or EtcdGet(key)
  2. if inEtcd(key).rev == inMemory(oldObj).rev:
  3. EtcdSet(key) = newObj
  4. transaction = success
  5. else:
  6. EtcdGet(key)
  7. transaction = fail

并发更新是如何做冲突检测的?

在上面的分析中,可以看到有两处冲突检测的判断:

  1. Preconditions
  2. tryUpdate 中的 resourceVersion != version

对于 kubectl apply 和 edit (发送的都是 PATCH 请求),默认创建的 Preconditions 是零值,所以不会通过 Preconditions 进行冲突检测,而在 tryUpdate 中调用 objInfo.UpdatedObject(ctx, existing) 得到的 newObj.rv 始终是等于 existing.rv 的,所以也不会进行冲突检测。
那什么时候会进行冲突检测?其实 kubectl 还有个 replace 的命令,通过抓包发现 replace 命令发送的是 PUT 请求,并且请求中会带有 resourceVersion:

  1. PUT /apis/extensions/v1beta1/namespaces/default/deployments/nginx HTTP/1.1
  2. Host: localhost:8080
  3. User-Agent: kubectl/v0.0.0 (linux/amd64) kubernetes/$Format
  4. Content-Length: 866
  5. Accept: application/json
  6. Content-Type: application/json
  7. Uber-Trace-Id: 6e685772cc06fc16:2514dc54a474fe88:4f488c05a7cef9c8:1
  8. Accept-Encoding: gzip
  9. {"apiVersion":"extensions/v1beta1","kind":"Deployment","metadata":{"labels":{"app":"nginx"},"name":"nginx","namespace":"default","resourceVersion":"744603"},"spec":{"progressDeadlineSeconds":600,"replicas":1,"revisionHistoryLimit":10,"selector":{"matchLabels":{"app":"nginx"}},"strategy":{"rollingUpdate":{"maxSurge":"25%","maxUnavailable":"25%"},"type":"RollingUpdate"},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"nginx"}},"spec":{"containers":[{"env":[{"name":"DEMO_GREETING","value":"Hello from the environment#kubectl replace"}],"image":"nginx","imagePullPolicy":"IfNotPresent","name":"nginx","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File"}],"dnsPolicy":"ClusterFirst","restartPolicy":"Always","schedulerName":"default-scheduler","securityContext":{},"terminationGracePeriodSeconds":30}}}}

PUT 请求在 ApiServer 中的处理与 PATCH 请求类似,都是调用 k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update,创建的 rest.UpdatedObjectInfo 为 rest.DefaultUpdatedObjectInfo(obj, transformers…),注意这里传了 obj 参数值(通过 decode 请求 body 得到),而不是 nil。
在 PUT 请求处理中,创建的 Preconditions 也是零值,不会通过 Preconditions 做检查。但是在 tryUpdate 中,resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj) 得到的 resourceVersion 是请求 body 中的值,而不像 PATCH 请求一样,来自 existing(看下 defaultUpdatedObjectInfo.UpdatedObject 方法就明白了)。
所以在 PUT 请求处理中,会通过 tryUpdate 中的 resourceVersion != version,检测是否发生了并发写冲突。