gRPC模式
gRPC主要有4种请求/响应模式
- 简单模式(Simple RPC)
- 服务端数据流模式(Server-side streaming RPC)
- 客户端数据流模式(Client-side streaming RPC)
- 双向数据流模式(Bidirectional streaming RPC)
1. 简单模式(Simple RPC)
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,类似 Http 请求。上篇已实现。
2. 服务端数据流模式(Server-side streaming RPC)
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。
3. 客户端数据流模式(Client-side streaming RPC)
与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。
3.双向数据流模式(Bidirectional streaming RPC)
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。
代码实现
stream.proto
syntax ="proto3";
option go_package="proto/";
service Greeter {
rpc GetStream(StreamReqData) returns (stream StreamResData); // 服务端流模式
rpc PutStream(stream StreamReqData) returns (stream StreamResData); // 客户端流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData); // 双向流模式
}
message StreamReqData {
string data = 1;
}
message StreamResData {
string data = 1;
}
// 在当前目录生成
// protoc -I . stream.proto --go_out=plugins=grpc:../
server.go
package main
import (
"fmt"
"google.golang.org/grpc"
"net"
"sync"
"time"
"ueumd/grpc-stream/proto"
)
const PORT =":8082"
type server struct {}
// 之前的写法 通过不了
//func (s *server)GetStream(ctx context.Context, req *proto.StreamReqData )(*proto.StreamResData, error) {
// return nil, nil
//}
/**
stream.pb.go
// GreeterServer is the server API for Greeter service.
type GreeterServer interface {
GetStream(*StreamReqData, Greeter_GetStreamServer) error
PutStream(Greeter_PutStreamServer) error
AllStream(Greeter_AllStreamServer) error
}
*/
// 服务端模式
// 客户端发起一次请求,服务端返回一段连续的数据流
func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer ) error {
i := 0
for {
i ++
_ = res.Send(&proto.StreamResData{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
// 发送10次服务停止发送
if i > 10 {
break
}
}
return nil
}
// 客户端流模式
// 与服务端数据流模式相反,客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer ) error {
for {
if res, err := cliStr.Recv(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println(res.Data)
}
}
return nil
}
// 双向流模式
// 客户端和服务端都可以向对方发送数据流
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer ) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
res, _ := allStr.Recv()
fmt.Println("收到客户端消息:" + res.Data)
}
}()
go func() {
defer wg.Done()
for {
_ = allStr.Send(&proto.StreamResData{
Data: "I am server",
})
time.Sleep(time.Second*2)
}
}()
wg.Wait()
return nil
}
func main() {
lis, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
rpcServer := grpc.NewServer()
proto.RegisterGreeterServer(rpcServer, &server{})
err = rpcServer.Serve(lis)
if err != nil {
panic(err)
}
}
client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"sync"
"time"
"ueumd/grpc-stream/proto"
)
func main() {
// 添加grpc.WithInsecure(),不然没有证书会报错
conn, err := grpc.Dial("localhost:8082", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
client := proto.NewGreeterClient(conn)
//服务端流模式
res, _ := client.GetStream(context.Background(), &proto.StreamReqData{Data:"Golang Server GetStream"})
for {
// socket send recv
result, err := res.Recv()
// 服务端发送10次后停止发送,客户端会收到EOF
if err != nil {
fmt.Println(err)
break
}
fmt.Println("Data:", result.Data)
}
// 客户端模式
putS, _ := client.PutStream(context.Background())
i := 0
for {
i++
_ = putS.Send(&proto.StreamReqData{
Data: fmt.Sprintf("Golang Server PutStream %d", i),
})
time.Sleep(time.Second)
if i > 10 {
break
}
}
// 双向
allStr, _ := client.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
res, _ := allStr.Recv()
fmt.Println("收到服户端消息:" + res.Data)
}
}()
go func() {
defer wg.Done()
for {
_ = allStr.Send(&proto.StreamReqData{
Data: "Golang Server AllStream ",
})
time.Sleep(time.Second*2)
}
}()
wg.Wait()
}