Middleware

Install

API Server 启动时,通过 createMux 方法将中间件功能安装在每一个路由的 Handler 上。其中关键代码在 5 ~ 13 行,核心方法是通过 makeHTTPHandler 将中间件安装在每一个路由上。

  1. func (s *Server) createMux() *mux.Router {
  2. m := mux.NewRouter()
  3. logrus.Debug("Registering routers")
  4. for _, apiRouter := range s.routers {
  5. for _, r := range apiRouter.Routes() {
  6. f := s.makeHTTPHandler(r.Handler())
  7. logrus.Debugf("Registering %s, %s", r.Method(), r.Path())
  8. m.Path(versionMatcher + r.Path()).Methods(r.Method()).Handler(f)
  9. m.Path(r.Path()).Methods(r.Method()).Handler(f)
  10. }
  11. }
  12. debugRouter := debug.NewRouter()
  13. s.routers = append(s.routers, debugRouter)
  14. for _, r := range debugRouter.Routes() {
  15. f := s.makeHTTPHandler(r.Handler())
  16. m.Path("/debug" + r.Path()).Handler(f)
  17. }
  18. notFoundHandler := httputils.MakeErrorHandler(pageNotFoundError{})
  19. m.HandleFunc(versionMatcher+"/{path:.*}", notFoundHandler)
  20. m.NotFoundHandler = notFoundHandler
  21. m.MethodNotAllowedHandler = notFoundHandler
  22. return m
  23. }

Server 的 makeHTTPHandler 方法,通过 15 行的 handlerWithGlobalMiddlewares 方法将全局中间件添加到路由 Handler 上,并返回新的带中间件检查的 Handler。

  1. func (s *Server) makeHTTPHandler(handler httputils.APIFunc) http.HandlerFunc {
  2. return func(w http.ResponseWriter, r *http.Request) {
  3. // Define the context that we'll pass around to share info
  4. // like the docker-request-id.
  5. //
  6. // The 'context' will be used for global data that should
  7. // apply to all requests. Data that is specific to the
  8. // immediate function being called should still be passed
  9. // as 'args' on the function call.
  10. // use intermediate variable to prevent "should not use basic type
  11. // string as key in context.WithValue" golint errors
  12. ctx := context.WithValue(r.Context(), dockerversion.UAStringKey{}, r.Header.Get("User-Agent"))
  13. r = r.WithContext(ctx)
  14. handlerFunc := s.handlerWithGlobalMiddlewares(handler)
  15. vars := mux.Vars(r)
  16. if vars == nil {
  17. vars = make(map[string]string)
  18. }
  19. if err := handlerFunc(ctx, w, r, vars); err != nil {
  20. statusCode := errdefs.GetHTTPErrorStatusCode(err)
  21. if statusCode >= 500 {
  22. logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)
  23. }
  24. httputils.MakeErrorHandler(err)(w, r)
  25. }
  26. }
  27. }

Server 在安装全局中间件时,首先保存初始的全局中间件,并按照中间件先后顺序分别增加中间件功能,并在下轮循环中继续对新的 Handler 进行处理,最后加装调试中间件。

  1. func (s *Server) handlerWithGlobalMiddlewares(handler httputils.APIFunc) httputils.APIFunc {
  2. next := handler
  3. for _, m := range s.middlewares {
  4. next = m.WrapHandler(next)
  5. }
  6. if s.cfg.Logging && logrus.GetLevel() == logrus.DebugLevel {
  7. next = middleware.DebugRequestMiddleware(next)
  8. }
  9. return next
  10. }

Interface & Implementations

