gRPC 客户端是如何管理与服务端的连接的?

grpc.ClientConn 表示一个客户端实例与服务端之间的连接,主要包含如下数据结构:

1、grpc.connectivityStateManager(grpc.ClientConn.csMgr) 总体的连接状态

状态类型为 connectivity.State,有如下几种状态:

  • Idle
  • Connecting
  • Ready
  • TransientFailure
  • Shutdown

grpc.ClientConn 包含了多个 grpc.addrConn(每个 grpc.addrConn 表示客户端到一个服务端的一条连接),每个 grpc.addrConn 也有自己的连接状态。

  • 当至少有一个 grpc.addrConn.state = Ready,则 grpc.ClientConn.csMgr.state = Ready
  • 当至少有一个 grpc.addrConn.state = Connecting,则 grpc.ClientConn.csMgr.state = Connecting
  • 否则 grpc.ClientConn.csMgr.state = TransientFailure
    1. 默认实现下客户端与某一个服务端(host:port)只会建立一条连接,所有 RPC 执行都会复用这条连接。
    2. 关于为何只建立一条连接可以看下这个 issueUse multiple connections to avoid the servers
    3. SETTINGS_MAX_CONCURRENT_STREAMS limit #11704[1]
    4. 不过如果使用 manual.Resolver,把同一个服务地址复制多遍,也能做到与一个服务端建立多个连接。
    关于 grpc.addrConn.state 的状态切换可参考设计文档:gRPC Connectivity Semantics and API

2、grpc.ccResolverWrapper 服务端地址解析模块的封装

grpc 内置的 resolver.Resolver 有:

  • dns.dnsResolver:通过域名解析服务地址
  • manual.Resolver:手动设置服务地址
  • passthrough.passthroughResolver:将 grpc.Dial 参数中的地址作为服务地址,这也是默认的

3、grpc.ccBalancerWrapper 负载均衡模块的封装

grpc 内置的 balancer.Balancer 有:

  • grpc.pickfirstBalancer:只使用一个服务地址
  • roundrobin:在多个服务地址中轮转
  • grpclb:使用一个单独的服务提供负载均衡信息(可用的服务地址列表)

可参考设计文档:Load Balancing in gRPC

4、grpc.pickerWrapper 从连接池中选择一个连接

与使用的 balancer.Balancer 具体实现相关:

  • grpc.pickfirstBalancer:grpc.picker,返回当前的连接
  • roundrobin:roundrobin.rrPicker,轮转返回一个可用连接
  • grpclb:grpclb.lbPicker,轮转返回一个可用连接

配置 resolver = dns, balancer = roundrobin 组件之间的关系如下图:
GRPC 客户端服务保障如何做? - 图1

  • dns.dnsResolver 会启动一个 goroutine,负责解析服务地址,发送更新事件
  • grpc.ccBalancer.Wrapper 会启动一个 goroutine,负责监听服务地址更新事件和连接状态变化事件 – 服务地址更新事件会触发负载均衡组件更新连接池 – 连接状态变化事件会触发负载均衡组件更新连接池中连接的状态,以及更新 picker
  • 当执行 RPC 调用时,会通过 grpc.ClientConn.blockingpicker(即 grpc.pickerWrapper.pick,最终调用 roundrobin.rrPicker.Pick)从连接池中获取连接

代码实现

客户端

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "time"
  8. pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"
  9. _ "github.com/yangxikun/go-grpc-client-side-lb-example/resolver/dns"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/balancer/roundrobin"
  12. "google.golang.org/grpc/resolver"
  13. )
  14. const (
  15. defaultName = "rokety"
  16. )
  17. func main() {
  18. log.SetFlags(log.Lshortfile | log.Ldate)
  19. var address string
  20. var timeout int
  21. flag.IntVar(&timeout, "timeout", 1, "greet rpc call timeout")
  22. flag.StringVar(&address, "address", "localhost:50051", "grpc server addr")
  23. flag.Parse()
  24. // Set resolver
  25. resolver.SetDefaultScheme("custom_dns")
  26. // Set up a connection to the server.
  27. conn, err := grpc.Dial(address, grpc.WithInsecure(),
  28. grpc.WithDefaultServiceConfig(
  29. fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)
  30. ),
  31. grpc.WithBlock(), grpc.WithBackoffMaxDelay(time.Second))
  32. if err != nil {
  33. log.Fatalf("did not connect: %v", err)
  34. }
  35. defer conn.Close()
  36. c := pb.NewGreeterClient(conn)
  37. // Contact the server and print out its response.
  38. for range time.Tick(time.Second) {
  39. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
  40. r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
  41. if err != nil {
  42. log.Printf("could not greet: %v\n", err)
  43. } else {
  44. log.Printf("Greeting: %s", r.Message)
  45. }
  46. cancel()
  47. }
  48. }

