1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "google.golang.org/grpc"
    6. "grpc_test_stream/pb/person"
    7. "net"
    8. "strconv"
    9. "time"
    10. )
    11. type Sever struct {
    12. person.UnimplementedSearchServiceServer
    13. }
    14. func (Sever) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
    15. name := req.GetName()
    16. p := new(person.PersonRes)
    17. p.Name = "我收到了来自" + name + "的请求"
    18. fmt.Println(req)
    19. return p, nil
    20. }
    21. func (Sever) SearchIn(stream_server person.SearchService_SearchInServer) error {
    22. for {
    23. recv, err := stream_server.Recv()
    24. fmt.Println("读取到的内容是:", recv)
    25. if err != nil {
    26. err1 := stream_server.SendAndClose(&person.PersonRes{Name: "读取完成"})
    27. if err1 != nil {
    28. fmt.Println(err1)
    29. }
    30. break
    31. }
    32. }
    33. return nil
    34. }
    35. func (Sever) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error {
    36. name := req.Name
    37. for i := 0; i < 10; i++ {
    38. server.Send(&person.PersonRes{Name: name + "服务端收到了请求" + strconv.Itoa(i)})
    39. time.Sleep(1 * time.Second)
    40. }
    41. fmt.Println("服务端响应完成")
    42. return nil
    43. }
    44. func (Sever) SearchAll(server person.SearchService_SearchAllServer) error {
    45. c := make(chan string)
    46. //接收流式请求
    47. go func() {
    48. for i := 0; i <= 10; i++ {
    49. recv, err := server.Recv()
    50. if err != nil {
    51. c <- "over"
    52. break
    53. }
    54. c <- recv.Name
    55. }
    56. }()
    57. //发送请求
    58. for {
    59. s := <-c
    60. if s == "over" {
    61. break
    62. }
    63. server.Send(&person.PersonRes{Name: "已接收到:" + s})
    64. }
    65. return nil
    66. }
    67. func main() {
    68. listener, err := net.Listen("tcp", ":8888")
    69. if err != nil {
    70. fmt.Println(err)
    71. fmt.Println("监听失败1")
    72. }
    73. grpc_server := grpc.NewServer()
    74. person.RegisterSearchServiceServer(grpc_server, &Sever{})
    75. err = grpc_server.Serve(listener)
    76. if err != nil {
    77. fmt.Println(err)
    78. fmt.Println("监听失败2")
    79. }
    80. fmt.Println("服务端启动成功")
    81. }
    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "google.golang.org/grpc"
    6. "google.golang.org/grpc/credentials/insecure"
    7. "grpc_test_stream/pb/person"
    8. "strconv"
    9. "sync"
    10. "time"
    11. )
    12. func main() {
    13. conn, err := grpc.Dial(":8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
    14. defer conn.Close()
    15. if err != nil {
    16. fmt.Println("客户端启用失败")
    17. fmt.Println(err)
    18. }
    19. client := person.NewSearchServiceClient(conn)
    20. fmt.Println("客户端启动成功")
    21. //personRes1, err := client.Search(context.Background(), &person.PersonReq{
    22. // Name: "黄宽",
    23. // Age: 18,
    24. //})
    25. //fmt.Println(personRes1.GetAge(), personRes1.GetName())
    26. //流式传入。
    27. //cin, err := client.SearchIn(context.Background())
    28. //for i := 0; i < 10; i++ {
    29. // cin.Send(&person.PersonReq{Name: "传入消息" + strconv.Itoa(i)})
    30. // time.Sleep(1 * time.Second)
    31. //}
    32. //res, err := cin.CloseAndRecv()
    33. //if err != nil {
    34. // fmt.Println(err)
    35. //}
    36. //流式接收
    37. //out, err := client.SearchOut(context.Background(), &person.PersonReq{Name: "黄宽1"})
    38. //if err != nil {
    39. // fmt.Println(err)
    40. //}
    41. //for {
    42. // recv, err := out.Recv()
    43. // if err != nil {
    44. // fmt.Println(err)
    45. // break
    46. // }
    47. // fmt.Println(recv)
    48. //}
    49. //双向流
    50. all, err := client.SearchAll(context.Background())
    51. if err != nil {
    52. fmt.Print(err)
    53. }
    54. wg := sync.WaitGroup{}
    55. wg.Add(2)
    56. go func() {
    57. for i := 0; i <= 10; i++ {
    58. all.Send(&person.PersonReq{Name: "黄宽" + strconv.Itoa(i)})
    59. time.Sleep(1 * time.Second)
    60. }
    61. wg.Done()
    62. }()
    63. go func() {
    64. for {
    65. recv, err := all.Recv()
    66. if nil != err {
    67. fmt.Println(err)
    68. wg.Done()
    69. break
    70. }
    71. fmt.Println(recv.GetName())
    72. }
    73. }()
    74. wg.Wait()
    75. fmt.Println("客户端完成")
    76. }
    1. syntax = "proto3";
    2. package person;
    3. option go_package = "./;person";
    4. message PersonReq {
    5. string name = 1;
    6. int32 age = 2;
    7. }
    8. message PersonRes {
    9. string name = 1;
    10. int32 age = 2;
    11. }
    12. service SearchService {
    13. rpc Search(PersonReq) returns (PersonRes);//传统的即刻响应
    14. rpc SearchIn(stream PersonReq) returns (PersonRes);//请求为流
    15. rpc SearchOut(PersonReq) returns (stream PersonRes);//响应为流
    16. rpc SearchAll(stream PersonReq) returns (stream PersonRes);//均为流
    17. }