DNS Cache

DNSCache 可通过缓存 DNS 查询结果来提升域名解析速度。DNSCache 缓存时间根据运行环境来确定,在裸金属主机上,服务变更频率较低,可适当延长缓存有效时间;在 Docker、Kubernetes 环境下则选择较短的时间。

Definition

  1. type DNSCache struct {
  2. sync.RWMutex
  3. lookupHostFn func(ctx context.Context, host string) ([]string, error)
  4. lookupTimeout time.Duration
  5. loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
  6. cache map[string][]string
  7. doneOnce sync.Once
  8. doneCh chan struct{}
  9. }

lookupHostFn 用于执行 DNS 查询操作,返回 []string 是因为同一服务名有可能对应多个 IP 地址;cache 中 key 为 loopupHostFn 中传入的服务名,value 为 lookupHostFn 返回的 IP 地址列表。lookupHostFn 为 net.DefaultResolver.LookupHost 方法:

  1. r := &DNSCache{
  2. lookupHostFn: net.DefaultResolver.LookupHost,
  3. lookupTimeout: lookupTimeout,
  4. loggerOnce: loggerOnce,
  5. cache: make(map[string][]string, cacheSize),
  6. doneCh: make(chan struct{}),
  7. }

doneOnce 用于关闭 doneCh,doneCh 可控制 DNSCache 的后台协程终止运行。

  1. func (r *DNSCache) Stop() {
  2. r.doneOnce.Do(func() {
  3. close(r.doneCh)
  4. })
  5. }

Creation

创建 DNSCache 时,会同时启动一个协程,定时刷新缓存。freq 为缓存失效(刷新)时间,每次触发刷新操作时,更新定时器触发时间为一个随机数是为了避免不同的 minio 实例以相同的频率执行域名请求,造成系统压力。

  1. rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
  2. timer := time.NewTimer(freq)
  3. go func() {
  4. defer timer.Stop()
  5. for {
  6. select {
  7. case <-timer.C:
  8. // Make sure that refreshes on DNS do not be attempted
  9. // at the same time, allows for reduced load on the
  10. // DNS servers.
  11. timer.Reset(time.Duration(rnd.Float64() * float64(freq)))
  12. r.Refresh()
  13. case <-r.doneCh:
  14. return
  15. }
  16. }
  17. }()

Refresh 方法及在 Refresh 中使用的 LookupHost 方法如下

  1. func (r *DNSCache) Refresh() {
  2. r.RLock()
  3. hosts := make([]string, 0, len(r.cache))
  4. for host := range r.cache {
  5. hosts = append(hosts, host)
  6. }
  7. r.RUnlock()
  8. for _, host := range hosts {
  9. ctx, cancelF := context.WithTimeout(context.Background(), r.lookupTimeout)
  10. if _, err := r.LookupHost(ctx, host); err != nil {
  11. r.loggerOnce(ctx, err, host)
  12. }
  13. cancelF()
  14. }
  15. }
  16. func (r *DNSCache) LookupHost(ctx context.Context, host string) ([]string, error) {
  17. addrs, err := r.lookupHostFn(ctx, host)
  18. if err != nil {
  19. return nil, err
  20. }
  21. r.Lock()
  22. r.cache[host] = addrs
  23. r.Unlock()
  24. return addrs, nil
  25. }

Fetch

Fetch 方法用于获取服务名对应的 IP 地址列表。与 LookupHost 不同的是,Fetch 优先从缓存中获取,如果不存在服务名记录,再执行 LookupHost 操作。而 LookupHost 则先执行域名查询操作,成功获取后,再更新缓存。

  1. func (r *DNSCache) Fetch(ctx context.Context, host string) ([]string, error) {
  2. r.RLock()
  3. addrs, ok := r.cache[host]
  4. r.RUnlock()
  5. if ok {
  6. return addrs, nil
  7. }
  8. return r.LookupHost(ctx, host)
  9. }

HTTP Forwarder

cli-forwarder.svg
图 1: Forwarder 原理

Forwarder 的作用是将请求重定向至合理的目标服务。其核心原理是通过内置的 HTTP 反向代理服务器进行服务重定向。

Connection Establishment

系统只有一个全局的 Forwarder 实例 globalForwarder,如下所示

  1. globalForwarder = handlers.NewForwarder(&handlers.Forwarder{
  2. PassHost: true,
  3. RoundTripper: newGatewayHTTPTransport(1 * time.Hour),
  4. Logger: func(err error) {
  5. if err != nil && !errors.Is(err, context.Canceled) {
  6. logger.LogIf(GlobalContext, err)
  7. }
  8. },
  9. })

