DNS Cache
DNSCache 可通过缓存 DNS 查询结果来提升域名解析速度。DNSCache 缓存时间根据运行环境来确定,在裸金属主机上,服务变更频率较低,可适当延长缓存有效时间;在 Docker、Kubernetes 环境下则选择较短的时间。
Definition
type DNSCache struct {sync.RWMutexlookupHostFn func(ctx context.Context, host string) ([]string, error)lookupTimeout time.DurationloggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})cache map[string][]stringdoneOnce sync.OncedoneCh chan struct{}}
lookupHostFn 用于执行 DNS 查询操作,返回 []string 是因为同一服务名有可能对应多个 IP 地址;cache 中 key 为 loopupHostFn 中传入的服务名,value 为 lookupHostFn 返回的 IP 地址列表。lookupHostFn 为 net.DefaultResolver.LookupHost 方法:
r := &DNSCache{lookupHostFn: net.DefaultResolver.LookupHost,lookupTimeout: lookupTimeout,loggerOnce: loggerOnce,cache: make(map[string][]string, cacheSize),doneCh: make(chan struct{}),}
doneOnce 用于关闭 doneCh,doneCh 可控制 DNSCache 的后台协程终止运行。
func (r *DNSCache) Stop() {r.doneOnce.Do(func() {close(r.doneCh)})}
Creation
创建 DNSCache 时,会同时启动一个协程,定时刷新缓存。freq 为缓存失效(刷新)时间,每次触发刷新操作时,更新定时器触发时间为一个随机数是为了避免不同的 minio 实例以相同的频率执行域名请求,造成系统压力。
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))timer := time.NewTimer(freq)go func() {defer timer.Stop()for {select {case <-timer.C:// Make sure that refreshes on DNS do not be attempted// at the same time, allows for reduced load on the// DNS servers.timer.Reset(time.Duration(rnd.Float64() * float64(freq)))r.Refresh()case <-r.doneCh:return}}}()
Refresh 方法及在 Refresh 中使用的 LookupHost 方法如下
func (r *DNSCache) Refresh() {r.RLock()hosts := make([]string, 0, len(r.cache))for host := range r.cache {hosts = append(hosts, host)}r.RUnlock()for _, host := range hosts {ctx, cancelF := context.WithTimeout(context.Background(), r.lookupTimeout)if _, err := r.LookupHost(ctx, host); err != nil {r.loggerOnce(ctx, err, host)}cancelF()}}func (r *DNSCache) LookupHost(ctx context.Context, host string) ([]string, error) {addrs, err := r.lookupHostFn(ctx, host)if err != nil {return nil, err}r.Lock()r.cache[host] = addrsr.Unlock()return addrs, nil}
Fetch
Fetch 方法用于获取服务名对应的 IP 地址列表。与 LookupHost 不同的是,Fetch 优先从缓存中获取,如果不存在服务名记录,再执行 LookupHost 操作。而 LookupHost 则先执行域名查询操作,成功获取后,再更新缓存。
func (r *DNSCache) Fetch(ctx context.Context, host string) ([]string, error) {r.RLock()addrs, ok := r.cache[host]r.RUnlock()if ok {return addrs, nil}return r.LookupHost(ctx, host)}
HTTP Forwarder
图 1: Forwarder 原理
Forwarder 的作用是将请求重定向至合理的目标服务。其核心原理是通过内置的 HTTP 反向代理服务器进行服务重定向。
Connection Establishment
系统只有一个全局的 Forwarder 实例 globalForwarder,如下所示
globalForwarder = handlers.NewForwarder(&handlers.Forwarder{PassHost: true,RoundTripper: newGatewayHTTPTransport(1 * time.Hour),Logger: func(err error) {if err != nil && !errors.Is(err, context.Canceled) {logger.LogIf(GlobalContext, err)}},})
newGatewayHTTPTransport 使用 newCustomHTTPTransport 并通过 DialContext 属性关联 DNS 缓存结构 DNSCache
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {// For more details about various values used here refer// https://golang.org/pkg/net/http/#Transport documentationtr := &http.Transport{Proxy: http.ProxyFromEnvironment,// DialContext 关联上 DNSCacheDialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),MaxIdleConnsPerHost: 1024,WriteBufferSize: 16 << 10, // 16KiB moving up from 4KiB defaultReadBufferSize: 16 << 10, // 16KiB moving up from 4KiB defaultIdleConnTimeout: 15 * time.Second,ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.TLSHandshakeTimeout: 10 * time.Second,ExpectContinueTimeout: 10 * time.Second,TLSClientConfig: tlsConfig,// Go net/http automatically unzip if content-type is// gzip disable this feature, as we are always interested// in raw stream.DisableCompression: true,}// https://github.com/golang/go/issues/23559// https://github.com/golang/go/issues/42534// https://github.com/golang/go/issues/43989// https://github.com/golang/go/issues/33425// https://github.com/golang/go/issues/29246// if tlsConfig != nil {// trhttp2, _ := http2.ConfigureTransports(tr)// if trhttp2 != nil {// // ReadIdleTimeout is the timeout after which a health check using ping// // frame will be carried out if no frame is received on the// // connection. 5 minutes is sufficient time for any idle connection.// trhttp2.ReadIdleTimeout = 5 * time.Minute// // PingTimeout is the timeout after which the connection will be closed// // if a response to Ping is not received.// trhttp2.PingTimeout = dialTimeout// // DisableCompression, if true, prevents the Transport from// // requesting compression with an "Accept-Encoding: gzip"// trhttp2.DisableCompression = true// }// }return func() *http.Transport {return tr}}
DialContext 使用的 DialContextWithDNSCache 方法返回的函数,DialContext 在执行 Transport 的 dial 方法(内部实现)时触发。可以看到,在执行连接时,会在 DNSCache 中获取服务对应的 IP 地址列表,随后使用 net.Dialer 进行连接。
func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialContext {if baseDialCtx == nil {// This is same as which `http.DefaultTransport` uses.baseDialCtx = (&net.Dialer{Timeout: 30 * time.Second,KeepAlive: 30 * time.Second,}).DialContext}return func(ctx context.Context, network, host string) (net.Conn, error) {h, p, err := net.SplitHostPort(host)if err != nil {return nil, err}// Fetch DNS result from cache.//// ctxLookup is only used for canceling DNS Lookup.ctxLookup, cancelF := context.WithTimeout(ctx, cache.lookupTimeout)defer cancelF()addrs, err := cache.Fetch(ctxLookup, h) // 获取 DNS 缓存if err != nil {return nil, err}var firstErr errorfor _, randomIndex := range randPerm(len(addrs)) { // 使用随机顺序连接conn, err := baseDialCtx(ctx, "tcp", net.JoinHostPort(addrs[randomIndex], p))if err == nil {return conn, nil}if firstErr == nil {firstErr = err}}return nil, firstErr}}
Redirect HTTP Request
重定向 HTTP 请求的实现依赖于 Forwarder 的 ServeHTTP 方法。在每次 Forwarder 接收到 HTTP 请求时,都会创建一个 ReverseProxy 来对该请求执行重定向,具体如下
func (f *Forwarder) ServeHTTP(w http.ResponseWriter, inReq *http.Request) {outReq := new(http.Request)*outReq = *inReq // includes shallow copies of maps, but we handle this in Directorrevproxy := httputil.ReverseProxy{Director: func(req *http.Request) {f.modifyRequest(req, inReq.URL)},Transport: f.RoundTripper, // 由 Forwarder 接管连接创建过程FlushInterval: defaultFlushInterval,ErrorHandler: f.customErrHandler,}if f.ErrorHandler != nil {revproxy.ErrorHandler = f.ErrorHandler}revproxy.ServeHTTP(w, outReq)}
由于 ReverseProxy 的 Transport 使用 Forwarder 的 RoundTripper 属性,因此会使用 DNSCache 关联的 DialContext 进行连接。
