0、架构

如下图所示。k8s作为集群的入口,是唯一能够和etcd打交道的模块。其作用主要如下:
1、作为Kubernetes API的服务端,为集群内的节点以及kubectl工具提供API服务
2、与ETCD打交道,实现对k8s内部obj的增删改查
3、保证在分布式存储系统(etcd)中的Kubernetes API对象的状态一致**
image.png

1、配置解析/启动

1.1、option(会删除部分代码,不影响理解)

  1. type ServerRunOptions struct {
  2. GenericServerRunOptions *genericoptions.ServerRunOptions
  3. Etcd *genericoptions.EtcdOptions
  4. SecureServing *genericoptions.SecureServingOptionsWithLoopback
  5. InsecureServing *genericoptions.DeprecatedInsecureServingOptionsWithLoopback
  6. Audit *genericoptions.AuditOptions
  7. Features *genericoptions.FeatureOptions
  8. Admission *kubeoptions.AdmissionOptions
  9. Authentication *kubeoptions.BuiltInAuthenticationOptions
  10. Authorization *kubeoptions.BuiltInAuthorizationOptions
  11. CloudProvider *kubeoptions.CloudProviderOptions
  12. APIEnablement *genericoptions.APIEnablementOptions
  13. EgressSelector *genericoptions.EgressSelectorOptions
  14. AllowPrivileged bool
  15. EnableLogsHandler bool
  16. EventTTL time.Duration
  17. KubeletConfig kubeletclient.KubeletClientConfig
  18. KubernetesServiceNodePort int
  19. MaxConnectionBytesPerSec int64
  20. ServiceClusterIPRanges string
  21. PrimaryServiceClusterIPRange net.IPNet
  22. SecondaryServiceClusterIPRange net.IPNet
  23. ServiceNodePortRange utilnet.PortRange
  24. SSHKeyfile string
  25. SSHUser string
  26. ProxyClientCertFile string
  27. ProxyClientKeyFile string
  28. EnableAggregatorRouting bool
  29. MasterCount int
  30. EndpointReconcilerType string
  31. ServiceAccountSigningKeyFile string
  32. ServiceAccountIssuer serviceaccount.TokenGenerator
  33. ServiceAccountTokenMaxExpiration time.Duration
  34. ShowHiddenMetricsForVersion string
  35. }

1.1.1、GenericServerRunOptions

  1. // 运行generic server的options
  2. type ServerRunOptions struct {
  3. AdvertiseAddress net.IP
  4. CorsAllowedOriginList []string
  5. ExternalHost string
  6. MaxRequestsInFlight int
  7. MaxMutatingRequestsInFlight int
  8. RequestTimeout time.Duration
  9. GoawayChance float64
  10. LivezGracePeriod time.Duration
  11. MinRequestTimeout int
  12. ShutdownDelayDuration time.Duration
  13. // We intentionally did not add a flag for this option. Users of the
  14. // apiserver library can wire it to a flag.
  15. JSONPatchMaxCopyBytes int64
  16. // The limit on the request body size that would be accepted and
  17. // decoded in a write request. 0 means no limit.
  18. // We intentionally did not add a flag for this option. Users of the
  19. // apiserver library can wire it to a flag.
  20. MaxRequestBodyBytes int64
  21. TargetRAMMB int
  22. EnablePriorityAndFairness bool
  23. }
  24. // 默认值
  25. func NewServerRunOptions() *ServerRunOptions {
  26. defaults := server.NewConfig(serializer.CodecFactory{})
  27. return &ServerRunOptions{
  28. MaxRequestsInFlight: defaults.MaxRequestsInFlight,
  29. MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
  30. RequestTimeout: defaults.RequestTimeout,
  31. LivezGracePeriod: defaults.LivezGracePeriod,
  32. MinRequestTimeout: defaults.MinRequestTimeout,
  33. ShutdownDelayDuration: defaults.ShutdownDelayDuration,
  34. JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
  35. MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
  36. EnablePriorityAndFairness: true,
  37. }
  38. }
  39. // 此处为NewConfig函数
  40. func NewConfig(codecs serializer.CodecFactory) *Config {
  41. defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
  42. return &Config{
  43. Serializer: codecs,
  44. BuildHandlerChainFunc: DefaultBuildHandlerChain,
  45. HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
  46. LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
  47. DisabledPostStartHooks: sets.NewString(),
  48. PostStartHooks: map[string]PostStartHookConfigEntry{},
  49. HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
  50. ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
  51. LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
  52. EnableIndex: true,
  53. EnableDiscovery: true,
  54. EnableProfiling: true,
  55. EnableMetrics: true,
  56. MaxRequestsInFlight: 400,
  57. MaxMutatingRequestsInFlight: 200,
  58. RequestTimeout: time.Duration(60) * time.Second,
  59. MinRequestTimeout: 1800,
  60. LivezGracePeriod: time.Duration(0),
  61. ShutdownDelayDuration: time.Duration(0),
  62. // 1.5MB is the default client request size in bytes
  63. // the etcd server should accept. See
  64. // https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
  65. // A request body might be encoded in json, and is converted to
  66. // proto when persisted in etcd, so we allow 2x as the largest size
  67. // increase the "copy" operations in a json patch may cause.
  68. JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),
  69. // 1.5MB is the recommended client request size in byte
  70. // the etcd server should accept. See
  71. // https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
  72. // A request body might be encoded in json, and is converted to
  73. // proto when persisted in etcd, so we allow 2x as the largest request
  74. // body size to be accepted and decoded in a write request.
  75. MaxRequestBodyBytes: int64(3 * 1024 * 1024),
  76. // Default to treating watch as a long-running operation
  77. // Generic API servers have no inherent long-running subresources
  78. LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
  79. }
  80. }

