HTTP

Server

Docker 采用的是 HTTP 的 C/S 结构来实现核心服务,主要由三个核心结构对外提供服务,三者关系如图 1 所示。

docker-overview.svg
图 1:Docker Daemon 示意图

DaemonCli 结构体创建完毕后,直接执行 start 方法启动服务,其中的关键代码如下所示。首先创建的是 apiserver.Server 结构体;然后创建 daemon.Daemon 对象;最后,通过 routerOptions 配合,创建 router.Router 接口实例。

  1. func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
  2. // ...
  3. // Create the daemon root before we create ANY other files (PID, or migrate keys)
  4. // to ensure the appropriate ACL is set (particularly relevant on Windows)
  5. if err := daemon.CreateDaemonRoot(cli.Config); err != nil {
  6. return err
  7. }
  8. if err := system.MkdirAll(cli.Config.ExecRoot, 0700); err != nil {
  9. return err
  10. }
  11. // ...
  12. serverConfig, err := newAPIServerConfig(cli)
  13. if err != nil {
  14. return errors.Wrap(err, "failed to create API server")
  15. }
  16. cli.api = apiserver.New(serverConfig)
  17. hosts, err := loadListeners(cli, serverConfig)
  18. if err != nil {
  19. return errors.Wrap(err, "failed to load listeners")
  20. }
  21. // ...
  22. pluginStore := plugin.NewStore()
  23. if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {
  24. logrus.Fatalf("Error creating middlewares: %v", err)
  25. }
  26. d, err := daemon.NewDaemon(ctx, cli.Config, pluginStore)
  27. if err != nil {
  28. return errors.Wrap(err, "failed to start daemon")
  29. }
  30. // ...
  31. routerOptions, err := newRouterOptions(cli.Config, d)
  32. if err != nil {
  33. return err
  34. }
  35. routerOptions.api = cli.api
  36. routerOptions.cluster = c
  37. initRouter(routerOptions)
  38. // ...
  39. }

router.Router 接口只有一个方法 Routes,返回这个路由注册的全部可用方法、路径及处理方法,如下所示。

  1. // Router defines an interface to specify a group of routes to add to the docker server.
  2. type Router interface {
  3. // Routes returns the list of routes to add to the docker server.
  4. Routes() []Route
  5. }
  6. // Route defines an individual API route in the docker server.
  7. type Route interface {
  8. // Handler returns the raw function to create the http handler.
  9. Handler() httputils.APIFunc
  10. // Method returns the http method that the route responds to.
  11. Method() string
  12. // Path returns the subpath where the route responds to.
  13. Path() string
  14. }

初始化路由时,可以根据路由对应的不同功能,传入在 routerOptions 中保留的 daemon.Daemon 实例。

  1. func initRouter(opts routerOptions) {
  2. decoder := runconfig.ContainerDecoder{
  3. GetSysInfo: func() *sysinfo.SysInfo {
  4. return opts.daemon.RawSysInfo()
  5. },
  6. }
  7. routers := []router.Router{
  8. // we need to add the checkpoint router before the container router or the DELETE gets masked
  9. checkpointrouter.NewRouter(opts.daemon, decoder),
  10. container.NewRouter(opts.daemon, decoder, opts.daemon.RawSysInfo().CgroupUnified),
  11. image.NewRouter(opts.daemon.ImageService()),
  12. systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildkit, opts.features),
  13. volume.NewRouter(opts.daemon.VolumesService()),
  14. build.NewRouter(opts.buildBackend, opts.daemon, opts.features),
  15. sessionrouter.NewRouter(opts.sessionManager),
  16. swarmrouter.NewRouter(opts.cluster),
  17. pluginrouter.NewRouter(opts.daemon.PluginManager()),
  18. distributionrouter.NewRouter(opts.daemon.ImageService()),
  19. }
  20. grpcBackends := []grpcrouter.Backend{}
  21. for _, b := range []interface{}{opts.daemon, opts.buildBackend} {
  22. if b, ok := b.(grpcrouter.Backend); ok {
  23. grpcBackends = append(grpcBackends, b)
  24. }
  25. }
  26. if len(grpcBackends) > 0 {
  27. routers = append(routers, grpcrouter.NewRouter(grpcBackends...))
  28. }
  29. if opts.daemon.NetworkControllerEnabled() {
  30. routers = append(routers, network.NewRouter(opts.daemon, opts.cluster))
  31. }
  32. if opts.daemon.HasExperimental() {
  33. for _, r := range routers {
  34. for _, route := range r.Routes() {
  35. if experimental, ok := route.(router.ExperimentalRoute); ok {
  36. experimental.Enable()
  37. }
  38. }
  39. }
  40. }
  41. opts.api.InitRouter(routers...)
  42. }

