gRPC模式

gRPC主要有4种请求/响应模式

  1. 简单模式(Simple RPC)
  2. 服务端数据流模式(Server-side streaming RPC)
  3. 客户端数据流模式(Client-side streaming RPC)
  4. 双向数据流模式(Bidirectional streaming RPC)

image.png

1. 简单模式(Simple RPC)

这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,类似 Http 请求。上篇已实现。

2. 服务端数据流模式(Server-side streaming RPC)

这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。

3. 客户端数据流模式(Client-side streaming RPC)

与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。

3.双向数据流模式(Bidirectional streaming RPC)

  1. 顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。

代码实现

stream.proto

  1. syntax ="proto3";
  2. option go_package="proto/";
  3. service Greeter {
  4. rpc GetStream(StreamReqData) returns (stream StreamResData); // 服务端流模式
  5. rpc PutStream(stream StreamReqData) returns (stream StreamResData); // 客户端流模式
  6. rpc AllStream(stream StreamReqData) returns (stream StreamResData); // 双向流模式
  7. }
  8. message StreamReqData {
  9. string data = 1;
  10. }
  11. message StreamResData {
  12. string data = 1;
  13. }
  14. // 在当前目录生成
  15. // protoc -I . stream.proto --go_out=plugins=grpc:../

server.go

  1. package main
  2. import (
  3. "fmt"
  4. "google.golang.org/grpc"
  5. "net"
  6. "sync"
  7. "time"
  8. "ueumd/grpc-stream/proto"
  9. )
  10. const PORT =":8082"
  11. type server struct {}
  12. // 之前的写法 通过不了
  13. //func (s *server)GetStream(ctx context.Context, req *proto.StreamReqData )(*proto.StreamResData, error) {
  14. // return nil, nil
  15. //}
  16. /**
  17. stream.pb.go
  18. // GreeterServer is the server API for Greeter service.
  19. type GreeterServer interface {
  20. GetStream(*StreamReqData, Greeter_GetStreamServer) error
  21. PutStream(Greeter_PutStreamServer) error
  22. AllStream(Greeter_AllStreamServer) error
  23. }
  24. */
  25. // 服务端模式
  26. // 客户端发起一次请求,服务端返回一段连续的数据流
  27. func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer ) error {
  28. i := 0
  29. for {
  30. i ++
  31. _ = res.Send(&proto.StreamResData{
  32. Data: fmt.Sprintf("%v", time.Now().Unix()),
  33. })
  34. time.Sleep(time.Second)
  35. // 发送10次服务停止发送
  36. if i > 10 {
  37. break
  38. }
  39. }
  40. return nil
  41. }
  42. // 客户端流模式
  43. // 与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应
  44. func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer ) error {
  45. for {
  46. if res, err := cliStr.Recv(); err != nil {
  47. fmt.Println(err)
  48. break
  49. } else {
  50. fmt.Println(res.Data)
  51. }
  52. }
  53. return nil
  54. }
  55. // 双向流模式
  56. // 客户端和服务端都可以向对方发送数据流
  57. func (s *server) AllStream(allStr proto.Greeter_AllStreamServer ) error {
  58. wg := sync.WaitGroup{}
  59. wg.Add(2)
  60. go func() {
  61. defer wg.Done()
  62. for {
  63. res, _ := allStr.Recv()
  64. fmt.Println("收到客户端消息:" + res.Data)
  65. }
  66. }()
  67. go func() {
  68. defer wg.Done()
  69. for {
  70. _ = allStr.Send(&proto.StreamResData{
  71. Data: "I am server",
  72. })
  73. time.Sleep(time.Second*2)
  74. }
  75. }()
  76. wg.Wait()
  77. return nil
  78. }
  79. func main() {
  80. lis, err := net.Listen("tcp", PORT)
  81. if err != nil {
  82. panic(err)
  83. }
  84. rpcServer := grpc.NewServer()
  85. proto.RegisterGreeterServer(rpcServer, &server{})
  86. err = rpcServer.Serve(lis)
  87. if err != nil {
  88. panic(err)
  89. }
  90. }

client.go

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "sync"
  7. "time"
  8. "ueumd/grpc-stream/proto"
  9. )
  10. func main() {
  11. // 添加grpc.WithInsecure(),不然没有证书会报错
  12. conn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
  13. if err != nil {
  14. panic(err)
  15. }
  16. defer conn.Close()
  17. client := proto.NewGreeterClient(conn)
  18. //服务端流模式
  19. res, _ := client.GetStream(context.Background(), &proto.StreamReqData{Data:"Golang Server GetStream"})
  20. for {
  21. // socket send recv
  22. result, err := res.Recv()
  23. // 服务端发送10次后停止发送,客户端会收到EOF
  24. if err != nil {
  25. fmt.Println(err)
  26. break
  27. }
  28. fmt.Println("Data:", result.Data)
  29. }
  30. // 客户端模式
  31. putS, _ := client.PutStream(context.Background())
  32. i := 0
  33. for {
  34. i++
  35. _ = putS.Send(&proto.StreamReqData{
  36. Data: fmt.Sprintf("Golang Server PutStream %d", i),
  37. })
  38. time.Sleep(time.Second)
  39. if i > 10 {
  40. break
  41. }
  42. }
  43. // 双向
  44. allStr, _ := client.AllStream(context.Background())
  45. wg := sync.WaitGroup{}
  46. wg.Add(2)
  47. go func() {
  48. defer wg.Done()
  49. for {
  50. res, _ := allStr.Recv()
  51. fmt.Println("收到服户端消息:" + res.Data)
  52. }
  53. }()
  54. go func() {
  55. defer wg.Done()
  56. for {
  57. _ = allStr.Send(&proto.StreamReqData{
  58. Data: "Golang Server AllStream ",
  59. })
  60. time.Sleep(time.Second*2)
  61. }
  62. }()
  63. wg.Wait()
  64. }