HTTP Listener

server-http-listener.svg
图 1. httpListener 关系图

httpListener 用于处理同一 HTTP Server 实例监听多个地址的需求。创建时,将实际的 TCPListener 记录在一个数组中,每个 TCPListener 独立工作,当有新连接到达某个 TCPListener 时,将这个连接发送至 chan,httpListener 的 Accept 方法等待这个 chan,当有数据时,返回结果,以此实现对多个地址的监听,表现在上层只有一个 Listener。

Creation

  1. func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {
  2. var tcpListeners []*net.TCPListener
  3. // Close all opened listeners on error
  4. defer func() {
  5. if err == nil {
  6. return
  7. }
  8. for _, tcpListener := range tcpListeners {
  9. // Ignore error on close.
  10. tcpListener.Close()
  11. }
  12. }()
  13. for _, serverAddr := range serverAddrs {
  14. var l net.Listener
  15. if l, err = listen("tcp", serverAddr); err != nil {
  16. if l, err = fallbackListen("tcp", serverAddr); err != nil {
  17. return nil, err
  18. }
  19. }
  20. tcpListener, ok := l.(*net.TCPListener)
  21. if !ok {
  22. return nil, fmt.Errorf("unexpected listener type found %v, expected net.TCPListener", l)
  23. }
  24. tcpListeners = append(tcpListeners, tcpListener)
  25. }
  26. listener = &httpListener{
  27. tcpListeners: tcpListeners,
  28. }
  29. listener.start()
  30. return listener, nil
  31. }

L6 - L15 在退出时执行,如果执行有误,关闭已经打开的 TCPListener;
L19 - L20 尝试用不同的方式打开一个 TCPListener 用于监听,listen 方法对应 NewListener 方法,如下所示,通过 socket 方法直接打开 TCP,并设置这个 socket(L12);fallbackListen 就是 net.Listen 方法。

  1. func (cfg *Config) NewListener(network, addr string) (net.Listener, error) {
  2. sa, soType, err := getSockaddr(network, addr)
  3. if err != nil {
  4. return nil, err
  5. }
  6. fd, err := newSocketCloexec(soType, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
  7. if err != nil {
  8. return nil, err
  9. }
  10. if err = cfg.fdSetup(fd, sa, addr); err != nil {
  11. syscall.Close(fd)
  12. return nil, err
  13. }
  14. name := fmt.Sprintf("reuseport.%d.%s.%s", os.Getpid(), network, addr)
  15. file := os.NewFile(uintptr(fd), name)
  16. ln, err := net.FileListener(file)
  17. if err != nil {
  18. file.Close()
  19. return nil, err
  20. }
  21. if err = file.Close(); err != nil {
  22. ln.Close()
  23. return nil, err
  24. }
  25. return ln, nil
  26. }

设置部分如下所示

  1. func (cfg *Config) fdSetup(fd int, sa syscall.Sockaddr, addr string) error {
  2. var err error
  3. if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {
  4. return fmt.Errorf("cannot enable SO_REUSEADDR: %s", err)
  5. }
  6. // This should disable Nagle's algorithm in all accepted sockets by default.
  7. // Users may enable it with net.TCPConn.SetNoDelay(false).
  8. if err = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1); err != nil {
  9. return fmt.Errorf("cannot disable Nagle's algorithm: %s", err)
  10. }
  11. if cfg.ReusePort {
  12. if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, soReusePort, 1); err != nil {
  13. return fmt.Errorf("cannot enable SO_REUSEPORT: %s", err)
  14. }
  15. }
  16. if cfg.DeferAccept {
  17. if err = enableDeferAccept(fd); err != nil {
  18. return err
  19. }
  20. }
  21. if cfg.FastOpen {
  22. if err = enableFastOpen(fd); err != nil {
  23. return err
  24. }
  25. }
  26. if err = syscall.Bind(fd, sa); err != nil {
  27. return fmt.Errorf("cannot bind to %q: %s", addr, err)
  28. }
  29. backlog := cfg.Backlog
  30. if backlog <= 0 {
  31. if backlog, err = soMaxConn(); err != nil {
  32. return fmt.Errorf("cannot determine backlog to pass to listen(2): %s", err)
  33. }
  34. }
  35. if err = syscall.Listen(fd, backlog); err != nil {
  36. return fmt.Errorf("cannot listen on %q: %s", addr, err)
  37. }
  38. return nil
  39. }

L36 启动这个 httpListener。

Start

  1. func (listener *httpListener) start() {
  2. listener.acceptCh = make(chan acceptResult)
  3. listener.doneCh = make(chan struct{})
  4. // Closure to send acceptResult to acceptCh.
  5. // It returns true if the result is sent else false if returns when doneCh is closed.
  6. send := func(result acceptResult, doneCh <-chan struct{}) bool {
  7. select {
  8. case listener.acceptCh <- result:
  9. // Successfully written to acceptCh
  10. return true
  11. case <-doneCh:
  12. // As stop signal is received, close accepted connection.
  13. if result.conn != nil {
  14. result.conn.Close()
  15. }
  16. return false
  17. }
  18. }
  19. // Closure to handle single connection.
  20. handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) {
  21. tcpConn.SetKeepAlive(true)
  22. send(acceptResult{tcpConn, nil}, doneCh)
  23. }
  24. // Closure to handle TCPListener until done channel is closed.
  25. handleListener := func(tcpListener *net.TCPListener, doneCh <-chan struct{}) {
  26. for {
  27. tcpConn, err := tcpListener.AcceptTCP()
  28. if err != nil {
  29. // Returns when send fails.
  30. if !send(acceptResult{nil, err}, doneCh) {
  31. return
  32. }
  33. } else {
  34. go handleConn(tcpConn, doneCh)
  35. }
  36. }
  37. }
  38. // Start separate goroutine for each TCP listener to handle connection.
  39. for _, tcpListener := range listener.tcpListeners {
  40. go handleListener(tcpListener, listener.doneCh)
  41. }
  42. }

