grpc 除了提供四种请求类型之外,还支持很多高级功能:keepalive、请求重试、负载均衡、用户验证等。接下来一一介绍。

GRPC 进阶功能

每个grpc请求都是 stream。

Keepalive

Keepalive 能够让 grpc 的每个 stream 保持长连接状态,适合一些执行时间长的请求。Keepalive 支持在服务端和客户端配置,且只有服务端配置后,客户端的配置才会真正有效。
先给出实例的代码在来说明 grpc keepalive 的使用情况:
server 实现:

  1. // ...
  2. var kaep = keepalive.EnforcementPolicy{
  3. MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
  4. PermitWithoutStream: true, // Allow pings even when there are no active streams
  5. }
  6. var kasp = keepalive.ServerParameters{
  7. MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
  8. MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
  9. MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
  10. Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
  11. Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
  12. }
  13. // server implements EchoServer.
  14. type server struct {
  15. pb.UnimplementedEchoServer
  16. }
  17. func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
  18. return &pb.EchoResponse{Message: req.Message}, nil
  19. }
  20. func main() {
  21. address := "50001"
  22. lis, err := net.Listen("tcp", address)
  23. if err != nil {
  24. log.Fatalf("failed to listen: %v", err)
  25. }
  26. // 创建 grpc server 时配置服务端的 keepalive
  27. s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
  28. pb.RegisterEchoServer(s, &server{})
  29. if err := s.Serve(lis); err != nil {
  30. log.Fatalf("failed to serve: %v", err)
  31. }
  32. }

client 端实现:

  1. // ...
  2. var kacp = keepalive.ClientParameters{
  3. Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
  4. Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
  5. PermitWithoutStream: true, // send pings even without active streams
  6. }
  7. func main() {
  8. conn, err := grpc.Dial("50001", grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
  9. if err != nil {
  10. log.Fatalf("did not connect: %v", err)
  11. }
  12. defer conn.Close()
  13. c := pb.NewEchoClient(conn)
  14. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  15. defer cancel()
  16. fmt.Println("Performing unary request")
  17. res, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "keepalive demo"})
  18. if err != nil {
  19. log.Fatalf("unexpected error from UnaryEcho: %v", err)
  20. }
  21. fmt.Println("RPC response:", res)
  22. }

