HTTP Listener
图 1. httpListener 关系图
httpListener 用于处理同一 HTTP Server 实例监听多个地址的需求。创建时,将实际的 TCPListener 记录在一个数组中,每个 TCPListener 独立工作,当有新连接到达某个 TCPListener 时,将这个连接发送至 chan,httpListener 的 Accept 方法等待这个 chan,当有数据时,返回结果,以此实现对多个地址的监听,表现在上层只有一个 Listener。
Creation
func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {
var tcpListeners []*net.TCPListener
// Close all opened listeners on error
defer func() {
if err == nil {
return
}
for _, tcpListener := range tcpListeners {
// Ignore error on close.
tcpListener.Close()
}
}()
for _, serverAddr := range serverAddrs {
var l net.Listener
if l, err = listen("tcp", serverAddr); err != nil {
if l, err = fallbackListen("tcp", serverAddr); err != nil {
return nil, err
}
}
tcpListener, ok := l.(*net.TCPListener)
if !ok {
return nil, fmt.Errorf("unexpected listener type found %v, expected net.TCPListener", l)
}
tcpListeners = append(tcpListeners, tcpListener)
}
listener = &httpListener{
tcpListeners: tcpListeners,
}
listener.start()
return listener, nil
}
L6 - L15 在退出时执行,如果执行有误,关闭已经打开的 TCPListener;
L19 - L20 尝试用不同的方式打开一个 TCPListener 用于监听,listen 方法对应 NewListener 方法,如下所示,通过 socket 方法直接打开 TCP,并设置这个 socket(L12);fallbackListen 就是 net.Listen 方法。
func (cfg *Config) NewListener(network, addr string) (net.Listener, error) {
sa, soType, err := getSockaddr(network, addr)
if err != nil {
return nil, err
}
fd, err := newSocketCloexec(soType, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
if err != nil {
return nil, err
}
if err = cfg.fdSetup(fd, sa, addr); err != nil {
syscall.Close(fd)
return nil, err
}
name := fmt.Sprintf("reuseport.%d.%s.%s", os.Getpid(), network, addr)
file := os.NewFile(uintptr(fd), name)
ln, err := net.FileListener(file)
if err != nil {
file.Close()
return nil, err
}
if err = file.Close(); err != nil {
ln.Close()
return nil, err
}
return ln, nil
}
设置部分如下所示
func (cfg *Config) fdSetup(fd int, sa syscall.Sockaddr, addr string) error {
var err error
if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil {
return fmt.Errorf("cannot enable SO_REUSEADDR: %s", err)
}
// This should disable Nagle's algorithm in all accepted sockets by default.
// Users may enable it with net.TCPConn.SetNoDelay(false).
if err = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1); err != nil {
return fmt.Errorf("cannot disable Nagle's algorithm: %s", err)
}
if cfg.ReusePort {
if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, soReusePort, 1); err != nil {
return fmt.Errorf("cannot enable SO_REUSEPORT: %s", err)
}
}
if cfg.DeferAccept {
if err = enableDeferAccept(fd); err != nil {
return err
}
}
if cfg.FastOpen {
if err = enableFastOpen(fd); err != nil {
return err
}
}
if err = syscall.Bind(fd, sa); err != nil {
return fmt.Errorf("cannot bind to %q: %s", addr, err)
}
backlog := cfg.Backlog
if backlog <= 0 {
if backlog, err = soMaxConn(); err != nil {
return fmt.Errorf("cannot determine backlog to pass to listen(2): %s", err)
}
}
if err = syscall.Listen(fd, backlog); err != nil {
return fmt.Errorf("cannot listen on %q: %s", addr, err)
}
return nil
}
L36 启动这个 httpListener。
Start
func (listener *httpListener) start() {
listener.acceptCh = make(chan acceptResult)
listener.doneCh = make(chan struct{})
// Closure to send acceptResult to acceptCh.
// It returns true if the result is sent else false if returns when doneCh is closed.
send := func(result acceptResult, doneCh <-chan struct{}) bool {
select {
case listener.acceptCh <- result:
// Successfully written to acceptCh
return true
case <-doneCh:
// As stop signal is received, close accepted connection.
if result.conn != nil {
result.conn.Close()
}
return false
}
}
// Closure to handle single connection.
handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) {
tcpConn.SetKeepAlive(true)
send(acceptResult{tcpConn, nil}, doneCh)
}
// Closure to handle TCPListener until done channel is closed.
handleListener := func(tcpListener *net.TCPListener, doneCh <-chan struct{}) {
for {
tcpConn, err := tcpListener.AcceptTCP()
if err != nil {
// Returns when send fails.
if !send(acceptResult{nil, err}, doneCh) {
return
}
} else {
go handleConn(tcpConn, doneCh)
}
}
}
// Start separate goroutine for each TCP listener to handle connection.
for _, tcpListener := range listener.tcpListeners {
go handleListener(tcpListener, listener.doneCh)
}
}
L43 - 45 对全部 TCPListener 执行监听操作;
L28 - 40 是实际监听过程,当有连接错误时,发送给目标 chan,如果正确,则处理该连接;
L22 - L24 处理正常连接,将其发送给目标 chan;
L7 - L19: 处理错误连接,发送给目标 chan 后,返回 true,使 httpListener 能继续正常工作。
Accept
func (listener *httpListener) Accept() (conn net.Conn, err error) {
result, ok := <-listener.acceptCh
if ok {
return result.conn, result.err
}
return nil, syscall.EINVAL
}
Server
图 2. Server
Server 同一处理全部 HTTP 请求,主要包含了一个 http.Server 实例及一个 httpListener 实例。真正的 http.Server 监听来自 httpListener 的请求,并处理请求。通过 httpListener 实现监听不同地址,这部分内容已经做了详细解释。Server 创建代码如下
handler, err := configureServerHandler(globalEndpoints)
if err != nil {
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
}
var getCert certs.GetCertificateFunc
if globalTLSCerts != nil {
getCert = globalTLSCerts.GetCertificate
}
httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}
// Turn-off random logging by Go internally
httpServer.ErrorLog = log.New(&nullWriter{}, "", 0)
go func() {
globalHTTPServerErrorCh <- httpServer.Start()
}()
setHTTPServer(httpServer)
Register Handlers
Minio 的 handler 使用了 gorilla/mux 开源库,L26 的 Use 方法是在注册中间件,其他部分是在注册处理方法。传入的参数 EndpointServerPools 仅在全局配置项 globalIsDisterasure 开启时起作用。
func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) {
// Initialize router. `SkipClean(true)` stops gorilla/mux from
// normalizing URL path minio/minio#3256
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
// Initialize distributed NS lock.
if globalIsDistErasure {
registerDistErasureRouters(router, endpointServerPools)
}
// Add Admin router, all APIs are enabled in server mode.
registerAdminRouter(router, true)
// Add healthcheck router
registerHealthCheckRouter(router)
// Add server metrics router
registerMetricsRouter(router)
// Add STS router always.
registerSTSRouter(router)
// Add API router
registerAPIRouter(router)
router.Use(globalHandlers...)
return router, nil
}
Middlewares
var globalHandlers = []mux.MiddlewareFunc{
// filters HTTP headers which are treated as metadata and are reserved
// for internal use only.
filterReservedMetadata,
// Enforce rules specific for TLS requests
setSSETLSHandler,
// Auth handler verifies incoming authorization headers and
// routes them accordingly. Client receives a HTTP error for
// invalid/unsupported signatures.
setAuthHandler,
// Validates all incoming requests to have a valid date header.
setTimeValidityHandler,
// Validates if incoming request is for restricted buckets.
setReservedBucketHandler,
// Redirect some pre-defined browser request paths to a static location prefix.
setBrowserRedirectHandler,
// Adds 'crossdomain.xml' policy handler to serve legacy flash clients.
setCrossDomainPolicy,
// Limits all header sizes to a maximum fixed limit
setRequestHeaderSizeLimitHandler,
// Limits all requests size to a maximum fixed limit
setRequestSizeLimitHandler,
// Network statistics
setHTTPStatsHandler,
// Validate all the incoming requests.
setRequestValidityHandler,
// Forward path style requests to actual host in a bucket federated setup.
setBucketForwardingHandler,
// set HTTP security headers such as Content-Security-Policy.
addSecurityHeaders,
// set x-amz-request-id header.
addCustomHeaders,
// add redirect handler to redirect
// requests when object layer is not
// initialized.
setRedirectHandler,
// Add new handlers here.
}