缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

GRPC

  1. gRPC是跨平台、跨语言并且效率非常高的RPC方式。
  2. gRPC默认使用protobuf
  3. 可以用proto files创建gRPC服务,
  4. protobuf消息类型来定义方法参数和返回类型。
  5. 建议在gRPC里使用proto3
  6. 因为这样可以使用gRPC支持的全部语言,
  7. 并且能避免proto2客户端与proto3服务端交互时出现兼容性问题。
  8. 在实战项目中使用gRPC时,
  9. 一定要注意服务器的防火墙必须支持HTTP2.0
  10. 因为gRPC是基于HTTP2.0设计的。

目标

  • 测试验证gRPC的四种通讯模式:
    • 请求-应答模式
    • 客户端推流模式
    • 客户端拉流模式
    • 双向流模式

设计

  • hello.proto: 定义gRPC通讯协议
  • HelloServer.go: gRPC服务端的实现
  • pb/hello.pb.go: protoc生成的代码,源码略
  • logger/logger.go: 搜集运行日志以便诊断, 源码略

单元测试

grpc_test.go,依次在四种gRPC通讯模式下发送/接收1000条消息,并对所有消息日志进行校验

  1. package grpc
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "io"
  7. g "learning/gooop/grpc"
  8. "learning/gooop/grpc/logger"
  9. "learning/gooop/grpc/pb"
  10. "strconv"
  11. "sync"
  12. "testing"
  13. "time"
  14. )
  15. func Test_HelloServer(t *testing.T) {
  16. fnAssertTrue := func(b bool, msg string) {
  17. if !b {
  18. t.Fatal(msg)
  19. }
  20. }
  21. logger.Verbose(false)
  22. serverPort := 3333
  23. serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)
  24. iTotalMsgCount := 1000
  25. // start server
  26. srv := new(g.HelloServer)
  27. err := srv.BeginServeTCP(serverPort)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. time.Sleep(100 * time.Millisecond)
  32. // connect to grpc server
  33. conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
  34. if err != nil {
  35. t.Fatal(err)
  36. }
  37. defer conn.Close()
  38. // create grpc client
  39. client := pb.NewHelloServerClient(conn)
  40. // test SimpleRequest
  41. ctx := context.Background()
  42. for i := 0; i < iTotalMsgCount; i++ {
  43. msg := &pb.HelloMessage{
  44. Msg: fmt.Sprintf("SimpleRequest %d", i),
  45. }
  46. reply, err := client.SimpleRequest(ctx, msg)
  47. if err != nil {
  48. t.Fatal(err)
  49. }
  50. fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response")
  51. }
  52. t.Log("passed SimpleRequest")
  53. // test ClientStream
  54. clientStream, err := client.ClientStream(ctx)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. for i := 0; i < iTotalMsgCount; i++ {
  59. msg := &pb.HelloMessage{
  60. Msg: fmt.Sprintf("ClientStream %08d", i),
  61. }
  62. err = clientStream.Send(msg)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. }
  67. reply, err := clientStream.CloseAndRecv()
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")
  72. // logger.Logf("HelloServer.ClientStream, recv %s", msg.String())
  73. for i := 0; i < iTotalMsgCount; i++ {
  74. log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)
  75. fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
  76. }
  77. t.Log("passed ClientStream")
  78. // test ServerStream
  79. serverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. for {
  84. msg, err := serverStream.Recv()
  85. if err == io.EOF {
  86. break
  87. }
  88. if err != nil {
  89. t.Fatal(err)
  90. }
  91. logger.Logf("ServerStream.Recv %s", msg.Msg)
  92. }
  93. for i := 0; i < iTotalMsgCount; i++ {
  94. log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)
  95. fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
  96. }
  97. t.Log("passed ServerStream")
  98. // test DualStream
  99. dualStream, err := client.DualStream(ctx)
  100. var wg sync.WaitGroup
  101. wg.Add(1)
  102. go func() {
  103. defer wg.Done()
  104. for i := 0; i < iTotalMsgCount; i++ {
  105. msg := &pb.HelloMessage{
  106. Msg: fmt.Sprintf("DualStream.Send %08d", i),
  107. }
  108. err := dualStream.Send(msg)
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. }
  113. err = dualStream.CloseSend()
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. }()
  118. wg.Add(1)
  119. go func() {
  120. defer wg.Done()
  121. for {
  122. msg, err := dualStream.Recv()
  123. if err == io.EOF {
  124. break
  125. }
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. logger.Logf("DualStream.Recv %s", msg.Msg)
  130. }
  131. }()
  132. wg.Wait()
  133. for i := 0; i < iTotalMsgCount; i++ {
  134. // Msg: "reply " + msg.Msg,
  135. // logger.Logf("DualStream.Recv %s", msg.Msg)
  136. log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)
  137. fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)
  138. }
  139. t.Log("passed DualStream")
  140. }