Middleware 定义了通用中间件接口,对于具体的中间件可以通过实现该接口变为中间件,同时可以添加与中间件功能相关的特定信息,注意这个方法的参数与返回值类型是完全相同的。

  1. type Middleware interface {
  2. WrapHandler(func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error
  3. }

以 cors 中间件为例,通过实现 WrapHandler 方法将 cors 相关的设置添加在应答报文头上,然后再执行传入的 Handler。

  1. type CORSMiddleware struct {
  2. defaultHeaders string
  3. }
  4. // NewCORSMiddleware creates a new CORSMiddleware with default headers.
  5. func NewCORSMiddleware(d string) CORSMiddleware {
  6. return CORSMiddleware{defaultHeaders: d}
  7. }
  8. // WrapHandler returns a new handler function wrapping the previous one in the request chain.
  9. func (c CORSMiddleware) WrapHandler(handler func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
  10. return func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
  11. // If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*"
  12. // otherwise, all head values will be passed to HTTP handler
  13. corsHeaders := c.defaultHeaders
  14. if corsHeaders == "" {
  15. corsHeaders = "*"
  16. }
  17. logrus.Debugf("CORS header is enabled and set to: %s", corsHeaders)
  18. w.Header().Add("Access-Control-Allow-Origin", corsHeaders)
  19. w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, X-Registry-Auth")
  20. w.Header().Add("Access-Control-Allow-Methods", "HEAD, GET, POST, DELETE, PUT, OPTIONS")
  21. return handler(ctx, w, r, vars)
  22. }
  23. }

全局中间件在 Docker 后台进程中初始化,唯一特殊的中间件是认证中间件,在构建时传入了一个名为 pluginStore 变量。

  1. func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config, pluginStore plugingetter.PluginGetter) error {
  2. v := cfg.Version
  3. exp := middleware.NewExperimentalMiddleware(cli.Config.Experimental)
  4. s.UseMiddleware(exp)
  5. vm := middleware.NewVersionMiddleware(v, api.DefaultVersion, api.MinVersion)
  6. s.UseMiddleware(vm)
  7. if cfg.CorsHeaders != "" {
  8. c := middleware.NewCORSMiddleware(cfg.CorsHeaders)
  9. s.UseMiddleware(c)
  10. }
  11. cli.authzMiddleware = authorization.NewMiddleware(cli.Config.AuthorizationPlugins, pluginStore)
  12. cli.Config.AuthzMiddleware = cli.authzMiddleware
  13. s.UseMiddleware(cli.authzMiddleware)
  14. return nil
  15. }

根据调用的上下文可以找到这个变量对应的创建方法

  1. pluginStore := plugin.NewStore()
  2. if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {
  3. logrus.Fatalf("Error creating middlewares: %v", err)
  4. }

Plugins

Overview

plugin-plugin-overview.svg
图 1:Plugin 全局概览

从图 1 可以看出插件管理涉及的主要功能模组。通过 http.Client 不难确认,插件也是 C/S 模型的。其中 Manager 全局唯一的 Store,Store 里储存了插件名称到插件的一个映射,并会保存一些列针对插件的处理方法。

Client

Client 的方法调用使用 Call 方法,参数为服务名(目标方法)、参数及返回值,这几个参数的含义非常明确,返回值是目标方法返回内容,Call 方法执行完成后,会被修改。

  1. func (c *Client) Call(serviceMethod string, args, ret interface{}) error {
  2. return c.CallWithOptions(serviceMethod, args, ret)
  3. }

Client 的另外一个方法是 Stream,请求插件服务端,并返回一个流。

  1. func (c *Client) Stream(serviceMethod string, args interface{}) (io.ReadCloser, error) {
  2. var buf bytes.Buffer
  3. if err := json.NewEncoder(&buf).Encode(args); err != nil {
  4. return nil, err
  5. }
  6. return c.callWithRetry(serviceMethod, &buf, true)
  7. }

