Command

Etcd 使用命令方式创建可执行功能,在 Gateway 部分中,首先创建一个主命令,然后再向主命令下挂接子命令(第 6 行),如下所示。

  1. func newGatewayCommand() *cobra.Command {
  2. lpc := &cobra.Command{
  3. Use: "gateway <subcommand>",
  4. Short: "gateway related command",
  5. }
  6. lpc.AddCommand(newGatewayStartCommand())
  7. return lpc
  8. }

子命令也是一个 Command 实例,其中的 Run 方法是该命令执行时的关键方法,第 8 ~ 16 行用于处理与子命令相关的参数。

  1. func newGatewayStartCommand() *cobra.Command {
  2. cmd := cobra.Command{
  3. Use: "start",
  4. Short: "start the gateway",
  5. Run: startGateway,
  6. }
  7. cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
  8. cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
  9. cmd.Flags().StringVar(&gatewayDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
  10. cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
  11. cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file for verifying the discovered endpoints when discovery-srv is provided.")
  12. cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
  13. cmd.Flags().DurationVar(&gatewayRetryDelay, "retry-delay", time.Minute, "duration of delay before retrying failed endpoints")
  14. return &cmd
  15. }

Execution

Gateway 命令的执行部分为 startGateway。大致过程如下,首先获取目标节点(第 11 行);然后对获得的目标地址进行处理(第 17 ~ 29 行);接下来获取 Gateway 监听地址信息并进行处理(第 31 ~ 64 行);最后启动 TCPProxy。

  1. func startGateway(cmd *cobra.Command, args []string) {
  2. lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
  3. if err != nil {
  4. fmt.Fprintln(os.Stderr, err)
  5. os.Exit(1)
  6. }
  7. // We use os.Args to show all the arguments (not only passed-through Cobra).
  8. lg.Info("Running: ", zap.Strings("args", os.Args))
  9. srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)
  10. if len(srvs.Endpoints) == 0 {
  11. // no endpoints discovered, fall back to provided endpoints
  12. srvs.Endpoints = gatewayEndpoints
  13. }
  14. // Strip the schema from the endpoints because we start just a TCP proxy
  15. srvs.Endpoints = stripSchema(srvs.Endpoints)
  16. if len(srvs.SRVs) == 0 {
  17. for _, ep := range srvs.Endpoints {
  18. h, p, serr := net.SplitHostPort(ep)
  19. if serr != nil {
  20. fmt.Printf("error parsing endpoint %q", ep)
  21. os.Exit(1)
  22. }
  23. var port uint16
  24. fmt.Sscanf(p, "%d", &port)
  25. srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
  26. }
  27. }
  28. lhost, lport, err := net.SplitHostPort(gatewayListenAddr)
  29. if err != nil {
  30. fmt.Println("failed to validate listen address:", gatewayListenAddr)
  31. os.Exit(1)
  32. }
  33. laddrs, err := net.LookupHost(lhost)
  34. if err != nil {
  35. fmt.Println("failed to resolve listen host:", lhost)
  36. os.Exit(1)
  37. }
  38. laddrsMap := make(map[string]bool)
  39. for _, addr := range laddrs {
  40. laddrsMap[addr] = true
  41. }
  42. for _, srv := range srvs.SRVs {
  43. var eaddrs []string
  44. eaddrs, err = net.LookupHost(srv.Target)
  45. if err != nil {
  46. fmt.Println("failed to resolve endpoint host:", srv.Target)
  47. os.Exit(1)
  48. }
  49. if fmt.Sprintf("%d", srv.Port) != lport {
  50. continue
  51. }
  52. for _, ea := range eaddrs {
  53. if laddrsMap[ea] {
  54. fmt.Printf("SRV or endpoint (%s:%d->%s:%d) should not resolve to the gateway listen addr (%s)\n", srv.Target, srv.Port, ea, srv.Port, gatewayListenAddr)
  55. os.Exit(1)
  56. }
  57. }
  58. }
  59. if len(srvs.Endpoints) == 0 {
  60. fmt.Println("no endpoints found")
  61. os.Exit(1)
  62. }
  63. var l net.Listener
  64. l, err = net.Listen("tcp", gatewayListenAddr)
  65. if err != nil {
  66. fmt.Fprintln(os.Stderr, err)
  67. os.Exit(1)
  68. }
  69. tp := tcpproxy.TCPProxy{
  70. Logger: lg,
  71. Listener: l,
  72. Endpoints: srvs.SRVs,
  73. MonitorInterval: gatewayRetryDelay,
  74. }
  75. // At this point, etcd gateway listener is initialized
  76. notifySystemd(lg)
  77. tp.Run()
  78. }

Proxy

