- 0、架构
- 1、配置解析/启动
- 2、REST
- 2.1、apiserver提供的api
- 2.2、请求流程
- 2.3、api多版本
- 2.4、聚合/CRD
- 2.5、创建HPA全流程debug例子
- 3、搭建debug环境(mac)
0、架构
如下图所示。k8s作为集群的入口,是唯一能够和etcd打交道的模块。其作用主要如下:
1、作为Kubernetes API的服务端,为集群内的节点以及kubectl工具提供API服务
2、与ETCD打交道,实现对k8s内部obj的增删改查
3、保证在分布式存储系统(etcd)中的Kubernetes API对象的状态一致**
1、配置解析/启动
1.1、option(会删除部分代码,不影响理解)
type ServerRunOptions struct {GenericServerRunOptions *genericoptions.ServerRunOptionsEtcd *genericoptions.EtcdOptionsSecureServing *genericoptions.SecureServingOptionsWithLoopbackInsecureServing *genericoptions.DeprecatedInsecureServingOptionsWithLoopbackAudit *genericoptions.AuditOptionsFeatures *genericoptions.FeatureOptionsAdmission *kubeoptions.AdmissionOptionsAuthentication *kubeoptions.BuiltInAuthenticationOptionsAuthorization *kubeoptions.BuiltInAuthorizationOptionsCloudProvider *kubeoptions.CloudProviderOptionsAPIEnablement *genericoptions.APIEnablementOptionsEgressSelector *genericoptions.EgressSelectorOptionsAllowPrivileged boolEnableLogsHandler boolEventTTL time.DurationKubeletConfig kubeletclient.KubeletClientConfigKubernetesServiceNodePort intMaxConnectionBytesPerSec int64ServiceClusterIPRanges stringPrimaryServiceClusterIPRange net.IPNetSecondaryServiceClusterIPRange net.IPNetServiceNodePortRange utilnet.PortRangeSSHKeyfile stringSSHUser stringProxyClientCertFile stringProxyClientKeyFile stringEnableAggregatorRouting boolMasterCount intEndpointReconcilerType stringServiceAccountSigningKeyFile stringServiceAccountIssuer serviceaccount.TokenGeneratorServiceAccountTokenMaxExpiration time.DurationShowHiddenMetricsForVersion string}
1.1.1、GenericServerRunOptions
// 运行generic server的optionstype ServerRunOptions struct {AdvertiseAddress net.IPCorsAllowedOriginList []stringExternalHost stringMaxRequestsInFlight intMaxMutatingRequestsInFlight intRequestTimeout time.DurationGoawayChance float64LivezGracePeriod time.DurationMinRequestTimeout intShutdownDelayDuration time.Duration// We intentionally did not add a flag for this option. Users of the// apiserver library can wire it to a flag.JSONPatchMaxCopyBytes int64// The limit on the request body size that would be accepted and// decoded in a write request. 0 means no limit.// We intentionally did not add a flag for this option. Users of the// apiserver library can wire it to a flag.MaxRequestBodyBytes int64TargetRAMMB intEnablePriorityAndFairness bool}// 默认值func NewServerRunOptions() *ServerRunOptions {defaults := server.NewConfig(serializer.CodecFactory{})return &ServerRunOptions{MaxRequestsInFlight: defaults.MaxRequestsInFlight,MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,RequestTimeout: defaults.RequestTimeout,LivezGracePeriod: defaults.LivezGracePeriod,MinRequestTimeout: defaults.MinRequestTimeout,ShutdownDelayDuration: defaults.ShutdownDelayDuration,JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,EnablePriorityAndFairness: true,}}// 此处为NewConfig函数func NewConfig(codecs serializer.CodecFactory) *Config {defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}return &Config{Serializer: codecs,BuildHandlerChainFunc: DefaultBuildHandlerChain,HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),DisabledPostStartHooks: sets.NewString(),PostStartHooks: map[string]PostStartHookConfigEntry{},HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),EnableIndex: true,EnableDiscovery: true,EnableProfiling: true,EnableMetrics: true,MaxRequestsInFlight: 400,MaxMutatingRequestsInFlight: 200,RequestTimeout: time.Duration(60) * time.Second,MinRequestTimeout: 1800,LivezGracePeriod: time.Duration(0),ShutdownDelayDuration: time.Duration(0),// 1.5MB is the default client request size in bytes// the etcd server should accept. See// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.// A request body might be encoded in json, and is converted to// proto when persisted in etcd, so we allow 2x as the largest size// increase the "copy" operations in a json patch may cause.JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),// 1.5MB is the recommended client request size in byte// the etcd server should accept. See// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.// A request body might be encoded in json, and is converted to// proto when persisted in etcd, so we allow 2x as the largest request// body size to be accepted and decoded in a write request.MaxRequestBodyBytes: int64(3 * 1024 * 1024),// Default to treating watch as a long-running operation// Generic API servers have no inherent long-running subresourcesLongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),}}
1.1.2、Etcd
// 后端存储类型var storageTypes = sets.NewString(storagebackend.StorageTypeETCD3,)// etcd配置文件type EtcdOptions struct {// The value of Paging on StorageConfig will be overridden by the// calculated feature gate value.StorageConfig storagebackend.ConfigEncryptionProviderConfigFilepath stringEtcdServersOverrides []string// To enable protobuf as storage format, it is enough// to set it to "application/vnd.kubernetes.protobuf".DefaultStorageMediaType stringDeleteCollectionWorkers intEnableGarbageCollection bool// Set EnableWatchCache to false to disable all watch cachesEnableWatchCache bool// Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size setDefaultWatchCacheSize int// WatchCacheSizes represents override to a given resourceWatchCacheSizes []string}func (s *EtcdOptions) ApplyTo(c *server.Config) error {if s == nil {return nil}if err := s.addEtcdHealthEndpoint(c); err != nil {return err}c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}return nil}func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {if err := s.addEtcdHealthEndpoint(c); err != nil {return err}c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}return nil}func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)if err != nil {return err}c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error {return healthCheck()}))if s.EncryptionProviderConfigFilepath != "" {kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath)if err != nil {return err}c.AddHealthChecks(kmsPluginHealthzChecks...)}return nil}type SimpleRestOptionsFactory struct {Options EtcdOptions}func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {ret := generic.RESTOptions{StorageConfig: &f.Options.StorageConfig,Decorator: generic.UndecoratedStorage,EnableGarbageCollection: f.Options.EnableGarbageCollection,DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,ResourcePrefix: resource.Group + "/" + resource.Resource,CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,}if f.Options.EnableWatchCache {sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)if err != nil {return generic.RESTOptions{}, err}cacheSize, ok := sizes[resource]if !ok {cacheSize = f.Options.DefaultWatchCacheSize}// depending on cache size this might return an undecorated storageret.Decorator = genericregistry.StorageWithCacher(cacheSize)}return ret, nil}type StorageFactoryRestOptionsFactory struct {Options EtcdOptionsStorageFactory serverstorage.StorageFactory}func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {storageConfig, err := f.StorageFactory.NewConfig(resource)if err != nil {return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())}ret := generic.RESTOptions{StorageConfig: storageConfig,Decorator: generic.UndecoratedStorage,DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,EnableGarbageCollection: f.Options.EnableGarbageCollection,ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,}if f.Options.EnableWatchCache {sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)if err != nil {return generic.RESTOptions{}, err}cacheSize, ok := sizes[resource]if !ok {cacheSize = f.Options.DefaultWatchCacheSize}// depending on cache size this might return an undecorated storageret.Decorator = genericregistry.StorageWithCacher(cacheSize)}return ret, nil}// ParseWatchCacheSizes turns a list of cache size values into a map of group resources// to requested sizes.func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) {watchCacheSizes := make(map[schema.GroupResource]int)for _, c := range cacheSizes {tokens := strings.Split(c, "#")if len(tokens) != 2 {return nil, fmt.Errorf("invalid value of watch cache size: %s", c)}size, err := strconv.Atoi(tokens[1])if err != nil {return nil, fmt.Errorf("invalid size of watch cache size: %s", c)}if size < 0 {return nil, fmt.Errorf("watch cache size cannot be negative: %s", c)}watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size}return watchCacheSizes, nil}// WriteWatchCacheSizes turns a map of cache size values into a list of string specifications.func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) {var cacheSizes []stringfor resource, size := range watchCacheSizes {if size < 0 {return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource)}cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size))}return cacheSizes, nil}
1.2、config
1.3、启动流程
2、REST
2.1、apiserver提供的api
可以看到apiserver提供的api主要分为restful api和非restful api。
restful api主要是对apiserver核心obj的增删改查,非restful api主要是健康检查、监控等api。
其中:
/api/开头的api控制apiserver核心对象,比如pod、node等
/apis/开头的api,增加了group概念,并通过group/kind/version确定唯一一个obj
2.2、请求流程
2.1、经过内置filter的过滤(也即认证、授权、准入),所有filter如下
2.2、解析namespace/name
2.3、获取此url的gvk信息,基于请求的content获取对应格式的序列化器,同时基于HubGV获取对应的解码器。同时解码请求体数据
PS:
1、HubGV一般为内部版本,即group/__internal
2、为了支持多版本,apiserver会讲请求对象解码为实际obj,再转换为内部版本,最后存储为存储版本
2.4、将obj的内部版本对象转换为存储版本对象(一般为v1)并存储etcd。利用etcd的事务保证创建幂等
2.3、api多版本
2.4、聚合/CRD
参见学习笔记
k8s源码解析/apiserver/apiextensions-apiserver
k8s源码解析/apiserver/kube-aggregator
2.5、创建HPA全流程debug例子
2.5.0、说明
1、本次debug**通过api而非kubectl
2、debug关闭了部分filter
3、主要针对版本转换代码
4、trace/log可直接跳过
**
2.5.1、在如下文件处打断点,并用debug模式启动apiserver
kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
2.5.2、利用v2beta1 type创建json文件
{"apiVersion": "autoscaling/v2beta1","kind": "HorizontalPodAutoscaler","metadata": {"name": "test"},"spec": {"scaleTargetRef": {"apiVersion": "extensions/v2beta1","kind": "Deployment","name": "test"},"minReplicas": 2,"maxReplicas": 10,"metrics": [{"type": "Resource","resource": {"name": "cpu","targetAverageUtilization": 60}},{"type": "Resource","resource": {"name": "mem","targetAverageUtilization": 50}}]}}
2.5.3、调用api创建hpa
2.5.4、解析ns/name
2.5.5、获取相应的gv信息和序列化器

