server启动过程
定义server
这里pb.UnimplementedGreeterServer被嵌入了server结构,所以即使没有实现SayHello方法,编译也能通过。
但是,我们通常要强制server在编译期就必须实现对应的方法,所以生产中建议不嵌入。
实现自己的业务逻辑
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error){//}
注册TCP监听端口
lis, err := net.Listen("tcp", port)
因为gRPC的应用层是基于HTTP2的,所以这里不出意外,监听的是tcp端口
grpc.NewServer()
- 入参为选项参数options
- 自带一组defaultServerOptions,最大发送size、最大接收size、连接超时、发送缓冲、接收缓冲
重点:pb.RegisterGreeterServer(s, &server{})
调用的是pb里面的RegisterGreeterServer,该方法会传入_Greeter_serviceDesc,和srv,将_Greeter_serviceDesc,将proto生成的服务描述注册到grpc包的server的map中。上文说过,_Greeter_serviceDesc的method的handle方法,会调用我们重写的 方法。
s.Serve(lis)
- 加锁,修改server状态
- 记录listener,生成唯一ID
- 一直循环,rawConn, err := lis.Accept()
- 起go协程处理rawConn
- 将tcp连接封装对应的creds认证信息【TLS、astl】
- newHTTP2Transport,完成http2协议握手,设置http2相关参数,起go协程处理http2连接,go 调用keepalive方法
- serveStreams 方法处理http2 连接,
/————————————————- 调用过程—————————————————-/
- handleStream处理http2的帧,完成帧处理后,每一个header帧,都会用一个新协程处理,其余帧补充更新传输设置或补充接受的数据。回调进入handleStream,前半段被拆为service,后者为method,通过map查找【即是一个conn,可以发多个header帧,每个header帧可以调用不同的method】
- method在processUnaryRPC或者处理,stream在processStreamingRPC处理
- processUnaryRPC为例:先从流中获取压缩方式,校验本地是否包含解压方式 decomp = encoding.GetCompressor(rc)
- recvAndDecompress,recvMsg读出遍历recvBuffer,解压出[]byte 结构的 data 。
- 定义一个方法df,解码,然后传入MethodDesc.handle方法中。
- 在MethodDesc.handle【实际proto生成时,每个server会生成对应的ServiceDesc,里面的MethodDesc 的Handler最后会调用我们自己重写的方法。】中,回调12步的解码方法,获取请求,然后判断有无interceptor,若无,即直接调用我们重新的对应方法。
- 调用完MethodDesc 的handler方法后,得到reply, appErr,appErr不为空,则尝试封装为grpc内部错误,否则为Unknow。
- 调用sendResponse,根据ContentSubtype编码,然后压缩,成bytes,接着用err = t.Write(stream, hdr, payload, opts),将帧信息传回去,完成返回。
- t.Write() 判断有没发送过HeaderFrame【CAS stream.headerSent字段判断】 ,若无,则先发送HeaderFrame,再将将数据封装成dataFrame,传到ControlBuffer
- loopyWriter 调用 controlBuf.get();然后调用loopyWriter.handle();将dataFrame 传入到 stream的itemList,并将 stream 入队 activityStreams
- loopyWriter 调用 processData();获取activityStreams 的第一个活跃流,然后获取到itemList,将itemList的数据写出到framer中;

客户端调用流程
grpc.Dial
新建一个conn连接,这里是一个支持HTTP2.0的客户端,暂不细讲
pb.NewGreeterClient(conn)
新建一个client,包装对应的method,方便调用SayHello
调用SayHello
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
连接过程
- conn,err 调用dial方法,调用到DialContext方法,提取拦截器,注册频道号,。。。根据Scheme获取 resolverBuilder
- newccResolverWrapper,调用resolverBuilder.build(),将clientConn传入给resolver;
- resolverBuilder build完后,调用 clientConn.resolverWrapper.UpdateState(resolver.State{})
- 然后调用updateResolverState ->maybeApplyDefaultServiceConfig -> cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) ->
- 调用 applyServiceConfigAndBalancer ,获取负载均衡Name,cc.switchBalancer(Name);根据name 查找builder【balancer.Get(name)】; 生成负载均衡器Wrapper
- cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
- go ccb.watch();; 更新状态相关!
- ccb.balancer = b.Build(ccb, bopts);调用build,生成balancer.Balancer,然后返回ccbw
- 调用ccBalancerWrapper.updateClientConnState -> balancer.V2Balancer.UpdateClientConnState;创建subConn; UpdateState 更新成Idle,将addressConn 加入到picker,然后添加picker到clientConn中
- 将cc更新成Idle
- balancer.sc.Connect();更新adressConn 的为connecting ; 调用 go ac.resetTransport()
- 更新解析器状态,然后调用UpdateClientConnState 更新负载均衡器的连接状态
- 调用adressConn .conn(),创建transport
- startHealthCheck,将clientConn的状态 改为Ready;
- 如果阻塞,则一直轮询判断连接状态==connectivity.Ready
- 返回ClientConn
调用过程
- 核心调用的是 Invoke 方法,具体实现要看grpc.ClientConn中
- grpc.ClientConn中实现了Invoke方法,在call.go文件中,详情都在invoke中
- newClientStream 封装clientStream(传入调用包、调用方法,压缩、解压等其他选项)
- 调用newAttemptLocked,从cc.blockingpicker.pick,获取transport
- 创建csAttempt,调用负载均衡器的pick方法 ,获取传输。
- 调用csAttemp.t.sendMsg发送消息
- 调用sendMessage
- 校验连接是否关闭
- hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) ,压缩,解压之类的,加上header
- 校验payload 的大小
- a.t.Write.. 发送数据
- 调用clientStream.RecvMsg()
- 调用recv,recvAndDecompress,解压缩。
- c.Unmarshal(d, m) 解码。
调用流程图:

简单来说:
TCP握手成功后,将连接升级为http2连接,然后通过内部的组件: controlBuf 和 loopyWriter 来处理连接。
server和client 都有这个组件,服务端、客户端 发送或处理请求,都会将相应的请求封装成各种frame 入队到controlBuffer,然后loopyWriter run方法一直会 从 controlbuffer中get() 一个frame【handle it】,然后处理 这个frame,然后发送数据(processData);从中伴随着流量控制 【trInflow,sendQuota ; inflow /outstream】