服务端

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "net"
  7. "time"
  8. pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/reflection"
  11. )
  12. const (
  13. port = ":50051"
  14. )
  15. var stuckDuration time.Duration
  16. // server is used to implement helloworld.GreeterServer.
  17. type server struct{}
  18. // SayHello implements helloworld.GreeterServer
  19. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  20. time.Sleep(stuckDuration)
  21. return &pb.HelloReply{Message: "Hello " + in.Name + "! From " + GetIP()}, nil
  22. }
  23. func main() {
  24. // simulate busy server
  25. stuckDuration = time.Duration(rand.NewSource(time.Now().UnixNano()).Int63()%2) * time.Second
  26. if stuckDuration == time.Second {
  27. log.Println("I will stuck one second!!!")
  28. }
  29. lis, err := net.Listen("tcp", port)
  30. if err != nil {
  31. log.Fatalf("failed to listen: %v", err)
  32. }
  33. s := grpc.NewServer()
  34. pb.RegisterGreeterServer(s, &server{})
  35. // Register reflection service on gRPC server.
  36. reflection.Register(s)
  37. if err := s.Serve(lis); err != nil {
  38. log.Fatalf("failed to serve: %v", err)
  39. }
  40. }
  41. func GetIP() string {
  42. ifaces, _ := net.Interfaces()
  43. // handle err
  44. for _, i := range ifaces {
  45. addrs, _ := i.Addrs()
  46. // handle err
  47. for _, addr := range addrs {
  48. var ip net.IP
  49. switch v := addr.(type) {
  50. case *net.IPNet:
  51. ip = v.IP
  52. case *net.IPAddr:
  53. ip = v.IP
  54. default:
  55. continue
  56. }
  57. if ip.String() != "127.0.0.1" {
  58. return ip.String()
  59. }
  60. }
  61. }
  62. return ""
  63. }

接下来将借助 k8s 部署客户端和服务端代码,测试代码实际效果。
启动 3 个服务端实例,客户端 timeout 设置为 2 秒,客户端日志情况:

  1. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.181
  2. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.161
  3. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  4. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.181
  5. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.161
  6. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  7. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.181
  8. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.161
  9. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163

将服务端缩容为 1 个实例后,客户端日志情况:

  1. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.181
  2. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.161
  3. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  4. 2019/10/21 main.go:44: could not greet: rpc error: code = Unavailable desc = transport is closing
  5. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  6. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  7. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  8. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  9. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  10. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
  11. 2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163