无论使用哪个方法执行调用,最终都会执行到 callWithRetry 方法,这个方法的第四个参数用于对请求选项进行设定,在 5 ~ 8 行。请求选项设定完成后,使用 RequestFactory 接口实例创建请求,然后通过网络请求(http.Client.Do)调用远程方法并返回结果。

  1. func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool, reqOpts ...func(*RequestOpts)) (io.ReadCloser, error) {
  2. var retries int
  3. start := time.Now()
  4. var opts RequestOpts
  5. for _, o := range reqOpts {
  6. o(&opts)
  7. }
  8. for {
  9. req, err := c.requestFactory.NewRequest(serviceMethod, data)
  10. if err != nil {
  11. return nil, err
  12. }
  13. cancelRequest := func() {}
  14. if opts.Timeout > 0 {
  15. var ctx context.Context
  16. ctx, cancelRequest = context.WithTimeout(req.Context(), opts.Timeout)
  17. req = req.WithContext(ctx)
  18. }
  19. resp, err := c.http.Do(req)
  20. if err != nil {
  21. cancelRequest()
  22. if !retry {
  23. return nil, err
  24. }
  25. timeOff := backoff(retries)
  26. if abort(start, timeOff) {
  27. return nil, err
  28. }
  29. retries++
  30. logrus.Warnf("Unable to connect to plugin: %s%s: %v, retrying in %v", req.URL.Host, req.URL.Path, err, timeOff)
  31. time.Sleep(timeOff)
  32. continue
  33. }
  34. if resp.StatusCode != http.StatusOK {
  35. b, err := io.ReadAll(resp.Body)
  36. resp.Body.Close()
  37. cancelRequest()
  38. if err != nil {
  39. return nil, &statusError{resp.StatusCode, serviceMethod, err.Error()}
  40. }
  41. // Plugins' Response(s) should have an Err field indicating what went
  42. // wrong. Try to unmarshal into ResponseErr. Otherwise fallback to just
  43. // return the string(body)
  44. type responseErr struct {
  45. Err string
  46. }
  47. remoteErr := responseErr{}
  48. if err := json.Unmarshal(b, &remoteErr); err == nil {
  49. if remoteErr.Err != "" {
  50. return nil, &statusError{resp.StatusCode, serviceMethod, remoteErr.Err}
  51. }
  52. }
  53. // old way...
  54. return nil, &statusError{resp.StatusCode, serviceMethod, string(b)}
  55. }
  56. return ioutils.NewReadCloserWrapper(resp.Body, func() error {
  57. err := resp.Body.Close()
  58. cancelRequest()
  59. return err
  60. }), nil
  61. }
  62. }

选择设置采用函数方式修改,因此,创建选项修改函数时注意返回一个闭包方法,下面的代码是修改超时选项的实现。

  1. func WithRequestTimeout(t time.Duration) func(*RequestOpts) {
  2. return func(o *RequestOpts) {
  3. o.Timeout = t
  4. }
  5. }

Daemon

plugin-manager-in-daemon.svg
图 2:Plugin 与 Daemon 关系图

Daemon 实例中保留了 Store 与 Manager 的实例引用,并在创建 Cluster 实例时,通过创建配置对象 ClusterConfig 将 Daemon 当作 executor.Backend 的实现,并将 Manager 当作 plugin.Backend 的实现结构保存下来,注意这里的 plugin.Backend 正确位置位于 daemon/cluster/controllers/plugin 目录。

Backend 接口定义了与容器相关、插件、网络等相关的功能性方法,其实现是 daemon.Daemon 实例,接口定义如下所示。

  1. type Backend interface {
  2. CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
  3. DeleteManagedNetwork(networkID string) error
  4. FindNetwork(idName string) (libnetwork.Network, error)
  5. SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error)
  6. ReleaseIngress() (<-chan struct{}, error)
  7. CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
  8. ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
  9. ContainerStop(name string, seconds *int) error
  10. ContainerLogs(context.Context, string, *types.ContainerLogsOptions) (msgs <-chan *backend.LogMessage, tty bool, err error)
  11. ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
  12. ActivateContainerServiceBinding(containerName string) error
  13. DeactivateContainerServiceBinding(containerName string) error
  14. UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
  15. ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
  16. ContainerWait(ctx context.Context, name string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error)
  17. ContainerRm(name string, config *types.ContainerRmConfig) error
  18. ContainerKill(name string, sig uint64) error
  19. SetContainerDependencyStore(name string, store exec.DependencyGetter) error
  20. SetContainerSecretReferences(name string, refs []*swarmtypes.SecretReference) error
  21. SetContainerConfigReferences(name string, refs []*swarmtypes.ConfigReference) error
  22. SystemInfo() *types.Info
  23. Containers(config *types.ContainerListOptions) ([]*types.Container, error)
  24. SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error
  25. DaemonJoinsCluster(provider cluster.Provider)
  26. DaemonLeavesCluster()
  27. IsSwarmCompatible() error
  28. SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{})
  29. UnsubscribeFromEvents(listener chan interface{})
  30. UpdateAttachment(string, string, string, *network.NetworkingConfig) error
  31. WaitForDetachment(context.Context, string, string, string, string) error
  32. PluginManager() *plugin.Manager
  33. PluginGetter() *plugin.Store
  34. GetAttachmentStore() *networkSettings.AttachmentStore
  35. HasExperimental() bool
  36. }