keepalive 的实现核心在于 keepalive.EnforcementPolicykeepalive.ServerParameters。首先是 `keepalive.ServerParameters。它包含几个属性:

  • MaxConnectionIdle : 最大空闲连接时间,默认为无限制。这段时间为客户端 stream 请求为0 或者建立连接。超出这段时间后,serve 会发送一个 GoWay,强制 client stream 断开。
  • MaxConnectionAge:最大连接时间,默认为无限制。stream 连接超出这个值是发送一个 GoWay
  • MaxConnectionAgeGrace :超出MaxConnectionAge之后的宽限时长,默认无限制,最小为 1s。
  • Time :如果一段时间客户端存活但没有 pings 请求,服务端发送一次 ping 请求,默认是 2hour。
  • Timeout:服务端发送 ping 请求超时的时间,默认20s。

keepalive.EnforcementPolicy在服务端强制执行策略,如果客户端违反改策略则断开连接。它有两个属性:

  • MinTime : 如果在指定时间内收到 pings 请求大于一次,强制断开连接,默认 5min。
  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭。

keepalive.ClientParameters是在客户端这侧使用的 keepalive 配置:

  • Time :pings 请求间隔时间,默认无限制,最小为 10s。
  • Timeout :pings 超时时间,默认是 20s。
  • PermitWithoutStream:没有活动的 stream 也允许pings。默认关闭。

请求重试

grpc 支持请求重试,在客户端配置好规则之后,客户端会在请求失败之后尝试重新发起请求。

  1. var (
  2. retryPolicy = `{
  3. "methodConfig": [{
  4. "name": [{"service": "mysite.pb.Echo"}],
  5. "waitForReady": true,
  6. "retryPolicy": {
  7. "MaxAttempts": 3,
  8. "InitialBackoff": ".01s",
  9. "MaxBackoff": "1s",
  10. "BackoffMultiplier": 2.0,
  11. "RetryableStatusCodes": [ "UNAVAILABLE" ]
  12. }
  13. }]}`
  14. )
  15. // use grpc.WithDefaultServiceConfig() to set service config
  16. func retryDial() (*grpc.ClientConn, error) {
  17. return grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
  18. }
  19. // ...

retry 配置只需要在客户端设置即可生效。主要是配置ServerConfig,格式为该链接

  • MaxAttempts :重试的最大次数,最大值是5。
  • InitialBackoff : 初始化重试间隔时间,第一次重试去 Randon(0,initialBackoff)
  • MaxBackoff : 最大重试间隔时间,多次重试是,间隔时间取 random(0,min(initial_backoff*backoff_multiplier**(n-1), max_backoff))
  • RetryableStatusCodes : 设置需要重试的状态码。

负载均衡

grpc 支持客户端负载均衡策略,负载均衡在 grpc name_resolver 的基础上实现:

  1. const (
  2. exampleScheme = "example"
  3. exampleServiceName = "lb.example.grpc.io"
  4. )
  5. // ...
  6. func main() {
  7. // ...
  8. // round_robin 指定负载均衡策略为轮询策略
  9. roundrobinConn, err := grpc.Dial(
  10. fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
  11. grpc.WithBalancerName("round_robin"), // This sets the initial balancing policy.
  12. grpc.WithInsecure(),
  13. grpc.WithBlock(),
  14. )
  15. // ...
  16. }
  17. // 配置 name resolver
  18. type exampleResolverBuilder struct{}
  19. func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  20. r := &exampleResolver{
  21. target: target,
  22. cc: cc,
  23. addrsStore: map[string][]string{
  24. exampleServiceName: addrs,
  25. },
  26. }
  27. r.start()
  28. return r, nil
  29. }
  30. func (*exampleResolverBuilder) Scheme() string { return exampleScheme }
  31. type exampleResolver struct {
  32. target resolver.Target
  33. cc resolver.ClientConn
  34. addrsStore map[string][]string
  35. }
  36. func (r *exampleResolver) start() {
  37. addrStrs := r.addrsStore[r.target.Endpoint]
  38. addrs := make([]resolver.Address, len(addrStrs))
  39. for i, s := range addrStrs {
  40. addrs[i] = resolver.Address{Addr: s}
  41. }
  42. r.cc.UpdateState(resolver.State{Addresses: addrs})
  43. }
  44. func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
  45. func (*exampleResolver) Close() {}
  46. func init() {
  47. resolver.Register(&exampleResolverBuilder{})
  48. }

主要是要实现 resolver.Builder接口

  1. // Builder creates a resolver that will be used to watch name resolution updates.
  2. type Builder interface {
  3. // Build creates a new resolver for the given target.
  4. //
  5. // gRPC dial calls Build synchronously, and fails if the returned error is
  6. // not nil.
  7. Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  8. // Scheme returns the scheme supported by this resolver.
  9. // Scheme is defined at <https://github.com/grpc/grpc/blob/master/doc/naming.md>.
  10. Scheme() string
  11. }

上面的实现方式不支持动态增减服务端地址,可以使用 etcd 实现负载均衡:

  1. type etcdBuilder struct {
  2. prefix string
  3. endpoints []string
  4. }
  5. func ETCDBuilder(prefix string, endpoints []string) resolver.Builder {
  6. return &etcdBuilder{prefix, endpoints}
  7. }
  8. func (b *etcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  9. cli, err := clientv3.New(clientv3.Config{
  10. Endpoints: b.endpoints,
  11. DialTimeout: 3 * time.Second,
  12. })
  13. if err != nil {
  14. return nil, fmt.Errorf("connect to etcd endpoints error")
  15. }
  16. ctx, cancel := context.WithCancel(context.Background())
  17. rlv := &etcdResolver{
  18. cc: cc,
  19. cli: cli,
  20. ctx: ctx,
  21. cancel: cancel,
  22. watchKeyPrefix: b.prefix,
  23. freq: 5 * time.Second,
  24. t: time.NewTimer(0),
  25. rn: make(chan struct{}, 1),
  26. im: make(chan []resolver.Address),
  27. wg: sync.WaitGroup{},
  28. }
  29. rlv.wg.Add(2)
  30. go rlv.watcher()
  31. go rlv.FetchBackendsWithWatch()
  32. return rlv, nil
  33. }
  34. func (b *etcdBuilder) Scheme() string {
  35. return "etcd"
  36. }
  37. type etcdResolver struct {
  38. retry int
  39. freq time.Duration
  40. ctx context.Context
  41. cancel context.CancelFunc
  42. cc resolver.ClientConn
  43. cli *clientv3.Client
  44. t *time.Timer
  45. watchKeyPrefix string
  46. rn chan struct{}
  47. im chan []resolver.Address
  48. wg sync.WaitGroup
  49. }
  50. func (r *etcdResolver) ResolveNow(opt resolver.ResolveNowOptions) {
  51. select {
  52. case r.rn <- struct{}{}:
  53. default:
  54. }
  55. }
  56. func (r *etcdResolver) Close() {
  57. r.cancel()
  58. r.wg.Wait()
  59. r.t.Stop()
  60. }
  61. func (r *etcdResolver) watcher() {
  62. defer r.wg.Done()
  63. for {
  64. select {
  65. case <-r.ctx.Done():
  66. return
  67. case addrs := <-r.im:
  68. if len(addrs) > 0 {
  69. r.retry = 0
  70. r.t.Reset(r.freq)
  71. r.cc.UpdateState(resolver.State{Addresses: addrs})
  72. continue
  73. }
  74. case <-r.t.C:
  75. case <-r.rn:
  76. }
  77. result := r.FetchBackends()
  78. if len(result) == 0 {
  79. r.retry++
  80. r.t.Reset(r.freq)
  81. } else {
  82. r.retry = 0
  83. r.t.Reset(r.freq)
  84. }
  85. r.cc.UpdateState(resolver.State{Addresses: result})
  86. }
  87. }
  88. func (r *etcdResolver) FetchBackendsWithWatch() {
  89. defer r.wg.Done()
  90. for {
  91. select {
  92. case <-r.ctx.Done():
  93. return
  94. case _ = <-r.cli.Watch(r.ctx, r.watchKeyPrefix, clientv3.WithPrefix()):
  95. result := r.FetchBackends()
  96. r.im <- result
  97. }
  98. }
  99. }
  100. func (r *etcdResolver) FetchBackends() []resolver.Address {
  101. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  102. defer cancel()
  103. result := make([]resolver.Address, 0)
  104. resp, err := r.cli.Get(ctx, r.watchKeyPrefix, clientv3.WithPrefix())
  105. if err != nil {
  106. return result
  107. }
  108. for _, kv := range resp.Kvs {
  109. if strings.TrimSpace(string(kv.Value)) == "" {
  110. continue
  111. }
  112. result = append(result, resolver.Address{Addr: string(kv.Value)})
  113. }
  114. return result
  115. }

grpc 加密传输

以上的请求中,grpc 都是通过明文传输数据。但这种方式是很容易泄露数据内容的,grpc 支持 TLS 格式的加密通讯,来保存数据传输的安全性。

TLS 证书

我们首先来生成 TLS 证书

  1. openssl ecparam -genkey -name secp384r1 -out server.key
  2. openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650

这里需要填写相关信息

  1. Country Name (2 letter code) []:
  2. State or Province Name (full name) []:
  3. Locality Name (eg, city) []:
  4. Organization Name (eg, company) []:
  5. Organizational Unit Name (eg, section) []:
  6. Common Name (eg, fully qualified host name) []: mysite
  7. Email Address []:

填写完成后就生成对应的证书:

  1. ssl
  2. ├── server.key
  3. └── server.pem

服务端实现

  1. // ...
  2. const PORT = "50001"
  3. func main() {
  4. // 通过 credentials 加载服务端的TLS证书
  5. c, err := credentials.NewServerTLSFromFile("../ssl/server.pem", "../ssl/server.key")
  6. if err != nil {
  7. log.Fatalf("credentials.NewServerTLSFromFile err: %v", err)
  8. }
  9. // 添加 credentials 配置
  10. server := grpc.NewServer(grpc.Creds(c))
  11. pb.RegisterSearchServiceServer(server, &SearchService{})
  12. lis, err := net.Listen("tcp", ":"+PORT)
  13. if err != nil {
  14. log.Fatalf("net.Listen err: %v", err)
  15. }
  16. server.Serve(lis)
  17. }

客户端实现

  1. const PORT = "9001"
  2. func main() {
  3. // 添加 credentials 配置
  4. c, err := credentials.NewClientTLSFromFile("../ssl/server.pem", "mysite")
  5. if err != nil {
  6. log.Fatalf("credentials.NewClientTLSFromFile err: %v", err)
  7. }
  8. // 客户端开启证书验证
  9. conn, err := grpc.Dial(":"+PORT, grpc.WithTransportCredentials(c))
  10. if err != nil {
  11. log.Fatalf("grpc.Dial err: %v", err)
  12. }
  13. defer conn.Close()
  14. client := pb.NewSearchServiceClient(conn)
  15. resp, err := client.Search(context.Background(), &pb.SearchRequest{
  16. Request: "gRPC",
  17. })
  18. if err != nil {
  19. log.Fatalf("client.Search err: %v", err)
  20. }
  21. log.Printf("resp: %s", resp.GetResponse())
  22. }

CA TLS 证书

TLS 证书的安全性还不够高,特别在证书生成之后,server.key文件的传输就成为一个问题。所以 CA 来签发 TLS 证书来解决这个问题。使用开源工具 cfssl 生成对应的证书:
1.ca 配置

  1. cat << EOF | tee ca-config.json
  2. {
  3. "signing": {
  4. "default": {
  5. "expiry": "87600h"
  6. },
  7. "profiles": {
  8. "mysite": {
  9. "expiry": "87600h",
  10. "usages": [
  11. "signing",
  12. "key encipherment",
  13. "server auth",
  14. "client auth"
  15. ]
  16. }
  17. }
  18. }}
  19. EOF

配置 mysite 机构证书可以进行服务端和客户端双向验证。
2.ca 证书

  1. cat << EOF | tee ca-csr.json
  2. {
  3. "CN": "mysite CA",
  4. "key": {
  5. "algo": "rsa",
  6. "size": 2048
  7. },
  8. "names": [
  9. {
  10. "C": "CN",
  11. "L": "Beijing",
  12. "ST": "Beijing"
  13. }
  14. ]}
  15. EOF

3.服务端证书

  1. cat << EOF | tee server-csr.json
  2. {
  3. "CN": "mysite",
  4. "hosts": [
  5. "127.0.0.1"
  6. ],
  7. "key": {
  8. "algo": "rsa",
  9. "size": 2048
  10. },
  11. "names": [
  12. {
  13. "C": "CN",
  14. "L": "Beijing",
  15. "ST": "Beijing"
  16. }
  17. ]}
  18. EOF

生成 mysite ca 证书和私钥,初始化 ca

  1. cfssl gencert -initca ca-csr.json | cfssljson -bare ca

生成server证书

  1. cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=ca-config.json -profile=mysite -hostname=mysite server-csr.json | cfssljson -bare server

最后的结果为:

  1. ../ssl
  2. ├── ca-config.json
  3. ├── ca-csr.json
  4. ├── ca-key.pem
  5. ├── ca.csr
  6. ├── ca.pem
  7. ├── server-csr.json
  8. ├── server-key.pem
  9. ├── server.csr
  10. └── server.pem

接下来是代码实现,先是服务端:

  1. // ...
  2. type ecServer struct {
  3. pb.UnimplementedEchoServer
  4. }
  5. func (s *ecServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
  6. return &pb.EchoResponse{Message: req.Message}, nil
  7. }
  8. func main() {
  9. lis, err := net.Listen("tcp", "127.0.0.1:50001")
  10. if err != nil {
  11. log.Fatalf("failed to listen: %v", err)
  12. }
  13. // Create tls based credential.
  14. cert, err := tls.LoadX509KeyPair("ssl/server.pem", "ssl/server-key.pem")
  15. if err != nil {
  16. log.Fatalf("tls.LoadX509KeyPair err: %v", err)
  17. }
  18. certPool := x509.NewCertPool()
  19. ca, err := ioutil.ReadFile("ssl/ca.pem")
  20. if err != nil {
  21. log.Fatalf("ioutil.ReadFile err: %v", err)
  22. }
  23. if ok := certPool.AppendCertsFromPEM(ca); !ok {
  24. log.Fatalf("certPool.AppendCertsFromPEM err")
  25. }
  26. creds := credentials.NewTLS(&tls.Config{
  27. Certificates: []tls.Certificate{cert},
  28. ClientAuth: tls.RequireAndVerifyClientCert,
  29. ClientCAs: certPool,
  30. })
  31. s := grpc.NewServer(grpc.Creds(creds))
  32. // Register EchoServer on the server.
  33. pb.RegisterEchoServer(s, &ecServer{})
  34. log.Println("server start")
  35. if err := s.Serve(lis); err != nil {
  36. log.Fatalf("failed to serve: %v", err)
  37. }
  38. }

然后是客户端:

  1. // ...
  2. func callUnaryEcho(client pb.EchoClient, message string) {
  3. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  4. defer cancel()
  5. resp, err := client.UnaryEcho(ctx, &pb.EchoRequest{Message: message})
  6. if err != nil {
  7. log.Fatalf("client.UnaryEcho(_) = _, %v: ", err)
  8. }
  9. fmt.Println("UnaryEcho: ", resp.Message)
  10. }
  11. func main() {
  12. // Create tls based credential.
  13. cert, err := tls.LoadX509KeyPair("ssl/server.pem", "ssl/server-key.pem")
  14. if err != nil {
  15. log.Fatalf("tls.LoadX509KeyPair err: %v", err)
  16. }
  17. certPool := x509.NewCertPool()
  18. ca, err := ioutil.ReadFile("ssl/ca.pem")
  19. if err != nil {
  20. log.Fatalf("ioutil.ReadFile err: %v", err)
  21. }
  22. if ok := certPool.AppendCertsFromPEM(ca); !ok {
  23. log.Fatalf("certPool.AppendCertsFromPEM err")
  24. }
  25. creds := credentials.NewTLS(&tls.Config{
  26. Certificates: []tls.Certificate{cert},
  27. ServerName: "mysite",
  28. RootCAs: certPool,
  29. })
  30. // Set up a connection to the server.
  31. conn, err := grpc.Dial("127.0.0.1:50001", grpc.WithTransportCredentials(creds))
  32. if err != nil {
  33. log.Fatalf("did not connect: %v", err)
  34. }
  35. defer conn.Close()
  36. // Make a echo client and send an RPC.
  37. rgc := pb.NewEchoClient(conn)
  38. callUnaryEcho(rgc, "hello world")
  39. }

拦截器

grpc 支持服务端和客户端的拦截器,可以在请求发起或返回前进行处理,而不用修改原来的代码。接下来来看服务端和客户端各自怎么使用拦截器:

  1. // unary 请求拦截器
  2. func UnaryInterceptor(ctx context.Context,
  3. req interface{},
  4. info *grpc.UnaryServerInfo,
  5. handler grpc.UnaryHandler,
  6. ) (resp interface{}, err error) {
  7. var ip string
  8. p, ok := peer.FromContext(ctx)
  9. if ok {
  10. ip = p.Addr.String()
  11. }
  12. md, _ := metadata.FromIncomingContext(ctx)
  13. start := time.Now()
  14. resp, err = handler(ctx, req)
  15. end := time.Now()
  16. log.Printf("%10s | %14s | %10v | md=%v | reply = %v", ip, info.FullMethod, end.Sub(start), md, resp)
  17. return
  18. }
  19. // stream 请求拦截器
  20. func StreamInterceptor(srv interface{},
  21. ss grpc.ServerStream,
  22. info *grpc.StreamServerInfo,
  23. handler grpc.StreamHandler,
  24. ) (err error) {
  25. var ip string
  26. p, ok := peer.FromContext(ss.Context())
  27. if ok {
  28. ip = p.Addr.String()
  29. }
  30. err = handler(srv, ss)
  31. log.Printf("stream %v | %v | %s\\n", srv, ip, info.FullMethod)
  32. return
  33. }
  34. type server struct {
  35. pb.UnimplementedEchoServer
  36. }
  37. func (s *server) UnaryEcho(ctx context.Context, request *pb.EchoRequest) (*pb.EchoResponse, error) {
  38. return &pb.EchoResponse{Message: request.Message}, nil
  39. }
  40. func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
  41. ctx := stream.Context()
  42. for {
  43. select {
  44. case <-ctx.Done():
  45. break
  46. default:
  47. }
  48. msg, err := stream.Recv()
  49. if errors.Is(err, io.EOF) {
  50. break
  51. }
  52. if err != nil {
  53. log.Printf("recv failed: %v\\n", err)
  54. }
  55. if err := stream.Send(&pb.EchoResponse{Message: "reply: " + msg.Message}); err != nil {
  56. log.Printf("send to client: %v\\n", err)
  57. }
  58. }
  59. return nil
  60. }
  61. func main() {
  62. addr := "127.0.0.1:50001"
  63. lis, err := net.Listen("tcp", addr)
  64. if err != nil {
  65. log.Fatalf("network at %v: %v\\n", addr, err)
  66. }
  67. s := grpc.NewServer(grpc.ChainUnaryInterceptor(UnaryInterceptor), grpc.ChainStreamInterceptor(StreamInterceptor))
  68. pb.RegisterEchoServer(s, &server{})
  69. if err := s.Serve(lis); err != nil {
  70. log.Fatalf("start server at %v: %v\\n", addr, err)
  71. }
  72. }

grpc 中的拦截器分两种,一元请求的拦截器和流式请求的拦截器。其中流式请求的连接器同时作用于服务端流式、客户端流式和双向流式三种请求模式。

接下来是客户端:

  1. func clientUnaryInterceptor(
  2. ctx context.Context,
  3. method string,
  4. req, reply interface{},
  5. cc *grpc.ClientConn,
  6. invoker grpc.UnaryInvoker,
  7. opts ...grpc.CallOption,
  8. ) (err error) {
  9. ctx = metadata.AppendToOutgoingContext(ctx, "username", "OOB")
  10. err = invoker(ctx, method, req, reply, cc, opts...)
  11. return
  12. }
  13. func clientStreamInterceptor(ctx context.Context,
  14. desc *grpc.StreamDesc,
  15. cc *grpc.ClientConn,
  16. method string,
  17. streamer grpc.Streamer,
  18. opts ...grpc.CallOption,
  19. ) (stream grpc.ClientStream, err error) {
  20. // before stream
  21. stream, err = streamer(ctx, desc, cc, method, opts...)
  22. // after stream
  23. return
  24. }
  25. func callUnaryEcho(cc pb.EchoClient, msg string) {
  26. reply, err := cc.UnaryEcho(context.Background(), &pb.EchoRequest{Message: msg})
  27. if err == nil {
  28. log.Printf("reply => %v\\n", reply)
  29. }
  30. }
  31. func callBidirectionalEcho(cc pb.EchoClient, msg string) {
  32. stream, err := cc.BidirectionalStreamingEcho(context.TODO())
  33. if err != nil {
  34. log.Fatalf("call BidirectionalEcho: %v\\n", err)
  35. }
  36. _ = stream.Send(&pb.EchoRequest{Message: msg})
  37. _ = stream.CloseSend()
  38. ctx := stream.Context()
  39. for {
  40. select {
  41. case <-ctx.Done():
  42. break
  43. default:
  44. }
  45. reply, err := stream.Recv()
  46. if errors.Is(err, io.EOF) {
  47. break
  48. }
  49. if err != nil {
  50. log.Fatalf("stream recv: %v\\n", err)
  51. }
  52. log.Printf("stream reply => %v\\n", reply.Message)
  53. }
  54. }
  55. func main() {
  56. addr := "127.0.0.1:50001"
  57. ctx, cancel := context.WithCancel(context.Background())
  58. defer cancel()
  59. conn, err := grpc.DialContext(
  60. ctx,
  61. addr,
  62. grpc.WithInsecure(),
  63. grpc.WithChainUnaryInterceptor(clientUnaryInterceptor),
  64. grpc.WithChainStreamInterceptor(clientStreamInterceptor))
  65. if err != nil {
  66. log.Fatalf("connect %v: %v\\n", addr, err)
  67. }
  68. cc := pb.NewEchoClient(conn)
  69. callUnaryEcho(cc, "unary")
  70. callBidirectionalEcho(cc, "start")
  71. }

grpc 的拦截器同时支持单个拦截器和链式拦截器。

grpc 添加 pprof 接口

grpc 本身是使用 http2 作为底层协议,所以它也能和 golang 的 pprof 结合提供 pprof 接口。下面给出代码:

  1. type server struct {
  2. pb.UnimplementedEchoServer
  3. }
  4. func (s *server) UnaryEcho(ctx context.Context, request *pb.EchoRequest) (*pb.EchoResponse, error) {
  5. return &pb.EchoResponse{Message: request.Message}, nil
  6. }
  7. func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
  8. ctx := stream.Context()
  9. for {
  10. select {
  11. case <-ctx.Done():
  12. break
  13. default:
  14. }
  15. msg, err := stream.Recv()
  16. if errors.Is(err, io.EOF) {
  17. break
  18. }
  19. if err != nil {
  20. log.Printf("recv failed: %v\\n", err)
  21. }
  22. if err := stream.Send(&pb.EchoResponse{Message: "reply: " + msg.Message}); err != nil {
  23. log.Printf("send to client: %v\\n", err)
  24. }
  25. }
  26. return nil
  27. }
  28. func main() {
  29. addr := "127.0.0.1:50001"
  30. // 这里可以添加服务段启动配置和各种拦截器
  31. s := grpc.NewServer()
  32. pb.RegisterEchoServer(s, &server{})
  33. mux := http.NewServeMux()
  34. mux.HandleFunc("/debug/pprof/", pprof.Index)
  35. mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  36. mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  37. mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  38. mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  39. // 启动 http2 服务,golang http 启动时添加证书会自动转化为 http2 服务。
  40. // 将 Content-Type 为 application/grpc 请求转交给 grpc 即可。
  41. err := http.ListenAndServeTLS(
  42. addr,
  43. "ssl/server.pem",
  44. "ssl/server-key.pem",
  45. http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  46. if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
  47. log.Println("call grpc service")
  48. s.ServeHTTP(rw, r)
  49. } else {
  50. mux.ServeHTTP(rw, r)
  51. }
  52. }))
  53. if err != nil {
  54. log.Fatalf("start server at %v: %v", addr, err)
  55. }
  56. }

grpc 请求断开处理

grpc 的请求没有自己设置请求的超时时间,而是将这部分的处理交给 golang 的 context 包。通过 context 的功能实现客户端的登录超时,请求超时。
服务端代码:

  1. type server struct {
  2. pb.UnimplementedEchoServer
  3. }
  4. func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
  5. // 该函数内是 stream 的整个生命周期,该函数退出后,stream 的上下文结束
  6. // 每个stream函数相互独立
  7. // 服务端的 stream 不能直接发起请求终止,但可以通过提前结束该函数,停止该 stream
  8. for {
  9. in, err := stream.Recv()
  10. if err != nil {
  11. fmt.Printf("server: error receiving from stream: %v\n", err)
  12. if err == io.EOF {
  13. return nil
  14. }
  15. return err
  16. }
  17. fmt.Printf("echoing message %q\n", in.Message)
  18. stream.Send(&pb.EchoResponse{Message: in.Message})
  19. }
  20. }
  21. func main() {
  22. lis, err := net.Listen("tcp", "127.0.0.1:10050")
  23. if err != nil {
  24. log.Fatalf("failed to listen: %v", err)
  25. }
  26. fmt.Printf("server listening at port %v\n", lis.Addr())
  27. s := grpc.NewServer()
  28. pb.RegisterEchoServer(s, &server{})
  29. s.Serve(lis)
  30. }

客户端:

  1. func sendMessage(stream pb.Echo_BidirectionalStreamingEchoClient, msg string) error {
  2. fmt.Printf("sending message %q\n", msg)
  3. return stream.Send(&pb.EchoRequest{Message: msg})
  4. }
  5. func recvMessage(stream pb.Echo_BidirectionalStreamingEchoClient, wantErrCode codes.Code) {
  6. res, err := stream.Recv()
  7. if status.Code(err) != wantErrCode {
  8. log.Fatalf("stream.Recv() = %v, %v; want _, status.Code(err)=%v", res, err, wantErrCode)
  9. }
  10. if err != nil {
  11. fmt.Printf("stream.Recv() returned expected error %v\n", err)
  12. return
  13. }
  14. fmt.Printf("received message %q\n", res.Message)
  15. }
  16. func main() {
  17. addr := "127.0.0.1:10050"
  18. // 建立连接
  19. // 建立连接的 ctx 和请求的 ctx 是独立的
  20. conn, err := grpc.DialContext(context.Background(), addr, grpc.WithInsecure())
  21. if err != nil {
  22. log.Fatalf("did not connect: %v", err)
  23. }
  24. defer conn.Close()
  25. c := pb.NewEchoClient(conn)
  26. // Initiate the stream with a context that supports cancellation.
  27. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  28. stream, err := c.BidirectionalStreamingEcho(ctx)
  29. if err != nil {
  30. log.Fatalf("error creating stream: %v", err)
  31. }
  32. // Send some test messages.
  33. if err := sendMessage(stream, "hello"); err != nil {
  34. log.Fatalf("error sending on stream: %v", err)
  35. }
  36. if err := sendMessage(stream, "world"); err != nil {
  37. log.Fatalf("error sending on stream: %v", err)
  38. }
  39. // Ensure the RPC is working.
  40. recvMessage(stream, codes.OK)
  41. recvMessage(stream, codes.OK)
  42. fmt.Println("cancelling context")
  43. cancel()
  44. // This Send may or may not return an error, depending on whether the
  45. // monitored context detects cancellation before the call is made.
  46. sendMessage(stream, "closed")
  47. // This Recv should never succeed.
  48. recvMessage(stream, codes.Canceled)
  49. }

GRPC 性能优化

虽然 grpc 的官方自诩是高性能的框架,但是 grpc 内部使用大量的反射,使得 grpc 在性能上并不算很好,所以还是有必要优化。
grpc 的优化思路比较简单,不需要直接修改源码,只需要在 protoc 命令生成 golang 代码是,将 golang/protobuf 换成第三方的 gogo/protobuf
gogo库基于官方库开发,增加了很多的功能,包括:

  • 快速的序列化和反序列化
  • 更规范的Go数据结构
  • goprotobuf兼容
  • 可选择的产生一些辅助方法,减少使用中的代码输入
  • 可以选择产生测试代码和benchmark代码
  • 其它序列化格式

比如etcd、k8s、dgraph、docker swarmkit都使用它。
基于速度和定制化的考虑,gogo有三种产生代码的方式

  • gofast: 速度优先,不支持其它gogoprotobuf extensions。
  1. go get github.com/gogo/protobuf/protoc-gen-gofast
  2. protoc --gofast_out=. myproto.proto
  • gogofast类似gofast,但是会导入gogoprotobuf
  • gogofaster类似gogofast, 不会产生XXX_unrecognized指针字段,可以减少垃圾回收时间。
  • gogoslick类似gogofaster,但是可以增加一些额外的方法gostringequal等等。
  1. go get github.com/gogo/protobuf/proto
  2. go get github.com/gogo/protobuf/{binary} //protoc-gen-gogofastprotoc-gen-gogofaster protoc-gen-gogoslick
  3. go get github.com/gogo/protobuf/gogoproto
  4. protoc -I=. -I=$GOPATH/src -I=$GOPATH/src/github.com/gogo/protobuf/protobuf --{binary}_out=. myproto.proto
  • protoc-gen-gogo: 最快的速度,最多的可定制化

你可以通过扩展定制序列化: 扩展.

  1. go get github.com/gogo/protobuf/proto
  2. go get github.com/gogo/protobuf/jsonpb
  3. go get github.com/gogo/protobuf/protoc-gen-gogo
  4. go get github.com/gogo/protobuf/gogoproto

gogo同样支持grpc: protoc --gofast_out=plugins=grpc:. my.proto
同时还有 protobuf 对应的教程