0、interfaces
// 一个序列化器的名字
type Identifier string
// 编码一个obj
type Encoder interface {
// 如果版本不相容或者版本未定义,需要抛出异常
Encode(obj Object, w io.Writer) error
// 为了兼容CacheableObject,定义Encode函数需要形式如下:
// func (e *MyEncoder) Encode(obj Object, w io.Writer) error {
// if co, ok := obj.(CacheableObject); ok {
// return co.CacheEncode(e.Identifier(), e.doEncode, w)
// }
// return e.doEncode(obj, w)
// }
Identifier() Identifier
}
// 解码一个obj
type Decoder interface {
// 解码器将尝试将data解码为固有的类型或者提供的默认类型
// 它返回一个解码后的obj和其gvk信息
// 如果into非nil,则into将会作为目标类型。解码器将选择使用给定的obj而不用重新生成一个
// 当前into对象不保证会被填充。返回的obi不保证匹配into
// 如果提供了默认值,则他们将会被应用到数据里面。否则into的类型将会被用于决策使用何种版本的obj
Decode(data []byte, defaults *schema.GroupVersionKind, into Object) (Object, *schema.GroupVersionKind, error)
}
// 核心接口
type Serializer interface {
Encoder
Decoder
}
type Codec Serializer
type Framer interface {
NewFrameReader(r io.ReadCloser) io.ReadCloser
NewFrameWriter(w io.Writer) io.Writer
}
// 对一个序列化器的信息描述
type SerializerInfo struct {
// 类型 -> application/json
MediaType string
// application in application/json
MediaTypeType string
// json in application/json
MediaTypeSubType string
// 是否可以安全的编码为utf8格式
EncodesAsText bool
// 具体的序列化器
Serializer Serializer
// 可读性较高的序列化器(用于json格式)
PrettySerializer Serializer
// 流式序列化器
StreamSerializer *StreamSerializerInfo
}
// 包含多种格式的序列化器,apiserver的handler持有的就是此接口
// 实际就是CodecFactory
type NegotiatedSerializer interface {
// 支持的所有类型的序列化器
SupportedMediaTypes() []SerializerInfo
// 返回一个编码器并保证将对象写入到给定的gv中
EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
// 返回一个解码器并保证将对象写入到给定的gv中(此处的GroupVersioner一般为给定的__internal版本)
DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
}
// 判断data是否符合自己的格式要求
type RecognizingDecoder interface {
runtime.Decoder
RecognizesData(peek io.Reader) (ok, unknown bool, err error)
}
// 通常用于rest客户端
type ClientNegotiator interface {
Encoder(contentType string, params map[string]string) (Encoder, error)
Decoder(contentType string, params map[string]string) (Decoder, error)
StreamDecoder(contentType string, params map[string]string) (Decoder, Serializer, Framer, error)
}
// 存储的序列化器,可用于存储restful obj
type StorageSerializer interface {
SupportedMediaTypes() []SerializerInfo
UniversalDeserializer() Decoder
EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder
}
// ParameterCodec defines methods for serializing and deserializing API objects to url.Values and
// performing any necessary conversion. Unlike the normal Codec, query parameters are not self describing
// and the desired version must be specified.
type ParameterCodec interface {
// DecodeParameters takes the given url.Values in the specified group version and decodes them
// into the provided object, or returns an error.
DecodeParameters(parameters url.Values, from schema.GroupVersion, into Object) error
// EncodeParameters encodes the provided object as query parameters or returns an error.
EncodeParameters(obj Object, to schema.GroupVersion) (url.Values, error)
}
// StreamSerializerInfo contains information about a specific stream serialization format
type StreamSerializerInfo struct {
// EncodesAsText indicates this serializer can be encoded to UTF-8 safely.
EncodesAsText bool
// Serializer is the top level object serializer for this type when streaming
Serializer
// Framer is the factory for retrieving streams that separate objects on the wire
Framer
}
// 暂时不清楚这两个接口的作用
type NestedObjectEncoder interface {
EncodeNestedObjects(e Encoder) error
}
type NestedObjectDecoder interface {
DecodeNestedObjects(d Decoder) error
}
1、serializerType
// 每一种序列化器对应一个具体的serializerType
type serializerType struct {
AcceptContentTypes []string // 可接受的类型
ContentType string
FileExtensions []string
EncodesAsText bool
Serializer runtime.Serializer
PrettySerializer runtime.Serializer
AcceptStreamContentTypes []string
StreamContentType string
Framer runtime.Framer
StreamSerializer runtime.Serializer
}
// 默认初始化了3中序列化器
// json/yaml/protobuf(其中json/yaml其实用的是一个实现struct json.Serializer)
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType {
jsonSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
)
jsonSerializerType := serializerType{
AcceptContentTypes: []string{runtime.ContentTypeJSON},
ContentType: runtime.ContentTypeJSON,
FileExtensions: []string{"json"},
EncodesAsText: true,
Serializer: jsonSerializer,
Framer: json.Framer,
StreamSerializer: jsonSerializer,
}
if options.Pretty {
jsonSerializerType.PrettySerializer = json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: true, Strict: options.Strict},
)
}
yamlSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: true, Pretty: false, Strict: options.Strict},
)
protoSerializer := protobuf.NewSerializer(scheme, scheme)
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)
serializers := []serializerType{
jsonSerializerType,
{
AcceptContentTypes: []string{runtime.ContentTypeYAML},
ContentType: runtime.ContentTypeYAML,
FileExtensions: []string{"yaml"},
EncodesAsText: true,
Serializer: yamlSerializer,
},
{
AcceptContentTypes: []string{runtime.ContentTypeProtobuf},
ContentType: runtime.ContentTypeProtobuf,
FileExtensions: []string{"pb"},
Serializer: protoSerializer,
Framer: protobuf.LengthDelimitedFramer,
StreamSerializer: protoRawSerializer,
},
}
for _, fn := range serializerExtensions {
if serializer, ok := fn(scheme); ok {
serializers = append(serializers, serializer)
}
}
return serializers
}
2、RecognizingDecoder
func NewDecoder(decoders ...runtime.Decoder) runtime.Decoder {
return &decoder{
decoders: decoders,
}
}
type decoder struct {
decoders []runtime.Decoder
}
var _ RecognizingDecoder = &decoder{}
func (d *decoder) RecognizesData(peek io.Reader) (bool, bool, error) {
var (
lastErr error
anyUnknown bool
)
data, _ := bufio.NewReaderSize(peek, 1024).Peek(1024)
for _, r := range d.decoders {
switch t := r.(type) {
case RecognizingDecoder:
ok, unknown, err := t.RecognizesData(bytes.NewBuffer(data))
if err != nil {
lastErr = err
continue
}
anyUnknown = anyUnknown || unknown
if !ok {
continue
}
return true, false, nil
}
}
return false, anyUnknown, lastErr
}
func (d *decoder) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
var (
lastErr error
skipped []runtime.Decoder
)
// try recognizers, record any decoders we need to give a chance later
for _, r := range d.decoders {
switch t := r.(type) {
case RecognizingDecoder:
buf := bytes.NewBuffer(data)
ok, unknown, err := t.RecognizesData(buf)
if err != nil {
lastErr = err
continue
}
if unknown {
skipped = append(skipped, t)
continue
}
if !ok {
continue
}
return r.Decode(data, gvk, into)
default:
skipped = append(skipped, t)
}
}
for _, r := range skipped {
out, actual, err := r.Decode(data, gvk, into)
if err != nil {
lastErr = err
continue
}
return out, actual, nil
}
if lastErr == nil {
lastErr = fmt.Errorf("no serialization format matched the provided data")
}
return nil, nil, lastErr
}
3、CodecFactory
type CodecFactory struct {
// scheme对象
scheme *runtime.Scheme
// 所有的序列化器
serializers []serializerType
// 普遍的解码器,集成了所有的编码器,挨个匹配看是否数据类型是否能匹配到解码器
// 找到则解码,否则抛出异常
universal runtime.Decoder
// 所有ContentType的序列化器
accepts []runtime.SerializerInfo
// 核心序列化器,默认为json序列化器
legacySerializer runtime.Serializer
}
func NewCodecFactory(scheme *runtime.Scheme, mutators ...CodecFactoryOptionsMutator) CodecFactory {
options := CodecFactoryOptions{Pretty: true}
for _, fn := range mutators {
fn(&options)
}
// 获取所有的序列化器
serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory, options)
return newCodecFactory(scheme, serializers)
}
// 新建一个newCodecFactory
func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory {
decoders := make([]runtime.Decoder, 0, len(serializers))
var accepts []runtime.SerializerInfo
alreadyAccepted := make(map[string]struct{})
var legacySerializer runtime.Serializer
for _, d := range serializers {
decoders = append(decoders, d.Serializer)
for _, mediaType := range d.AcceptContentTypes {
if _, ok := alreadyAccepted[mediaType]; ok {
continue
}
alreadyAccepted[mediaType] = struct{}{}
info := runtime.SerializerInfo{
MediaType: d.ContentType,
EncodesAsText: d.EncodesAsText,
Serializer: d.Serializer,
PrettySerializer: d.PrettySerializer,
}
mediaType, _, err := mime.ParseMediaType(info.MediaType)
if err != nil {
panic(err)
}
parts := strings.SplitN(mediaType, "/", 2)
info.MediaTypeType = parts[0]
info.MediaTypeSubType = parts[1]
if d.StreamSerializer != nil {
info.StreamSerializer = &runtime.StreamSerializerInfo{
Serializer: d.StreamSerializer,
EncodesAsText: d.EncodesAsText,
Framer: d.Framer,
}
}
accepts = append(accepts, info)
if mediaType == runtime.ContentTypeJSON {
legacySerializer = d.Serializer
}
}
}
if legacySerializer == nil {
legacySerializer = serializers[0].Serializer
}
return CodecFactory{
scheme: scheme,
serializers: serializers,
universal: recognizer.NewDecoder(decoders...),
accepts: accepts,
legacySerializer: legacySerializer,
}
}
// 支持的所有序列化器
// apiserver一般会调用此函数获取所有的序列化器并根据request的类型进行匹配查找
// 然后再将找到的序列化器作为参数再调用CodecFactory的Encode/Decode函数
func (f CodecFactory) SupportedMediaTypes() []runtime.SerializerInfo {
return f.accepts
}
// 此方法编码为指定版本,解码为内部版本,固定为json格式。
// 此方法已经废弃。apiserver和client应该先确定一个具体的序列化器,然后调用CodecForVersions方法
// 如果只是想读数据,则应该调用UniversalDecoder()方法
func (f CodecFactory) LegacyCodec(version ...schema.GroupVersion) runtime.Codec {
return versioning.NewDefaultingCodecForScheme(f.scheme, f.legacySerializer, f.universal, schema.GroupVersions(version), runtime.InternalGroupVersioner)
}
// UniversalDeserializer can convert any stored data recognized by this factory into a Go object that satisfies
// runtime.Object. It does not perform conversion. It does not perform defaulting.
func (f CodecFactory) UniversalDeserializer() runtime.Decoder {
return f.universal
}
// UniversalDecoder returns a runtime.Decoder capable of decoding all known API objects in all known formats. Used
// by clients that do not need to encode objects but want to deserialize API objects stored on disk. Only decodes
// objects in groups registered with the scheme. The GroupVersions passed may be used to select alternate
// versions of objects to return - by default, runtime.APIVersionInternal is used. If any versions are specified,
// unrecognized groups will be returned in the version they are encoded as (no conversion). This decoder performs
// defaulting.
func (f CodecFactory) UniversalDecoder(versions ...schema.GroupVersion) runtime.Decoder {
var versioner runtime.GroupVersioner
if len(versions) == 0 {
versioner = runtime.InternalGroupVersioner
} else {
versioner = schema.GroupVersions(versions)
}
return f.CodecForVersions(nil, f.universal, nil, versioner)
}
// CodecForVersions creates a codec with the provided serializer. If an object is decoded and its group is not in the list,
// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not
// converted. If encode or decode are nil, no conversion is performed.
func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode runtime.GroupVersioner, decode runtime.GroupVersioner) runtime.Codec {
// TODO: these are for backcompat, remove them in the future
if encode == nil {
encode = runtime.DisabledGroupVersioner
}
if decode == nil {
decode = runtime.InternalGroupVersioner
}
return versioning.NewDefaultingCodecForScheme(f.scheme, encoder, decoder, encode, decode)
}
func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return f.CodecForVersions(nil, decoder, nil, gv)
}
func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return f.CodecForVersions(encoder, nil, gv, nil)
}
// 不带转换器的编码工厂(返回的为WithoutVersionDecoder结构体)
func (f CodecFactory) WithoutConversion() runtime.NegotiatedSerializer {
return WithoutConversionCodecFactory{f}
}
// CodecFactory配置项
type CodecFactoryOptions struct {
Strict bool // 是否为严格模式
Pretty bool // 是否需要开启可读性系列化器
}
type CodecFactoryOptionsMutator func(*CodecFactoryOptions)
func EnablePretty(options *CodecFactoryOptions) {
options.Pretty = true
}
func DisablePretty(options *CodecFactoryOptions) {
options.Pretty = false
}
func EnableStrict(options *CodecFactoryOptions) {
options.Strict = true
}
func DisableStrict(options *CodecFactoryOptions) {
options.Strict = false
}
4、VersioningCodec
// encoder/decoder 一般是由外部调用者利用CodecFactory获取给定类型的序列化器
// 然后再传入到此
// encodeVersion/decodeVersion一般为内部版本(即RequeScope里面的HubGroupVersion)
// 若decodeVersion为nil,则默认为runtime.InternalGroupVersioner
func NewDefaultingCodecForScheme(
scheme *runtime.Scheme,
encoder runtime.Encoder,
decoder runtime.Decoder,
encodeVersion runtime.GroupVersioner,
decodeVersion runtime.GroupVersioner,
) runtime.Codec {
return NewCodec(encoder, decoder, runtime.UnsafeObjectConvertor(scheme), scheme, scheme, scheme, encodeVersion, decodeVersion, scheme.Name())
}
func NewCodec(
encoder runtime.Encoder,
decoder runtime.Decoder,
convertor runtime.ObjectConvertor,
creater runtime.ObjectCreater,
typer runtime.ObjectTyper,
defaulter runtime.ObjectDefaulter,
encodeVersion runtime.GroupVersioner,
decodeVersion runtime.GroupVersioner,
originalSchemeName string,
) runtime.Codec {
internal := &codec{
encoder: encoder,
decoder: decoder,
convertor: convertor,
creater: creater,
typer: typer,
defaulter: defaulter,
encodeVersion: encodeVersion,
decodeVersion: decodeVersion,
identifier: identifier(encodeVersion, encoder),
originalSchemeName: originalSchemeName,
}
return internal
}
type codec struct {
encoder runtime.Encoder
decoder runtime.Decoder
convertor runtime.ObjectConvertor
creater runtime.ObjectCreater
typer runtime.ObjectTyper
defaulter runtime.ObjectDefaulter
encodeVersion runtime.GroupVersioner
decodeVersion runtime.GroupVersioner
identifier runtime.Identifier
originalSchemeName string
}
// Decode尝试解码obj然后尝试转换为内部版本
// defaultGVK 为当前请求路径下的gvk(即安装api时候指定的scope里面的kind)
// 若是从etcd读取出来的数据,则一般defaultGVK为空
// into 为安装api时候指定的scope里面NamedCreater创建的对象(一般为内部版本对象)
// into是我们的目标对象,需要将data转换为into obj
func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
decodeInto := into
if into != nil {
// 若into为Unstructured obj且gv信息为空
// 将decodeInto实力化一份
if _, ok := into.(runtime.Unstructured); ok && !into.GetObjectKind().GroupVersionKind().GroupVersion().Empty() {
decodeInto = reflect.New(reflect.TypeOf(into).Elem()).Interface().(runtime.Object)
}
}
obj, gvk, err := c.decoder.Decode(data, defaultGVK, decodeInto)
if err != nil {
return nil, gvk, err
}
// 不清楚作用
if d, ok := obj.(runtime.NestedObjectDecoder); ok {
if err := d.DecodeNestedObjects(runtime.WithoutVersionDecoder{Decoder: c.decoder}); err != nil {
return nil, gvk, err
}
}
if into != nil {
if c.defaulter != nil {
c.defaulter.Default(obj)
}
// Short-circuit conversion if the into object is same object
if into == obj {
return into, gvk, nil
}
if err := c.convertor.Convert(obj, into, c.decodeVersion); err != nil {
return nil, gvk, err
}
return into, gvk, nil
}
if c.defaulter != nil {
c.defaulter.Default(obj)
}
out, err := c.convertor.ConvertToVersion(obj, c.decodeVersion)
if err != nil {
return nil, gvk, err
}
return out, gvk, nil
}
func (c *codec) Encode(obj runtime.Object, w io.Writer) error {
if co, ok := obj.(runtime.CacheableObject); ok {
return co.CacheEncode(c.Identifier(), c.doEncode, w)
}
return c.doEncode(obj, w)
}
func (c *codec) doEncode(obj runtime.Object, w io.Writer) error {
switch obj := obj.(type) {
case *runtime.Unknown:
return c.encoder.Encode(obj, w)
case runtime.Unstructured:
// An unstructured list can contain objects of multiple group version kinds. don't short-circuit just
// because the top-level type matches our desired destination type. actually send the object to the converter
// to give it a chance to convert the list items if needed.
if _, ok := obj.(*unstructured.UnstructuredList); !ok {
// avoid conversion roundtrip if GVK is the right one already or is empty (yes, this is a hack, but the old behaviour we rely on in kubectl)
objGVK := obj.GetObjectKind().GroupVersionKind()
if len(objGVK.Version) == 0 {
return c.encoder.Encode(obj, w)
}
targetGVK, ok := c.encodeVersion.KindForGroupVersionKinds([]schema.GroupVersionKind{objGVK})
if !ok {
return runtime.NewNotRegisteredGVKErrForTarget(c.originalSchemeName, objGVK, c.encodeVersion)
}
if targetGVK == objGVK {
return c.encoder.Encode(obj, w)
}
}
}
gvks, isUnversioned, err := c.typer.ObjectKinds(obj)
if err != nil {
return err
}
objectKind := obj.GetObjectKind()
old := objectKind.GroupVersionKind()
// restore the old GVK after encoding
defer objectKind.SetGroupVersionKind(old)
if c.encodeVersion == nil || isUnversioned {
if e, ok := obj.(runtime.NestedObjectEncoder); ok {
if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
return err
}
}
objectKind.SetGroupVersionKind(gvks[0])
return c.encoder.Encode(obj, w)
}
// Perform a conversion if necessary
out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion)
if err != nil {
return err
}
if e, ok := out.(runtime.NestedObjectEncoder); ok {
if err := e.EncodeNestedObjects(runtime.WithVersionEncoder{Version: c.encodeVersion, Encoder: c.encoder, ObjectTyper: c.typer}); err != nil {
return err
}
}
// Conversion is responsible for setting the proper group, version, and kind onto the outgoing object
return c.encoder.Encode(out, w)
}
func (c *codec) Identifier() runtime.Identifier {
return c.identifier
}
var identifiersMap sync.Map
type codecIdentifier struct {
EncodeGV string `json:"encodeGV,omitempty"`
Encoder string `json:"encoder,omitempty"`
Name string `json:"name,omitempty"`
}
func identifier(encodeGV runtime.GroupVersioner, encoder runtime.Encoder) runtime.Identifier {
result := codecIdentifier{
Name: "versioning",
}
if encodeGV != nil {
result.EncodeGV = encodeGV.Identifier()
}
if encoder != nil {
result.Encoder = string(encoder.Identifier())
}
if id, ok := identifiersMap.Load(result); ok {
return id.(runtime.Identifier)
}
identifier, err := json.Marshal(result)
if err != nil {
klog.Fatalf("Failed marshaling identifier for codec: %v", err)
}
identifiersMap.Store(result, runtime.Identifier(identifier))
return runtime.Identifier(identifier)
}