在 gRPC 中,大类可分为两种 RPC 方法,与拦截器的对应关系是:
- 普通方法:一元拦截器(grpc.UnaryInterceptor)
- 流方法:流拦截器(grpc.StreamInterceptor)
拦截器可以理解成hook或者middleware,grpc 明确指出只支持一个。
This is intentional. We only provide a hook so that various complex interceptor patterns (e.g., chaining, nested, stack, concurrent, etc.) can be built on top of it without running into the argument of the execution ordering of the multiple interceptors while keeping grpc itself simple.
上面其实说,grpc 故意设计的这样,只能放一个钩子,这一个钩子来创建一些复杂的模式,防止了开发人员将过多的精力放到,函数调用链的执行顺序上。
看看grpc的源码
客户端:
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
// unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.unaryInt = f
})
}
// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.streamInt = f
})
}
可以看到,如果多次被调用,会一直覆盖
服务端:
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
})
}
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
o.streamInt = i
})
}
多次调用,直接panic
服务器端流式 RPC
stream, err := client.ListFeatures(context.Background(), rect)
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
在简单 RPC 的例子中,我们给方法传入一个上下文和请求。然而,我们得到返回的是一个 RouteGuide_ListFeaturesClient
实例,而不是一个应答对象。客户端可以使用 RouteGuide_ListFeaturesClient
流去读取服务器的响应。
我们使用 RouteGuide_ListFeaturesClient
的 Recv()
方法去反复读取服务器的响应到一个响应 protocol buffer 对象(在这个场景下是Feature
)直到消息读取完毕:每次调用完成时,客户端都要检查从 Recv()
返回的错误 err
。如果返回为 nil
,流依然完好并且可以继续读取;如果返回为 io.EOF
,则说明消息流已经结束;否则就一定是一个通过 err
传过来的 RPC 错误。
客户端流式 RPC
除了我们需要给方法传入一个上下文而后返回 RouteGuide_RecordRouteClient
流以外,客户端流方法 RecordRoute
和服务器端方法类似,它可以用来读 和 写消息。
stream, err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
RouteGuide_RecordRouteClient
有一个 Send()
方法,我们可以用它来给服务器发送请求。一旦我们完成使用 Send()
方法将客户端请求写入流,就需要调用流的 CloseAndRecv()
方法,让 gRPC 知道我们已经完成了写入同时期待返回应答。我们从 CloseAndRecv()
返回的 err
中获得 RPC 的状态。如果状态为nil
,那么CloseAndRecv()
的第一个返回值将会是合法的服务器应答。
双向流
proto
syntax = "proto3";
package protos;
//订单请求参数
message OrderRequest {
string orderId = 1;
int64 timeStamp = 2;
}
//订单信息
message OrderInfo {
string OrderId = 1;
string OrderName = 2;
string OrderStatus = 3;
}
service OrderService {
rpc SelectOrderList (stream OrderRequest) returns (stream OrderInfo) {}; //双向流模式
//rpc SelectOrderList (stream OrderRequest) returns (OrderInfo) {}; //client流
//rpc SelectOrderList (OrderRequest) returns (stream OrderInfo) {}; //server流
}
client
package main
import (
"fmt"
"google.golang.org/grpc"
"io"
message "study/grpc2/protos"
"time"
"context"
)
func main() {
//1、Dail连接
conn, err := grpc.Dial("localhost:8090", grpc.WithInsecure())
if err != nil {
panic(err.Error())
}
defer conn.Close()
OrderServiceClient := message.NewOrderServiceClient(conn)
orderInfosClient,err:= OrderServiceClient.SelectOrderList(context.Background())
if err!=nil{
panic(err)
}
defer orderInfosClient.CloseSend() //关闭发送
t := time.NewTicker(1 * time.Second)
for {
request := message.OrderRequest{TimeStamp: time.Now().Unix()}
orderInfosClient.Send(&request) //简洁写法
orderInfo,err:=orderInfosClient.Recv()
/*
orderInfosClient.Recv() 是下面的简便得写法
orderInfo := new(message.OrderInfo)
if err := orderInfosClient.RecvMsg(orderInfo); err != nil {
return nil, err
}
return orderInfo, nil
*/
if err != nil {
panic(err.Error())
}
fmt.Println(orderInfo.OrderId)
select {
case <-t.C:
}
}
}
server
package main
import (
"fmt"
"google.golang.org/grpc"
"io"
"net"
message "study/grpc2/protos"
"time"
)
//订单服务实现
type OrderServiceImpl struct {
}
//获取订单信息s
func (os *OrderServiceImpl) SelectOrderList(stream message.OrderService_SelectOrderListServer) error {
for {
orderInfo,err := stream.Recv()
fmt.Println("收到client消息:",orderInfo.TimeStamp)
if err != nil{
return err
}
OrderId:= fmt.Sprint(orderInfo.TimeStamp)
res := message.OrderInfo{OrderId:OrderId}
fmt.Println("模拟业务操作,睡眠2s.......")
time.Sleep(time.Second*2)
if err = stream.Send(&res);err != nil {
return err
}
}
return nil
}
func main() {
server := grpc.NewServer()
//注册
message.RegisterOrderServiceServer(server, new(OrderServiceImpl))
lis, err := net.Listen("tcp", ":8090")
if err != nil {
panic(err.Error())
}
server.Serve(lis)
}
客户端close的意义
对于一个client,不管是客户端还是双向流,如果等client完整的执行完毕,不进行关闭,且整个conn也不关闭,在server端for循环的goroutine会hang住
func main() {
//1、Dail连接
conn, err := grpc.Dial("127.0.0.1:8190", grpc.WithInsecure())
if err != nil {
panic(err.Error())
}
//defer conn.Close()
run(conn)
time.Sleep(time.Hour)
}
func run(conn *grpc.ClientConn) {
OrderServiceClient := message.NewOrderServiceClient(conn)
orderInfosClient, err := OrderServiceClient.SelectOrderList(context.Background())
if err != nil {
panic(err)
}
//defer orderInfosClient.CloseSend() //关闭发送
t := time.NewTicker(1 * time.Second)
for {
request := message.OrderRequest{TimeStamp: time.Now().Unix()}
orderInfosClient.Send(&request) //简洁写法
orderInfo, err := orderInfosClient.Recv()
if err != nil {
panic(err.Error())
}
fmt.Println(orderInfo.OrderId)
select {
case <-t.C:
}
break
}
}
看看server端goroutine的状态
源码部分
如果进行关闭,客户端流CloseAndRecv()或
双向流的CloseSend()
方法,在server端收到的错误就是EOF
很明显,client端发送完毕后,主动的close是非常有必要的