HTTP
Server
Docker 采用的是 HTTP 的 C/S 结构来实现核心服务,主要由三个核心结构对外提供服务,三者关系如图 1 所示。
图 1:Docker Daemon 示意图
DaemonCli 结构体创建完毕后,直接执行 start 方法启动服务,其中的关键代码如下所示。首先创建的是 apiserver.Server 结构体;然后创建 daemon.Daemon 对象;最后,通过 routerOptions 配合,创建 router.Router 接口实例。
func (cli *DaemonCli) start(opts *daemonOptions) (err error) {// ...// Create the daemon root before we create ANY other files (PID, or migrate keys)// to ensure the appropriate ACL is set (particularly relevant on Windows)if err := daemon.CreateDaemonRoot(cli.Config); err != nil {return err}if err := system.MkdirAll(cli.Config.ExecRoot, 0700); err != nil {return err}// ...serverConfig, err := newAPIServerConfig(cli)if err != nil {return errors.Wrap(err, "failed to create API server")}cli.api = apiserver.New(serverConfig)hosts, err := loadListeners(cli, serverConfig)if err != nil {return errors.Wrap(err, "failed to load listeners")}// ...pluginStore := plugin.NewStore()if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {logrus.Fatalf("Error creating middlewares: %v", err)}d, err := daemon.NewDaemon(ctx, cli.Config, pluginStore)if err != nil {return errors.Wrap(err, "failed to start daemon")}// ...routerOptions, err := newRouterOptions(cli.Config, d)if err != nil {return err}routerOptions.api = cli.apirouterOptions.cluster = cinitRouter(routerOptions)// ...}
router.Router 接口只有一个方法 Routes,返回这个路由注册的全部可用方法、路径及处理方法,如下所示。
// Router defines an interface to specify a group of routes to add to the docker server.type Router interface {// Routes returns the list of routes to add to the docker server.Routes() []Route}// Route defines an individual API route in the docker server.type Route interface {// Handler returns the raw function to create the http handler.Handler() httputils.APIFunc// Method returns the http method that the route responds to.Method() string// Path returns the subpath where the route responds to.Path() string}
初始化路由时,可以根据路由对应的不同功能,传入在 routerOptions 中保留的 daemon.Daemon 实例。
func initRouter(opts routerOptions) {decoder := runconfig.ContainerDecoder{GetSysInfo: func() *sysinfo.SysInfo {return opts.daemon.RawSysInfo()},}routers := []router.Router{// we need to add the checkpoint router before the container router or the DELETE gets maskedcheckpointrouter.NewRouter(opts.daemon, decoder),container.NewRouter(opts.daemon, decoder, opts.daemon.RawSysInfo().CgroupUnified),image.NewRouter(opts.daemon.ImageService()),systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildkit, opts.features),volume.NewRouter(opts.daemon.VolumesService()),build.NewRouter(opts.buildBackend, opts.daemon, opts.features),sessionrouter.NewRouter(opts.sessionManager),swarmrouter.NewRouter(opts.cluster),pluginrouter.NewRouter(opts.daemon.PluginManager()),distributionrouter.NewRouter(opts.daemon.ImageService()),}grpcBackends := []grpcrouter.Backend{}for _, b := range []interface{}{opts.daemon, opts.buildBackend} {if b, ok := b.(grpcrouter.Backend); ok {grpcBackends = append(grpcBackends, b)}}if len(grpcBackends) > 0 {routers = append(routers, grpcrouter.NewRouter(grpcBackends...))}if opts.daemon.NetworkControllerEnabled() {routers = append(routers, network.NewRouter(opts.daemon, opts.cluster))}if opts.daemon.HasExperimental() {for _, r := range routers {for _, route := range r.Routes() {if experimental, ok := route.(router.ExperimentalRoute); ok {experimental.Enable()}}}}opts.api.InitRouter(routers...)}
我们以容器对应方法为例来简单说明一下,在 containerRouter 中,创建路由路径并记录对应的功能方法,如下所示。
func (r *containerRouter) initRoutes() {r.routes = []router.Route{// HEADrouter.NewHeadRoute("/containers/{name:.*}/archive", r.headContainersArchive),// GETrouter.NewGetRoute("/containers/json", r.getContainersJSON),router.NewGetRoute("/containers/{name:.*}/export", r.getContainersExport),router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs),router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats),router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),// POSTrouter.NewPostRoute("/containers/create", r.postContainersCreate),router.NewPostRoute("/containers/{name:.*}/kill", r.postContainersKill),router.NewPostRoute("/containers/{name:.*}/pause", r.postContainersPause),router.NewPostRoute("/containers/{name:.*}/unpause", r.postContainersUnpause),router.NewPostRoute("/containers/{name:.*}/restart", r.postContainersRestart),router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),router.NewPostRoute("/containers/{name:.*}/stop", r.postContainersStop),router.NewPostRoute("/containers/{name:.*}/wait", r.postContainersWait),router.NewPostRoute("/containers/{name:.*}/resize", r.postContainersResize),router.NewPostRoute("/containers/{name:.*}/attach", r.postContainersAttach),router.NewPostRoute("/containers/{name:.*}/copy", r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12router.NewPostRoute("/containers/{name:.*}/exec", r.postContainerExecCreate),router.NewPostRoute("/exec/{name:.*}/start", r.postContainerExecStart),router.NewPostRoute("/exec/{name:.*}/resize", r.postContainerExecResize),router.NewPostRoute("/containers/{name:.*}/rename", r.postContainerRename),router.NewPostRoute("/containers/{name:.*}/update", r.postContainerUpdate),router.NewPostRoute("/containers/prune", r.postContainersPrune),router.NewPostRoute("/commit", r.postCommit),// PUTrouter.NewPutRoute("/containers/{name:.*}/archive", r.putContainersArchive),// DELETErouter.NewDeleteRoute("/containers/{name:.*}", r.deleteContainers),}}
HTTPServer
图 2:HTTPServer 总揽
Server 的 Accept 方法将 Listener 绑定至一个 HTTPServer 结构体中,确立每一个 Listener 实例都有一个特定的服务结构体。http.Server 中的 Handler 是通过 apiserver.Server 中的全局路由生成的。
func (s *Server) Accept(addr string, listeners ...net.Listener) {for _, listener := range listeners {httpServer := &HTTPServer{srv: &http.Server{Addr: addr,},l: listener,}s.servers = append(s.servers, httpServer)}}
Server 的 serveAPI 开启服务,通过下面代码中的 3 ~ 13 行,为每个实际的 HTTPServer 创建一个服务协程,并监听每个协程是否有错误发生,只要一个协程出错,全部进程退出。
func (s *Server) serveAPI() error {var chErrors = make(chan error, len(s.servers))for _, srv := range s.servers {srv.srv.Handler = s.createMux()go func(srv *HTTPServer) {var err errorlogrus.Infof("API listen on %s", srv.l.Addr())if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {err = nil}chErrors <- err}(srv)}for range s.servers {err := <-chErrorsif err != nil {return err}}return nil}
HTTPServer 的 Serve 方法只需要简单调用内部的 http.Server 的 Serve 方法,并传入保存的 Listener 接口实例即可。
func (s *HTTPServer) Serve() error {return s.srv.Serve(s.l)}
Client
Client 功能实现相对简单,在 http.Client 的基础上做了一些简单封装,其结构如下所示。
type Client struct {// scheme sets the scheme for the clientscheme string// host holds the server address to connect tohost string// proto holds the client protocol i.e. unix.proto string// addr holds the client address.addr string// basePath holds the path to prepend to the requests.basePath string// client used to send and receive http requests.client *http.Client// version of the server to talk to.version string// custom http headers configured by users.customHTTPHeaders map[string]string// manualOverride is set to true when the version was set by users.manualOverride bool// negotiateVersion indicates if the client should automatically negotiate// the API version to use when making requests. API version negotiation is// performed on the first request, after which negotiated is set to "true"// so that subsequent requests do not re-negotiate.negotiateVersion bool// negotiated indicates that API version negotiation took placenegotiated bool}
其中的 client 通过 defaultHTTPClient 方法创建。
func defaultHTTPClient(host string) (*http.Client, error) {url, err := ParseHostURL(host)if err != nil {return nil, err}transport := new(http.Transport)sockets.ConfigureTransport(transport, url.Scheme, url.Host)return &http.Client{Transport: transport,CheckRedirect: CheckRedirect,}, nil}
其中的 sockets.ConfigureTransport 方法值得关注,其中对于 unix、npipe 类型的协议做了特殊处理,对于普通的 tcp 类型连接,则根据系统代理进行配置。
func ConfigureTransport(tr *http.Transport, proto, addr string) error {switch proto {case "unix":return configureUnixTransport(tr, proto, addr)case "npipe":return configureNpipeTransport(tr, proto, addr)default:tr.Proxy = http.ProxyFromEnvironmentdialer, err := DialerFromEnvironment(&net.Dialer{Timeout: defaultTimeout,})if err != nil {return err}tr.Dial = dialer.Dial}return nil}
Proxy
DailerFromEnvironment 根据系统配置的 ALL_PROXY 与 NO_PROXY 环境变量对默认的 Dailer 进行配置,并返回 PerHost 实例。下面代码中的 23 行将系统 NO_PROXY 环境变量中设置的 URL 进行保存。
func DialerFromEnvironment(direct *net.Dialer) (proxy.Dialer, error) {allProxy := GetProxyEnv("all_proxy")if len(allProxy) == 0 {return direct, nil}proxyURL, err := url.Parse(allProxy)if err != nil {return direct, err}proxyFromURL, err := proxy.FromURL(proxyURL, direct)if err != nil {return direct, err}noProxy := GetProxyEnv("no_proxy")if len(noProxy) == 0 {return proxyFromURL, nil}perHost := proxy.NewPerHost(proxyFromURL, direct)perHost.AddFromString(noProxy)return perHost, nil}
PerHost 保留代理 Dialer 及原始 Dailer,默认使用代理 Dialer。
func NewPerHost(defaultDialer, bypass Dialer) *PerHost {return &PerHost{def: defaultDialer,bypass: bypass,}}
其 Dial 方法根据连接地址中的主机域来选择 Dialer 实例再进行连接建立操作。
func (p *PerHost) Dial(network, addr string) (c net.Conn, err error) {host, _, err := net.SplitHostPort(addr)if err != nil {return nil, err}return p.dialerForRequest(host).Dial(network, addr)}
dialerForRequest 方法根据过滤规则选择默认的代理 Dialer 或者直连 Dialer。
func (p *PerHost) dialerForRequest(host string) Dialer {if ip := net.ParseIP(host); ip != nil {for _, net := range p.bypassNetworks {if net.Contains(ip) {return p.bypass}}for _, bypassIP := range p.bypassIPs {if bypassIP.Equal(ip) {return p.bypass}}return p.def}for _, zone := range p.bypassZones {if strings.HasSuffix(host, zone) {return p.bypass}if host == zone[1:] {// For a zone ".example.com", we match "example.com"// too.return p.bypass}}for _, bypassHost := range p.bypassHosts {if bypassHost == host {return p.bypass}}return p.def}
