Command
Etcd 使用命令方式创建可执行功能,在 Gateway 部分中,首先创建一个主命令,然后再向主命令下挂接子命令(第 6 行),如下所示。
func newGatewayCommand() *cobra.Command {
lpc := &cobra.Command{
Use: "gateway <subcommand>",
Short: "gateway related command",
}
lpc.AddCommand(newGatewayStartCommand())
return lpc
}
子命令也是一个 Command 实例,其中的 Run 方法是该命令执行时的关键方法,第 8 ~ 16 行用于处理与子命令相关的参数。
func newGatewayStartCommand() *cobra.Command {
cmd := cobra.Command{
Use: "start",
Short: "start the gateway",
Run: startGateway,
}
cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
cmd.Flags().StringVar(&gatewayDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file for verifying the discovered endpoints when discovery-srv is provided.")
cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
cmd.Flags().DurationVar(&gatewayRetryDelay, "retry-delay", time.Minute, "duration of delay before retrying failed endpoints")
return &cmd
}
Execution
Gateway 命令的执行部分为 startGateway。大致过程如下,首先获取目标节点(第 11 行);然后对获得的目标地址进行处理(第 17 ~ 29 行);接下来获取 Gateway 监听地址信息并进行处理(第 31 ~ 64 行);最后启动 TCPProxy。
func startGateway(cmd *cobra.Command, args []string) {
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
// We use os.Args to show all the arguments (not only passed-through Cobra).
lg.Info("Running: ", zap.Strings("args", os.Args))
srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)
if len(srvs.Endpoints) == 0 {
// no endpoints discovered, fall back to provided endpoints
srvs.Endpoints = gatewayEndpoints
}
// Strip the schema from the endpoints because we start just a TCP proxy
srvs.Endpoints = stripSchema(srvs.Endpoints)
if len(srvs.SRVs) == 0 {
for _, ep := range srvs.Endpoints {
h, p, serr := net.SplitHostPort(ep)
if serr != nil {
fmt.Printf("error parsing endpoint %q", ep)
os.Exit(1)
}
var port uint16
fmt.Sscanf(p, "%d", &port)
srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
}
}
lhost, lport, err := net.SplitHostPort(gatewayListenAddr)
if err != nil {
fmt.Println("failed to validate listen address:", gatewayListenAddr)
os.Exit(1)
}
laddrs, err := net.LookupHost(lhost)
if err != nil {
fmt.Println("failed to resolve listen host:", lhost)
os.Exit(1)
}
laddrsMap := make(map[string]bool)
for _, addr := range laddrs {
laddrsMap[addr] = true
}
for _, srv := range srvs.SRVs {
var eaddrs []string
eaddrs, err = net.LookupHost(srv.Target)
if err != nil {
fmt.Println("failed to resolve endpoint host:", srv.Target)
os.Exit(1)
}
if fmt.Sprintf("%d", srv.Port) != lport {
continue
}
for _, ea := range eaddrs {
if laddrsMap[ea] {
fmt.Printf("SRV or endpoint (%s:%d->%s:%d) should not resolve to the gateway listen addr (%s)\n", srv.Target, srv.Port, ea, srv.Port, gatewayListenAddr)
os.Exit(1)
}
}
}
if len(srvs.Endpoints) == 0 {
fmt.Println("no endpoints found")
os.Exit(1)
}
var l net.Listener
l, err = net.Listen("tcp", gatewayListenAddr)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
tp := tcpproxy.TCPProxy{
Logger: lg,
Listener: l,
Endpoints: srvs.SRVs,
MonitorInterval: gatewayRetryDelay,
}
// At this point, etcd gateway listener is initialized
notifySystemd(lg)
tp.Run()
}
Proxy
TCPProxy 通过 Run 方法启动,主要工作是创建一个 Monitor 协程,及监听 Listener 实例的新连接请求,监听到新连接后通过 serve 协程独立处理。
func (tp *TCPProxy) Run() error {
tp.donec = make(chan struct{})
if tp.MonitorInterval == 0 {
tp.MonitorInterval = 5 * time.Minute
}
for _, srv := range tp.Endpoints {
addr := formatAddr(srv.Target, srv.Port)
tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
}
eps := []string{}
for _, ep := range tp.Endpoints {
eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))
}
if tp.Logger != nil {
tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps))
}
go tp.runMonitor()
for {
in, err := tp.Listener.Accept()
if err != nil {
return err
}
go tp.serve(in)
}
}
Serve Requests
代理请求处理过程非常简单,首先通过 pick 方法获取一个目标端,并创建到目标端的 TCP 连接,如果目标端不可达,那么标记该目标端为未激活状态(第 19 行)。如果目标端可以到达,那么通过一个从请求到目标的协程(第 30 ~ 34 行)与自己所在协程的目标到请求端(第 36 ~ 38 行)双协程构建一个双全工通道。
func (tp *TCPProxy) serve(in net.Conn) {
var (
err error
out net.Conn
)
for {
tp.mu.Lock()
remote := tp.pick()
tp.mu.Unlock()
if remote == nil {
break
}
// TODO: add timeout
out, err = net.Dial("tcp", remote.addr)
if err == nil {
break
}
remote.inactivate()
if tp.Logger != nil {
tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
}
}
if out == nil {
in.Close()
return
}
go func() {
io.Copy(in, out)
in.Close()
out.Close()
}()
io.Copy(out, in)
out.Close()
in.Close()
}
Monitor
监控协程会在固定时间间隔后触发,遍历全部的远端节点,并尝试检查未激活的远端节点是否已经恢复可用。
func (tp *TCPProxy) runMonitor() {
for {
select {
case <-time.After(tp.MonitorInterval):
tp.mu.Lock()
for _, rem := range tp.remotes {
if rem.isActive() {
continue
}
go func(r *remote) {
if err := r.tryReactivate(); err != nil {
if tp.Logger != nil {
tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
}
} else {
if tp.Logger != nil {
tp.Logger.Info("activated", zap.String("address", r.addr))
}
}
}(rem)
}
tp.mu.Unlock()
case <-tp.donec:
return
}
}
}
检查过程非常简单,只是简单进行连接。
func (r *remote) tryReactivate() error {
conn, err := net.Dial("tcp", r.addr)
if err != nil {
return err
}
conn.Close()
r.mu.Lock()
defer r.mu.Unlock()
r.inactive = false
return nil
}
Pick a Remote
根据远端是否可达,优先级及权重综合选择一个合适的远端节点。如果远端目标不可达,不选择(第 10 行);第 11 ~ 24 行对优先级进行选择,注意,通过这个方法获得的 weighted、unweighted 两个切片中的远端节点优先级绝对值是递减的。
func (tp *TCPProxy) pick() *remote {
var weighted []*remote
var unweighted []*remote
bestPr := uint16(65535)
w := 0
// find best priority class
for _, r := range tp.remotes {
switch {
case !r.isActive():
case r.srv.Priority < bestPr:
bestPr = r.srv.Priority
w = 0
weighted = nil
unweighted = nil
fallthrough
case r.srv.Priority == bestPr:
if r.srv.Weight > 0 {
weighted = append(weighted, r)
w += int(r.srv.Weight)
} else {
unweighted = append(unweighted, r)
}
}
}
if weighted != nil {
if len(unweighted) > 0 && rand.Intn(100) == 1 {
// In the presence of records containing weights greater
// than 0, records with weight 0 should have a very small
// chance of being selected.
r := unweighted[tp.pickCount%len(unweighted)]
tp.pickCount++
return r
}
// choose a uniform random number between 0 and the sum computed
// (inclusive), and select the RR whose running sum value is the
// first in the selected order
choose := rand.Intn(w)
for i := 0; i < len(weighted); i++ {
choose -= int(weighted[i].srv.Weight)
if choose <= 0 {
return weighted[i]
}
}
}
if unweighted != nil {
for i := 0; i < len(tp.remotes); i++ {
picked := tp.remotes[tp.pickCount%len(tp.remotes)]
tp.pickCount++
if picked.isActive() {
return picked
}
}
}
return nil
}