我们以容器对应方法为例来简单说明一下,在 containerRouter 中,创建路由路径并记录对应的功能方法,如下所示。

  1. func (r *containerRouter) initRoutes() {
  2. r.routes = []router.Route{
  3. // HEAD
  4. router.NewHeadRoute("/containers/{name:.*}/archive", r.headContainersArchive),
  5. // GET
  6. router.NewGetRoute("/containers/json", r.getContainersJSON),
  7. router.NewGetRoute("/containers/{name:.*}/export", r.getContainersExport),
  8. router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),
  9. router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),
  10. router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),
  11. router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs),
  12. router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats),
  13. router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),
  14. router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),
  15. router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),
  16. // POST
  17. router.NewPostRoute("/containers/create", r.postContainersCreate),
  18. router.NewPostRoute("/containers/{name:.*}/kill", r.postContainersKill),
  19. router.NewPostRoute("/containers/{name:.*}/pause", r.postContainersPause),
  20. router.NewPostRoute("/containers/{name:.*}/unpause", r.postContainersUnpause),
  21. router.NewPostRoute("/containers/{name:.*}/restart", r.postContainersRestart),
  22. router.NewPostRoute("/containers/{name:.*}/start", r.postContainersStart),
  23. router.NewPostRoute("/containers/{name:.*}/stop", r.postContainersStop),
  24. router.NewPostRoute("/containers/{name:.*}/wait", r.postContainersWait),
  25. router.NewPostRoute("/containers/{name:.*}/resize", r.postContainersResize),
  26. router.NewPostRoute("/containers/{name:.*}/attach", r.postContainersAttach),
  27. router.NewPostRoute("/containers/{name:.*}/copy", r.postContainersCopy), // Deprecated since 1.8, Errors out since 1.12
  28. router.NewPostRoute("/containers/{name:.*}/exec", r.postContainerExecCreate),
  29. router.NewPostRoute("/exec/{name:.*}/start", r.postContainerExecStart),
  30. router.NewPostRoute("/exec/{name:.*}/resize", r.postContainerExecResize),
  31. router.NewPostRoute("/containers/{name:.*}/rename", r.postContainerRename),
  32. router.NewPostRoute("/containers/{name:.*}/update", r.postContainerUpdate),
  33. router.NewPostRoute("/containers/prune", r.postContainersPrune),
  34. router.NewPostRoute("/commit", r.postCommit),
  35. // PUT
  36. router.NewPutRoute("/containers/{name:.*}/archive", r.putContainersArchive),
  37. // DELETE
  38. router.NewDeleteRoute("/containers/{name:.*}", r.deleteContainers),
  39. }
  40. }

HTTPServer

docker-http-server.svg
图 2:HTTPServer 总揽

Server 的 Accept 方法将 Listener 绑定至一个 HTTPServer 结构体中,确立每一个 Listener 实例都有一个特定的服务结构体。http.Server 中的 Handler 是通过 apiserver.Server 中的全局路由生成的。

  1. func (s *Server) Accept(addr string, listeners ...net.Listener) {
  2. for _, listener := range listeners {
  3. httpServer := &HTTPServer{
  4. srv: &http.Server{
  5. Addr: addr,
  6. },
  7. l: listener,
  8. }
  9. s.servers = append(s.servers, httpServer)
  10. }
  11. }

Server 的 serveAPI 开启服务,通过下面代码中的 3 ~ 13 行,为每个实际的 HTTPServer 创建一个服务协程,并监听每个协程是否有错误发生,只要一个协程出错,全部进程退出。

  1. func (s *Server) serveAPI() error {
  2. var chErrors = make(chan error, len(s.servers))
  3. for _, srv := range s.servers {
  4. srv.srv.Handler = s.createMux()
  5. go func(srv *HTTPServer) {
  6. var err error
  7. logrus.Infof("API listen on %s", srv.l.Addr())
  8. if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
  9. err = nil
  10. }
  11. chErrors <- err
  12. }(srv)
  13. }
  14. for range s.servers {
  15. err := <-chErrors
  16. if err != nil {
  17. return err
  18. }
  19. }
  20. return nil
  21. }

HTTPServer 的 Serve 方法只需要简单调用内部的 http.Server 的 Serve 方法,并传入保存的 Listener 接口实例即可。

  1. func (s *HTTPServer) Serve() error {
  2. return s.srv.Serve(s.l)
  3. }

Client