TCPProxy 通过 Run 方法启动,主要工作是创建一个 Monitor 协程,及监听 Listener 实例的新连接请求,监听到新连接后通过 serve 协程独立处理。

  1. func (tp *TCPProxy) Run() error {
  2. tp.donec = make(chan struct{})
  3. if tp.MonitorInterval == 0 {
  4. tp.MonitorInterval = 5 * time.Minute
  5. }
  6. for _, srv := range tp.Endpoints {
  7. addr := formatAddr(srv.Target, srv.Port)
  8. tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
  9. }
  10. eps := []string{}
  11. for _, ep := range tp.Endpoints {
  12. eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))
  13. }
  14. if tp.Logger != nil {
  15. tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps))
  16. }
  17. go tp.runMonitor()
  18. for {
  19. in, err := tp.Listener.Accept()
  20. if err != nil {
  21. return err
  22. }
  23. go tp.serve(in)
  24. }
  25. }

Serve Requests

代理请求处理过程非常简单,首先通过 pick 方法获取一个目标端,并创建到目标端的 TCP 连接,如果目标端不可达,那么标记该目标端为未激活状态(第 19 行)。如果目标端可以到达,那么通过一个从请求到目标的协程(第 30 ~ 34 行)与自己所在协程的目标到请求端(第 36 ~ 38 行)双协程构建一个双全工通道。

  1. func (tp *TCPProxy) serve(in net.Conn) {
  2. var (
  3. err error
  4. out net.Conn
  5. )
  6. for {
  7. tp.mu.Lock()
  8. remote := tp.pick()
  9. tp.mu.Unlock()
  10. if remote == nil {
  11. break
  12. }
  13. // TODO: add timeout
  14. out, err = net.Dial("tcp", remote.addr)
  15. if err == nil {
  16. break
  17. }
  18. remote.inactivate()
  19. if tp.Logger != nil {
  20. tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
  21. }
  22. }
  23. if out == nil {
  24. in.Close()
  25. return
  26. }
  27. go func() {
  28. io.Copy(in, out)
  29. in.Close()
  30. out.Close()
  31. }()
  32. io.Copy(out, in)
  33. out.Close()
  34. in.Close()
  35. }

Monitor

监控协程会在固定时间间隔后触发,遍历全部的远端节点,并尝试检查未激活的远端节点是否已经恢复可用。

  1. func (tp *TCPProxy) runMonitor() {
  2. for {
  3. select {
  4. case <-time.After(tp.MonitorInterval):
  5. tp.mu.Lock()
  6. for _, rem := range tp.remotes {
  7. if rem.isActive() {
  8. continue
  9. }
  10. go func(r *remote) {
  11. if err := r.tryReactivate(); err != nil {
  12. if tp.Logger != nil {
  13. tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
  14. }
  15. } else {
  16. if tp.Logger != nil {
  17. tp.Logger.Info("activated", zap.String("address", r.addr))
  18. }
  19. }
  20. }(rem)
  21. }
  22. tp.mu.Unlock()
  23. case <-tp.donec:
  24. return
  25. }
  26. }
  27. }

检查过程非常简单,只是简单进行连接。

  1. func (r *remote) tryReactivate() error {
  2. conn, err := net.Dial("tcp", r.addr)
  3. if err != nil {
  4. return err
  5. }
  6. conn.Close()
  7. r.mu.Lock()
  8. defer r.mu.Unlock()
  9. r.inactive = false
  10. return nil
  11. }

Pick a Remote

根据远端是否可达,优先级及权重综合选择一个合适的远端节点。如果远端目标不可达,不选择(第 10 行);第 11 ~ 24 行对优先级进行选择,注意,通过这个方法获得的 weighted、unweighted 两个切片中的远端节点优先级绝对值是递减的。

  1. func (tp *TCPProxy) pick() *remote {
  2. var weighted []*remote
  3. var unweighted []*remote
  4. bestPr := uint16(65535)
  5. w := 0
  6. // find best priority class
  7. for _, r := range tp.remotes {
  8. switch {
  9. case !r.isActive():
  10. case r.srv.Priority < bestPr:
  11. bestPr = r.srv.Priority
  12. w = 0
  13. weighted = nil
  14. unweighted = nil
  15. fallthrough
  16. case r.srv.Priority == bestPr:
  17. if r.srv.Weight > 0 {
  18. weighted = append(weighted, r)
  19. w += int(r.srv.Weight)
  20. } else {
  21. unweighted = append(unweighted, r)
  22. }
  23. }
  24. }
  25. if weighted != nil {
  26. if len(unweighted) > 0 && rand.Intn(100) == 1 {
  27. // In the presence of records containing weights greater
  28. // than 0, records with weight 0 should have a very small
  29. // chance of being selected.
  30. r := unweighted[tp.pickCount%len(unweighted)]
  31. tp.pickCount++
  32. return r
  33. }
  34. // choose a uniform random number between 0 and the sum computed
  35. // (inclusive), and select the RR whose running sum value is the
  36. // first in the selected order
  37. choose := rand.Intn(w)
  38. for i := 0; i < len(weighted); i++ {
  39. choose -= int(weighted[i].srv.Weight)
  40. if choose <= 0 {
  41. return weighted[i]
  42. }
  43. }
  44. }
  45. if unweighted != nil {
  46. for i := 0; i < len(tp.remotes); i++ {
  47. picked := tp.remotes[tp.pickCount%len(tp.remotes)]
  48. tp.pickCount++
  49. if picked.isActive() {
  50. return picked
  51. }
  52. }
  53. }
  54. return nil
  55. }