GRPC 简介

grpc 是由 google 开发的一款开源,高性能 rpc(远程进程调用协议)使用 Protocol Buffers 作为数据交换格式。
grpc.svg

GRPC 安装

golang 使用 grpc 要安装 grpc-go, protoc 和 对应的插件。

安装grpc-go

  1. go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
  2. go get -u google.golang.org/grpc

如果是国内用户无法连接到 google.golang.org 的话可以使用 VPN。或者直接从 github.com 直接下载源代码再编译安装

  1. git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc
  2. go get -u google.golang.org/grpc

安装 protoc

golang 要使用 grpc,还需要使用 protoc 工具。因为 golang 不能直接识别 .proto 文件,需要使用 protoc 工具将 .proto 转化成 golang 代码。下面介绍几个平台下安装 protobuf 的方法。

macos

macos 下安装直接使用 brew 命令即可。

  1. brew install protobuf

linux

linux 下需要先从 github.com 下载 protobuf 源码或者二进制文件,下载地址
二进制安装的话就下载 protobuf-all-*.tar.gz 包,解压后进入生成的目录。之后执行命令:

  1. make && make install

windows

下载 protobuf.all-*.zip 包,解压后再配置环境变量,将 protobuf\bin 配置到 $PATH 变量中。

GRPC使用

新建项目

新建一个 grpc 项目,如下:

  1. ../sample
  2. └── pb
  3. └── echo.proto

echo.proto 的内容为:

  1. syntax = "proto3"; // protobuf 语法版本,默认为 proto2
  2. // // 这个是注释
  3. // .proto 所在的包路径
  4. package sample.pb;
  5. option go_package = "pb";
  6. // EchoRequest grpc 请求报文格式.
  7. message EchoRequest {
  8. string message = 1;
  9. }
  10. // EchoResponse grpc 响应报文格式.
  11. message EchoResponse {
  12. string message = 1;
  13. }
  14. // 定义 Echo 服务.
  15. service Echo {
  16. // UnaryEcho 一元请求.
  17. rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  18. // ServerStreamingEcho 服务端 stream 请求.
  19. rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  20. // ClientStreamingEcho 客户端 stream 请求.
  21. rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  22. // BidirectionalStreamingEcho 双向 stream.
  23. rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
  24. }

执行以下命令将 .proto 转化为 golang 代码:

  1. cd sample
  2. # protoc -I<import路径> <...-I$PATH> --go_out=plugins=grpc:<输出路径> *.proto
  3. protoc -I. --go_out=plugins=grpc:. pb/echo.proto

简单描述下 protoc 命令的功能。

  • -I : *.proto 中导入的包的路径,导入的路径为全路径格式。. 表示当前路径。
  • —goout=plugins=grpc: :指定 .proto 输出的格式和路径,生成 _.go 文件的路径为 和 *.proto 的拼接。
    执行成功后成为文件 echo.pb.go 文件:
  1. ../sample
  2. └── pb
  3. ├── echo.pb.go
  4. └── echo.proto

Server

  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "google.golang.org/grpc"
  6. "io"
  7. "log"
  8. "mysite/sample/pb"
  9. "net"
  10. )
  11. type server struct {
  12. pb.EchoServer
  13. }
  14. // 简单请求
  15. func (s *server) UnaryEcho(ctx context.Context, request *pb.EchoRequest) (*pb.EchoResponse, error) {
  16. return &pb.EchoResponse{Message: "echo: " + request.Message}, nil
  17. }
  18. // 服务端流式
  19. func (s *server) ServerStreamingEcho(request *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
  20. _ = stream.Send(&pb.EchoResponse{Message: "hello"})
  21. _ = stream.Send(&pb.EchoResponse{Message: " "})
  22. _ = stream.Send(&pb.EchoResponse{Message: "client"})
  23. return nil
  24. }
  25. // 客户端流式
  26. func (s *server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {
  27. for {
  28. recv, err := stream.Recv() // block 直到有数据输出
  29. if errors.Is(err, io.EOF) {
  30. // 表示消息传输完毕
  31. break
  32. }
  33. if err != nil {
  34. log.Printf("recv error: %v", err)
  35. return err
  36. }
  37. // client 断开连接
  38. log.Printf("recv data: %v", recv.Message)
  39. }
  40. // SendAndClose 只存在于客户端 stream 请求
  41. // 发送完关闭 stream
  42. return stream.SendAndClose(&pb.EchoResponse{Message: "bye"})
  43. }
  44. // 双向流式
  45. func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
  46. // 如果服务端 stream 方法退出,客户端请求也直接断开
  47. for {
  48. recv, err := stream.Recv()
  49. if errors.Is(err, io.EOF) {
  50. break
  51. }
  52. if err != nil {
  53. log.Printf("recv error: %v", err)
  54. return err
  55. }
  56. if recv.Message == "bye" {
  57. log.Printf("client send done!")
  58. break
  59. }
  60. if err := stream.Send(&pb.EchoResponse{Message: "reply: " + recv.Message}); err != nil {
  61. log.Printf("send message error: %v", err)
  62. return err
  63. }
  64. }
  65. return nil
  66. }
  67. func main() {
  68. addr := "127.0.0.1:50001"
  69. // grpc 为 http2 请求,传输层协议为 tcp
  70. lis, err := net.Listen("tcp", addr)
  71. if err != nil {
  72. log.Fatalf("binding at %v: %v", addr, err)
  73. }
  74. gRPCServer := grpc.NewServer()
  75. pb.RegisterEchoServer(gRPCServer, &server{})
  76. if err := gRPCServer.Serve(lis); err != nil {
  77. log.Fatalf("start grpc: %v", err)
  78. }
  79. }