Client 功能实现相对简单,在 http.Client 的基础上做了一些简单封装,其结构如下所示。

  1. type Client struct {
  2. // scheme sets the scheme for the client
  3. scheme string
  4. // host holds the server address to connect to
  5. host string
  6. // proto holds the client protocol i.e. unix.
  7. proto string
  8. // addr holds the client address.
  9. addr string
  10. // basePath holds the path to prepend to the requests.
  11. basePath string
  12. // client used to send and receive http requests.
  13. client *http.Client
  14. // version of the server to talk to.
  15. version string
  16. // custom http headers configured by users.
  17. customHTTPHeaders map[string]string
  18. // manualOverride is set to true when the version was set by users.
  19. manualOverride bool
  20. // negotiateVersion indicates if the client should automatically negotiate
  21. // the API version to use when making requests. API version negotiation is
  22. // performed on the first request, after which negotiated is set to "true"
  23. // so that subsequent requests do not re-negotiate.
  24. negotiateVersion bool
  25. // negotiated indicates that API version negotiation took place
  26. negotiated bool
  27. }

其中的 client 通过 defaultHTTPClient 方法创建。

  1. func defaultHTTPClient(host string) (*http.Client, error) {
  2. url, err := ParseHostURL(host)
  3. if err != nil {
  4. return nil, err
  5. }
  6. transport := new(http.Transport)
  7. sockets.ConfigureTransport(transport, url.Scheme, url.Host)
  8. return &http.Client{
  9. Transport: transport,
  10. CheckRedirect: CheckRedirect,
  11. }, nil
  12. }

其中的 sockets.ConfigureTransport 方法值得关注,其中对于 unix、npipe 类型的协议做了特殊处理,对于普通的 tcp 类型连接,则根据系统代理进行配置。

  1. func ConfigureTransport(tr *http.Transport, proto, addr string) error {
  2. switch proto {
  3. case "unix":
  4. return configureUnixTransport(tr, proto, addr)
  5. case "npipe":
  6. return configureNpipeTransport(tr, proto, addr)
  7. default:
  8. tr.Proxy = http.ProxyFromEnvironment
  9. dialer, err := DialerFromEnvironment(&net.Dialer{
  10. Timeout: defaultTimeout,
  11. })
  12. if err != nil {
  13. return err
  14. }
  15. tr.Dial = dialer.Dial
  16. }
  17. return nil
  18. }

Proxy

DailerFromEnvironment 根据系统配置的 ALL_PROXY 与 NO_PROXY 环境变量对默认的 Dailer 进行配置,并返回 PerHost 实例。下面代码中的 23 行将系统 NO_PROXY 环境变量中设置的 URL 进行保存。

  1. func DialerFromEnvironment(direct *net.Dialer) (proxy.Dialer, error) {
  2. allProxy := GetProxyEnv("all_proxy")
  3. if len(allProxy) == 0 {
  4. return direct, nil
  5. }
  6. proxyURL, err := url.Parse(allProxy)
  7. if err != nil {
  8. return direct, err
  9. }
  10. proxyFromURL, err := proxy.FromURL(proxyURL, direct)
  11. if err != nil {
  12. return direct, err
  13. }
  14. noProxy := GetProxyEnv("no_proxy")
  15. if len(noProxy) == 0 {
  16. return proxyFromURL, nil
  17. }
  18. perHost := proxy.NewPerHost(proxyFromURL, direct)
  19. perHost.AddFromString(noProxy)
  20. return perHost, nil
  21. }

PerHost 保留代理 Dialer 及原始 Dailer,默认使用代理 Dialer。

  1. func NewPerHost(defaultDialer, bypass Dialer) *PerHost {
  2. return &PerHost{
  3. def: defaultDialer,
  4. bypass: bypass,
  5. }
  6. }

其 Dial 方法根据连接地址中的主机域来选择 Dialer 实例再进行连接建立操作。

  1. func (p *PerHost) Dial(network, addr string) (c net.Conn, err error) {
  2. host, _, err := net.SplitHostPort(addr)
  3. if err != nil {
  4. return nil, err
  5. }
  6. return p.dialerForRequest(host).Dial(network, addr)
  7. }

dialerForRequest 方法根据过滤规则选择默认的代理 Dialer 或者直连 Dialer。

  1. func (p *PerHost) dialerForRequest(host string) Dialer {
  2. if ip := net.ParseIP(host); ip != nil {
  3. for _, net := range p.bypassNetworks {
  4. if net.Contains(ip) {
  5. return p.bypass
  6. }
  7. }
  8. for _, bypassIP := range p.bypassIPs {
  9. if bypassIP.Equal(ip) {
  10. return p.bypass
  11. }
  12. }
  13. return p.def
  14. }
  15. for _, zone := range p.bypassZones {
  16. if strings.HasSuffix(host, zone) {
  17. return p.bypass
  18. }
  19. if host == zone[1:] {
  20. // For a zone ".example.com", we match "example.com"
  21. // too.
  22. return p.bypass
  23. }
  24. }
  25. for _, bypassHost := range p.bypassHosts {
  26. if bypassHost == host {
  27. return p.bypass
  28. }
  29. }
  30. return p.def
  31. }