坑点描述

服务A -> 服务B -> 服务C

场景:

  • 由服务A发起调用时没有设置timeout
  • 服务B -> 服务C 是一个异步调用

结果:服务B -> 服务C,时不时出现rpc error: code = Canceled desc = context canceled
但是,而且在服务B调用服务C前,观察传过去的ctx,通过ctx.Deadline() 拿到的ok是false,即没设置timeout

这是为什么?�

gRPC 超时如何做到跨进程传递?

我们测试发现,不仅是 Go gRPC 服务之间超时可以传递(如果你拿到上游的 ctx 继续往下透传的话)。Go 和 Java 服务之间,超时也会随着调用链传递。

那么 gRPC 的超时是如何做到跨进程跨语言传递的?
有朋友可能想到了 metadata,是否 gRPC 请求链上游设置了超时后,gRPC 框架底层将过期时间放在 metadata 里了?遗憾的是我们打印 metadata 后发现并未发现 timeout 字段踪迹。那么超时时间到底是怎样传递的呢?以 “grpc-go”源码为例,我们来找下线索。

我们知道 gRPC 基于 HTTP2,HTTP2 传输的最小单位是 Frame(帧)。HTTP2 的帧包含很多类型:“DATA Frame”、“HEADERS Frame”、“PRIORITY Frame”、“RST_STREAM Frame”、“CONTINUATON Frame”等。一个 HTTP2 请求/响应可以被拆成多个帧并行发送,每一帧都有一个 StreamID 来标记属于哪个 Stream。服务端收到 Frame 后,根据 StreamID 组装出原始请求数据。
image.png
对于 gRPC 而言,Data Frame 用来存放请求的 response payload;Headers Frame 可用来存放一些需要进行跨进程传递的数据,比如“grpc-status(RPC 请求状态码)”、“:path(RPC 完整路径)”等。那么超时时间是否也通过 HEADERS Frame 传递呢?

客户端设置 timeout

我们知道,用户定义好 protobuf 并通过 protoc 生成桩代码后,桩代码中已经包含了 gRPCCient 接口的实现,每一个在 protobuf 中定义的 RPC,底层都会通过 ClientConn. Invoke 向服务端发起调用:
比如对于这样的 protobuf:

  1. syntax = "proto3";
  2. package proto;
  3. service DemoService {
  4. rpc SayHi(HiRequest) returns (HiResponse);
  5. }
  6. message HiRequest {
  7. string name = 1;
  8. }
  9. message HiResponse {
  10. string message = 1;
  11. }

生成的桩代码中已经包含了 Client 实现:

  1. type DemoServiceClient interface {
  2. SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)
  3. }
  4. type demoServiceClient struct {
  5. cc *grpc.ClientConn
  6. }
  7. func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {
  8. return &demoServiceClient{cc}
  9. }
  10. func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {
  11. out := new(HiResponse)
  12. // 调用 grpc.ClientConn.Invoke() 函数,grpc.ClientConn.Invoke() 内部最终会调用 invoke() 函数
  13. err := c.cc.Invoke(ctx, "/proto.DemoService/SayHi", in, out, opts...)
  14. if err != nil {
  15. return nil, err
  16. }
  17. return out, nil
  18. }

客户端发起 gRPC 请求时,最终会调用 invoke() 方法,invoke() 源码大概如下:

  1. func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
  2. // 构造 clientStream
  3. cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
  4. if err != nil {
  5. return err
  6. }
  7. // 发送 RPC 请求
  8. if err := cs.SendMsg(req); err != nil {
  9. return err
  10. }
  11. return cs.RecvMsg(reply)
  12. }

我们看下 newClientStream 源码,newClientStream 源码比较复杂,我们挑重点看:

  1. func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  2. ...
  3. // 等待 resolver 解析出可用地址
  4. if err := cc.waitForResolvedAddrs(ctx); err != nil {
  5. return nil, err
  6. }
  7. ...
  8. // 构造 *clientStream
  9. cs := &clientStream{
  10. callHdr: callHdr,
  11. ctx: ctx,
  12. ...
  13. }
  14. // 构造新的 *csAttempt,newAttemptLocked 内部会获取 grpc.ClientTransport 并赋值给 *csAttemp.t
  15. if err := cs.newAttemptLocked(sh, trInfo); err != nil {
  16. cs.finish(err)
  17. return nil, err
  18. }
  19. ...
  20. return cs, nil
  21. }