Client

  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "google.golang.org/grpc"
  7. "io"
  8. "log"
  9. "mysite/sample/pb"
  10. )
  11. // 简单请求
  12. func unaryEcho(cli pb.EchoClient, msg string) {
  13. recv, err := cli.UnaryEcho(context.Background(), &pb.EchoRequest{Message: msg})
  14. if err != nil {
  15. log.Fatalf("unaryEcho %v", err)
  16. }
  17. log.Println("recv data => " + recv.Message)
  18. }
  19. // 服务端流式
  20. func serverStreamingEcho(cli pb.EchoClient, msg string) {
  21. stream, err := cli.ServerStreamingEcho(context.Background(), &pb.EchoRequest{Message: msg})
  22. if err != nil {
  23. log.Fatalf("serverStreamingEcho %v", err)
  24. }
  25. ctx := stream.Context()
  26. for {
  27. select {
  28. case <-ctx.Done():
  29. log.Println("serverStreamingEcho done!")
  30. break
  31. default:
  32. }
  33. msg, err := stream.Recv()
  34. if errors.Is(err, io.EOF) {
  35. break
  36. }
  37. if err == nil {
  38. log.Println("serverStreaming reply => ", msg.Message)
  39. }
  40. }
  41. }
  42. // 客户端流式
  43. func clientStreamingEcho(cli pb.EchoClient) {
  44. stream, err := cli.ClientStreamingEcho(context.Background())
  45. if err != nil {
  46. log.Printf("connect client Streaming: %v\n", err)
  47. return
  48. }
  49. err = stream.Send(&pb.EchoRequest{Message: "hello"})
  50. if err != nil {
  51. log.Printf("clientStreamingEcho send data: %v", err)
  52. return
  53. }
  54. err = stream.Send(&pb.EchoRequest{Message: " "})
  55. if err != nil {
  56. log.Printf("clientStreamingEcho send data: %v", err)
  57. return
  58. }
  59. err = stream.Send(&pb.EchoRequest{Message: "world"})
  60. if err != nil {
  61. log.Printf("clientStreamingEcho send data: %v", err)
  62. return
  63. }
  64. if recv, err := stream.CloseAndRecv(); err == nil {
  65. fmt.Printf("recv data: %v\n", recv.Message)
  66. }
  67. }
  68. // 双向流式
  69. func bidirectionalStreamingEcho(cli pb.EchoClient) {
  70. stream, err := cli.BidirectionalStreamingEcho(context.Background())
  71. if err != nil {
  72. log.Printf("bidirectionalStreamingEcho error: %v\n", err)
  73. return
  74. }
  75. stream.Send(&pb.EchoRequest{Message: "dataset 1"})
  76. recv, err := stream.Recv()
  77. if err == nil {
  78. fmt.Printf("recv from bidirectionalStreamingEcho => %v\n", recv.Message)
  79. }
  80. stream.Send(&pb.EchoRequest{Message: "dataset 2"})
  81. recv, err = stream.Recv()
  82. if err == nil {
  83. fmt.Printf("recv from bidirectionalStreamingEcho => %v\n", recv.Message)
  84. }
  85. stream.Send(&pb.EchoRequest{Message: "dataset 3"})
  86. recv, err = stream.Recv()
  87. if err == nil {
  88. fmt.Printf("recv from bidirectionalStreamingEcho => %v\n", recv.Message)
  89. }
  90. stream.Send(&pb.EchoRequest{Message: "bye"})
  91. stream.CloseSend()
  92. }
  93. func main() {
  94. addr := "127.0.0.1:50001"
  95. ctx, cancel := context.WithCancel(context.Background())
  96. defer cancel()
  97. conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure())
  98. if err != nil {
  99. log.Fatalf("connect %v: %v", addr, err)
  100. }
  101. cli := pb.NewEchoClient(conn)
  102. unaryEcho(cli, "hello")
  103. serverStreamingEcho(cli, "hello")
  104. clientStreamingEcho(cli)
  105. bidirectionalStreamingEcho(cli)
  106. }