0、interfaces
// 一个序列化器的名字type Identifier string// 编码一个objtype Encoder interface { // 如果版本不相容或者版本未定义,需要抛出异常 Encode(obj Object, w io.Writer) error // 为了兼容CacheableObject,定义Encode函数需要形式如下: // func (e *MyEncoder) Encode(obj Object, w io.Writer) error { // if co, ok := obj.(CacheableObject); ok { // return co.CacheEncode(e.Identifier(), e.doEncode, w) // } // return e.doEncode(obj, w) // } Identifier() Identifier}// 解码一个objtype Decoder interface { // 解码器将尝试将data解码为固有的类型或者提供的默认类型 // 它返回一个解码后的obj和其gvk信息 // 如果into非nil,则into将会作为目标类型。解码器将选择使用给定的obj而不用重新生成一个 // 当前into对象不保证会被填充。返回的obi不保证匹配into // 如果提供了默认值,则他们将会被应用到数据里面。否则into的类型将会被用于决策使用何种版本的obj Decode(data []byte, defaults *schema.GroupVersionKind, into Object) (Object, *schema.GroupVersionKind, error)}// 核心接口type Serializer interface { Encoder Decoder}type Codec Serializertype Framer interface { NewFrameReader(r io.ReadCloser) io.ReadCloser NewFrameWriter(w io.Writer) io.Writer}// 对一个序列化器的信息描述type SerializerInfo struct { // 类型 -> application/json MediaType string // application in application/json MediaTypeType string // json in application/json MediaTypeSubType string // 是否可以安全的编码为utf8格式 EncodesAsText bool // 具体的序列化器 Serializer Serializer // 可读性较高的序列化器(用于json格式) PrettySerializer Serializer // 流式序列化器 StreamSerializer *StreamSerializerInfo}// 包含多种格式的序列化器,apiserver的handler持有的就是此接口// 实际就是CodecFactorytype NegotiatedSerializer interface { // 支持的所有类型的序列化器 SupportedMediaTypes() []SerializerInfo // 返回一个编码器并保证将对象写入到给定的gv中 EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder // 返回一个解码器并保证将对象写入到给定的gv中(此处的GroupVersioner一般为给定的__internal版本) DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder}// 判断data是否符合自己的格式要求type RecognizingDecoder interface { runtime.Decoder RecognizesData(peek io.Reader) (ok, unknown bool, err error)}// 通常用于rest客户端type ClientNegotiator interface { Encoder(contentType string, params map[string]string) (Encoder, error) Decoder(contentType string, params map[string]string) (Decoder, error) StreamDecoder(contentType string, params map[string]string) (Decoder, Serializer, Framer, error)}// 存储的序列化器,可用于存储restful objtype StorageSerializer interface { SupportedMediaTypes() []SerializerInfo UniversalDeserializer() Decoder EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder}// ParameterCodec defines methods for serializing and deserializing API objects to url.Values and// performing any necessary conversion. Unlike the normal Codec, query parameters are not self describing// and the desired version must be specified.type ParameterCodec interface { // DecodeParameters takes the given url.Values in the specified group version and decodes them // into the provided object, or returns an error. DecodeParameters(parameters url.Values, from schema.GroupVersion, into Object) error // EncodeParameters encodes the provided object as query parameters or returns an error. EncodeParameters(obj Object, to schema.GroupVersion) (url.Values, error)}// StreamSerializerInfo contains information about a specific stream serialization formattype StreamSerializerInfo struct { // EncodesAsText indicates this serializer can be encoded to UTF-8 safely. EncodesAsText bool // Serializer is the top level object serializer for this type when streaming Serializer // Framer is the factory for retrieving streams that separate objects on the wire Framer}// 暂时不清楚这两个接口的作用type NestedObjectEncoder interface { EncodeNestedObjects(e Encoder) error}type NestedObjectDecoder interface { DecodeNestedObjects(d Decoder) error}
1、serializerType
// 每一种序列化器对应一个具体的serializerTypetype serializerType struct { AcceptContentTypes []string // 可接受的类型 ContentType string FileExtensions []string EncodesAsText bool Serializer runtime.Serializer PrettySerializer runtime.Serializer AcceptStreamContentTypes []string StreamContentType string Framer runtime.Framer StreamSerializer runtime.Serializer}// 默认初始化了3中序列化器// json/yaml/protobuf(其中json/yaml其实用的是一个实现struct json.Serializer)func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType { jsonSerializer := json.NewSerializerWithOptions( mf, scheme, scheme, json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict}, ) jsonSerializerType := serializerType{ AcceptContentTypes: []string{runtime.ContentTypeJSON}, ContentType: runtime.ContentTypeJSON, FileExtensions: []string{"json"}, EncodesAsText: true, Serializer: jsonSerializer, Framer: json.Framer, StreamSerializer: jsonSerializer, } if options.Pretty { jsonSerializerType.PrettySerializer = json.NewSerializerWithOptions( mf, scheme, scheme, json.SerializerOptions{Yaml: false, Pretty: true, Strict: options.Strict}, ) } yamlSerializer := json.NewSerializerWithOptions( mf, scheme, scheme, json.SerializerOptions{Yaml: true, Pretty: false, Strict: options.Strict}, ) protoSerializer := protobuf.NewSerializer(scheme, scheme) protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme) serializers := []serializerType{ jsonSerializerType, { AcceptContentTypes: []string{runtime.ContentTypeYAML}, ContentType: runtime.ContentTypeYAML, FileExtensions: []string{"yaml"}, EncodesAsText: true, Serializer: yamlSerializer, }, { AcceptContentTypes: []string{runtime.ContentTypeProtobuf}, ContentType: runtime.ContentTypeProtobuf, FileExtensions: []string{"pb"}, Serializer: protoSerializer, Framer: protobuf.LengthDelimitedFramer, StreamSerializer: protoRawSerializer, }, } for _, fn := range serializerExtensions { if serializer, ok := fn(scheme); ok { serializers = append(serializers, serializer) } } return serializers}
2、RecognizingDecoder
func NewDecoder(decoders ...runtime.Decoder) runtime.Decoder { return &decoder{ decoders: decoders, }}type decoder struct { decoders []runtime.Decoder}var _ RecognizingDecoder = &decoder{}func (d *decoder) RecognizesData(peek io.Reader) (bool, bool, error) { var ( lastErr error anyUnknown bool ) data, _ := bufio.NewReaderSize(peek, 1024).Peek(1024) for _, r := range d.decoders { switch t := r.(type) { case RecognizingDecoder: ok, unknown, err := t.RecognizesData(bytes.NewBuffer(data)) if err != nil { lastErr = err continue } anyUnknown = anyUnknown || unknown if !ok { continue } return true, false, nil } } return false, anyUnknown, lastErr}func (d *decoder) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { var ( lastErr error skipped []runtime.Decoder ) // try recognizers, record any decoders we need to give a chance later for _, r := range d.decoders { switch t := r.(type) { case RecognizingDecoder: buf := bytes.NewBuffer(data) ok, unknown, err := t.RecognizesData(buf) if err != nil { lastErr = err continue } if unknown { skipped = append(skipped, t) continue } if !ok { continue } return r.Decode(data, gvk, into) default: skipped = append(skipped, t) } } for _, r := range skipped { out, actual, err := r.Decode(data, gvk, into) if err != nil { lastErr = err continue } return out, actual, nil } if lastErr == nil { lastErr = fmt.Errorf("no serialization format matched the provided data") } return nil, nil, lastErr}
3、CodecFactory
type CodecFactory struct { // scheme对象 scheme *runtime.Scheme // 所有的序列化器 serializers []serializerType // 普遍的解码器,集成了所有的编码器,挨个匹配看是否数据类型是否能匹配到解码器 // 找到则解码,否则抛出异常 universal runtime.Decoder // 所有ContentType的序列化器 accepts []runtime.SerializerInfo // 核心序列化器,默认为json序列化器 legacySerializer runtime.Serializer}func NewCodecFactory(scheme *runtime.Scheme, mutators ...CodecFactoryOptionsMutator) CodecFactory { options := CodecFactoryOptions{Pretty: true} for _, fn := range mutators { fn(&options) } // 获取所有的序列化器 serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory, options) return newCodecFactory(scheme, serializers)}// 新建一个newCodecFactoryfunc newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory { decoders := make([]runtime.Decoder, 0, len(serializers)) var accepts []runtime.SerializerInfo alreadyAccepted := make(map[string]struct{}) var legacySerializer runtime.Serializer for _, d := range serializers { decoders = append(decoders, d.Serializer) for _, mediaType := range d.AcceptContentTypes { if _, ok := alreadyAccepted[mediaType]; ok { continue } alreadyAccepted[mediaType] = struct{}{} info := runtime.SerializerInfo{ MediaType: d.ContentType, EncodesAsText: d.EncodesAsText, Serializer: d.Serializer, PrettySerializer: d.PrettySerializer, } mediaType, _, err := mime.ParseMediaType(info.MediaType) if err != nil { panic(err) } parts := strings.SplitN(mediaType, "/", 2) info.MediaTypeType = parts[0] info.MediaTypeSubType = parts[1] if d.StreamSerializer != nil { info.StreamSerializer = &runtime.StreamSerializerInfo{ Serializer: d.StreamSerializer, EncodesAsText: d.EncodesAsText, Framer: d.Framer, } } accepts = append(accepts, info) if mediaType == runtime.ContentTypeJSON { legacySerializer = d.Serializer } } } if legacySerializer == nil { legacySerializer = serializers[0].Serializer } return CodecFactory{ scheme: scheme, serializers: serializers, universal: recognizer.NewDecoder(decoders...), accepts: accepts, legacySerializer: legacySerializer, }}// 支持的所有序列化器// apiserver一般会调用此函数获取所有的序列化器并根据request的类型进行匹配查找// 然后再将找到的序列化器作为参数再调用CodecFactory的Encode/Decode函数func (f CodecFactory) SupportedMediaTypes() []runtime.SerializerInfo { return f.accepts}// 此方法编码为指定版本,解码为内部版本,固定为json格式。// 此方法已经废弃。apiserver和client应该先确定一个具体的序列化器,然后调用CodecForVersions方法// 如果只是想读数据,则应该调用UniversalDecoder()方法func (f CodecFactory) LegacyCodec(version ...schema.GroupVersion) runtime.Codec { return versioning.NewDefaultingCodecForScheme(f.scheme, f.legacySerializer, f.universal, schema.GroupVersions(version), runtime.InternalGroupVersioner)}// UniversalDeserializer can convert any stored data recognized by this factory into a Go object that satisfies// runtime.Object. It does not perform conversion. It does not perform defaulting.func (f CodecFactory) UniversalDeserializer() runtime.Decoder { return f.universal}// UniversalDecoder returns a runtime.Decoder capable of decoding all known API objects in all known formats. Used// by clients that do not need to encode objects but want to deserialize API objects stored on disk. Only decodes// objects in groups registered with the scheme. The GroupVersions passed may be used to select alternate// versions of objects to return - by default, runtime.APIVersionInternal is used. If any versions are specified,// unrecognized groups will be returned in the version they are encoded as (no conversion). This decoder performs// defaulting.func (f CodecFactory) UniversalDecoder(versions ...schema.GroupVersion) runtime.Decoder { var versioner runtime.GroupVersioner if len(versions) == 0 { versioner = runtime.InternalGroupVersioner } else { versioner = schema.GroupVersions(versions) } return f.CodecForVersions(nil, f.universal, nil, versioner)}// CodecForVersions creates a codec with the provided serializer. If an object is decoded and its group is not in the list,// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not// converted. If encode or decode are nil, no conversion is performed.func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode runtime.GroupVersioner, decode runtime.GroupVersioner) runtime.Codec { // TODO: these are for backcompat, remove them in the future if encode == nil { encode = runtime.DisabledGroupVersioner } if decode == nil { decode = runtime.InternalGroupVersioner } return versioning.NewDefaultingCodecForScheme(f.scheme, encoder, decoder, encode, decode)}func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder { return f.CodecForVersions(nil, decoder, nil, gv)}func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { return f.CodecForVersions(encoder, nil, gv, nil)}// 不带转换器的编码工厂(返回的为WithoutVersionDecoder结构体)func (f CodecFactory) WithoutConversion() runtime.NegotiatedSerializer { return WithoutConversionCodecFactory{f}}// CodecFactory配置项type CodecFactoryOptions struct { Strict bool // 是否为严格模式 Pretty bool // 是否需要开启可读性系列化器}type CodecFactoryOptionsMutator func(*CodecFactoryOptions)func EnablePretty(options *CodecFactoryOptions) { options.Pretty = true}func DisablePretty(options *CodecFactoryOptions) { options.Pretty = false}func EnableStrict(options *CodecFactoryOptions) { options.Strict = true}func DisableStrict(options *CodecFactoryOptions) { options.Strict = false}
4、VersioningCodec
// encoder/decoder 一般是由外部调用者利用CodecFactory获取给定类型的序列化器// 然后再传入到此// encodeVersion/decodeVersion一般为内部版本(即RequeScope里面的HubGroupVersion)// 若decodeVersion为nil,则默认为runtime.InternalGroupVersionerfunc NewDefaultingCodecForScheme( scheme *runtime.Scheme, encoder runtime.Encoder, decoder runtime.Decoder, encodeVersion runtime.GroupVersioner, decodeVersion runtime.GroupVersioner,) runtime.Codec { return NewCodec(encoder, decoder, runtime.UnsafeObjectConvertor(scheme), scheme, scheme, scheme, encodeVersion, decodeVersion, scheme.Name())}func NewCodec( encoder runtime.Encoder, decoder runtime.Decoder, convertor runtime.ObjectConvertor, creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaulter runtime.ObjectDefaulter, encodeVersion runtime.GroupVersioner, decodeVersion runtime.GroupVersioner, originalSchemeName string,) runtime.Codec { internal := &codec{ encoder: encoder, decoder: decoder, convertor: convertor, creater: creater, typer: typer, defaulter: defaulter, encodeVersion: encodeVersion, decodeVersion: decodeVersion, identifier: identifier(encodeVersion, encoder), originalSchemeName: originalSchemeName, } return internal}type codec struct { encoder runtime.Encoder decoder runtime.Decoder convertor runtime.ObjectConvertor creater runtime.ObjectCreater typer runtime.ObjectTyper defaulter runtime.ObjectDefaulter encodeVersion runtime.GroupVersioner decodeVersion runtime.GroupVersioner identifier runtime.Identifier originalSchemeName string}// Decode尝试解码obj然后尝试转换为内部版本// defaultGVK 为当前请求路径下的gvk(即安装api时候指定的scope里面的kind)// 若是从etcd读取出来的数据,则一般defaultGVK为空// into 为安装api时候指定的scope里面NamedCreater创建的对象(一般为内部版本对象)// into是我们的目标对象,需要将data转换为into objfunc (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { decodeInto := into if into != nil { // 若into为Unstructured obj且gv信息为空 // 将decodeInto实力化一份 if _, ok := into.(runtime.Unstructured); ok && !into.GetObjectKind().GroupVersionKind().GroupVersion().Empty() { decodeInto = reflect.New(reflect.TypeOf(into).Elem()).Interface().(runtime.Object) } } obj, gvk, err := c.decoder.Decode(data, defaultGVK, decodeInto) if err != nil { return nil, gvk, err } // 不清楚作用 if d, ok := obj.(runtime.NestedObjectDecoder); ok { if err := d.DecodeNestedObjects(runtime.WithoutVersionDecoder{Decoder: c.decoder}); err != nil { return nil, gvk, err } } if into != nil { if c.defaulter != nil { c.defaulter.Default(obj) } // Short-circuit conversion if the into object is same object if into == obj { return into, gvk, nil } if err := c.convertor.Convert(obj, into, c.decodeVersion); err != nil { return nil, gvk, err } return into, gvk, nil } if c.defaulter != nil { c.defaulter.Default(obj) } out, err := c.convertor.ConvertToVersion(obj, c.decodeVersion) if err != nil { return nil, gvk, err } return out, gvk, nil}func (c *codec) Encode(obj runtime.Object, w io.Writer) error { if co, ok := obj.(runtime.CacheableObject); ok { return co.CacheEncode(c.Identifier(), c.doEncode, w) } return c.doEncode(obj, w)}func (c *codec) doEncode(obj runtime.Object, w io.Writer) error { switch obj := obj.(type) { case *runtime.Unknown: return c.encoder.Encode(obj, w) case runtime.Unstructured: // An unstructured list can contain objects of multiple group version kinds. don't short-circuit just // because the top-level type matches our desired destination type. actually send the object to the converter // to give it a chance to convert the list items if needed. if _, ok := obj.(*unstructured.UnstructuredList); !ok { // avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl) objGVK := obj.GetObjectKind().GroupVersionKind() if len(objGVK.Version) == 0 { return c.encoder.Encode(obj, w) } targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK}) if !ok { return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion) } if targetGVK == objGVK { return c.encoder.Encode(obj, w) } } } gvks, isUnversioned, err := c.typer.ObjectKinds(obj) if err != nil { return err } objectKind := obj.GetObjectKind() old := objectKind.GroupVersionKind() // restore the old GVK after encoding defer objectKind.SetGroupVersionKind(old) if c.encodeVersion == nil || isUnversioned { if e, ok := obj.(runtime.NestedObjectEncoder); ok { if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Encoder: c.encoder, ObjectTyper: c.typer}); err != nil { return err } } objectKind.SetGroupVersionKind(gvks[0]) return c.encoder.Encode(obj, w) } // Perform a conversion if necessary out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion) if err != nil { return err } if e, ok := out.(runtime.NestedObjectEncoder); ok { if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Version: c.encodeVersion, Encoder: c.encoder, ObjectTyper: c.typer}); err != nil { return err } } // Conversion is responsible for setting the proper group, version, and kind onto the outgoing object return c.encoder.Encode(out, w)}func (c *codec) Identifier() runtime.Identifier { return c.identifier}var identifiersMap sync.Maptype codecIdentifier struct { EncodeGV string `json:"encodeGV,omitempty"` Encoder string `json:"encoder,omitempty"` Name string `json:"name,omitempty"`}func identifier(encodeGV runtime.GroupVersioner, encoder runtime.Encoder) runtime.Identifier { result := codecIdentifier{ Name: "versioning", } if encodeGV != nil { result.EncodeGV = encodeGV.Identifier() } if encoder != nil { result.Encoder = string(encoder.Identifier()) } if id, ok := identifiersMap.Load(result); ok { return id.(runtime.Identifier) } identifier, err := json.Marshal(result) if err != nil { klog.Fatalf("Failed marshaling identifier for codec: %v", err) } identifiersMap.Store(result, runtime.Identifier(identifier)) return runtime.Identifier(identifier)}