1.1.2、Etcd

  1. // 后端存储类型
  2. var storageTypes = sets.NewString(
  3. storagebackend.StorageTypeETCD3,
  4. )
  5. // etcd配置文件
  6. type EtcdOptions struct {
  7. // The value of Paging on StorageConfig will be overridden by the
  8. // calculated feature gate value.
  9. StorageConfig storagebackend.Config
  10. EncryptionProviderConfigFilepath string
  11. EtcdServersOverrides []string
  12. // To enable protobuf as storage format, it is enough
  13. // to set it to "application/vnd.kubernetes.protobuf".
  14. DefaultStorageMediaType string
  15. DeleteCollectionWorkers int
  16. EnableGarbageCollection bool
  17. // Set EnableWatchCache to false to disable all watch caches
  18. EnableWatchCache bool
  19. // Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set
  20. DefaultWatchCacheSize int
  21. // WatchCacheSizes represents override to a given resource
  22. WatchCacheSizes []string
  23. }
  24. func (s *EtcdOptions) ApplyTo(c *server.Config) error {
  25. if s == nil {
  26. return nil
  27. }
  28. if err := s.addEtcdHealthEndpoint(c); err != nil {
  29. return err
  30. }
  31. c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
  32. return nil
  33. }
  34. func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
  35. if err := s.addEtcdHealthEndpoint(c); err != nil {
  36. return err
  37. }
  38. c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
  39. return nil
  40. }
  41. func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
  42. healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
  43. if err != nil {
  44. return err
  45. }
  46. c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error {
  47. return healthCheck()
  48. }))
  49. if s.EncryptionProviderConfigFilepath != "" {
  50. kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath)
  51. if err != nil {
  52. return err
  53. }
  54. c.AddHealthChecks(kmsPluginHealthzChecks...)
  55. }
  56. return nil
  57. }
  58. type SimpleRestOptionsFactory struct {
  59. Options EtcdOptions
  60. }
  61. func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
  62. ret := generic.RESTOptions{
  63. StorageConfig: &f.Options.StorageConfig,
  64. Decorator: generic.UndecoratedStorage,
  65. EnableGarbageCollection: f.Options.EnableGarbageCollection,
  66. DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
  67. ResourcePrefix: resource.Group + "/" + resource.Resource,
  68. CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
  69. }
  70. if f.Options.EnableWatchCache {
  71. sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
  72. if err != nil {
  73. return generic.RESTOptions{}, err
  74. }
  75. cacheSize, ok := sizes[resource]
  76. if !ok {
  77. cacheSize = f.Options.DefaultWatchCacheSize
  78. }
  79. // depending on cache size this might return an undecorated storage
  80. ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
  81. }
  82. return ret, nil
  83. }
  84. type StorageFactoryRestOptionsFactory struct {
  85. Options EtcdOptions
  86. StorageFactory serverstorage.StorageFactory
  87. }
  88. func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
  89. storageConfig, err := f.StorageFactory.NewConfig(resource)
  90. if err != nil {
  91. return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
  92. }
  93. ret := generic.RESTOptions{
  94. StorageConfig: storageConfig,
  95. Decorator: generic.UndecoratedStorage,
  96. DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
  97. EnableGarbageCollection: f.Options.EnableGarbageCollection,
  98. ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
  99. CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
  100. }
  101. if f.Options.EnableWatchCache {
  102. sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
  103. if err != nil {
  104. return generic.RESTOptions{}, err
  105. }
  106. cacheSize, ok := sizes[resource]
  107. if !ok {
  108. cacheSize = f.Options.DefaultWatchCacheSize
  109. }
  110. // depending on cache size this might return an undecorated storage
  111. ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
  112. }
  113. return ret, nil
  114. }
  115. // ParseWatchCacheSizes turns a list of cache size values into a map of group resources
  116. // to requested sizes.
  117. func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) {
  118. watchCacheSizes := make(map[schema.GroupResource]int)
  119. for _, c := range cacheSizes {
  120. tokens := strings.Split(c, "#")
  121. if len(tokens) != 2 {
  122. return nil, fmt.Errorf("invalid value of watch cache size: %s", c)
  123. }
  124. size, err := strconv.Atoi(tokens[1])
  125. if err != nil {
  126. return nil, fmt.Errorf("invalid size of watch cache size: %s", c)
  127. }
  128. if size < 0 {
  129. return nil, fmt.Errorf("watch cache size cannot be negative: %s", c)
  130. }
  131. watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size
  132. }
  133. return watchCacheSizes, nil
  134. }
  135. // WriteWatchCacheSizes turns a map of cache size values into a list of string specifications.
  136. func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) {
  137. var cacheSizes []string
  138. for resource, size := range watchCacheSizes {
  139. if size < 0 {
  140. return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource)
  141. }
  142. cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size))
  143. }
  144. return cacheSizes, nil
  145. }

