References
- emicklei/go-restful: package for building REST-style Web Services using Go
Routes
Landscape
PathRecorderMux 的 refreshMuxLocked 方法如下所示。可理解为 PathRecorderMux 中除 mux 域外,都是作为 pathHandler 的缓存存在的,这个方法的作用就是将缓存内容替换到 pathHandler 中,并存储在 mux 中。
func (m *PathRecorderMux) refreshMuxLocked() {
newMux := &pathHandler{
muxName: m.name,
pathToHandler: map[string]http.Handler{},
prefixHandlers: []prefixHandler{},
notFoundHandler: http.NotFoundHandler(),
}
if m.notFoundHandler != nil {
newMux.notFoundHandler = m.notFoundHandler
}
for path, handler := range m.pathToHandler {
newMux.pathToHandler[path] = handler
}
keys := sets.StringKeySet(m.prefixToHandler).List()
sort.Sort(sort.Reverse(byPrefixPriority(keys)))
for _, prefix := range keys {
newMux.prefixHandlers = append(newMux.prefixHandlers, prefixHandler{
prefix: prefix,
handler: m.prefixToHandler[prefix],
})
}
m.mux.Store(newMux)
}
Listed Path Provider
ListedpathProvider 用于获取暴露的 API 根路径。GenericAPIServer 对该接口的实现依赖于 APIServerHandler 及 DelegationTarget 的实现。
Install APIs
在创建 GenericAPIServer 的最后阶段,通过方法 installAPI 装入各种路由。装入路由时,会使用 routes 包中内容,routes 包是针对各种辅助功能的路由即其 Handler 信息。
routes.Index
将根路径获取路由加载至 APIServerHandler 中,代码如下所示。routes.Index 表示要装入路由的结构。
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
routes.Index 路由加载方法也极其简单明白
func (i Index) Install(pathProvider ListedPathProvider, mux *mux.PathRecorderMux) {
handler := IndexLister{StatusCode: http.StatusOK, PathProvider: pathProvider}
mux.UnlistedHandle("/", handler)
mux.UnlistedHandle("/index.html", handler)
}
routes.Profiling
过程与 routes.Index 几乎相同,因此只将安装路由的方法记录下来
func (d Profiling) Install(c *mux.PathRecorderMux) {
c.UnlistedHandleFunc("/debug/pprof", redirectTo("/debug/pprof/"))
c.UnlistedHandlePrefix("/debug/pprof/", http.HandlerFunc(pprof.Index))
c.UnlistedHandleFunc("/debug/pprof/profile", pprof.Profile)
c.UnlistedHandleFunc("/debug/pprof/symbol", pprof.Symbol)
c.UnlistedHandleFunc("/debug/pprof/trace", pprof.Trace)
}
routes.Metrics
配合 Profiling 时
func (m MetricsWithReset) Install(c *mux.PathRecorderMux) {
register()
defaultMetricsHandler := prometheus.Handler().ServeHTTP
c.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
if req.Method == "DELETE" {
apimetrics.Reset()
etcdmetrics.Reset()
io.WriteString(w, "metrics reset\n")
return
}
defaultMetricsHandler(w, req)
})
}
不需要配合 Profiling
func (m DefaultMetrics) Install(c *mux.PathRecorderMux) {
register()
c.Handle("/metrics", prometheus.Handler())
}
routes.Version
Version 信息是加载到 RESTFul 上的,其他基本一样。
func (v Version) Install(c *restful.Container) {
if v.Version == nil {
return
}
// Set up a service to return the git code version.
versionWS := new(restful.WebService)
versionWS.Path("/version")
versionWS.Doc("git code version from which this is built")
versionWS.Route(
versionWS.GET("/").To(v.handleVersion).
Doc("get the code version").
Operation("getCodeVersion").
Produces(restful.MIME_JSON).
Consumes(restful.MIME_JSON).
Writes(version.Info{}))
c.Add(versionWS)
}
Health Checker
HealthChecker 由传入的 DelegationTarget 生成,并最终注册给 GenericAPIServer。
Handler
Dispatch Procedure
FullHandlerChain 使用 HandlerChainBuilderFu 函数,将 http.Handler 包装起来,并在最后传递个 director。director 再分发给 http.ServeMux 或 PathRecorderMux。
HandlerChainBuilderFn 声明如下所示
type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler
director.ServeHTTP
director 满足 http.Handler 接口,在 ServeHTTP 方法中根据 req.URL.path 实现分发功能
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
// if we are exactly /apis or /apis/, then we need special handling in loop.
// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
if path == "/apis" || path == "/apis/" {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}
case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// if we didn't find a match, then we just skip gorestful altogether
klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
d.nonGoRestfulMux.ServeHTTP(w, req)
}
DefaultBuildHandlerChain
API Server 中的 handler 来自于 Config 结构,Config 结构在 config.go 中初始化,使用默认配置,默认配置具体实现如下所示
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
API
API Server 在启动前,需要安装好各种基础服务,基础服务安装代码位置在 InstallLegacyAPI,这个方法在 Kubernetes 中,被 InstallLegacyAPI 调用,用于安装 API Group。
server.APIGroupInfo
在注册过程中,最重要的信息是 server.APIGroupInfo,核心结构如下图所示
kube-apiserver 的 APIGroupInfo 在 NewLegacyRESTStorage 中注册生成。APIGroupInfo 中的 Scheme、Codecs、ParameterCodec 在 scheme.go 中初始化。APIGroupInfo.VersionedResourcesStorageMap 初始化如下所示,使用的 RESTOptionsGetter 在 Storage 系列中。
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
if serviceAccountStorage.Token != nil {
restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
Install API Resources
注意本图中的 endpoints.APIGroupVersion 是由 server.APIGroupInfo 通过方法 getAPIGroupVersion 生成的。
APIGroupVersion 通过 InstallREST 方法注册路由处理函数。在该方法中,首先创建了 APIInstaller 结构,再通过其 Install 方法生成 metav1.APIResource 及 restful.WebService 结构。APIResource 大致由以下内容组成
- Name: 资源的名称
- SingularName: 资源的单一名称
- Namespaced: true 表示该资源为一个命名空间
- Group Version Kind: 组、版本、类型
- Verbs: 操作,可为:get、list、watch、create、update、patch、delete、proxy 等
- ShortNames: 建议的资源短名称
- Categories: 资源归属的 Group 名称
NewREST
以 podtemplate.NewREST 为例来简要说明过程,该方法返回值为一个 REST 对象,且只是简单的匿名包含 Store 对象,如下所示
type REST struct {
*genericregistry.Store
}
针对 Pod 的 NewREST 方法,具体代码如下所示,首先创建一个 Store 实例,并与 PodTemplate、PodTemplateList 等核心对象关联,并提供了如何创建对象的策略。
func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.PodTemplate{} },
NewListFunc: func() runtime.Object { return &api.PodTemplateList{} },
DefaultQualifiedResource: api.Resource("podtemplates"),
CreateStrategy: podtemplate.Strategy,
UpdateStrategy: podtemplate.Strategy,
DeleteStrategy: podtemplate.Strategy,
ExportStrategy: podtemplate.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
return &REST{store}
}
Install API
InstallAPIGroups 将 API 注册于 APIServerHandler,其关键代码如下所示,在注册 API 之前,先将 API 使用的 Resources 注册。
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
// ...
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}
// ...
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
return nil
}
该方法在 Kubernetes API Server 中,被 InstallAPIs 使用,且 APIGroupInfo 由 RESTStorageProvider 生成,如下所示
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
GroupManager
在 InstallAPIGroups 中,可以发现,API 最终是发布在 discovery.GroupManager 上的,那么我们来看一下这个接口即其作用。
discovery.rootAPIsHandler 满足 http.Handler 接口,并将 RESTFul 请求重定向到了 http.Handler 接口,如下面代码所示,WebService 返回一个 restful.WebService 对象,并将本组下全部请求交由 s.restfulHandle,而这个方法仅是简单调用 http.Handler 接口方法
func (s *rootAPIsHandler) WebService() *restful.WebService {
mediaTypes, _ := negotiation.MediaTypesForSerializer(s.serializer)
ws := new(restful.WebService)
ws.Path(APIGroupPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(s.restfulHandle).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(metav1.APIGroupList{}))
return ws
}
restfulHandle 方法如下所示
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {
s.ServeHTTP(resp.ResponseWriter, req.Request)
}
在通过 New 方法创建 GenericAPIServer 时,GroupManager 已经通过 installAPI 方法挂载至主路由上,如下所示
func installAPI(s *GenericAPIServer, c *Config) {
// ...
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
if c.EnableDiscovery {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
}
Extensions API Server
Relationship
CustomeResourcesDefinitions 在 createAPIExtensionsServer 中创建,并在 CreateAPIServer 中作为 DelegationTarget 的实例传递给 GenericAPIServer。
API Aggregator
APIAggregator 通过 NewWithDelegate 方法创建。
Landscape
Handler
从上图可以看到,aggregator 是最终暴露的 GenericAPIServer,每个 GenericAPIServer 都挂载了自己的路由,如下图所示,那么是如何关联起来的就是需要解决的问题。
跟踪 delegateHandler 引用情况,发现在 AddAPIService 中使用。proxyHandler 创建过程如下,创建的 proxyHandler 挂载在 GenericAPIServer 的 Mux 中,proxyHandler 的 ServeHTTP 方法自行阅读。
Group 处理通过注册 apiGroupHandler 完成,自行阅读源码即可。
Serives Controller
References
Hooks
Plugin
Landscape
RegisterAllAdmissionPlugins 方法注册全部内置的 Plugin。