gRPC 客户端是如何管理与服务端的连接的?
grpc.ClientConn 表示一个客户端实例与服务端之间的连接,主要包含如下数据结构:
1、grpc.connectivityStateManager(grpc.ClientConn.csMgr) 总体的连接状态
状态类型为 connectivity.State,有如下几种状态:
- Idle
- Connecting
- Ready
- TransientFailure
- Shutdown
grpc.ClientConn 包含了多个 grpc.addrConn(每个 grpc.addrConn 表示客户端到一个服务端的一条连接),每个 grpc.addrConn 也有自己的连接状态。
- 当至少有一个 grpc.addrConn.state = Ready,则 grpc.ClientConn.csMgr.state = Ready
- 当至少有一个 grpc.addrConn.state = Connecting,则 grpc.ClientConn.csMgr.state = Connecting
- 否则 grpc.ClientConn.csMgr.state = TransientFailure
关于 grpc.addrConn.state 的状态切换可参考设计文档:gRPC Connectivity Semantics and API默认实现下客户端与某一个服务端(host:port)只会建立一条连接,所有 RPC 执行都会复用这条连接。关于为何只建立一条连接可以看下这个 issue:Use multiple connections to avoid the server’sSETTINGS_MAX_CONCURRENT_STREAMS limit #11704[1]不过如果使用 manual.Resolver,把同一个服务地址复制多遍,也能做到与一个服务端建立多个连接。
2、grpc.ccResolverWrapper 服务端地址解析模块的封装
grpc 内置的 resolver.Resolver 有:
- dns.dnsResolver:通过域名解析服务地址
- manual.Resolver:手动设置服务地址
- passthrough.passthroughResolver:将 grpc.Dial 参数中的地址作为服务地址,这也是默认的
3、grpc.ccBalancerWrapper 负载均衡模块的封装
grpc 内置的 balancer.Balancer 有:
- grpc.pickfirstBalancer:只使用一个服务地址
- roundrobin:在多个服务地址中轮转
- grpclb:使用一个单独的服务提供负载均衡信息(可用的服务地址列表)
可参考设计文档:Load Balancing in gRPC
4、grpc.pickerWrapper 从连接池中选择一个连接
与使用的 balancer.Balancer 具体实现相关:
- grpc.pickfirstBalancer:grpc.picker,返回当前的连接
- roundrobin:roundrobin.rrPicker,轮转返回一个可用连接
- grpclb:grpclb.lbPicker,轮转返回一个可用连接
配置 resolver = dns, balancer = roundrobin 组件之间的关系如下图: 
- dns.dnsResolver 会启动一个 goroutine,负责解析服务地址,发送更新事件
- grpc.ccBalancer.Wrapper 会启动一个 goroutine,负责监听服务地址更新事件和连接状态变化事件 – 服务地址更新事件会触发负载均衡组件更新连接池 – 连接状态变化事件会触发负载均衡组件更新连接池中连接的状态,以及更新 picker
- 当执行 RPC 调用时,会通过 grpc.ClientConn.blockingpicker(即 grpc.pickerWrapper.pick,最终调用 roundrobin.rrPicker.Pick)从连接池中获取连接
代码实现
客户端
package mainimport ("context""flag""fmt""log""time"pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"_ "github.com/yangxikun/go-grpc-client-side-lb-example/resolver/dns""google.golang.org/grpc""google.golang.org/grpc/balancer/roundrobin""google.golang.org/grpc/resolver")const (defaultName = "rokety")func main() {log.SetFlags(log.Lshortfile | log.Ldate)var address stringvar timeout intflag.IntVar(&timeout, "timeout", 1, "greet rpc call timeout")flag.StringVar(&address, "address", "localhost:50051", "grpc server addr")flag.Parse()// Set resolverresolver.SetDefaultScheme("custom_dns")// Set up a connection to the server.conn, err := grpc.Dial(address, grpc.WithInsecure(),grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),grpc.WithBlock(), grpc.WithBackoffMaxDelay(time.Second))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)// Contact the server and print out its response.for range time.Tick(time.Second) {ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})if err != nil {log.Printf("could not greet: %v\n", err)} else {log.Printf("Greeting: %s", r.Message)}cancel()}}
服务端
package mainimport ("context""log""math/rand""net""time"pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb""google.golang.org/grpc""google.golang.org/grpc/reflection")const (port = ":50051")var stuckDuration time.Duration// server is used to implement helloworld.GreeterServer.type server struct{}// SayHello implements helloworld.GreeterServerfunc (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {time.Sleep(stuckDuration)return &pb.HelloReply{Message: "Hello " + in.Name + "! From " + GetIP()}, nil}func main() {// simulate busy serverstuckDuration = time.Duration(rand.NewSource(time.Now().UnixNano()).Int63()%2) * time.Secondif stuckDuration == time.Second {log.Println("I will stuck one second!!!")}lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterGreeterServer(s, &server{})// Register reflection service on gRPC server.reflection.Register(s)if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func GetIP() string {ifaces, _ := net.Interfaces()// handle errfor _, i := range ifaces {addrs, _ := i.Addrs()// handle errfor _, addr := range addrs {var ip net.IPswitch v := addr.(type) {case *net.IPNet:ip = v.IPcase *net.IPAddr:ip = v.IPdefault:continue}if ip.String() != "127.0.0.1" {return ip.String()}}}return ""}
接下来将借助 k8s 部署客户端和服务端代码,测试代码实际效果。
启动 3 个服务端实例,客户端 timeout 设置为 2 秒,客户端日志情况:
2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.1812019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.1612019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.1812019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.1612019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.1812019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.1612019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
将服务端缩容为 1 个实例后,客户端日志情况:
2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.235.1812019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.251.1612019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:44: could not greet: rpc error: code = Unavailable desc = transport is closing2019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.1632019/10/21 main.go:46: Greeting: Hello rokety! From 10.200.105.163
可以看到客户端遇到了一次连接被关闭的错误,为了避免缩容和滚动更新导致的此类错误,我们可以在客户端的代码里加上重试机制:
grpc 客户端的重试策略有 2 种实现,具体可参考涉及文档:gRPC Retry Design:
- Retry policy:出错时立即重试
- Hedging policy:定时发送并发的多个请求,根据请求的响应情况决定是否发送下一个同样的请求,还是返回(该策略目前未实现)
注意:客户端程序启动时,还需要设置环境变量:GRPC_GO_RETRY=onMaxAttempts = 2,即最多尝试 2 次,也就是最多重试 1 次RetryableStatusCodes 只设置了 UNAVAILABLE,也就是解决上面出现的错误:rpc error: code = Unavailable desc = transport is closingRetryableStatusCodes 中设置 DeadlineExceeded 和 Canceled 是没有作用的,因为在重试逻辑的代码里判断到 Context 超时或取消就会立即退出重试逻辑了。
添加重试逻辑后的客户端代码:
```go package main
import ( “context” “flag” “fmt” “log” “time”
pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"_ "github.com/yangxikun/go-grpc-client-side-lb-example/resolver/dns""google.golang.org/grpc""google.golang.org/grpc/balancer/roundrobin""google.golang.org/grpc/resolver"
)
const ( defaultName = “rokety” )
func main() {
log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime)
var address string
var timeout int
flag.IntVar(&timeout, “timeout”, 1, “greet rpc call timeout”)
flag.StringVar(&address, “address”, “localhost:50051”, “grpc server addr”)
flag.Parse()
// Set up a connection to the server.
resolver.SetDefaultScheme(“custom_dns”)
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(
fmt.Sprintf(
{"LoadBalancingPolicy": "%s",
"MethodConfig":[{
"Name": [{"Service": "helloworld.Greeter"}],
"RetryPolicy":{ "MaxAttempts":2,
"InitialBackoff": "0.1s",
"MaxBackoff": "1s",
"BackoffMultiplier": 2.0,
"RetryableStatusCodes": ["UNAVAILABLE"]
}
}]
},roundrobin.Name)),
grpc.WithBlock(), grpc.WithBackoffMaxDelay(time.Second))
if err != nil {
log.Fatalf(“did not connect: %v”, err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.for range time.Tick(time.Second) {ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})if err != nil {log.Printf("could not greet: %v\n", err)} else {log.Printf("Greeting: %s", r.Message)}cancel()}
}
服务端实例恢复为 3 个,修改客户端 timeout 设置为 1 秒后,客户端日志情况:```go2019/10/21 13:02:54 main.go:46: Greeting: Hello rokety! From 10.200.38.2202019/10/21 13:02:56 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded2019/10/21 13:02:57 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded2019/10/21 13:02:57 main.go:46: Greeting: Hello rokety! From 10.200.38.2202019/10/21 13:02:59 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded2019/10/21 13:03:00 main.go:44: could not greet: rpc error: code = DeadlineExceeded desc = context deadline exceeded
可以看到有 2 个服务端的响应是一直超时的,但实际业务使用中,希望避免这种错误,这时可以使用 grpc 的健康检查功能(设计文档:GRPC Health Checking Protocol),该功能要求服务端实现健康检查接口,客户端和服务端的代码都需要调整:
添加健康检查
客户端
注意:
需要在 grpc.WithDefaultServiceConfig 中配置 HealthCheckConfig
需要导入_ “google.golang.org/grpc/health”
package mainimport ("context""flag""fmt""log""time"pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb"_ "github.com/yangxikun/go-grpc-client-side-lb-example/resolver/dns""google.golang.org/grpc""google.golang.org/grpc/balancer/roundrobin"_ "google.golang.org/grpc/health""google.golang.org/grpc/resolver")const (defaultName = "rokety")func main() {log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime)var address stringvar timeout intflag.IntVar(&timeout, "timeout", 1, "greet rpc call timeout")flag.StringVar(&address, "address", "localhost:50051", "grpc server addr")flag.Parse()// Set up a connection to the server.resolver.SetDefaultScheme("custom_dns")conn, err := grpc.Dial(address, grpc.WithInsecure(),grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s","MethodConfig":[{"Name": [{"Service": "helloworld.Greeter"}],"RetryPolicy":{"MaxAttempts":2,"InitialBackoff": "0.1s","MaxBackoff": "1s","BackoffMultiplier": 2.0,"RetryableStatusCodes": ["UNAVAILABLE", "CANCELLED"]}}],"HealthCheckConfig": {"ServiceName": "helloworld.Greeter"}}`, roundrobin.Name),grpc.WithBlock(), grpc.WithBackoffMaxDelay(time.Second))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)// Contact the server and print out its response.for range time.Tick(time.Second) {ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName})if err != nil {log.Printf("could not greet: %v\n", err)} else {log.Printf("Greeting: %s", r.Message)}cancel()}}
服务端
注意:
实现 grpc_health_v1 的接口
注册到服务中:grpc_health_v1.RegisterHealthServer(s, &healthServer{})
package mainimport ("context""log""math/rand""net""time"pb "github.com/yangxikun/go-grpc-client-side-lb-example/pb""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/health/grpc_health_v1""google.golang.org/grpc/reflection""google.golang.org/grpc/status")const (port = ":50051")var stuckDuration time.Durationtype healthServer struct{}func (h *healthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {log.Println("recv health check for service:", req.Service)if stuckDuration == time.Second {return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil}return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil}func (h *healthServer) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {log.Println("recv health watch for service:", req.Service)resp := new(grpc_health_v1.HealthCheckResponse)if stuckDuration == time.Second {resp.Status = grpc_health_v1.HealthCheckResponse_NOT_SERVING} else {resp.Status = grpc_health_v1.HealthCheckResponse_SERVING}for range time.NewTicker(time.Second).C {err := stream.Send(resp)if err != nil {return status.Error(codes.Canceled, "Stream has ended.")}}return nil}// server is used to implement helloworld.GreeterServer.type server struct{}// SayHello implements helloworld.GreeterServerfunc (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {time.Sleep(stuckDuration)return &pb.HelloReply{Message: "Hello " + in.Name + "! From " + GetIP()}, nil}func main() {// simulate busy serverstuckDuration = time.Duration(rand.NewSource(time.Now().UnixNano()).Int63()%2) * time.Secondif stuckDuration == time.Second {log.Println("I will stuck one second!!!")}lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterGreeterServer(s, &server{})grpc_health_v1.RegisterHealthServer(s, &healthServer{})// Register reflection service on gRPC server.reflection.Register(s)if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func GetIP() string {ifaces, _ := net.Interfaces()// handle errfor _, i := range ifaces {addrs, _ := i.Addrs()// handle errfor _, addr := range addrs {var ip net.IPswitch v := addr.(type) {case *net.IPNet:ip = v.IPcase *net.IPAddr:ip = v.IPdefault:continue}if ip.String() != "127.0.0.1" {return ip.String()}}}return ""}
更新服务镜像,查看客户端日志:
2019/10/21 14:25:01 main.go:47: Greeting: Hello rokety! From 10.200.251.1702019/10/21 14:25:02 main.go:47: Greeting: Hello rokety! From 10.200.177.1452019/10/21 14:25:03 main.go:47: Greeting: Hello rokety! From 10.200.251.1702019/10/21 14:25:04 main.go:47: Greeting: Hello rokety! From 10.200.177.145
3 个服务端实例,只向其中 2 个发送了请求,通过查看 3 个服务端日志,确认其中有一个会在健康检查接口中返回 HealthCheckResponse_NOT_SERVING。