其中 csAttempt.newStream 实现如下:

  1. type csAttempt struct {
  2. cs *clientStream
  3. t transport.ClientTransport // 客户端 Transport
  4. s *transport.Stream // 真正处理RPC 的 Stream
  5. ...
  6. }
  7. func (a *csAttempt) newStream() error {
  8. ...
  9. // 通过 Transport.NewStream 构造RPC Stream
  10. s, err := a.t.NewStream(cs.ctx, cs.callHdr)
  11. cs.attempt.s = s
  12. ...
  13. return nil
  14. }

“transport.ClientTransport”是一个接口,gRPC 内部“internal/transport.http2Client”实现了此接口。
“http2Client.NewStream()”源码如下:

  1. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  2. ctx = peer.NewContext(ctx, t.getPeer())
  3. headerFields, err := t.createHeaderFields(ctx, callHdr)
  4. ...
  5. hdr := &headerFrame{
  6. hf: headerFields,
  7. endStream: false,
  8. ...
  9. }
  10. ...
  11. for {
  12. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  13. if !checkForStreamQuota(it) {
  14. return false
  15. }
  16. if !checkForHeaderListSize(it) {
  17. return false
  18. }
  19. return true
  20. }, hdr)
  21. ...
  22. return s, nil
  23. }

“createHeaderFields”实现如下:

  1. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  2. ...
  3. // 如果透传过来的 ctx 被设置了 timeout/deadline,则在 HTTP2 headers frame 中添加 grpc-timeout 字段,
  4. // grpc-timeout 字段值被转化成 XhYmZs 字符串形式的超时时间
  5. if dl, ok := ctx.Deadline(); ok {
  6. timeout := time.Until(dl)
  7. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  8. }
  9. ...
  10. return headerFields, nil
  11. }

可以看到客户端发起请求时,如果设置了带 timeout 的ctx,则会导致底层 HTTP2 HEADERS Frame 中追加“grpc-timeout”字段。

服务端解析 timeout

服务端通过“Serve”方法启动 grpc Server,监听来自客户端连接。

  1. func (s *Server) Serve(lis net.Listener) error {
  2. ...
  3. for {
  4. // 接收客户端的连接
  5. rawConn, err := lis.Accept()
  6. ...
  7. s.serveWG.Add(1)
  8. go func() {
  9. // 对每一个客户端的连接单独开一个协程来处理
  10. s.handleRawConn(rawConn)
  11. s.serveWG.Done()
  12. }()
  13. }
  14. }
  1. func (s *Server) handleRawConn(rawConn net.Conn) {
  2. ...
  3. // 构造 HTTP2 Transport
  4. st := s.newHTTP2Transport(conn, authInfo)
  5. go func() {
  6. // 处理 HTTP2 Stream
  7. s.serveStreams(st)
  8. s.removeConn(st)
  9. }()
  10. }
  11. func (s *Server) serveStreams(st transport.ServerTransport) {
  12. defer st.Close()
  13. var wg sync.WaitGroup
  14. // http2Server 实现了 transport.ServerTransport 接口,此处会调用 http2Server.HandleSteams方法
  15. // st.HandleStreams 方法签名中第一个参数 handle func(stream *transport.Stream) {}为函数类型,
  16. // handle 随后会在 operateHeaders 中被调用
  17. st.HandleStreams(func(stream *transport.Stream) {
  18. wg.Add(1)
  19. go func() {
  20. defer wg.Done()
  21. // 解析出 gPRC Service, gRPC method, gRPC request message,执行注册到 gRPC.Server 中的 RPC 方法
  22. s.handleStream(st, stream, s.traceInfo(st, stream))
  23. }()
  24. }, ...)
  25. wg.Wait()
  26. }
  1. // http2Server.HandleStreams 会调用传入的 handle 处理 HTTP2 Stream
  2. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  3. defer close(t.readerDone)
  4. for {
  5. t.controlBuf.throttle()
  6. frame, err := t.framer.fr.ReadFrame()
  7. ...
  8. switch frame := frame.(type) {
  9. // 如果是 Headers 帧,则调用 operateHeaders 方法处理 Headers
  10. case *http2.MetaHeadersFrame:
  11. if t.operateHeaders(frame, handle, traceCtx) {
  12. t.Close()
  13. break
  14. }
  15. // 如果是 Data 帧,则调用 handleData 方法处理
  16. case *http2.DataFrame:
  17. t.handleData(frame)
  18. ...
  19. }
  20. }
  21. }
  22. // operateHeaders 解析 Headers 帧
  23. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
  24. // 从HTTP2 Headers 帧中获取 StreamID
  25. streamID := frame.Header().StreamID
  26. state := &decodeState{
  27. serverSide: true,
  28. }
  29. // 从HTTP2 Headers 帧中解析出Header。如果其中包含 grpc-timeout HEADER,
  30. // 则解析出其值并赋值给 state.data.timeout,并将 state.data.timeoutSet 设成 true
  31. if err := state.decodeHeader(frame); err != nil {
  32. if se, ok := status.FromError(err); ok {
  33. ...
  34. }
  35. buf := newRecvBuffer()
  36. // 构造 HTTP2 Stream
  37. s := &Stream{
  38. id: streamID,
  39. st: t,
  40. buf: buf,
  41. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  42. recvCompress: state.data.encoding,
  43. method: state.data.method,
  44. contentSubtype: state.data.contentSubtype,
  45. }
  46. ...
  47. // 如果 state.data.timeoutSet 为 true,则构造一个新的带 timeout 的 ctx 覆盖原 s.ctx
  48. // s.ctx 最终会透传到用户实现的 gRPC Handler 中,参与业务逻辑处理
  49. // 见 server.go 中 processUnaryRPC 内:
  50. // ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  51. // reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  52. // 此处不再赘述
  53. if state.data.timeoutSet {
  54. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
  55. } else {
  56. s.ctx, s.cancel = context.WithCancel(t.ctx)
  57. }
  58. ...
  59. t.controlBuf.put(&registerStream{
  60. streamID: s.id,
  61. wq: s.wq,
  62. })
  63. // 调用 serveStreams 定义好的 handle,执行gRPC调用
  64. handle(s)
  65. return false
  66. }