L43 - 45 对全部 TCPListener 执行监听操作;
L28 - 40 是实际监听过程,当有连接错误时,发送给目标 chan,如果正确,则处理该连接;
L22 - L24 处理正常连接,将其发送给目标 chan;
L7 - L19: 处理错误连接,发送给目标 chan 后,返回 true,使 httpListener 能继续正常工作。

Accept

  1. func (listener *httpListener) Accept() (conn net.Conn, err error) {
  2. result, ok := <-listener.acceptCh
  3. if ok {
  4. return result.conn, result.err
  5. }
  6. return nil, syscall.EINVAL
  7. }

Server

server-server.svg
图 2. Server

Server 同一处理全部 HTTP 请求,主要包含了一个 http.Server 实例及一个 httpListener 实例。真正的 http.Server 监听来自 httpListener 的请求,并处理请求。通过 httpListener 实现监听不同地址,这部分内容已经做了详细解释。Server 创建代码如下

  1. handler, err := configureServerHandler(globalEndpoints)
  2. if err != nil {
  3. logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
  4. }
  5. var getCert certs.GetCertificateFunc
  6. if globalTLSCerts != nil {
  7. getCert = globalTLSCerts.GetCertificate
  8. }
  9. httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert)
  10. httpServer.BaseContext = func(listener net.Listener) context.Context {
  11. return GlobalContext
  12. }
  13. // Turn-off random logging by Go internally
  14. httpServer.ErrorLog = log.New(&nullWriter{}, "", 0)
  15. go func() {
  16. globalHTTPServerErrorCh <- httpServer.Start()
  17. }()
  18. setHTTPServer(httpServer)

Register Handlers

Minio 的 handler 使用了 gorilla/mux 开源库,L26 的 Use 方法是在注册中间件,其他部分是在注册处理方法。传入的参数 EndpointServerPools 仅在全局配置项 globalIsDisterasure 开启时起作用。

  1. func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) {
  2. // Initialize router. `SkipClean(true)` stops gorilla/mux from
  3. // normalizing URL path minio/minio#3256
  4. router := mux.NewRouter().SkipClean(true).UseEncodedPath()
  5. // Initialize distributed NS lock.
  6. if globalIsDistErasure {
  7. registerDistErasureRouters(router, endpointServerPools)
  8. }
  9. // Add Admin router, all APIs are enabled in server mode.
  10. registerAdminRouter(router, true)
  11. // Add healthcheck router
  12. registerHealthCheckRouter(router)
  13. // Add server metrics router
  14. registerMetricsRouter(router)
  15. // Add STS router always.
  16. registerSTSRouter(router)
  17. // Add API router
  18. registerAPIRouter(router)
  19. router.Use(globalHandlers...)
  20. return router, nil
  21. }

Middlewares

  1. var globalHandlers = []mux.MiddlewareFunc{
  2. // filters HTTP headers which are treated as metadata and are reserved
  3. // for internal use only.
  4. filterReservedMetadata,
  5. // Enforce rules specific for TLS requests
  6. setSSETLSHandler,
  7. // Auth handler verifies incoming authorization headers and
  8. // routes them accordingly. Client receives a HTTP error for
  9. // invalid/unsupported signatures.
  10. setAuthHandler,
  11. // Validates all incoming requests to have a valid date header.
  12. setTimeValidityHandler,
  13. // Validates if incoming request is for restricted buckets.
  14. setReservedBucketHandler,
  15. // Redirect some pre-defined browser request paths to a static location prefix.
  16. setBrowserRedirectHandler,
  17. // Adds 'crossdomain.xml' policy handler to serve legacy flash clients.
  18. setCrossDomainPolicy,
  19. // Limits all header sizes to a maximum fixed limit
  20. setRequestHeaderSizeLimitHandler,
  21. // Limits all requests size to a maximum fixed limit
  22. setRequestSizeLimitHandler,
  23. // Network statistics
  24. setHTTPStatsHandler,
  25. // Validate all the incoming requests.
  26. setRequestValidityHandler,
  27. // Forward path style requests to actual host in a bucket federated setup.
  28. setBucketForwardingHandler,
  29. // set HTTP security headers such as Content-Security-Policy.
  30. addSecurityHeaders,
  31. // set x-amz-request-id header.
  32. addCustomHeaders,
  33. // add redirect handler to redirect
  34. // requests when object layer is not
  35. // initialized.
  36. setRedirectHandler,
  37. // Add new handlers here.
  38. }