在 controllers/plugin 目录中定义的 Backend 接口,是 Cluster 实例与 Manager 交互的关键接口,这里还牵涉到另外一个本目录下的 Controller 结构,对 Controller 的操作最终会通过 Manager 实例真正得到执行。

  1. type Backend interface {
  2. Disable(name string, config *enginetypes.PluginDisableConfig) error
  3. Enable(name string, config *enginetypes.PluginEnableConfig) error
  4. Remove(name string, config *enginetypes.PluginRmConfig) error
  5. Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
  6. Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
  7. Get(name string) (*v2.Plugin, error)
  8. SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
  9. }

Backend 与 Controller 实例关系如图 3 所示。这里的 nodeRunner 是与节点控制相关的数据结构,暂时不针对它展开,这个实例在启动时,会创建 container.executor 实例,其中包含了 plugin.Manager,可以对比图 2,会更加清晰。
plugin-controller.svg
图 3:Backend 与 Controller 关系图

Controller 可以通过两种方法获取,一种是图 3 中使用 nodeRunner 的 Controller 方法,另外一种可以通过 Executor 接口的 Controller 方法来获取。

  1. type Executor interface {
  2. // Describe returns the underlying node description.
  3. Describe(ctx context.Context) (*api.NodeDescription, error)
  4. // Configure uses the node object state to propagate node
  5. // state to the underlying executor.
  6. Configure(ctx context.Context, node *api.Node) error
  7. // Controller provides a controller for the given task.
  8. Controller(t *api.Task) (Controller, error)
  9. // SetNetworkBootstrapKeys passes the symmetric keys from the
  10. // manager to the executor.
  11. SetNetworkBootstrapKeys([]*api.EncryptionKey) error
  12. }

我们以 Controller 实例的 Start 方式为例来看一下,可以清楚的看到代码第 4、11 和 16 行都是对实例对象 Manager 的直接调用。

  1. func (p *Controller) Start(ctx context.Context) error {
  2. p.logger.Debug("Start")
  3. pl, err := p.backend.Get(p.pluginID)
  4. if err != nil {
  5. return err
  6. }
  7. if p.spec.Disabled {
  8. if pl.IsEnabled() {
  9. return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
  10. }
  11. return nil
  12. }
  13. if !pl.IsEnabled() {
  14. return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
  15. }
  16. return nil
  17. }

最后,看一下 exec.Controller 接口方法定义,注意这里的 Controller 管理的仍然是插件功能。

  1. type Controller interface {
  2. // Update the task definition seen by the controller. Will return
  3. // ErrTaskUpdateFailed if the provided task definition changes fields that
  4. // cannot be changed.
  5. //
  6. // Will be ignored if the task has exited.
  7. Update(ctx context.Context, t *api.Task) error
  8. // Prepare the task for execution. This should ensure that all resources
  9. // are created such that a call to start should execute immediately.
  10. Prepare(ctx context.Context) error
  11. // Start the target and return when it has started successfully.
  12. Start(ctx context.Context) error
  13. // Wait blocks until the target has exited.
  14. Wait(ctx context.Context) error
  15. // Shutdown requests to exit the target gracefully.
  16. Shutdown(ctx context.Context) error
  17. // Terminate the target.
  18. Terminate(ctx context.Context) error
  19. // Remove all resources allocated by the controller.
  20. Remove(ctx context.Context) error
  21. // Close closes any ephemeral resources associated with controller instance.
  22. Close() error
  23. }