HTTP Listener
图 1. httpListener 关系图
httpListener 用于处理同一 HTTP Server 实例监听多个地址的需求。创建时,将实际的 TCPListener 记录在一个数组中,每个 TCPListener 独立工作,当有新连接到达某个 TCPListener 时,将这个连接发送至 chan,httpListener 的 Accept 方法等待这个 chan,当有数据时,返回结果,以此实现对多个地址的监听,表现在上层只有一个 Listener。
Creation
func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {var tcpListeners []*net.TCPListener// Close all opened listeners on errordefer func() {if err == nil {return}for _, tcpListener := range tcpListeners {// Ignore error on close.tcpListener.Close()}}()for _, serverAddr := range serverAddrs {var l net.Listenerif l, err = listen("tcp", serverAddr); err != nil {if l, err = fallbackListen("tcp", serverAddr); err != nil {return nil, err}}tcpListener, ok := l.(*net.TCPListener)if !ok {return nil, fmt.Errorf("unexpected listener type found %v, expected net.TCPListener", l)}tcpListeners = append(tcpListeners, tcpListener)}listener = &httpListener{tcpListeners: tcpListeners,}listener.start()return listener, nil}
L6 - L15 在退出时执行,如果执行有误,关闭已经打开的 TCPListener;
L19 - L20 尝试用不同的方式打开一个 TCPListener 用于监听,listen 方法对应 NewListener 方法,如下所示,通过 socket 方法直接打开 TCP,并设置这个 socket(L12);fallbackListen 就是 net.Listen 方法。
func (cfg *Config) NewListener(network, addr string) (net.Listener, error) {sa, soType, err := getSockaddr(network, addr)if err != nil {return nil, err}fd, err := newSocketCloexec(soType, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)if err != nil {return nil, err}if err = cfg.fdSetup(fd, sa, addr); err != nil {syscall.Close(fd)return nil, err}name := fmt.Sprintf("reuseport.%d.%s.%s", os.Getpid(), network, addr)file := os.NewFile(uintptr(fd), name)ln, err := net.FileListener(file)if err != nil {file.Close()return nil, err}if err = file.Close(); err != nil {ln.Close()return nil, err}return ln, nil}
设置部分如下所示
func (cfg *Config) fdSetup(fd int, sa syscall.Sockaddr, addr string) error {var err errorif err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {return fmt.Errorf("cannot enable SO_REUSEADDR: %s", err)}// This should disable Nagle's algorithm in all accepted sockets by default.// Users may enable it with net.TCPConn.SetNoDelay(false).if err = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1); err != nil {return fmt.Errorf("cannot disable Nagle's algorithm: %s", err)}if cfg.ReusePort {if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, soReusePort, 1); err != nil {return fmt.Errorf("cannot enable SO_REUSEPORT: %s", err)}}if cfg.DeferAccept {if err = enableDeferAccept(fd); err != nil {return err}}if cfg.FastOpen {if err = enableFastOpen(fd); err != nil {return err}}if err = syscall.Bind(fd, sa); err != nil {return fmt.Errorf("cannot bind to %q: %s", addr, err)}backlog := cfg.Backlogif backlog <= 0 {if backlog, err = soMaxConn(); err != nil {return fmt.Errorf("cannot determine backlog to pass to listen(2): %s", err)}}if err = syscall.Listen(fd, backlog); err != nil {return fmt.Errorf("cannot listen on %q: %s", addr, err)}return nil}
L36 启动这个 httpListener。
Start
func (listener *httpListener) start() {listener.acceptCh = make(chan acceptResult)listener.doneCh = make(chan struct{})// Closure to send acceptResult to acceptCh.// It returns true if the result is sent else false if returns when doneCh is closed.send := func(result acceptResult, doneCh <-chan struct{}) bool {select {case listener.acceptCh <- result:// Successfully written to acceptChreturn truecase <-doneCh:// As stop signal is received, close accepted connection.if result.conn != nil {result.conn.Close()}return false}}// Closure to handle single connection.handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) {tcpConn.SetKeepAlive(true)send(acceptResult{tcpConn, nil}, doneCh)}// Closure to handle TCPListener until done channel is closed.handleListener := func(tcpListener *net.TCPListener, doneCh <-chan struct{}) {for {tcpConn, err := tcpListener.AcceptTCP()if err != nil {// Returns when send fails.if !send(acceptResult{nil, err}, doneCh) {return}} else {go handleConn(tcpConn, doneCh)}}}// Start separate goroutine for each TCP listener to handle connection.for _, tcpListener := range listener.tcpListeners {go handleListener(tcpListener, listener.doneCh)}}
L43 - 45 对全部 TCPListener 执行监听操作;
L28 - 40 是实际监听过程,当有连接错误时,发送给目标 chan,如果正确,则处理该连接;
L22 - L24 处理正常连接,将其发送给目标 chan;
L7 - L19: 处理错误连接,发送给目标 chan 后,返回 true,使 httpListener 能继续正常工作。
Accept
func (listener *httpListener) Accept() (conn net.Conn, err error) {result, ok := <-listener.acceptChif ok {return result.conn, result.err}return nil, syscall.EINVAL}
Server
图 2. Server
Server 同一处理全部 HTTP 请求,主要包含了一个 http.Server 实例及一个 httpListener 实例。真正的 http.Server 监听来自 httpListener 的请求,并处理请求。通过 httpListener 实现监听不同地址,这部分内容已经做了详细解释。Server 创建代码如下
handler, err := configureServerHandler(globalEndpoints)if err != nil {logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")}var getCert certs.GetCertificateFuncif globalTLSCerts != nil {getCert = globalTLSCerts.GetCertificate}httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert)httpServer.BaseContext = func(listener net.Listener) context.Context {return GlobalContext}// Turn-off random logging by Go internallyhttpServer.ErrorLog = log.New(&nullWriter{}, "", 0)go func() {globalHTTPServerErrorCh <- httpServer.Start()}()setHTTPServer(httpServer)
Register Handlers
Minio 的 handler 使用了 gorilla/mux 开源库,L26 的 Use 方法是在注册中间件,其他部分是在注册处理方法。传入的参数 EndpointServerPools 仅在全局配置项 globalIsDisterasure 开启时起作用。
func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) {// Initialize router. `SkipClean(true)` stops gorilla/mux from// normalizing URL path minio/minio#3256router := mux.NewRouter().SkipClean(true).UseEncodedPath()// Initialize distributed NS lock.if globalIsDistErasure {registerDistErasureRouters(router, endpointServerPools)}// Add Admin router, all APIs are enabled in server mode.registerAdminRouter(router, true)// Add healthcheck routerregisterHealthCheckRouter(router)// Add server metrics routerregisterMetricsRouter(router)// Add STS router always.registerSTSRouter(router)// Add API routerregisterAPIRouter(router)router.Use(globalHandlers...)return router, nil}
Middlewares
var globalHandlers = []mux.MiddlewareFunc{// filters HTTP headers which are treated as metadata and are reserved// for internal use only.filterReservedMetadata,// Enforce rules specific for TLS requestssetSSETLSHandler,// Auth handler verifies incoming authorization headers and// routes them accordingly. Client receives a HTTP error for// invalid/unsupported signatures.setAuthHandler,// Validates all incoming requests to have a valid date header.setTimeValidityHandler,// Validates if incoming request is for restricted buckets.setReservedBucketHandler,// Redirect some pre-defined browser request paths to a static location prefix.setBrowserRedirectHandler,// Adds 'crossdomain.xml' policy handler to serve legacy flash clients.setCrossDomainPolicy,// Limits all header sizes to a maximum fixed limitsetRequestHeaderSizeLimitHandler,// Limits all requests size to a maximum fixed limitsetRequestSizeLimitHandler,// Network statisticssetHTTPStatsHandler,// Validate all the incoming requests.setRequestValidityHandler,// Forward path style requests to actual host in a bucket federated setup.setBucketForwardingHandler,// set HTTP security headers such as Content-Security-Policy.addSecurityHeaders,// set x-amz-request-id header.addCustomHeaders,// add redirect handler to redirect// requests when object layer is not// initialized.setRedirectHandler,// Add new handlers here.}
