server启动过程

定义server

这里pb.UnimplementedGreeterServer被嵌入了server结构,所以即使没有实现SayHello方法,编译也能通过。
但是,我们通常要强制server在编译期就必须实现对应的方法,所以生产中建议不嵌入。

实现自己的业务逻辑

  1. func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error){
  2. //
  3. }

注册TCP监听端口

lis, err := net.Listen("tcp", port)

因为gRPC的应用层是基于HTTP2的,所以这里不出意外,监听的是tcp端口

grpc.NewServer()

  1. 入参为选项参数options
  2. 自带一组defaultServerOptions,最大发送size、最大接收size、连接超时、发送缓冲、接收缓冲


    重点:

    pb.RegisterGreeterServer(s, &server{})

    调用的是pb里面的RegisterGreeterServer,该方法会传入_Greeter_serviceDesc,和srv,将_Greeter_serviceDesc,将proto生成的服务描述注册到grpc包的server的map中。上文说过,_Greeter_serviceDesc的method的handle方法,会调用我们重写的 方法。

s.Serve(lis)

  1. 加锁,修改server状态
  2. 记录listener,生成唯一ID
  3. 一直循环,rawConn, err := lis.Accept()
  4. 起go协程处理rawConn
  5. 将tcp连接封装对应的creds认证信息【TLS、astl】
  6. newHTTP2Transport,完成http2协议握手,设置http2相关参数,起go协程处理http2连接,go 调用keepalive方法
  7. serveStreams 方法处理http2 连接,

/————————————————- 调用过程—————————————————-/

  1. handleStream处理http2的帧,完成帧处理后,每一个header帧,都会用一个新协程处理,其余帧补充更新传输设置或补充接受的数据。回调进入handleStream,前半段被拆为service,后者为method,通过map查找【即是一个conn,可以发多个header帧,每个header帧可以调用不同的method】
  2. method在processUnaryRPC或者处理,stream在processStreamingRPC处理
  3. processUnaryRPC为例:先从流中获取压缩方式,校验本地是否包含解压方式 decomp = encoding.GetCompressor(rc)
  4. recvAndDecompress,recvMsg读出遍历recvBuffer,解压出[]byte 结构的 data 。
  5. 定义一个方法df,解码,然后传入MethodDesc.handle方法中。
  6. 在MethodDesc.handle【实际proto生成时,每个server会生成对应的ServiceDesc,里面的MethodDesc 的Handler最后会调用我们自己重写的方法。】中,回调12步的解码方法,获取请求,然后判断有无interceptor,若无,即直接调用我们重新的对应方法。
  7. 调用完MethodDesc 的handler方法后,得到reply, appErr,appErr不为空,则尝试封装为grpc内部错误,否则为Unknow。
  8. 调用sendResponse,根据ContentSubtype编码,然后压缩,成bytes,接着用err = t.Write(stream, hdr, payload, opts),将帧信息传回去,完成返回。
  9. t.Write() 判断有没发送过HeaderFrame【CAS stream.headerSent字段判断】 ,若无,则先发送HeaderFrame,再将将数据封装成dataFrame,传到ControlBuffer
  10. loopyWriter 调用 controlBuf.get();然后调用loopyWriter.handle();将dataFrame 传入到 stream的itemList,并将 stream 入队 activityStreams
  11. loopyWriter 调用 processData();获取activityStreams 的第一个活跃流,然后获取到itemList,将itemList的数据写出到framer中;

服务启动与调用 - 图1

客户端调用流程

grpc.Dial

新建一个conn连接,这里是一个支持HTTP2.0的客户端,暂不细讲

pb.NewGreeterClient(conn)
新建一个client,包装对应的method,方便调用SayHello


调用SayHello

r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}


func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

连接过程

  1. conn,err 调用dial方法,调用到DialContext方法,提取拦截器,注册频道号,。。。根据Scheme获取 resolverBuilder
  2. newccResolverWrapper,调用resolverBuilder.build(),将clientConn传入给resolver;
  3. resolverBuilder build完后,调用 clientConn.resolverWrapper.UpdateState(resolver.State{})
  4. 然后调用updateResolverState ->maybeApplyDefaultServiceConfig -> cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) ->
  5. 调用 applyServiceConfigAndBalancer ,获取负载均衡Name,cc.switchBalancer(Name);根据name 查找builder【balancer.Get(name)】; 生成负载均衡器Wrapper
  6. cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
  7. go ccb.watch();; 更新状态相关!
  8. ccb.balancer = b.Build(ccb, bopts);调用build,生成balancer.Balancer,然后返回ccbw
  9. 调用ccBalancerWrapper.updateClientConnState -> balancer.V2Balancer.UpdateClientConnState;创建subConn; UpdateState 更新成Idle,将addressConn 加入到picker,然后添加picker到clientConn中
  10. 将cc更新成Idle
  11. balancer.sc.Connect();更新adressConn 的为connecting ; 调用 go ac.resetTransport()
  12. 更新解析器状态,然后调用UpdateClientConnState 更新负载均衡器的连接状态
  13. 调用adressConn .conn(),创建transport
  14. startHealthCheck,将clientConn的状态 改为Ready;
  15. 如果阻塞,则一直轮询判断连接状态==connectivity.Ready
  16. 返回ClientConn

调用过程

  1. 核心调用的是 Invoke 方法,具体实现要看grpc.ClientConn中
  2. grpc.ClientConn中实现了Invoke方法,在call.go文件中,详情都在invoke中
  3. newClientStream 封装clientStream(传入调用包、调用方法,压缩、解压等其他选项)
  4. 调用newAttemptLocked,从cc.blockingpicker.pick,获取transport
  5. 创建csAttempt,调用负载均衡器的pick方法 ,获取传输。
  6. 调用csAttemp.t.sendMsg发送消息
  7. 调用sendMessage
    1. 校验连接是否关闭
    2. hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) ,压缩,解压之类的,加上header
    3. 校验payload 的大小
    4. a.t.Write.. 发送数据
  8. 调用clientStream.RecvMsg()
  9. 调用recv,recvAndDecompress,解压缩。
  10. c.Unmarshal(d, m) 解码。

调用流程图:

服务启动与调用 - 图2

简单来说:

TCP握手成功后,将连接升级为http2连接,然后通过内部的组件: controlBuf 和 loopyWriter 来处理连接。

server和client 都有这个组件,服务端、客户端 发送或处理请求,都会将相应的请求封装成各种frame 入队到controlBuffer,然后loopyWriter run方法一直会 从 controlbuffer中get() 一个frame【handle it】,然后处理 这个frame,然后发送数据(processData);从中伴随着流量控制 【trInflow,sendQuota ; inflow /outstream】