缘起
最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
GRPC
gRPC是跨平台、跨语言并且效率非常高的RPC方式。gRPC默认使用protobuf。可以用proto files创建gRPC服务,用protobuf消息类型来定义方法参数和返回类型。建议在gRPC里使用proto3,因为这样可以使用gRPC支持的全部语言,并且能避免proto2客户端与proto3服务端交互时出现兼容性问题。在实战项目中使用gRPC时,一定要注意服务器的防火墙必须支持HTTP2.0,因为gRPC是基于HTTP2.0设计的。
目标
- 测试验证gRPC的四种通讯模式:
- 请求-应答模式
- 客户端推流模式
- 客户端拉流模式
- 双向流模式
设计
- hello.proto: 定义gRPC通讯协议
- HelloServer.go: gRPC服务端的实现
- pb/hello.pb.go: protoc生成的代码,源码略
- logger/logger.go: 搜集运行日志以便诊断, 源码略
单元测试
grpc_test.go,依次在四种gRPC通讯模式下发送/接收1000条消息,并对所有消息日志进行校验
package grpcimport ("context""fmt""google.golang.org/grpc""io"g "learning/gooop/grpc""learning/gooop/grpc/logger""learning/gooop/grpc/pb""strconv""sync""testing""time")func Test_HelloServer(t *testing.T) {fnAssertTrue := func(b bool, msg string) {if !b {t.Fatal(msg)}}logger.Verbose(false)serverPort := 3333serverAddress := fmt.Sprintf("127.0.0.1:%d", serverPort)iTotalMsgCount := 1000// start serversrv := new(g.HelloServer)err := srv.BeginServeTCP(serverPort)if err != nil {t.Fatal(err)}time.Sleep(100 * time.Millisecond)// connect to grpc serverconn, err := grpc.Dial(serverAddress, grpc.WithInsecure())if err != nil {t.Fatal(err)}defer conn.Close()// create grpc clientclient := pb.NewHelloServerClient(conn)// test SimpleRequestctx := context.Background()for i := 0; i < iTotalMsgCount; i++ {msg := &pb.HelloMessage{Msg: fmt.Sprintf("SimpleRequest %d", i),}reply, err := client.SimpleRequest(ctx, msg)if err != nil {t.Fatal(err)}fnAssertTrue(reply.Msg == "reply "+msg.Msg, "invalid SimpleRequest response")}t.Log("passed SimpleRequest")// test ClientStreamclientStream, err := client.ClientStream(ctx)if err != nil {t.Fatal(err)}for i := 0; i < iTotalMsgCount; i++ {msg := &pb.HelloMessage{Msg: fmt.Sprintf("ClientStream %08d", i),}err = clientStream.Send(msg)if err != nil {t.Fatal(err)}}reply, err := clientStream.CloseAndRecv()if err != nil {t.Fatal(err)}fnAssertTrue(reply.Msg == "reply ClientStream", "invalid ClientStream response")// logger.Logf("HelloServer.ClientStream, recv %s", msg.String())for i := 0; i < iTotalMsgCount; i++ {log := fmt.Sprintf("HelloServer.ClientStream, recv ClientStream %08d", i)fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)}t.Log("passed ClientStream")// test ServerStreamserverStream, err := client.ServerStream(ctx, &pb.HelloMessage{Msg: strconv.Itoa(iTotalMsgCount)})if err != nil {t.Fatal(err)}for {msg, err := serverStream.Recv()if err == io.EOF {break}if err != nil {t.Fatal(err)}logger.Logf("ServerStream.Recv %s", msg.Msg)}for i := 0; i < iTotalMsgCount; i++ {log := fmt.Sprintf("ServerStream.Recv ServerStream-%08d", i)fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)}t.Log("passed ServerStream")// test DualStreamdualStream, err := client.DualStream(ctx)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for i := 0; i < iTotalMsgCount; i++ {msg := &pb.HelloMessage{Msg: fmt.Sprintf("DualStream.Send %08d", i),}err := dualStream.Send(msg)if err != nil {t.Fatal(err)}}err = dualStream.CloseSend()if err != nil {t.Fatal(err)}}()wg.Add(1)go func() {defer wg.Done()for {msg, err := dualStream.Recv()if err == io.EOF {break}if err != nil {t.Fatal(err)}logger.Logf("DualStream.Recv %s", msg.Msg)}}()wg.Wait()for i := 0; i < iTotalMsgCount; i++ {// Msg: "reply " + msg.Msg,// logger.Logf("DualStream.Recv %s", msg.Msg)log := fmt.Sprintf("DualStream.Recv reply DualStream.Send %08d", i)fnAssertTrue(logger.Count(log) == 1, "expecting log "+log)}t.Log("passed DualStream")}
测试输出
$ go test -v grpc_test.go=== RUN Test_HelloServergrpc_test.go:60: passed SimpleRequestgrpc_test.go:87: passed ClientStreamgrpc_test.go:110: passed ServerStreamgrpc_test.go:159: passed DualStream--- PASS: Test_HelloServer (0.79s)PASSok command-line-arguments 0.791s
hello.proto
定义四种通讯模式的rpc接口
syntax = "proto3";package pb;option go_package="./pb";service HelloServer {rpc SimpleRequest(HelloMessage) returns (HelloMessage);rpc ClientStream(stream HelloMessage) returns (HelloMessage);rpc ServerStream(HelloMessage) returns (stream HelloMessage);rpc DualStream(stream HelloMessage) returns (stream HelloMessage);}message HelloMessage {string msg = 1;}
HelloServer.go
gRPC服务端的实现
package grpcimport ("context""fmt""google.golang.org/grpc""io""learning/gooop/grpc/logger""learning/gooop/grpc/pb""net""strconv")type HelloServer intfunc (me *HelloServer) SimpleRequest(ctx context.Context, msg *pb.HelloMessage) (*pb.HelloMessage, error) {//logger.Logf("HelloServer.SimpleRequest, %s", msg.Msg)msg.Msg = "reply " + msg.Msgreturn msg, nil}func (me *HelloServer) ClientStream(stream pb.HelloServer_ClientStreamServer) error {for {msg, err := stream.Recv()if err == io.EOF {logger.Logf("HelloServer.ClientStream, EOF")break}if err != nil {logger.Logf("HelloServer.ClientStream, err=%v", err)return err}logger.Logf("HelloServer.ClientStream, recv %s", msg.Msg)}err := stream.SendAndClose(&pb.HelloMessage{Msg: "reply ClientStream",})if err != nil {logger.Logf("HelloServer.ClientStream, SendAndClose err=%v", err)}return nil}func (me *HelloServer) ServerStream(msg *pb.HelloMessage, stream pb.HelloServer_ServerStreamServer) error {iMsgCount, err := strconv.Atoi(msg.Msg)if err != nil {return err}for i := 0; i < iMsgCount; i++ {msg := &pb.HelloMessage{Msg: fmt.Sprintf("ServerStream-%08d", i),}err := stream.Send(msg)if err != nil {return err}}return nil}func (me *HelloServer) DualStream(stream pb.HelloServer_DualStreamServer) error {for {msg, err := stream.Recv()if err == io.EOF {return nil}if err != nil {logger.Logf("HelloServer.DualStream, recv err=%v", err)return err}logger.Logf("HelloServer.DualStream, recv msg=%v", msg.Msg)ret := &pb.HelloMessage{Msg: "reply " + msg.Msg,}err = stream.Send(ret)if err != nil {logger.Logf("HelloServer.DualStream, send err=%v", err)return err}}}func (me *HelloServer) BeginServeTCP(port int) error {listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))if err != nil {return err}server := grpc.NewServer()pb.RegisterHelloServerServer(server, me)go func() {panic(server.Serve(listener))}()return nil}
(end)