“decodeHeader”会遍历 frame 中所有 Fields,并调用“processHeaderField”对 HTTP2 HEADERS 帧中的特定的 Field 进行处理。

  • 比如可以从“:path”中解析出包含 protobuf package、service name 和 RPC method name 的完整路径;
  • 比如可以从“grpc-timeout” 中解析出上游传递过来的 timeout;

“decodeHeader”内部实现如下:

  1. func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
  2. ...
  3. // 遍历Headers帧,解析Field
  4. for _, hf := range frame.Fields {
  5. d.processHeaderField(hf)
  6. }
  7. }
  8. func (d *decodeState) processHeaderField(f hpack.HeaderField) {
  9. switch f.Name {
  10. ...
  11. // 解析出 grpc-timeout
  12. case "grpc-timeout":
  13. d.data.timeoutSet = true
  14. var err error
  15. if d.data.timeout, err = decodeTimeout(f.Value); err != nil {
  16. d.data.grpcErr = status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
  17. }
  18. ...
  19. // 解析出 grpc 带 protobuf package path、Service name、RPC method name 的完整路径
  20. // 形如 /package.service/method
  21. case ":path":
  22. d.data.method = f.Value
  23. }
  24. }

至此可以看到,gRPC 框架确实是通过 HTTP2 HEADERS Frame 中的 “grpc-timeout”字段来实现跨进程传递超时时间。

总结

  • 客户端客户端发起 RPC 调用时传入了带 timeout 的 ctx
  • gRPC 框架底层通过 HTTP2 协议发送 RPC 请求时,将 timeout 值写入到 “grpc-timeout” HEADERS Frame 中
  • 服务端接收 RPC 请求时,gRPC 框架底层解析 HTTP2 HEADERS 帧,读取 “grpc-timeout”值,并覆盖透传到实际处理 RPC 请求的业务 gPRC Handle 中

转自:https://github.com/Zeb-D/my-review/blob/master/go/grpc%20%E8%B6%85%E6%97%B6%E4%BC%A0%E9%80%92%E5%8E%9F%E7%90%86.md#%E6%9C%8D%E5%8A%A1%E7%AB%AF%E8%A7%A3%E6%9E%90-timeout