PS:全局序列化器为kubernetes/pkg/api/legacyscheme/scheme.go(s.Serializer即Codecs)
2.5.6、解析option
option是通过query参数传递进来的,由于本案例没有传如有效option,因此此段代码跳过
2.5.7、准备解码参数
PS:
r.New()代码位于下图,defaultGVK就是此路径的gvk信息
2.5.8、开始解码并转换
2.5.8.1、先将请求体转换为原本的obj(通常情况下此处会转换为defaultGVK对应的obj)
2.5.8.2、转换为内部版本
2.5.8.2、转换函数(通常对于内置对象,apiserver注册了默认的转换函数),v2beta1 -> internal的转换函数为

函数实现为如下,就是将v2beta1的值赋值到内部版本(内部版本包含了所有的字段)

2.5.9、准备存储

可以看到再进入存储之前obj已经为内部版本(通过其路径)。
gvk之所以为空是因为apiserver认为内存版本(即内部版本gvk信息多余,因为已经知道是哪个具体的结构体,因此不需要在存储gvk信息,参见:https://github.com/kubernetes/kubernetes/issues/3030)
2.5.10、开始存储

可以看到在真正的encode之前会先将obj转换为encodeVersion对应版本的obj
可以看到转换之前obj为内部版本,转换后的out为v1版本(存储版本),然后在进行encode,encodeVersion其实就是对应的v1版本,如下图
2.5.11、存储到etcd
如下图,当encode结束后,对byte切片进行一些额外的处理,利用etcd的事务进行幂等创建
2.5.12、创建成功返回
可以看到返回之前需要进行解码和填充版本号的处理
实现逻辑如下:
可以看到会将data再次转换为内部版本并设置版本号
2.5.13、重新转换为v2beta1
可以看到,再最后返回给用户之前,重新获取目标版本,也就是路径版本,并进行转换
可以看到最终的编码器版本为v2beta1

至此整个流程结束
2.5.14、总结
1、期间obj版本经历了v2beta1->internal->v1->internal->v2beta1
2、我们知道v1和v2beta1的字段是不一样的,比如v2beta1有metric而v1没有。如下图1/2。则当转换为v1的时候是不是属性就丢了呢。其实不会。当内部版本转换为v1版本的时候,对于v1版本没有的字段会序列化之后放入到注解里面。当内部版本转换到v2beta1的时候,会反序列化并填充到对应的结构体里面。参见图3