可以看到客户端遇到了一次连接被关闭的错误,为了避免缩容和滚动更新导致的此类错误,我们可以在客户端的代码里加上重试机制:
grpc 客户端的重试策略有 2 种实现,具体可参考涉及文档:gRPC Retry Design

  • Retry policy:出错时立即重试
  • Hedging policy:定时发送并发的多个请求,根据请求的响应情况决定是否发送下一个同样的请求,还是返回(该策略目前未实现)
    1. 注意:
    2. 客户端程序启动时,还需要设置环境变量:GRPC_GO_RETRY=on
    3. MaxAttempts = 2,即最多尝试 2 次,也就是最多重试 1
    4. RetryableStatusCodes 只设置了 UNAVAILABLE,也就是解决上面出现的错误:rpc error: code = Unavailable desc = transport is closing
    5. RetryableStatusCodes 中设置 DeadlineExceeded Canceled 是没有作用的,
    6. 因为在重试逻辑的代码里判断到 Context 超时或取消就会立即退出重试逻辑了。

    添加重试逻辑后的客户端代码:

    ```go package main

import ( “context” “flag” “fmt” “log” “time”

  1. pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"
  2. _ "github.com/yangxikun/go-grpc-client-side-lb-example/resolver/dns"
  3. "google.golang.org/grpc"
  4. "google.golang.org/grpc/balancer/roundrobin"
  5. "google.golang.org/grpc/resolver"

)

const ( defaultName = “rokety” )

func main() { log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime) var address string var timeout int flag.IntVar(&timeout, “timeout”, 1, “greet rpc call timeout”) flag.StringVar(&address, “address”, “localhost:50051”, “grpc server addr”) flag.Parse() // Set up a connection to the server. resolver.SetDefaultScheme(“custom_dns”) conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig( fmt.Sprintf( {"LoadBalancingPolicy": "%s", "MethodConfig":[{ "Name": [{"Service": "helloworld.Greeter"}], "RetryPolicy":{ "MaxAttempts":2, "InitialBackoff": "0.1s", "MaxBackoff": "1s", "BackoffMultiplier": 2.0, "RetryableStatusCodes": ["UNAVAILABLE"] } }] },roundrobin.Name)), grpc.WithBlock(), grpc.WithBackoffMaxDelay(time.Second)) if err != nil { log.Fatalf(“did not connect: %v”, err) } defer conn.Close() c := pb.NewGreeterClient(conn)

  1. // Contact the server and print out its response.
  2. for range time.Tick(time.Second) {
  3. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
  4. r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
  5. if err != nil {
  6. log.Printf("could not greet: %v\n", err)
  7. } else {
  8. log.Printf("Greeting: %s", r.Message)
  9. }
  10. cancel()
  11. }

}

  1. 服务端实例恢复为 3 个,修改客户端 timeout 设置为 1 秒后,客户端日志情况:
  2. ```go
  3. 2019/10/21 13:02:54 main.go:46: Greeting: Hello rokety! From 10.200.38.220
  4. 2019/10/21 13:02:56 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded
  5. 2019/10/21 13:02:57 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded
  6. 2019/10/21 13:02:57 main.go:46: Greeting: Hello rokety! From 10.200.38.220
  7. 2019/10/21 13:02:59 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded
  8. 2019/10/21 13:03:00 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded

可以看到有 2 个服务端的响应是一直超时的,但实际业务使用中,希望避免这种错误,这时可以使用 grpc 的健康检查功能(设计文档:GRPC Health Checking Protocol),该功能要求服务端实现健康检查接口,客户端和服务端的代码都需要调整:

添加健康检查

客户端
注意:
需要在 grpc.WithDefaultServiceConfig 中配置 HealthCheckConfig
需要导入_ “google.golang.org/grpc/health”

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "time"
  8. pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"
  9. _ "github.com/yangxikun/go-grpc-client-side-lb-example/resolver/dns"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/balancer/roundrobin"
  12. _ "google.golang.org/grpc/health"
  13. "google.golang.org/grpc/resolver"
  14. )
  15. const (
  16. defaultName = "rokety"
  17. )
  18. func main() {
  19. log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime)
  20. var address string
  21. var timeout int
  22. flag.IntVar(&timeout, "timeout", 1, "greet rpc call timeout")
  23. flag.StringVar(&address, "address", "localhost:50051", "grpc server addr")
  24. flag.Parse()
  25. // Set up a connection to the server.
  26. resolver.SetDefaultScheme("custom_dns")
  27. conn, err := grpc.Dial(address, grpc.WithInsecure(),
  28. grpc.WithDefaultServiceConfig(
  29. fmt.Sprintf(
  30. `{"LoadBalancingPolicy": "%s",
  31. "MethodConfig":[{
  32. "Name": [{"Service": "helloworld.Greeter"}],
  33. "RetryPolicy":{"MaxAttempts":2,
  34. "InitialBackoff": "0.1s",
  35. "MaxBackoff": "1s",
  36. "BackoffMultiplier": 2.0,
  37. "RetryableStatusCodes": ["UNAVAILABLE", "CANCELLED"]}
  38. }],
  39. "HealthCheckConfig": {"ServiceName": "helloworld.Greeter"}
  40. }`, roundrobin.Name),
  41. grpc.WithBlock(), grpc.WithBackoffMaxDelay(time.Second))
  42. if err != nil {
  43. log.Fatalf("did not connect: %v", err)
  44. }
  45. defer conn.Close()
  46. c := pb.NewGreeterClient(conn)
  47. // Contact the server and print out its response.
  48. for range time.Tick(time.Second) {
  49. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
  50. r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})
  51. if err != nil {
  52. log.Printf("could not greet: %v\n", err)
  53. } else {
  54. log.Printf("Greeting: %s", r.Message)
  55. }
  56. cancel()
  57. }
  58. }

