Middleware
Install
API Server 启动时,通过 createMux 方法将中间件功能安装在每一个路由的 Handler 上。其中关键代码在 5 ~ 13 行,核心方法是通过 makeHTTPHandler 将中间件安装在每一个路由上。
func (s *Server) createMux() *mux.Router {m := mux.NewRouter()logrus.Debug("Registering routers")for _, apiRouter := range s.routers {for _, r := range apiRouter.Routes() {f := s.makeHTTPHandler(r.Handler())logrus.Debugf("Registering %s, %s", r.Method(), r.Path())m.Path(versionMatcher + r.Path()).Methods(r.Method()).Handler(f)m.Path(r.Path()).Methods(r.Method()).Handler(f)}}debugRouter := debug.NewRouter()s.routers = append(s.routers, debugRouter)for _, r := range debugRouter.Routes() {f := s.makeHTTPHandler(r.Handler())m.Path("/debug" + r.Path()).Handler(f)}notFoundHandler := httputils.MakeErrorHandler(pageNotFoundError{})m.HandleFunc(versionMatcher+"/{path:.*}", notFoundHandler)m.NotFoundHandler = notFoundHandlerm.MethodNotAllowedHandler = notFoundHandlerreturn m}
Server 的 makeHTTPHandler 方法,通过 15 行的 handlerWithGlobalMiddlewares 方法将全局中间件添加到路由 Handler 上,并返回新的带中间件检查的 Handler。
func (s *Server) makeHTTPHandler(handler httputils.APIFunc) http.HandlerFunc {return func(w http.ResponseWriter, r *http.Request) {// Define the context that we'll pass around to share info// like the docker-request-id.//// The 'context' will be used for global data that should// apply to all requests. Data that is specific to the// immediate function being called should still be passed// as 'args' on the function call.// use intermediate variable to prevent "should not use basic type// string as key in context.WithValue" golint errorsctx := context.WithValue(r.Context(), dockerversion.UAStringKey{}, r.Header.Get("User-Agent"))r = r.WithContext(ctx)handlerFunc := s.handlerWithGlobalMiddlewares(handler)vars := mux.Vars(r)if vars == nil {vars = make(map[string]string)}if err := handlerFunc(ctx, w, r, vars); err != nil {statusCode := errdefs.GetHTTPErrorStatusCode(err)if statusCode >= 500 {logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)}httputils.MakeErrorHandler(err)(w, r)}}}
Server 在安装全局中间件时,首先保存初始的全局中间件,并按照中间件先后顺序分别增加中间件功能,并在下轮循环中继续对新的 Handler 进行处理,最后加装调试中间件。
func (s *Server) handlerWithGlobalMiddlewares(handler httputils.APIFunc) httputils.APIFunc {next := handlerfor _, m := range s.middlewares {next = m.WrapHandler(next)}if s.cfg.Logging && logrus.GetLevel() == logrus.DebugLevel {next = middleware.DebugRequestMiddleware(next)}return next}
Interface & Implementations
Middleware 定义了通用中间件接口,对于具体的中间件可以通过实现该接口变为中间件,同时可以添加与中间件功能相关的特定信息,注意这个方法的参数与返回值类型是完全相同的。
type Middleware interface {WrapHandler(func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error}
以 cors 中间件为例,通过实现 WrapHandler 方法将 cors 相关的设置添加在应答报文头上,然后再执行传入的 Handler。
type CORSMiddleware struct {defaultHeaders string}// NewCORSMiddleware creates a new CORSMiddleware with default headers.func NewCORSMiddleware(d string) CORSMiddleware {return CORSMiddleware{defaultHeaders: d}}// WrapHandler returns a new handler function wrapping the previous one in the request chain.func (c CORSMiddleware) WrapHandler(handler func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {return func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {// If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*"// otherwise, all head values will be passed to HTTP handlercorsHeaders := c.defaultHeadersif corsHeaders == "" {corsHeaders = "*"}logrus.Debugf("CORS header is enabled and set to: %s", corsHeaders)w.Header().Add("Access-Control-Allow-Origin", corsHeaders)w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, X-Registry-Auth")w.Header().Add("Access-Control-Allow-Methods", "HEAD, GET, POST, DELETE, PUT, OPTIONS")return handler(ctx, w, r, vars)}}
全局中间件在 Docker 后台进程中初始化,唯一特殊的中间件是认证中间件,在构建时传入了一个名为 pluginStore 变量。
func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config, pluginStore plugingetter.PluginGetter) error {v := cfg.Versionexp := middleware.NewExperimentalMiddleware(cli.Config.Experimental)s.UseMiddleware(exp)vm := middleware.NewVersionMiddleware(v, api.DefaultVersion, api.MinVersion)s.UseMiddleware(vm)if cfg.CorsHeaders != "" {c := middleware.NewCORSMiddleware(cfg.CorsHeaders)s.UseMiddleware(c)}cli.authzMiddleware = authorization.NewMiddleware(cli.Config.AuthorizationPlugins, pluginStore)cli.Config.AuthzMiddleware = cli.authzMiddlewares.UseMiddleware(cli.authzMiddleware)return nil}
根据调用的上下文可以找到这个变量对应的创建方法
pluginStore := plugin.NewStore()if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {logrus.Fatalf("Error creating middlewares: %v", err)}
Plugins
Overview
图 1:Plugin 全局概览
从图 1 可以看出插件管理涉及的主要功能模组。通过 http.Client 不难确认,插件也是 C/S 模型的。其中 Manager 全局唯一的 Store,Store 里储存了插件名称到插件的一个映射,并会保存一些列针对插件的处理方法。
Client
Client 的方法调用使用 Call 方法,参数为服务名(目标方法)、参数及返回值,这几个参数的含义非常明确,返回值是目标方法返回内容,Call 方法执行完成后,会被修改。
func (c *Client) Call(serviceMethod string, args, ret interface{}) error {return c.CallWithOptions(serviceMethod, args, ret)}
Client 的另外一个方法是 Stream,请求插件服务端,并返回一个流。
func (c *Client) Stream(serviceMethod string, args interface{}) (io.ReadCloser, error) {var buf bytes.Bufferif err := json.NewEncoder(&buf).Encode(args); err != nil {return nil, err}return c.callWithRetry(serviceMethod, &buf, true)}
无论使用哪个方法执行调用,最终都会执行到 callWithRetry 方法,这个方法的第四个参数用于对请求选项进行设定,在 5 ~ 8 行。请求选项设定完成后,使用 RequestFactory 接口实例创建请求,然后通过网络请求(http.Client.Do)调用远程方法并返回结果。
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool, reqOpts ...func(*RequestOpts)) (io.ReadCloser, error) {var retries intstart := time.Now()var opts RequestOptsfor _, o := range reqOpts {o(&opts)}for {req, err := c.requestFactory.NewRequest(serviceMethod, data)if err != nil {return nil, err}cancelRequest := func() {}if opts.Timeout > 0 {var ctx context.Contextctx, cancelRequest = context.WithTimeout(req.Context(), opts.Timeout)req = req.WithContext(ctx)}resp, err := c.http.Do(req)if err != nil {cancelRequest()if !retry {return nil, err}timeOff := backoff(retries)if abort(start, timeOff) {return nil, err}retries++logrus.Warnf("Unable to connect to plugin: %s%s: %v, retrying in %v", req.URL.Host, req.URL.Path, err, timeOff)time.Sleep(timeOff)continue}if resp.StatusCode != http.StatusOK {b, err := io.ReadAll(resp.Body)resp.Body.Close()cancelRequest()if err != nil {return nil, &statusError{resp.StatusCode, serviceMethod, err.Error()}}// Plugins' Response(s) should have an Err field indicating what went// wrong. Try to unmarshal into ResponseErr. Otherwise fallback to just// return the string(body)type responseErr struct {Err string}remoteErr := responseErr{}if err := json.Unmarshal(b, &remoteErr); err == nil {if remoteErr.Err != "" {return nil, &statusError{resp.StatusCode, serviceMethod, remoteErr.Err}}}// old way...return nil, &statusError{resp.StatusCode, serviceMethod, string(b)}}return ioutils.NewReadCloserWrapper(resp.Body, func() error {err := resp.Body.Close()cancelRequest()return err}), nil}}
选择设置采用函数方式修改,因此,创建选项修改函数时注意返回一个闭包方法,下面的代码是修改超时选项的实现。
func WithRequestTimeout(t time.Duration) func(*RequestOpts) {return func(o *RequestOpts) {o.Timeout = t}}
Daemon
图 2:Plugin 与 Daemon 关系图
Daemon 实例中保留了 Store 与 Manager 的实例引用,并在创建 Cluster 实例时,通过创建配置对象 ClusterConfig 将 Daemon 当作 executor.Backend 的实现,并将 Manager 当作 plugin.Backend 的实现结构保存下来,注意这里的 plugin.Backend 正确位置位于 daemon/cluster/controllers/plugin 目录。
Backend 接口定义了与容器相关、插件、网络等相关的功能性方法,其实现是 daemon.Daemon 实例,接口定义如下所示。
type Backend interface {CreateManagedNetwork(clustertypes.NetworkCreateRequest) errorDeleteManagedNetwork(networkID string) errorFindNetwork(idName string) (libnetwork.Network, error)SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error)ReleaseIngress() (<-chan struct{}, error)CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) errorContainerStop(name string, seconds *int) errorContainerLogs(context.Context, string, *types.ContainerLogsOptions) (msgs <-chan *backend.LogMessage, tty bool, err error)ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) errorActivateContainerServiceBinding(containerName string) errorDeactivateContainerServiceBinding(containerName string) errorUpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) errorContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)ContainerWait(ctx context.Context, name string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error)ContainerRm(name string, config *types.ContainerRmConfig) errorContainerKill(name string, sig uint64) errorSetContainerDependencyStore(name string, store exec.DependencyGetter) errorSetContainerSecretReferences(name string, refs []*swarmtypes.SecretReference) errorSetContainerConfigReferences(name string, refs []*swarmtypes.ConfigReference) errorSystemInfo() *types.InfoContainers(config *types.ContainerListOptions) ([]*types.Container, error)SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) errorDaemonJoinsCluster(provider cluster.Provider)DaemonLeavesCluster()IsSwarmCompatible() errorSubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{})UnsubscribeFromEvents(listener chan interface{})UpdateAttachment(string, string, string, *network.NetworkingConfig) errorWaitForDetachment(context.Context, string, string, string, string) errorPluginManager() *plugin.ManagerPluginGetter() *plugin.StoreGetAttachmentStore() *networkSettings.AttachmentStoreHasExperimental() bool}
在 controllers/plugin 目录中定义的 Backend 接口,是 Cluster 实例与 Manager 交互的关键接口,这里还牵涉到另外一个本目录下的 Controller 结构,对 Controller 的操作最终会通过 Manager 实例真正得到执行。
type Backend interface {Disable(name string, config *enginetypes.PluginDisableConfig) errorEnable(name string, config *enginetypes.PluginEnableConfig) errorRemove(name string, config *enginetypes.PluginRmConfig) errorPull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) errorUpgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) errorGet(name string) (*v2.Plugin, error)SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())}
Backend 与 Controller 实例关系如图 3 所示。这里的 nodeRunner 是与节点控制相关的数据结构,暂时不针对它展开,这个实例在启动时,会创建 container.executor 实例,其中包含了 plugin.Manager,可以对比图 2,会更加清晰。
图 3:Backend 与 Controller 关系图
Controller 可以通过两种方法获取,一种是图 3 中使用 nodeRunner 的 Controller 方法,另外一种可以通过 Executor 接口的 Controller 方法来获取。
type Executor interface {// Describe returns the underlying node description.Describe(ctx context.Context) (*api.NodeDescription, error)// Configure uses the node object state to propagate node// state to the underlying executor.Configure(ctx context.Context, node *api.Node) error// Controller provides a controller for the given task.Controller(t *api.Task) (Controller, error)// SetNetworkBootstrapKeys passes the symmetric keys from the// manager to the executor.SetNetworkBootstrapKeys([]*api.EncryptionKey) error}
我们以 Controller 实例的 Start 方式为例来看一下,可以清楚的看到代码第 4、11 和 16 行都是对实例对象 Manager 的直接调用。
func (p *Controller) Start(ctx context.Context) error {p.logger.Debug("Start")pl, err := p.backend.Get(p.pluginID)if err != nil {return err}if p.spec.Disabled {if pl.IsEnabled() {return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})}return nil}if !pl.IsEnabled() {return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})}return nil}
最后,看一下 exec.Controller 接口方法定义,注意这里的 Controller 管理的仍然是插件功能。
type Controller interface {// Update the task definition seen by the controller. Will return// ErrTaskUpdateFailed if the provided task definition changes fields that// cannot be changed.//// Will be ignored if the task has exited.Update(ctx context.Context, t *api.Task) error// Prepare the task for execution. This should ensure that all resources// are created such that a call to start should execute immediately.Prepare(ctx context.Context) error// Start the target and return when it has started successfully.Start(ctx context.Context) error// Wait blocks until the target has exited.Wait(ctx context.Context) error// Shutdown requests to exit the target gracefully.Shutdown(ctx context.Context) error// Terminate the target.Terminate(ctx context.Context) error// Remove all resources allocated by the controller.Remove(ctx context.Context) error// Close closes any ephemeral resources associated with controller instance.Close() error}