测试输出

  1. $ go test -v grpc_test.go
  2. === RUN Test_HelloServer
  3. grpc_test.go:60: passed SimpleRequest
  4. grpc_test.go:87: passed ClientStream
  5. grpc_test.go:110: passed ServerStream
  6. grpc_test.go:159: passed DualStream
  7. --- PASS: Test_HelloServer (0.79s)
  8. PASS
  9. ok command-line-arguments 0.791s

hello.proto

定义四种通讯模式的rpc接口

  1. syntax = "proto3";
  2. package pb;
  3. option go_package="./pb";
  4. service HelloServer {
  5. rpc SimpleRequest(HelloMessage) returns (HelloMessage);
  6. rpc ClientStream(stream HelloMessage) returns (HelloMessage);
  7. rpc ServerStream(HelloMessage) returns (stream HelloMessage);
  8. rpc DualStream(stream HelloMessage) returns (stream HelloMessage);
  9. }
  10. message HelloMessage {
  11. string msg = 1;
  12. }

HelloServer.go

gRPC服务端的实现

  1. package grpc
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "io"
  7. "learning/gooop/grpc/logger"
  8. "learning/gooop/grpc/pb"
  9. "net"
  10. "strconv"
  11. )
  12. type HelloServer int
  13. func (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {
  14. //logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)
  15. msg.Msg = "reply " + msg.Msg
  16. return msg, nil
  17. }
  18. func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {
  19. for {
  20. msg, err := stream.Recv()
  21. if err == io.EOF {
  22. logger.Logf("HelloServer.ClientStream, EOF")
  23. break
  24. }
  25. if err != nil {
  26. logger.Logf("HelloServer.ClientStream, err=%v", err)
  27. return err
  28. }
  29. logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)
  30. }
  31. err := stream.SendAndClose(&pb.HelloMessage{
  32. Msg: "reply ClientStream",
  33. })
  34. if err != nil {
  35. logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)
  36. }
  37. return nil
  38. }
  39. func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {
  40. iMsgCount, err := strconv.Atoi(msg.Msg)
  41. if err != nil {
  42. return err
  43. }
  44. for i := 0; i < iMsgCount; i++ {
  45. msg := &pb.HelloMessage{
  46. Msg: fmt.Sprintf("ServerStream-%08d", i),
  47. }
  48. err := stream.Send(msg)
  49. if err != nil {
  50. return err
  51. }
  52. }
  53. return nil
  54. }
  55. func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {
  56. for {
  57. msg, err := stream.Recv()
  58. if err == io.EOF {
  59. return nil
  60. }
  61. if err != nil {
  62. logger.Logf("HelloServer.DualStream, recv err=%v", err)
  63. return err
  64. }
  65. logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)
  66. ret := &pb.HelloMessage{
  67. Msg: "reply " + msg.Msg,
  68. }
  69. err = stream.Send(ret)
  70. if err != nil {
  71. logger.Logf("HelloServer.DualStream, send err=%v", err)
  72. return err
  73. }
  74. }
  75. }
  76. func (me *HelloServer) BeginServeTCP(port int) error {
  77. listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
  78. if err != nil {
  79. return err
  80. }
  81. server := grpc.NewServer()
  82. pb.RegisterHelloServerServer(server, me)
  83. go func() {
  84. panic(server.Serve(listener))
  85. }()
  86. return nil
  87. }

(end)