Middleware
Install
API Server 启动时,通过 createMux 方法将中间件功能安装在每一个路由的 Handler 上。其中关键代码在 5 ~ 13 行,核心方法是通过 makeHTTPHandler 将中间件安装在每一个路由上。
func (s *Server) createMux() *mux.Router {
m := mux.NewRouter()
logrus.Debug("Registering routers")
for _, apiRouter := range s.routers {
for _, r := range apiRouter.Routes() {
f := s.makeHTTPHandler(r.Handler())
logrus.Debugf("Registering %s, %s", r.Method(), r.Path())
m.Path(versionMatcher + r.Path()).Methods(r.Method()).Handler(f)
m.Path(r.Path()).Methods(r.Method()).Handler(f)
}
}
debugRouter := debug.NewRouter()
s.routers = append(s.routers, debugRouter)
for _, r := range debugRouter.Routes() {
f := s.makeHTTPHandler(r.Handler())
m.Path("/debug" + r.Path()).Handler(f)
}
notFoundHandler := httputils.MakeErrorHandler(pageNotFoundError{})
m.HandleFunc(versionMatcher+"/{path:.*}", notFoundHandler)
m.NotFoundHandler = notFoundHandler
m.MethodNotAllowedHandler = notFoundHandler
return m
}
Server 的 makeHTTPHandler 方法,通过 15 行的 handlerWithGlobalMiddlewares 方法将全局中间件添加到路由 Handler 上,并返回新的带中间件检查的 Handler。
func (s *Server) makeHTTPHandler(handler httputils.APIFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Define the context that we'll pass around to share info
// like the docker-request-id.
//
// The 'context' will be used for global data that should
// apply to all requests. Data that is specific to the
// immediate function being called should still be passed
// as 'args' on the function call.
// use intermediate variable to prevent "should not use basic type
// string as key in context.WithValue" golint errors
ctx := context.WithValue(r.Context(), dockerversion.UAStringKey{}, r.Header.Get("User-Agent"))
r = r.WithContext(ctx)
handlerFunc := s.handlerWithGlobalMiddlewares(handler)
vars := mux.Vars(r)
if vars == nil {
vars = make(map[string]string)
}
if err := handlerFunc(ctx, w, r, vars); err != nil {
statusCode := errdefs.GetHTTPErrorStatusCode(err)
if statusCode >= 500 {
logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)
}
httputils.MakeErrorHandler(err)(w, r)
}
}
}
Server 在安装全局中间件时,首先保存初始的全局中间件,并按照中间件先后顺序分别增加中间件功能,并在下轮循环中继续对新的 Handler 进行处理,最后加装调试中间件。
func (s *Server) handlerWithGlobalMiddlewares(handler httputils.APIFunc) httputils.APIFunc {
next := handler
for _, m := range s.middlewares {
next = m.WrapHandler(next)
}
if s.cfg.Logging && logrus.GetLevel() == logrus.DebugLevel {
next = middleware.DebugRequestMiddleware(next)
}
return next
}
Interface & Implementations
Middleware 定义了通用中间件接口,对于具体的中间件可以通过实现该接口变为中间件,同时可以添加与中间件功能相关的特定信息,注意这个方法的参数与返回值类型是完全相同的。
type Middleware interface {
WrapHandler(func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error
}
以 cors 中间件为例,通过实现 WrapHandler 方法将 cors 相关的设置添加在应答报文头上,然后再执行传入的 Handler。
type CORSMiddleware struct {
defaultHeaders string
}
// NewCORSMiddleware creates a new CORSMiddleware with default headers.
func NewCORSMiddleware(d string) CORSMiddleware {
return CORSMiddleware{defaultHeaders: d}
}
// WrapHandler returns a new handler function wrapping the previous one in the request chain.
func (c CORSMiddleware) WrapHandler(handler func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
// If "api-cors-header" is not given, but "api-enable-cors" is true, we set cors to "*"
// otherwise, all head values will be passed to HTTP handler
corsHeaders := c.defaultHeaders
if corsHeaders == "" {
corsHeaders = "*"
}
logrus.Debugf("CORS header is enabled and set to: %s", corsHeaders)
w.Header().Add("Access-Control-Allow-Origin", corsHeaders)
w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, X-Registry-Auth")
w.Header().Add("Access-Control-Allow-Methods", "HEAD, GET, POST, DELETE, PUT, OPTIONS")
return handler(ctx, w, r, vars)
}
}
全局中间件在 Docker 后台进程中初始化,唯一特殊的中间件是认证中间件,在构建时传入了一个名为 pluginStore 变量。
func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config, pluginStore plugingetter.PluginGetter) error {
v := cfg.Version
exp := middleware.NewExperimentalMiddleware(cli.Config.Experimental)
s.UseMiddleware(exp)
vm := middleware.NewVersionMiddleware(v, api.DefaultVersion, api.MinVersion)
s.UseMiddleware(vm)
if cfg.CorsHeaders != "" {
c := middleware.NewCORSMiddleware(cfg.CorsHeaders)
s.UseMiddleware(c)
}
cli.authzMiddleware = authorization.NewMiddleware(cli.Config.AuthorizationPlugins, pluginStore)
cli.Config.AuthzMiddleware = cli.authzMiddleware
s.UseMiddleware(cli.authzMiddleware)
return nil
}
根据调用的上下文可以找到这个变量对应的创建方法
pluginStore := plugin.NewStore()
if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {
logrus.Fatalf("Error creating middlewares: %v", err)
}
Plugins
Overview
图 1:Plugin 全局概览
从图 1 可以看出插件管理涉及的主要功能模组。通过 http.Client 不难确认,插件也是 C/S 模型的。其中 Manager 全局唯一的 Store,Store 里储存了插件名称到插件的一个映射,并会保存一些列针对插件的处理方法。
Client
Client 的方法调用使用 Call 方法,参数为服务名(目标方法)、参数及返回值,这几个参数的含义非常明确,返回值是目标方法返回内容,Call 方法执行完成后,会被修改。
func (c *Client) Call(serviceMethod string, args, ret interface{}) error {
return c.CallWithOptions(serviceMethod, args, ret)
}
Client 的另外一个方法是 Stream,请求插件服务端,并返回一个流。
func (c *Client) Stream(serviceMethod string, args interface{}) (io.ReadCloser, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(args); err != nil {
return nil, err
}
return c.callWithRetry(serviceMethod, &buf, true)
}
无论使用哪个方法执行调用,最终都会执行到 callWithRetry 方法,这个方法的第四个参数用于对请求选项进行设定,在 5 ~ 8 行。请求选项设定完成后,使用 RequestFactory 接口实例创建请求,然后通过网络请求(http.Client.Do)调用远程方法并返回结果。
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool, reqOpts ...func(*RequestOpts)) (io.ReadCloser, error) {
var retries int
start := time.Now()
var opts RequestOpts
for _, o := range reqOpts {
o(&opts)
}
for {
req, err := c.requestFactory.NewRequest(serviceMethod, data)
if err != nil {
return nil, err
}
cancelRequest := func() {}
if opts.Timeout > 0 {
var ctx context.Context
ctx, cancelRequest = context.WithTimeout(req.Context(), opts.Timeout)
req = req.WithContext(ctx)
}
resp, err := c.http.Do(req)
if err != nil {
cancelRequest()
if !retry {
return nil, err
}
timeOff := backoff(retries)
if abort(start, timeOff) {
return nil, err
}
retries++
logrus.Warnf("Unable to connect to plugin: %s%s: %v, retrying in %v", req.URL.Host, req.URL.Path, err, timeOff)
time.Sleep(timeOff)
continue
}
if resp.StatusCode != http.StatusOK {
b, err := io.ReadAll(resp.Body)
resp.Body.Close()
cancelRequest()
if err != nil {
return nil, &statusError{resp.StatusCode, serviceMethod, err.Error()}
}
// Plugins' Response(s) should have an Err field indicating what went
// wrong. Try to unmarshal into ResponseErr. Otherwise fallback to just
// return the string(body)
type responseErr struct {
Err string
}
remoteErr := responseErr{}
if err := json.Unmarshal(b, &remoteErr); err == nil {
if remoteErr.Err != "" {
return nil, &statusError{resp.StatusCode, serviceMethod, remoteErr.Err}
}
}
// old way...
return nil, &statusError{resp.StatusCode, serviceMethod, string(b)}
}
return ioutils.NewReadCloserWrapper(resp.Body, func() error {
err := resp.Body.Close()
cancelRequest()
return err
}), nil
}
}
选择设置采用函数方式修改,因此,创建选项修改函数时注意返回一个闭包方法,下面的代码是修改超时选项的实现。
func WithRequestTimeout(t time.Duration) func(*RequestOpts) {
return func(o *RequestOpts) {
o.Timeout = t
}
}
Daemon
图 2:Plugin 与 Daemon 关系图
Daemon 实例中保留了 Store 与 Manager 的实例引用,并在创建 Cluster 实例时,通过创建配置对象 ClusterConfig 将 Daemon 当作 executor.Backend 的实现,并将 Manager 当作 plugin.Backend 的实现结构保存下来,注意这里的 plugin.Backend 正确位置位于 daemon/cluster/controllers/plugin 目录。
Backend 接口定义了与容器相关、插件、网络等相关的功能性方法,其实现是 daemon.Daemon 实例,接口定义如下所示。
type Backend interface {
CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
DeleteManagedNetwork(networkID string) error
FindNetwork(idName string) (libnetwork.Network, error)
SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error)
ReleaseIngress() (<-chan struct{}, error)
CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error
ContainerStop(name string, seconds *int) error
ContainerLogs(context.Context, string, *types.ContainerLogsOptions) (msgs <-chan *backend.LogMessage, tty bool, err error)
ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
ActivateContainerServiceBinding(containerName string) error
DeactivateContainerServiceBinding(containerName string) error
UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
ContainerWait(ctx context.Context, name string, condition containerpkg.WaitCondition) (<-chan containerpkg.StateStatus, error)
ContainerRm(name string, config *types.ContainerRmConfig) error
ContainerKill(name string, sig uint64) error
SetContainerDependencyStore(name string, store exec.DependencyGetter) error
SetContainerSecretReferences(name string, refs []*swarmtypes.SecretReference) error
SetContainerConfigReferences(name string, refs []*swarmtypes.ConfigReference) error
SystemInfo() *types.Info
Containers(config *types.ContainerListOptions) ([]*types.Container, error)
SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error
DaemonJoinsCluster(provider cluster.Provider)
DaemonLeavesCluster()
IsSwarmCompatible() error
SubscribeToEvents(since, until time.Time, filter filters.Args) ([]events.Message, chan interface{})
UnsubscribeFromEvents(listener chan interface{})
UpdateAttachment(string, string, string, *network.NetworkingConfig) error
WaitForDetachment(context.Context, string, string, string, string) error
PluginManager() *plugin.Manager
PluginGetter() *plugin.Store
GetAttachmentStore() *networkSettings.AttachmentStore
HasExperimental() bool
}
在 controllers/plugin 目录中定义的 Backend 接口,是 Cluster 实例与 Manager 交互的关键接口,这里还牵涉到另外一个本目录下的 Controller 结构,对 Controller 的操作最终会通过 Manager 实例真正得到执行。
type Backend interface {
Disable(name string, config *enginetypes.PluginDisableConfig) error
Enable(name string, config *enginetypes.PluginEnableConfig) error
Remove(name string, config *enginetypes.PluginRmConfig) error
Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
Get(name string) (*v2.Plugin, error)
SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
}
Backend 与 Controller 实例关系如图 3 所示。这里的 nodeRunner 是与节点控制相关的数据结构,暂时不针对它展开,这个实例在启动时,会创建 container.executor 实例,其中包含了 plugin.Manager,可以对比图 2,会更加清晰。
图 3:Backend 与 Controller 关系图
Controller 可以通过两种方法获取,一种是图 3 中使用 nodeRunner 的 Controller 方法,另外一种可以通过 Executor 接口的 Controller 方法来获取。
type Executor interface {
// Describe returns the underlying node description.
Describe(ctx context.Context) (*api.NodeDescription, error)
// Configure uses the node object state to propagate node
// state to the underlying executor.
Configure(ctx context.Context, node *api.Node) error
// Controller provides a controller for the given task.
Controller(t *api.Task) (Controller, error)
// SetNetworkBootstrapKeys passes the symmetric keys from the
// manager to the executor.
SetNetworkBootstrapKeys([]*api.EncryptionKey) error
}
我们以 Controller 实例的 Start 方式为例来看一下,可以清楚的看到代码第 4、11 和 16 行都是对实例对象 Manager 的直接调用。
func (p *Controller) Start(ctx context.Context) error {
p.logger.Debug("Start")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
if p.spec.Disabled {
if pl.IsEnabled() {
return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
}
return nil
}
if !pl.IsEnabled() {
return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
}
return nil
}
最后,看一下 exec.Controller 接口方法定义,注意这里的 Controller 管理的仍然是插件功能。
type Controller interface {
// Update the task definition seen by the controller. Will return
// ErrTaskUpdateFailed if the provided task definition changes fields that
// cannot be changed.
//
// Will be ignored if the task has exited.
Update(ctx context.Context, t *api.Task) error
// Prepare the task for execution. This should ensure that all resources
// are created such that a call to start should execute immediately.
Prepare(ctx context.Context) error
// Start the target and return when it has started successfully.
Start(ctx context.Context) error
// Wait blocks until the target has exited.
Wait(ctx context.Context) error
// Shutdown requests to exit the target gracefully.
Shutdown(ctx context.Context) error
// Terminate the target.
Terminate(ctx context.Context) error
// Remove all resources allocated by the controller.
Remove(ctx context.Context) error
// Close closes any ephemeral resources associated with controller instance.
Close() error
}