Command
Etcd 使用命令方式创建可执行功能,在 Gateway 部分中,首先创建一个主命令,然后再向主命令下挂接子命令(第 6 行),如下所示。
func newGatewayCommand() *cobra.Command {lpc := &cobra.Command{Use: "gateway <subcommand>",Short: "gateway related command",}lpc.AddCommand(newGatewayStartCommand())return lpc}
子命令也是一个 Command 实例,其中的 Run 方法是该命令执行时的关键方法,第 8 ~ 16 行用于处理与子命令相关的参数。
func newGatewayStartCommand() *cobra.Command {cmd := cobra.Command{Use: "start",Short: "start the gateway",Run: startGateway,}cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")cmd.Flags().StringVar(&gatewayDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file for verifying the discovered endpoints when discovery-srv is provided.")cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")cmd.Flags().DurationVar(&gatewayRetryDelay, "retry-delay", time.Minute, "duration of delay before retrying failed endpoints")return &cmd}
Execution
Gateway 命令的执行部分为 startGateway。大致过程如下,首先获取目标节点(第 11 行);然后对获得的目标地址进行处理(第 17 ~ 29 行);接下来获取 Gateway 监听地址信息并进行处理(第 31 ~ 64 行);最后启动 TCPProxy。
func startGateway(cmd *cobra.Command, args []string) {lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)if err != nil {fmt.Fprintln(os.Stderr, err)os.Exit(1)}// We use os.Args to show all the arguments (not only passed-through Cobra).lg.Info("Running: ", zap.Strings("args", os.Args))srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)if len(srvs.Endpoints) == 0 {// no endpoints discovered, fall back to provided endpointssrvs.Endpoints = gatewayEndpoints}// Strip the schema from the endpoints because we start just a TCP proxysrvs.Endpoints = stripSchema(srvs.Endpoints)if len(srvs.SRVs) == 0 {for _, ep := range srvs.Endpoints {h, p, serr := net.SplitHostPort(ep)if serr != nil {fmt.Printf("error parsing endpoint %q", ep)os.Exit(1)}var port uint16fmt.Sscanf(p, "%d", &port)srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})}}lhost, lport, err := net.SplitHostPort(gatewayListenAddr)if err != nil {fmt.Println("failed to validate listen address:", gatewayListenAddr)os.Exit(1)}laddrs, err := net.LookupHost(lhost)if err != nil {fmt.Println("failed to resolve listen host:", lhost)os.Exit(1)}laddrsMap := make(map[string]bool)for _, addr := range laddrs {laddrsMap[addr] = true}for _, srv := range srvs.SRVs {var eaddrs []stringeaddrs, err = net.LookupHost(srv.Target)if err != nil {fmt.Println("failed to resolve endpoint host:", srv.Target)os.Exit(1)}if fmt.Sprintf("%d", srv.Port) != lport {continue}for _, ea := range eaddrs {if laddrsMap[ea] {fmt.Printf("SRV or endpoint (%s:%d->%s:%d) should not resolve to the gateway listen addr (%s)\n", srv.Target, srv.Port, ea, srv.Port, gatewayListenAddr)os.Exit(1)}}}if len(srvs.Endpoints) == 0 {fmt.Println("no endpoints found")os.Exit(1)}var l net.Listenerl, err = net.Listen("tcp", gatewayListenAddr)if err != nil {fmt.Fprintln(os.Stderr, err)os.Exit(1)}tp := tcpproxy.TCPProxy{Logger: lg,Listener: l,Endpoints: srvs.SRVs,MonitorInterval: gatewayRetryDelay,}// At this point, etcd gateway listener is initializednotifySystemd(lg)tp.Run()}
Proxy
TCPProxy 通过 Run 方法启动,主要工作是创建一个 Monitor 协程,及监听 Listener 实例的新连接请求,监听到新连接后通过 serve 协程独立处理。
func (tp *TCPProxy) Run() error {tp.donec = make(chan struct{})if tp.MonitorInterval == 0 {tp.MonitorInterval = 5 * time.Minute}for _, srv := range tp.Endpoints {addr := formatAddr(srv.Target, srv.Port)tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})}eps := []string{}for _, ep := range tp.Endpoints {eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))}if tp.Logger != nil {tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps))}go tp.runMonitor()for {in, err := tp.Listener.Accept()if err != nil {return err}go tp.serve(in)}}
Serve Requests
代理请求处理过程非常简单,首先通过 pick 方法获取一个目标端,并创建到目标端的 TCP 连接,如果目标端不可达,那么标记该目标端为未激活状态(第 19 行)。如果目标端可以到达,那么通过一个从请求到目标的协程(第 30 ~ 34 行)与自己所在协程的目标到请求端(第 36 ~ 38 行)双协程构建一个双全工通道。
func (tp *TCPProxy) serve(in net.Conn) {var (err errorout net.Conn)for {tp.mu.Lock()remote := tp.pick()tp.mu.Unlock()if remote == nil {break}// TODO: add timeoutout, err = net.Dial("tcp", remote.addr)if err == nil {break}remote.inactivate()if tp.Logger != nil {tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))}}if out == nil {in.Close()return}go func() {io.Copy(in, out)in.Close()out.Close()}()io.Copy(out, in)out.Close()in.Close()}
Monitor
监控协程会在固定时间间隔后触发,遍历全部的远端节点,并尝试检查未激活的远端节点是否已经恢复可用。
func (tp *TCPProxy) runMonitor() {for {select {case <-time.After(tp.MonitorInterval):tp.mu.Lock()for _, rem := range tp.remotes {if rem.isActive() {continue}go func(r *remote) {if err := r.tryReactivate(); err != nil {if tp.Logger != nil {tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))}} else {if tp.Logger != nil {tp.Logger.Info("activated", zap.String("address", r.addr))}}}(rem)}tp.mu.Unlock()case <-tp.donec:return}}}
检查过程非常简单,只是简单进行连接。
func (r *remote) tryReactivate() error {conn, err := net.Dial("tcp", r.addr)if err != nil {return err}conn.Close()r.mu.Lock()defer r.mu.Unlock()r.inactive = falsereturn nil}
Pick a Remote
根据远端是否可达,优先级及权重综合选择一个合适的远端节点。如果远端目标不可达,不选择(第 10 行);第 11 ~ 24 行对优先级进行选择,注意,通过这个方法获得的 weighted、unweighted 两个切片中的远端节点优先级绝对值是递减的。
func (tp *TCPProxy) pick() *remote {var weighted []*remotevar unweighted []*remotebestPr := uint16(65535)w := 0// find best priority classfor _, r := range tp.remotes {switch {case !r.isActive():case r.srv.Priority < bestPr:bestPr = r.srv.Priorityw = 0weighted = nilunweighted = nilfallthroughcase r.srv.Priority == bestPr:if r.srv.Weight > 0 {weighted = append(weighted, r)w += int(r.srv.Weight)} else {unweighted = append(unweighted, r)}}}if weighted != nil {if len(unweighted) > 0 && rand.Intn(100) == 1 {// In the presence of records containing weights greater// than 0, records with weight 0 should have a very small// chance of being selected.r := unweighted[tp.pickCount%len(unweighted)]tp.pickCount++return r}// choose a uniform random number between 0 and the sum computed// (inclusive), and select the RR whose running sum value is the// first in the selected orderchoose := rand.Intn(w)for i := 0; i < len(weighted); i++ {choose -= int(weighted[i].srv.Weight)if choose <= 0 {return weighted[i]}}}if unweighted != nil {for i := 0; i < len(tp.remotes); i++ {picked := tp.remotes[tp.pickCount%len(tp.remotes)]tp.pickCount++if picked.isActive() {return picked}}}return nil}
