跟踪 ApiServer 对 PATCH 请求的处理
ApiServer 处理 PATCH 请求的调用栈如下:
restfulPatchResource// 参数r rest.Patcher // registry.Storescope handlers.RequestScopeadmit admission.InterfacesupportedTypes []string // 支持的资源对象 PATCH 类型// 返回值restful.RouteFunction // go-restful 的路由处理回调handlers.PatchResource// 创建 handlers.patcher 对象// 参数r rest.Patcher // registry.Storescope *RequestScopeadmit admission.InterfacepatchTypes []string // 支持的资源对象 PATCH 类型// 返回值http.HandlerFunck8s.io/apiserver/pkg/endpoints/handlers.(*patcher).patchResource// 参数ctx context.Context // http.Request.Contextscope *RequestScope// 返回值runtime.Object // 更新后的资源对象bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)errork8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update// 参数ctx context.Contextname string // 资源对象的名称,例如:/deployments/default/nginxobjInfo rest.UpdatedObjectInfo // 更新资源对象所需要的信息createValidation rest.ValidateObjectFuncupdateValidation rest.ValidateObjectUpdateFuncforceAllowCreate booloptions *metav1.UpdateOptions// 返回值runtime.Object // 更新后的资源对象bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)errork8s.io/apiserver/pkg/registry/generic/registry.(*DryRunnableStorage).GuaranteedUpdate// 实现对资源对象进行模拟更新的逻辑// 如果 dryRun = true,则执行完后不会再调用持久化操作// 参数ctx context.Contextkey string // 资源对象的名称,例如:/deployments/default/nginxptrToType runtime.Object, // 资源对象的指针类型,由 registry.Store.NewFunc 创建ignoreNotFound bool, // ???preconditions *storage.Preconditions, // 执行更新前的检查,检查 UID 和 ResourceVersiontryUpdate storage.UpdateFunc, // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作dryRun bool, // 是否空跑(即执行输入检查,在内存中尝试更新,但不持久化)suggestion ...runtime.Object// 返回值errork8s.io/apiserver/pkg/storage/cacher.(*Cacher).GuaranteedUpdate// 从内存缓存 cacher.Cacher.watchCache 中查询 key 是否存在,若存在,作为 suggestion 传递下去// 参数ctx context.Contextkey string // 资源对象的名称,例如:/deployments/default/nginxptrToType runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建ignoreNotFound boolpreconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersiontryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作_ ...runtime.Object// 返回值errork8s.io/apiserver/pkg/storage/etcd3.(*store).GuaranteedUpdate// 调用 tryUpdate 更新资源对象,并通过 Etcd 的事务更新操作,持久化到 Etcd// 参数ctx context.Contextkey string // 资源对象的名称,例如:/deployments/default/nginxout runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建ignoreNotFound boolpreconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersiontryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作suggestion ...runtime.Object // 如果在内存缓存中能找到,则可以避免直接访问 Etcd// 返回值error
storage.Preconditions
执行 tryUpdate 前的检查。
// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.type Preconditions struct {// Specifies the target UID.// +optionalUID *types.UID `json:"uid,omitempty"`// Specifies the target ResourceVersion// +optionalResourceVersion *string `json:"resourceVersion,omitempty"`}
storage.Preconditions 只有一个方法:func (p *Preconditions) Check(key string, obj runtime.Object) error,检查 obj 的 UID 和 ResourceVersion 是否与 Preconditions 一致。
在 PATCH 请求处理中,Preconditions 从 rest.UpdatedObjectInfo.Preconditions() 获取。
rest.UpdatedObjectInfo
tryUpdate 闭包中会调用 rest.UpdatedObjectInfo.UpdatedObject() 对资源对象执行更新操作。
// UpdatedObjectInfo provides information about an updated object to an Updater.// It requires access to the old object in order to return the newly updated object.type UpdatedObjectInfo interface {// Returns preconditions built from the updated object, if applicable.// May return nil, or a preconditions object containing nil fields,// if no preconditions can be determined from the updated object.Preconditions() *metav1.Preconditions// UpdatedObject returns the updated object, given a context and old object.// The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).// 根据提交的请求内容,修改objectUpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error)}
在 PATCH 请求处理中,rest.UpdatedObjectInfo 的实现为 rest.defaultUpdatedObjectInfo:
// defaultUpdatedObjectInfo implements UpdatedObjectInfotype defaultUpdatedObjectInfo struct {// obj is the updated objectobj runtime.Object// transformers is an optional list of transforming functions that modify or// replace obj using information from the context, old object, or other sources.transformers []TransformFunc}// DefaultUpdatedObjectInfo returns an UpdatedObjectInfo impl based on the specified object.func DefaultUpdatedObjectInfo(obj runtime.Object, transformers ...TransformFunc) UpdatedObjectInfo {return &defaultUpdatedObjectInfo{obj, transformers}}// Preconditions satisfies the UpdatedObjectInfo interface.// 注意这里只获取了 UIDfunc (i *defaultUpdatedObjectInfo) Preconditions() *metav1.Preconditions {// Attempt to get the UID out of the objectaccessor, err := meta.Accessor(i.obj)if err != nil {// If no UID can be read, no preconditions are possiblereturn nil}// If empty, no preconditions neededuid := accessor.GetUID()if len(uid) == 0 {return nil}return &metav1.Preconditions{UID: &uid}}// UpdatedObject satisfies the UpdatedObjectInfo interface.// It returns a copy of the held obj, passed through any configured transformers.func (i *defaultUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {var err error// Start with the configured objectnewObj := i.obj// If the original is non-nil (might be nil if the first transformer builds the object from the oldObj), make a copy,// so we don't return the original. BeforeUpdate can mutate the returned object, doing things like clearing ResourceVersion.// If we're re-called, we need to be able to return the pristine version.if newObj != nil {newObj = newObj.DeepCopyObject()}// Allow any configured transformers to update the new objectfor _, transformer := range i.transformers {// 例如使用patcher.applyPatch 更新objectnewObj, err = transformer(ctx, newObj, oldObj) // if http.method == patch: patcher.applyPatch, patcher.applyAdmissionif err != nil {return nil, err}}return newObj, nil}
创建时 defaultUpdatedObjectInfo.obj 为 nil,则 defaultUpdatedObjectInfo.Preconditions() 返回 nil,即不会进行 Preconditions 检查。
// patchResource divides PatchResource for easier unit testingfunc (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runtime.Object, bool, error) {......p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)......}
tryUpdate 闭包更新
// Update performs an atomic update and set of the object. Returns the result of the update// or an error. If the registry allows create-on-update, the create flow will be executed.// A bool is returned along with the object and any errors, to indicate object creation.func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {......err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {// Given the existing object, get the new object// 更新资源对应objobj, err := objInfo.UpdatedObject(ctx, existing) // defaultUpdatedObjectInfo.UpdatedObjectif err != nil {return nil, nil, err}// If AllowUnconditionalUpdate() is true and the object specified by// the user does not have a resource version, then we populate it with// the latest version. Else, we check that the version specified by// the user matches the version of latest storage object.resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)if err != nil {return nil, nil, err}doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()version, err := e.Storage.Versioner().ObjectResourceVersion(existing)if err != nil {return nil, nil, err}// version == 0 说明 key 对应的资源对象不存在if version == 0 {if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {return nil, nil, kubeerr.NewNotFound(qualifiedResource, name)}creating = truecreatingObj = objif err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {return nil, nil, err}// at this point we have a fully formed object. It is time to call the validators that the apiserver// handling chain wants to enforce.if createValidation != nil {if err := createValidation(obj.DeepCopyObject()); err != nil {return nil, nil, err}}ttl, err := e.calculateTTL(obj, 0, false)if err != nil {return nil, nil, err}return obj, &ttl, nil}creating = falsecreatingObj = nilif doUnconditionalUpdate {// Update the object's resource version to match the latest// storage object's resource version.// 无条件更新err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)if err != nil {return nil, nil, err}} else {// Check if the object's resource version matches the latest// resource version.if resourceVersion == 0 {// TODO: The Invalid error should have a field for Resource.// After that field is added, we should fill the Resource and// leave the Kind field empty. See the discussion in #18526.qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}return nil, nil, kubeerr.NewInvalid(qualifiedKind, name, fieldErrList)}// 这里是关键,更新前后的资源对象版本不一致,说明出现了并发更新冲突// PATCH 请求中 resourceVersion 始终是等于 version 的!!!// PUT 请求中才可能会出现 resourceVersion != versionif resourceVersion != version {return nil, nil, kubeerr.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))}}if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {return nil, nil, err}// at this point we have a fully formed object. It is time to call the validators that the apiserver// handling chain wants to enforce.if updateValidation != nil {if err := updateValidation(obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {return nil, nil, err}}// Check the default delete-during-update conditions, and store-specific conditions if providedif ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&(e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {deleteObj = objreturn nil, nil, errEmptiedFinalizers}ttl, err := e.calculateTTL(obj, res.TTL, true)if err != nil {return nil, nil, err}if int64(ttl) != res.TTL {return obj, &ttl, nil}return obj, nil, nil}, dryrun.IsDryRun(options.DryRun))......}
持久化到 Etcd
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out)))defer trace.LogIfLong(500 * time.Millisecond)v, err := conversion.EnforcePtr(out) // out 必须是非 nil 值的指针类型if err != nil {panic("unable to convert output object to pointer")}key = path.Join(s.pathPrefix, key)// 从 Etcd 存储中获取 key 对象的状态getCurrentState := func() (*objState, error) {startTime := time.Now()getResp, err := s.client.KV.Get(ctx, key, s.getOps...)metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)if err != nil {return nil, err}return s.getState(getResp, key, v, ignoreNotFound)}var origState *objStatevar mustCheckData bool // = true 说明 origState 有可能不是最新的// 如果上层调用提供了 key 对象的值(从 k8s.io/apiserver/pkg/storage/cacher.Cacher.watchCache.GetByKey(key) 获取)// 则不需要访问 Etcdif len(suggestion) == 1 && suggestion[0] != nil {span.LogFields(log.Object("suggestion[0]", suggestion[0]))origState, err = s.getStateFromObject(suggestion[0])span.LogFields(log.Object("origState", origState))if err != nil {return err}mustCheckData = true} else {origState, err = getCurrentState()if err != nil {return err}}trace.Step("initial value restored")transformContext := authenticatedDataString(key)for {// 检查 origState.obj 的 UID、 ResourceVersion 是否与 preconditions 一致// PATCH 和 PUT 请求都不做检查!!!if err := preconditions.Check(key, origState.obj); err != nil {// If our data is already up to date, return the error// 如果 origState 已经是最新的状态了,则返回错误if !mustCheckData {return err}// 可能 origState 不是最新的状态,从 Etcd 获取最新的状态// It's possible we were working with stale data// Actually fetchorigState, err = getCurrentState()if err != nil {return err}mustCheckData = false// Retrycontinue}// 在 origState.obj 的基础上进行修改.// 比如patch的apply,是在这里执行的。调用tryUpdate。ret, ttl, err := s.updateState(origState, tryUpdate)if err != nil {// origState 可能不是最新的状态,会从 Etcd 中获取最新的状态,再尝试更新一次// If our data is already up to date, return the errorif !mustCheckData {return err}// It's possible we were working with stale data// Actually fetchorigState, err = getCurrentState()if err != nil {return err}mustCheckData = false// Retrycontinue}data, err := runtime.Encode(s.codec, ret)if err != nil {return err}// 目前 origState.stale 始终为 false// 判断修改后的资源对象是否有变化if !origState.stale && bytes.Equal(data, origState.data) {// if we skipped the original Get in this loop, we must refresh from// etcd in order to be sure the data in the store is equivalent to// our desired serialization// mustCheckData = true 说明 origState 有可能不是最新的// 必须再确认一遍if mustCheckData {origState, err = getCurrentState()if err != nil {return err}mustCheckData = falseif !bytes.Equal(data, origState.data) {// original data changed, restart loopcontinue}}// recheck that the data from etcd is not stale before short-circuiting a writeif !origState.stale {return decode(s.codec, s.versioner, origState.data, out, origState.rev)}}// 将对象序列化为二进制数据存储到 EtcdnewData, err := s.transformer.TransformToStorage(data, transformContext)if err != nil {return storage.NewInternalError(err.Error())}// 设置 key 的过期时间opts, err := s.ttlOpts(ctx, int64(ttl))if err != nil {return err}trace.Step("Transaction prepared")span.LogFields(log.Uint64("ttl", ttl), log.Int64("origState.rev", origState.rev))// Etcd 事务// 注意这里会比较 Etcd 中的资源对象版本跟 origState.rev 是否一致// 如果一致,则更新// 否则,更新失败并获取当前最新的资源对象startTime := time.Now()txnResp, err := s.client.KV.Txn(ctx).If(clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),).Then(clientv3.OpPut(key, string(newData), opts...),).Else(clientv3.OpGet(key),).Commit()metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)if err != nil {return err}trace.Step("Transaction committed")if !txnResp.Succeeded {// 事务执行失败getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)// 获取最新的状态,并重试origState, err = s.getState(getResp, key, v, ignoreNotFound)if err != nil {return err}trace.Step("Retry value restored")mustCheckData = falsecontinue}// 事务执行成功putResp := txnResp.Responses[0].GetResponsePut()return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)}}
分析总结
etcd中并发更新是如何确保更新不丢失的?
在将更新后的对象持久化到 Etcd 中时,通过事务保证的,当事务执行失败时会不断重试,事务的伪代码逻辑如下:
// oldObj = FromMemCache(key) or EtcdGet(key)if inEtcd(key).rev == inMemory(oldObj).rev:EtcdSet(key) = newObjtransaction = successelse:EtcdGet(key)transaction = fail
并发更新是如何做冲突检测的?
在上面的分析中,可以看到有两处冲突检测的判断:
- Preconditions
- tryUpdate 中的 resourceVersion != version
对于 kubectl apply 和 edit (发送的都是 PATCH 请求),默认创建的 Preconditions 是零值,所以不会通过 Preconditions 进行冲突检测,而在 tryUpdate 中调用 objInfo.UpdatedObject(ctx, existing) 得到的 newObj.rv 始终是等于 existing.rv 的,所以也不会进行冲突检测。
那什么时候会进行冲突检测?其实 kubectl 还有个 replace 的命令,通过抓包发现 replace 命令发送的是 PUT 请求,并且请求中会带有 resourceVersion:
PUT /apis/extensions/v1beta1/namespaces/default/deployments/nginx HTTP/1.1Host: localhost:8080User-Agent: kubectl/v0.0.0 (linux/amd64) kubernetes/$FormatContent-Length: 866Accept: application/jsonContent-Type: application/jsonUber-Trace-Id: 6e685772cc06fc16:2514dc54a474fe88:4f488c05a7cef9c8:1Accept-Encoding: gzip{"apiVersion":"extensions/v1beta1","kind":"Deployment","metadata":{"labels":{"app":"nginx"},"name":"nginx","namespace":"default","resourceVersion":"744603"},"spec":{"progressDeadlineSeconds":600,"replicas":1,"revisionHistoryLimit":10,"selector":{"matchLabels":{"app":"nginx"}},"strategy":{"rollingUpdate":{"maxSurge":"25%","maxUnavailable":"25%"},"type":"RollingUpdate"},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"nginx"}},"spec":{"containers":[{"env":[{"name":"DEMO_GREETING","value":"Hello from the environment#kubectl replace"}],"image":"nginx","imagePullPolicy":"IfNotPresent","name":"nginx","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File"}],"dnsPolicy":"ClusterFirst","restartPolicy":"Always","schedulerName":"default-scheduler","securityContext":{},"terminationGracePeriodSeconds":30}}}}
PUT 请求在 ApiServer 中的处理与 PATCH 请求类似,都是调用 k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update,创建的 rest.UpdatedObjectInfo 为 rest.DefaultUpdatedObjectInfo(obj, transformers…),注意这里传了 obj 参数值(通过 decode 请求 body 得到),而不是 nil。
在 PUT 请求处理中,创建的 Preconditions 也是零值,不会通过 Preconditions 做检查。但是在 tryUpdate 中,resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj) 得到的 resourceVersion 是请求 body 中的值,而不像 PATCH 请求一样,来自 existing(看下 defaultUpdatedObjectInfo.UpdatedObject 方法就明白了)。
所以在 PUT 请求处理中,会通过 tryUpdate 中的 resourceVersion != version,检测是否发生了并发写冲突。
