DNS Cache
DNSCache 可通过缓存 DNS 查询结果来提升域名解析速度。DNSCache 缓存时间根据运行环境来确定,在裸金属主机上,服务变更频率较低,可适当延长缓存有效时间;在 Docker、Kubernetes 环境下则选择较短的时间。
Definition
type DNSCache struct {
sync.RWMutex
lookupHostFn func(ctx context.Context, host string) ([]string, error)
lookupTimeout time.Duration
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
cache map[string][]string
doneOnce sync.Once
doneCh chan struct{}
}
lookupHostFn 用于执行 DNS 查询操作,返回 []string 是因为同一服务名有可能对应多个 IP 地址;cache 中 key 为 loopupHostFn 中传入的服务名,value 为 lookupHostFn 返回的 IP 地址列表。lookupHostFn 为 net.DefaultResolver.LookupHost 方法:
r := &DNSCache{
lookupHostFn: net.DefaultResolver.LookupHost,
lookupTimeout: lookupTimeout,
loggerOnce: loggerOnce,
cache: make(map[string][]string, cacheSize),
doneCh: make(chan struct{}),
}
doneOnce 用于关闭 doneCh,doneCh 可控制 DNSCache 的后台协程终止运行。
func (r *DNSCache) Stop() {
r.doneOnce.Do(func() {
close(r.doneCh)
})
}
Creation
创建 DNSCache 时,会同时启动一个协程,定时刷新缓存。freq 为缓存失效(刷新)时间,每次触发刷新操作时,更新定时器触发时间为一个随机数是为了避免不同的 minio 实例以相同的频率执行域名请求,造成系统压力。
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
timer := time.NewTimer(freq)
go func() {
defer timer.Stop()
for {
select {
case <-timer.C:
// Make sure that refreshes on DNS do not be attempted
// at the same time, allows for reduced load on the
// DNS servers.
timer.Reset(time.Duration(rnd.Float64() * float64(freq)))
r.Refresh()
case <-r.doneCh:
return
}
}
}()
Refresh 方法及在 Refresh 中使用的 LookupHost 方法如下
func (r *DNSCache) Refresh() {
r.RLock()
hosts := make([]string, 0, len(r.cache))
for host := range r.cache {
hosts = append(hosts, host)
}
r.RUnlock()
for _, host := range hosts {
ctx, cancelF := context.WithTimeout(context.Background(), r.lookupTimeout)
if _, err := r.LookupHost(ctx, host); err != nil {
r.loggerOnce(ctx, err, host)
}
cancelF()
}
}
func (r *DNSCache) LookupHost(ctx context.Context, host string) ([]string, error) {
addrs, err := r.lookupHostFn(ctx, host)
if err != nil {
return nil, err
}
r.Lock()
r.cache[host] = addrs
r.Unlock()
return addrs, nil
}
Fetch
Fetch 方法用于获取服务名对应的 IP 地址列表。与 LookupHost 不同的是,Fetch 优先从缓存中获取,如果不存在服务名记录,再执行 LookupHost 操作。而 LookupHost 则先执行域名查询操作,成功获取后,再更新缓存。
func (r *DNSCache) Fetch(ctx context.Context, host string) ([]string, error) {
r.RLock()
addrs, ok := r.cache[host]
r.RUnlock()
if ok {
return addrs, nil
}
return r.LookupHost(ctx, host)
}
HTTP Forwarder
图 1: Forwarder 原理
Forwarder 的作用是将请求重定向至合理的目标服务。其核心原理是通过内置的 HTTP 反向代理服务器进行服务重定向。
Connection Establishment
系统只有一个全局的 Forwarder 实例 globalForwarder,如下所示
globalForwarder = handlers.NewForwarder(&handlers.Forwarder{
PassHost: true,
RoundTripper: newGatewayHTTPTransport(1 * time.Hour),
Logger: func(err error) {
if err != nil && !errors.Is(err, context.Canceled) {
logger.LogIf(GlobalContext, err)
}
},
})
newGatewayHTTPTransport 使用 newCustomHTTPTransport 并通过 DialContext 属性关联 DNS 缓存结构 DNSCache
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
// DialContext 关联上 DNSCache
DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 16 << 10, // 16KiB moving up from 4KiB default
ReadBufferSize: 16 << 10, // 16KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
// https://github.com/golang/go/issues/23559
// https://github.com/golang/go/issues/42534
// https://github.com/golang/go/issues/43989
// https://github.com/golang/go/issues/33425
// https://github.com/golang/go/issues/29246
// if tlsConfig != nil {
// trhttp2, _ := http2.ConfigureTransports(tr)
// if trhttp2 != nil {
// // ReadIdleTimeout is the timeout after which a health check using ping
// // frame will be carried out if no frame is received on the
// // connection. 5 minutes is sufficient time for any idle connection.
// trhttp2.ReadIdleTimeout = 5 * time.Minute
// // PingTimeout is the timeout after which the connection will be closed
// // if a response to Ping is not received.
// trhttp2.PingTimeout = dialTimeout
// // DisableCompression, if true, prevents the Transport from
// // requesting compression with an "Accept-Encoding: gzip"
// trhttp2.DisableCompression = true
// }
// }
return func() *http.Transport {
return tr
}
}
DialContext 使用的 DialContextWithDNSCache 方法返回的函数,DialContext 在执行 Transport 的 dial 方法(内部实现)时触发。可以看到,在执行连接时,会在 DNSCache 中获取服务对应的 IP 地址列表,随后使用 net.Dialer 进行连接。
func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialContext {
if baseDialCtx == nil {
// This is same as which `http.DefaultTransport` uses.
baseDialCtx = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
}
return func(ctx context.Context, network, host string) (net.Conn, error) {
h, p, err := net.SplitHostPort(host)
if err != nil {
return nil, err
}
// Fetch DNS result from cache.
//
// ctxLookup is only used for canceling DNS Lookup.
ctxLookup, cancelF := context.WithTimeout(ctx, cache.lookupTimeout)
defer cancelF()
addrs, err := cache.Fetch(ctxLookup, h) // 获取 DNS 缓存
if err != nil {
return nil, err
}
var firstErr error
for _, randomIndex := range randPerm(len(addrs)) { // 使用随机顺序连接
conn, err := baseDialCtx(ctx, "tcp", net.JoinHostPort(addrs[randomIndex], p))
if err == nil {
return conn, nil
}
if firstErr == nil {
firstErr = err
}
}
return nil, firstErr
}
}
Redirect HTTP Request
重定向 HTTP 请求的实现依赖于 Forwarder 的 ServeHTTP 方法。在每次 Forwarder 接收到 HTTP 请求时,都会创建一个 ReverseProxy 来对该请求执行重定向,具体如下
func (f *Forwarder) ServeHTTP(w http.ResponseWriter, inReq *http.Request) {
outReq := new(http.Request)
*outReq = *inReq // includes shallow copies of maps, but we handle this in Director
revproxy := httputil.ReverseProxy{
Director: func(req *http.Request) {
f.modifyRequest(req, inReq.URL)
},
Transport: f.RoundTripper, // 由 Forwarder 接管连接创建过程
FlushInterval: defaultFlushInterval,
ErrorHandler: f.customErrHandler,
}
if f.ErrorHandler != nil {
revproxy.ErrorHandler = f.ErrorHandler
}
revproxy.ServeHTTP(w, outReq)
}
由于 ReverseProxy 的 Transport 使用 Forwarder 的 RoundTripper 属性,因此会使用 DNSCache 关联的 DialContext 进行连接。