newGatewayHTTPTransport 使用 newCustomHTTPTransport 并通过 DialContext 属性关联 DNS 缓存结构 DNSCache

  1. func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
  2. // For more details about various values used here refer
  3. // https://golang.org/pkg/net/http/#Transport documentation
  4. tr := &http.Transport{
  5. Proxy: http.ProxyFromEnvironment,
  6. // DialContext 关联上 DNSCache
  7. DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
  8. MaxIdleConnsPerHost: 1024,
  9. WriteBufferSize: 16 << 10, // 16KiB moving up from 4KiB default
  10. ReadBufferSize: 16 << 10, // 16KiB moving up from 4KiB default
  11. IdleConnTimeout: 15 * time.Second,
  12. ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
  13. TLSHandshakeTimeout: 10 * time.Second,
  14. ExpectContinueTimeout: 10 * time.Second,
  15. TLSClientConfig: tlsConfig,
  16. // Go net/http automatically unzip if content-type is
  17. // gzip disable this feature, as we are always interested
  18. // in raw stream.
  19. DisableCompression: true,
  20. }
  21. // https://github.com/golang/go/issues/23559
  22. // https://github.com/golang/go/issues/42534
  23. // https://github.com/golang/go/issues/43989
  24. // https://github.com/golang/go/issues/33425
  25. // https://github.com/golang/go/issues/29246
  26. // if tlsConfig != nil {
  27. // trhttp2, _ := http2.ConfigureTransports(tr)
  28. // if trhttp2 != nil {
  29. // // ReadIdleTimeout is the timeout after which a health check using ping
  30. // // frame will be carried out if no frame is received on the
  31. // // connection. 5 minutes is sufficient time for any idle connection.
  32. // trhttp2.ReadIdleTimeout = 5 * time.Minute
  33. // // PingTimeout is the timeout after which the connection will be closed
  34. // // if a response to Ping is not received.
  35. // trhttp2.PingTimeout = dialTimeout
  36. // // DisableCompression, if true, prevents the Transport from
  37. // // requesting compression with an "Accept-Encoding: gzip"
  38. // trhttp2.DisableCompression = true
  39. // }
  40. // }
  41. return func() *http.Transport {
  42. return tr
  43. }
  44. }

DialContext 使用的 DialContextWithDNSCache 方法返回的函数,DialContext 在执行 Transport 的 dial 方法(内部实现)时触发。可以看到,在执行连接时,会在 DNSCache 中获取服务对应的 IP 地址列表,随后使用 net.Dialer 进行连接。

  1. func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialContext {
  2. if baseDialCtx == nil {
  3. // This is same as which `http.DefaultTransport` uses.
  4. baseDialCtx = (&net.Dialer{
  5. Timeout: 30 * time.Second,
  6. KeepAlive: 30 * time.Second,
  7. }).DialContext
  8. }
  9. return func(ctx context.Context, network, host string) (net.Conn, error) {
  10. h, p, err := net.SplitHostPort(host)
  11. if err != nil {
  12. return nil, err
  13. }
  14. // Fetch DNS result from cache.
  15. //
  16. // ctxLookup is only used for canceling DNS Lookup.
  17. ctxLookup, cancelF := context.WithTimeout(ctx, cache.lookupTimeout)
  18. defer cancelF()
  19. addrs, err := cache.Fetch(ctxLookup, h) // 获取 DNS 缓存
  20. if err != nil {
  21. return nil, err
  22. }
  23. var firstErr error
  24. for _, randomIndex := range randPerm(len(addrs)) { // 使用随机顺序连接
  25. conn, err := baseDialCtx(ctx, "tcp", net.JoinHostPort(addrs[randomIndex], p))
  26. if err == nil {
  27. return conn, nil
  28. }
  29. if firstErr == nil {
  30. firstErr = err
  31. }
  32. }
  33. return nil, firstErr
  34. }
  35. }

Redirect HTTP Request

重定向 HTTP 请求的实现依赖于 Forwarder 的 ServeHTTP 方法。在每次 Forwarder 接收到 HTTP 请求时,都会创建一个 ReverseProxy 来对该请求执行重定向,具体如下

  1. func (f *Forwarder) ServeHTTP(w http.ResponseWriter, inReq *http.Request) {
  2. outReq := new(http.Request)
  3. *outReq = *inReq // includes shallow copies of maps, but we handle this in Director
  4. revproxy := httputil.ReverseProxy{
  5. Director: func(req *http.Request) {
  6. f.modifyRequest(req, inReq.URL)
  7. },
  8. Transport: f.RoundTripper, // 由 Forwarder 接管连接创建过程
  9. FlushInterval: defaultFlushInterval,
  10. ErrorHandler: f.customErrHandler,
  11. }
  12. if f.ErrorHandler != nil {
  13. revproxy.ErrorHandler = f.ErrorHandler
  14. }
  15. revproxy.ServeHTTP(w, outReq)
  16. }

由于 ReverseProxy 的 Transport 使用 Forwarder 的 RoundTripper 属性,因此会使用 DNSCache 关联的 DialContext 进行连接。