服务端
注意:
实现 grpc_health_v1 的接口
注册到服务中:grpc_health_v1.RegisterHealthServer(s, &healthServer{})

  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "net"
  7. "time"
  8. pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/health/grpc_health_v1"
  12. "google.golang.org/grpc/reflection"
  13. "google.golang.org/grpc/status"
  14. )
  15. const (
  16. port = ":50051"
  17. )
  18. var stuckDuration time.Duration
  19. type healthServer struct{}
  20. func (h *healthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
  21. log.Println("recv health check for service:", req.Service)
  22. if stuckDuration == time.Second {
  23. return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
  24. }
  25. return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
  26. }
  27. func (h *healthServer) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
  28. log.Println("recv health watch for service:", req.Service)
  29. resp := new(grpc_health_v1.HealthCheckResponse)
  30. if stuckDuration == time.Second {
  31. resp.Status = grpc_health_v1.HealthCheckResponse_NOT_SERVING
  32. } else {
  33. resp.Status = grpc_health_v1.HealthCheckResponse_SERVING
  34. }
  35. for range time.NewTicker(time.Second).C {
  36. err := stream.Send(resp)
  37. if err != nil {
  38. return status.Error(codes.Canceled, "Stream has ended.")
  39. }
  40. }
  41. return nil
  42. }
  43. // server is used to implement helloworld.GreeterServer.
  44. type server struct{}
  45. // SayHello implements helloworld.GreeterServer
  46. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  47. time.Sleep(stuckDuration)
  48. return &pb.HelloReply{Message: "Hello " + in.Name + "! From " + GetIP()}, nil
  49. }
  50. func main() {
  51. // simulate busy server
  52. stuckDuration = time.Duration(rand.NewSource(time.Now().UnixNano()).Int63()%2) * time.Second
  53. if stuckDuration == time.Second {
  54. log.Println("I will stuck one second!!!")
  55. }
  56. lis, err := net.Listen("tcp", port)
  57. if err != nil {
  58. log.Fatalf("failed to listen: %v", err)
  59. }
  60. s := grpc.NewServer()
  61. pb.RegisterGreeterServer(s, &server{})
  62. grpc_health_v1.RegisterHealthServer(s, &healthServer{})
  63. // Register reflection service on gRPC server.
  64. reflection.Register(s)
  65. if err := s.Serve(lis); err != nil {
  66. log.Fatalf("failed to serve: %v", err)
  67. }
  68. }
  69. func GetIP() string {
  70. ifaces, _ := net.Interfaces()
  71. // handle err
  72. for _, i := range ifaces {
  73. addrs, _ := i.Addrs()
  74. // handle err
  75. for _, addr := range addrs {
  76. var ip net.IP
  77. switch v := addr.(type) {
  78. case *net.IPNet:
  79. ip = v.IP
  80. case *net.IPAddr:
  81. ip = v.IP
  82. default:
  83. continue
  84. }
  85. if ip.String() != "127.0.0.1" {
  86. return ip.String()
  87. }
  88. }
  89. }
  90. return ""
  91. }

更新服务镜像,查看客户端日志:

  1. 2019/10/21 14:25:01 main.go:47: Greeting: Hello rokety! From 10.200.251.170
  2. 2019/10/21 14:25:02 main.go:47: Greeting: Hello rokety! From 10.200.177.145
  3. 2019/10/21 14:25:03 main.go:47: Greeting: Hello rokety! From 10.200.251.170
  4. 2019/10/21 14:25:04 main.go:47: Greeting: Hello rokety! From 10.200.177.145

3 个服务端实例,只向其中 2 个发送了请求,通过查看 3 个服务端日志,确认其中有一个会在健康检查接口中返回 HealthCheckResponse_NOT_SERVING。

参考:go语言中文网站长 https://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&mid=2651437638&idx=1&sn=89d63780012cb505ab5350aed9180ca4&chksm=80bb64b4b7cceda29da232ad4ce81823ff72bd053f226254a1bdb5a6f8488925cc6c59b4c9a1