- 0、架构
- 1、配置解析/启动
- 2、REST
- 2.1、apiserver提供的api
- 2.2、请求流程
- 2.3、api多版本
- 2.4、聚合/CRD
- 2.5、创建HPA全流程debug例子
- 3、搭建debug环境(mac)
0、架构
如下图所示。k8s作为集群的入口,是唯一能够和etcd打交道的模块。其作用主要如下:
1、作为Kubernetes API的服务端,为集群内的节点以及kubectl工具提供API服务
2、与ETCD打交道,实现对k8s内部obj的增删改查
3、保证在分布式存储系统(etcd)中的Kubernetes API对象的状态一致**
1、配置解析/启动
1.1、option(会删除部分代码,不影响理解)
type ServerRunOptions struct {
GenericServerRunOptions *genericoptions.ServerRunOptions
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
InsecureServing *genericoptions.DeprecatedInsecureServingOptionsWithLoopback
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
Admission *kubeoptions.AdmissionOptions
Authentication *kubeoptions.BuiltInAuthenticationOptions
Authorization *kubeoptions.BuiltInAuthorizationOptions
CloudProvider *kubeoptions.CloudProviderOptions
APIEnablement *genericoptions.APIEnablementOptions
EgressSelector *genericoptions.EgressSelectorOptions
AllowPrivileged bool
EnableLogsHandler bool
EventTTL time.Duration
KubeletConfig kubeletclient.KubeletClientConfig
KubernetesServiceNodePort int
MaxConnectionBytesPerSec int64
ServiceClusterIPRanges string
PrimaryServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRange net.IPNet
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
EnableAggregatorRouting bool
MasterCount int
EndpointReconcilerType string
ServiceAccountSigningKeyFile string
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountTokenMaxExpiration time.Duration
ShowHiddenMetricsForVersion string
}
1.1.1、GenericServerRunOptions
// 运行generic server的options
type ServerRunOptions struct {
AdvertiseAddress net.IP
CorsAllowedOriginList []string
ExternalHost string
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
RequestTimeout time.Duration
GoawayChance float64
LivezGracePeriod time.Duration
MinRequestTimeout int
ShutdownDelayDuration time.Duration
// We intentionally did not add a flag for this option. Users of the
// apiserver library can wire it to a flag.
JSONPatchMaxCopyBytes int64
// The limit on the request body size that would be accepted and
// decoded in a write request. 0 means no limit.
// We intentionally did not add a flag for this option. Users of the
// apiserver library can wire it to a flag.
MaxRequestBodyBytes int64
TargetRAMMB int
EnablePriorityAndFairness bool
}
// 默认值
func NewServerRunOptions() *ServerRunOptions {
defaults := server.NewConfig(serializer.CodecFactory{})
return &ServerRunOptions{
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
}
}
// 此处为NewConfig函数
func NewConfig(codecs serializer.CodecFactory) *Config {
defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
PostStartHooks: map[string]PostStartHookConfigEntry{},
HealthzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
ReadyzChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
LivezChecks: append([]healthz.HealthChecker{}, defaultHealthChecks...),
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
EnableMetrics: true,
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
RequestTimeout: time.Duration(60) * time.Second,
MinRequestTimeout: 1800,
LivezGracePeriod: time.Duration(0),
ShutdownDelayDuration: time.Duration(0),
// 1.5MB is the default client request size in bytes
// the etcd server should accept. See
// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
// A request body might be encoded in json, and is converted to
// proto when persisted in etcd, so we allow 2x as the largest size
// increase the "copy" operations in a json patch may cause.
JSONPatchMaxCopyBytes: int64(3 * 1024 * 1024),
// 1.5MB is the recommended client request size in byte
// the etcd server should accept. See
// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
// A request body might be encoded in json, and is converted to
// proto when persisted in etcd, so we allow 2x as the largest request
// body size to be accepted and decoded in a write request.
MaxRequestBodyBytes: int64(3 * 1024 * 1024),
// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
}
}
1.1.2、Etcd
// 后端存储类型
var storageTypes = sets.NewString(
storagebackend.StorageTypeETCD3,
)
// etcd配置文件
type EtcdOptions struct {
// The value of Paging on StorageConfig will be overridden by the
// calculated feature gate value.
StorageConfig storagebackend.Config
EncryptionProviderConfigFilepath string
EtcdServersOverrides []string
// To enable protobuf as storage format, it is enough
// to set it to "application/vnd.kubernetes.protobuf".
DefaultStorageMediaType string
DeleteCollectionWorkers int
EnableGarbageCollection bool
// Set EnableWatchCache to false to disable all watch caches
EnableWatchCache bool
// Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set
DefaultWatchCacheSize int
// WatchCacheSizes represents override to a given resource
WatchCacheSizes []string
}
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
if s == nil {
return nil
}
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
return nil
}
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
if err != nil {
return err
}
c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error {
return healthCheck()
}))
if s.EncryptionProviderConfigFilepath != "" {
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath)
if err != nil {
return err
}
c.AddHealthChecks(kmsPluginHealthzChecks...)
}
return nil
}
type SimpleRestOptionsFactory struct {
Options EtcdOptions
}
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// depending on cache size this might return an undecorated storage
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
}
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// depending on cache size this might return an undecorated storage
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
// ParseWatchCacheSizes turns a list of cache size values into a map of group resources
// to requested sizes.
func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) {
watchCacheSizes := make(map[schema.GroupResource]int)
for _, c := range cacheSizes {
tokens := strings.Split(c, "#")
if len(tokens) != 2 {
return nil, fmt.Errorf("invalid value of watch cache size: %s", c)
}
size, err := strconv.Atoi(tokens[1])
if err != nil {
return nil, fmt.Errorf("invalid size of watch cache size: %s", c)
}
if size < 0 {
return nil, fmt.Errorf("watch cache size cannot be negative: %s", c)
}
watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size
}
return watchCacheSizes, nil
}
// WriteWatchCacheSizes turns a map of cache size values into a list of string specifications.
func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) {
var cacheSizes []string
for resource, size := range watchCacheSizes {
if size < 0 {
return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource)
}
cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size))
}
return cacheSizes, nil
}
1.2、config
1.3、启动流程
2、REST
2.1、apiserver提供的api
可以看到apiserver提供的api主要分为restful api和非restful api。
restful api主要是对apiserver核心obj的增删改查,非restful api主要是健康检查、监控等api。
其中:
/api/开头的api控制apiserver核心对象,比如pod、node等
/apis/开头的api,增加了group概念,并通过group/kind/version确定唯一一个obj
2.2、请求流程
2.1、经过内置filter的过滤(也即认证、授权、准入),所有filter如下
2.2、解析namespace/name
2.3、获取此url的gvk信息,基于请求的content获取对应格式的序列化器,同时基于HubGV获取对应的解码器。同时解码请求体数据
PS:
1、HubGV一般为内部版本,即group/__internal
2、为了支持多版本,apiserver会讲请求对象解码为实际obj,再转换为内部版本,最后存储为存储版本
2.4、将obj的内部版本对象转换为存储版本对象(一般为v1)并存储etcd。利用etcd的事务保证创建幂等
2.3、api多版本
2.4、聚合/CRD
参见学习笔记
k8s源码解析/apiserver/apiextensions-apiserver
k8s源码解析/apiserver/kube-aggregator
2.5、创建HPA全流程debug例子
2.5.0、说明
1、本次debug**通过api而非kubectl
2、debug关闭了部分filter
3、主要针对版本转换代码
4、trace/log可直接跳过**
2.5.1、在如下文件处打断点,并用debug模式启动apiserver
kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go
2.5.2、利用v2beta1 type创建json文件
{
"apiVersion": "autoscaling/v2beta1",
"kind": "HorizontalPodAutoscaler",
"metadata": {
"name": "test"
},
"spec": {
"scaleTargetRef": {
"apiVersion": "extensions/v2beta1",
"kind": "Deployment",
"name": "test"
},
"minReplicas": 2,
"maxReplicas": 10,
"metrics": [
{
"type": "Resource",
"resource": {
"name": "cpu",
"targetAverageUtilization": 60
}
},
{
"type": "Resource",
"resource": {
"name": "mem",
"targetAverageUtilization": 50
}
}
]
}
}
2.5.3、调用api创建hpa
2.5.4、解析ns/name
2.5.5、获取相应的gv信息和序列化器
PS:全局序列化器为kubernetes/pkg/api/legacyscheme/scheme.go(s.Serializer即Codecs)
2.5.6、解析option
option是通过query参数传递进来的,由于本案例没有传如有效option,因此此段代码跳过
2.5.7、准备解码参数
PS:
r.New()代码位于下图,defaultGVK就是此路径的gvk信息
2.5.8、开始解码并转换
2.5.8.1、先将请求体转换为原本的obj(通常情况下此处会转换为defaultGVK对应的obj)
2.5.8.2、转换为内部版本
2.5.8.2、转换函数(通常对于内置对象,apiserver注册了默认的转换函数),v2beta1 -> internal的转换函数为
函数实现为如下,就是将v2beta1的值赋值到内部版本(内部版本包含了所有的字段)
2.5.9、准备存储
可以看到再进入存储之前obj已经为内部版本(通过其路径)。
gvk之所以为空是因为apiserver认为内存版本(即内部版本gvk信息多余,因为已经知道是哪个具体的结构体,因此不需要在存储gvk信息,参见:https://github.com/kubernetes/kubernetes/issues/3030)
2.5.10、开始存储
可以看到在真正的encode之前会先将obj转换为encodeVersion对应版本的obj
可以看到转换之前obj为内部版本,转换后的out为v1版本(存储版本),然后在进行encode,encodeVersion其实就是对应的v1版本,如下图
2.5.11、存储到etcd
如下图,当encode结束后,对byte切片进行一些额外的处理,利用etcd的事务进行幂等创建
2.5.12、创建成功返回
可以看到返回之前需要进行解码和填充版本号的处理
实现逻辑如下:
可以看到会将data再次转换为内部版本并设置版本号
2.5.13、重新转换为v2beta1
可以看到,再最后返回给用户之前,重新获取目标版本,也就是路径版本,并进行转换
可以看到最终的编码器版本为v2beta1
至此整个流程结束
2.5.14、总结
1、期间obj版本经历了v2beta1->internal->v1->internal->v2beta1
2、我们知道v1和v2beta1的字段是不一样的,比如v2beta1有metric而v1没有。如下图1/2。则当转换为v1的时候是不是属性就丢了呢。其实不会。当内部版本转换为v1版本的时候,对于v1版本没有的字段会序列化之后放入到注解里面。当内部版本转换到v2beta1的时候,会反序列化并填充到对应的结构体里面。参见图3