1 stream.proto

  1. syntax = "proto3";
  2. option go_package="./;proto";
  3. message StreamReqData {
  4. string data = 1;
  5. }
  6. message StreamResData {
  7. string data = 1;
  8. }
  9. service Greeter {
  10. // 服务端流模式, 客户端发一次请求, 服务端源源不断地推送数据
  11. rpc GetStream(StreamReqData) returns (stream StreamResData);
  12. // 客户端流模式
  13. rpc PutStream(stream StreamReqData) returns (StreamResData);
  14. // 双向流模式
  15. rpc AllStream(stream StreamReqData) returns (stream StreamResData);
  16. }

protoc -I . stream.proto —go_out=plugins=grpc:.

2 server.go

  1. package main
  2. import (
  3. "fmt"
  4. "google.golang.org/grpc"
  5. "net"
  6. "stream_grpc_test/proto"
  7. "sync"
  8. "time"
  9. )
  10. const PORT = ":50052"
  11. type server struct{}
  12. func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
  13. i := 0
  14. for {
  15. i++
  16. _ = res.Send(&proto.StreamResData{
  17. Data: fmt.Sprintf("%v", time.Now().Unix()),
  18. })
  19. time.Sleep(time.Second)
  20. if i > 10 {
  21. break
  22. }
  23. }
  24. return nil
  25. }
  26. func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
  27. for {
  28. a, err := cliStr.Recv()
  29. if err != nil {
  30. fmt.Println(err)
  31. break
  32. }
  33. fmt.Println(a.Data)
  34. }
  35. return nil
  36. }
  37. func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
  38. wg := sync.WaitGroup{}
  39. wg.Add(2)
  40. go func() {
  41. defer wg.Done()
  42. for {
  43. data, _ := allStr.Recv()
  44. fmt.Println("收到客户端消息:" + data.Data)
  45. }
  46. }()
  47. go func() {
  48. defer wg.Done()
  49. for {
  50. _ = allStr.Send(&proto.StreamResData{Data: "我是服务器"})
  51. time.Sleep(time.Second)
  52. }
  53. }()
  54. wg.Wait()
  55. return nil
  56. }
  57. func main() {
  58. lis, err := net.Listen("tcp", PORT)
  59. if err != nil {
  60. panic(err)
  61. }
  62. s := grpc.NewServer()
  63. proto.RegisterGreeterServer(s, &server{})
  64. err = s.Serve(lis)
  65. if err != nil {
  66. panic(err)
  67. }
  68. }

3 client.go

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "stream_grpc_test/proto"
  7. "sync"
  8. "time"
  9. )
  10. func main() {
  11. conn, err := grpc.Dial(":50052", grpc.WithInsecure())
  12. if err != nil {
  13. panic(err)
  14. }
  15. defer conn.Close()
  16. c := proto.NewGreeterClient(conn)
  17. // 服务端流模式
  18. res, _ := c.GetStream(context.Background(), &proto.StreamReqData{Data: "慕课网"})
  19. for {
  20. a, err := res.Recv()
  21. if err != nil {
  22. fmt.Println(err)
  23. break
  24. }
  25. fmt.Println(a)
  26. }
  27. // 客户端流模式
  28. putS, _ := c.PutStream(context.Background())
  29. i := 0
  30. for {
  31. i++
  32. putS.Send(&proto.StreamReqData{
  33. Data: fmt.Sprintf("慕课网%d", i),
  34. })
  35. time.Sleep(time.Second)
  36. if i > 10 {
  37. break
  38. }
  39. }
  40. // 双向流模式
  41. allStr, _ := c.AllStream(context.Background())
  42. wg := sync.WaitGroup{}
  43. wg.Add(2)
  44. go func() {
  45. defer wg.Done()
  46. for {
  47. data, _ := allStr.Recv()
  48. fmt.Println("收到客户端消息:" + data.Data)
  49. }
  50. }()
  51. go func() {
  52. defer wg.Done()
  53. for {
  54. _ = allStr.Send(&proto.StreamReqData{Data: "慕课网"})
  55. time.Sleep(time.Second)
  56. }
  57. }()
  58. wg.Wait()
  59. }