1.2、config

1.3、启动流程

2、REST

2.1、apiserver提供的api

可以看到apiserver提供的api主要分为restful api和非restful api。
restful api主要是对apiserver核心obj的增删改查,非restful api主要是健康检查、监控等api。
其中:
/api/开头的api控制apiserver核心对象,比如pod、node等
/apis/开头的api,增加了group概念,并通过group/kind/version确定唯一一个obj

image.png

2.2、请求流程

如下图,一个完整的请求会有以下路径(以Create为例):

2.1、经过内置filter的过滤(也即认证、授权、准入),所有filter如下

image.png

2.2、解析namespace/name

image.png

2.3、获取此url的gvk信息,基于请求的content获取对应格式的序列化器,同时基于HubGV获取对应的解码器。同时解码请求体数据

PS:
1、HubGV一般为内部版本,即group/__internal
2、为了支持多版本,apiserver会讲请求对象解码为实际obj,再转换为内部版本,最后存储为存储版本
image.png
image.png

2.4、将obj的内部版本对象转换为存储版本对象(一般为v1)并存储etcd。利用etcd的事务保证创建幂等

image.png
image.png

2.3、api多版本

概念参照https://www.jianshu.com/p/86130b138a0f

2.4、聚合/CRD

参见学习笔记
k8s源码解析/apiserver/apiextensions-apiserver
k8s源码解析/apiserver/kube-aggregator

2.5、创建HPA全流程debug例子

2.5.0、说明

1、本次debug**通过api而非kubectl
2、debug关闭了部分filter
3、主要针对版本转换代码
4、trace/log可直接跳过
image.png**

2.5.1、在如下文件处打断点,并用debug模式启动apiserver

kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
image.png

2.5.2、利用v2beta1 type创建json文件

  1. {
  2. "apiVersion": "autoscaling/v2beta1",
  3. "kind": "HorizontalPodAutoscaler",
  4. "metadata": {
  5. "name": "test"
  6. },
  7. "spec": {
  8. "scaleTargetRef": {
  9. "apiVersion": "extensions/v2beta1",
  10. "kind": "Deployment",
  11. "name": "test"
  12. },
  13. "minReplicas": 2,
  14. "maxReplicas": 10,
  15. "metrics": [
  16. {
  17. "type": "Resource",
  18. "resource": {
  19. "name": "cpu",
  20. "targetAverageUtilization": 60
  21. }
  22. },
  23. {
  24. "type": "Resource",
  25. "resource": {
  26. "name": "mem",
  27. "targetAverageUtilization": 50
  28. }
  29. }
  30. ]
  31. }
  32. }

2.5.3、调用api创建hpa

image.png

2.5.4、解析ns/name

image.png

2.5.5、获取相应的gv信息和序列化器

image.png
PS:全局序列化器为kubernetes/pkg/api/legacyscheme/scheme.go(s.Serializer即Codecs)
image.png

2.5.6、解析option

option是通过query参数传递进来的,由于本案例没有传如有效option,因此此段代码跳过
image.png

2.5.7、准备解码参数

PS:
r.New()代码位于下图,defaultGVK就是此路径的gvk信息
image.png
image.png

2.5.8、开始解码并转换

2.5.8.1、先将请求体转换为原本的obj(通常情况下此处会转换为defaultGVK对应的obj)

image.png
如下图,可以看到obj为我们v2beta1的obj
image.png

2.5.8.2、转换为内部版本

image.png
可以看到into此时为内部版本对象
image.png

2.5.8.2、转换函数(通常对于内置对象,apiserver注册了默认的转换函数),v2beta1 -> internal的转换函数为

image.png
函数实现为如下,就是将v2beta1的值赋值到内部版本(内部版本包含了所有的字段)
image.png
image.png
image.png

2.5.9、准备存储

image.png
可以看到再进入存储之前obj已经为内部版本(通过其路径)。
gvk之所以为空是因为apiserver认为内存版本(即内部版本gvk信息多余,因为已经知道是哪个具体的结构体,因此不需要在存储gvk信息,参见:https://github.com/kubernetes/kubernetes/issues/3030)
image.png

2.5.10、开始存储

image.png
可以看到在真正的encode之前会先将obj转换为encodeVersion对应版本的obj
image.png
可以看到转换之前obj为内部版本,转换后的out为v1版本(存储版本),然后在进行encode,encodeVersion其实就是对应的v1版本,如下图
image.png
image.png

2.5.11、存储到etcd

如下图,当encode结束后,对byte切片进行一些额外的处理,利用etcd的事务进行幂等创建
image.png

2.5.12、创建成功返回

可以看到返回之前需要进行解码和填充版本号的处理
image.png
实现逻辑如下:
可以看到会将data再次转换为内部版本并设置版本号
image.png
image.png

2.5.13、重新转换为v2beta1

可以看到,再最后返回给用户之前,重新获取目标版本,也就是路径版本,并进行转换
image.png
可以看到最终的编码器版本为v2beta1
image.png
image.png
至此整个流程结束

2.5.14、总结

1、期间obj版本经历了v2beta1->internal->v1->internal->v2beta1
2、我们知道v1和v2beta1的字段是不一样的,比如v2beta1有metric而v1没有。如下图1/2。则当转换为v1的时候是不是属性就丢了呢。其实不会。当内部版本转换为v1版本的时候,对于v1版本没有的字段会序列化之后放入到注解里面。当内部版本转换到v2beta1的时候,会反序列化并填充到对应的结构体里面。参见图3
image.png
image.png
image.png

3、搭建debug环境(mac)