创建时间: 2019/10/7 19:47
作者: sunpengwei1992@aliyun.com

gRPC已经断断续续写了七篇文章了,但基本都是属于gRPC的使用上,旨在帮助大家如何使用gRPC,了解gRPC的功能以及特性,并通过示例代码让大家能快速入门,对开发人员而言,一个可运行的demo是最好的教程,文章连接和可运行demo如下,今天我们开始深入一点gRPC服务是怎么启动的,一起揭开他的神秘面纱。

  1. gRPC的特性和背后设计的原则
  2. gRPC的接口描述语言ProtoBuffer
  3. gRPC之GoLang入门HelloWord
  4. gRPC之流式调用原理http2协议分析
  5. gRPC认证的多种方式实践
  6. gRPC拦截器那点事,希望帮到你
  7. gRPC注册中心,常用的注册中心你懂了吗?AP还是CP

    1. https://github.com/sunpengwei1992/go_common/tree/master/grpc/helloworld_demo

    一个gRPC-Server启动主要以下几行代码,如下一个简单的gRPC-Server就启动起来了,但其流程可不简单,简单的背后意味着封装,一行一行来分析

    1. func StartServer() {
    2. lis, _ := net.Listen("tcp", "127.0.0.1:8090")
    3. //创建一个grpc服务器对象
    4. gRpcServer := grpc.NewServer()
    5. pb.RegisterHelloServiceServer(gRpcServer, &impl.HelloServiceServer{})
    6. //开启服务端
    7. gRpcServer.Serve(lis)
    8. }

    下面这一行代码从表面很简单,创建了一个grpServer实例,但是这个实例的参数以及入参的参数是非常多了,弄明白了这些参数的含义,后面代码的阅读会舒畅很多

    1. gRpcServer := grpc.NewServer()`

    分析NewServer源码

    1. // NewServer creates a gRPC server which has no service registered and has not started to accept requests yet.
    2. //NewServer创建一个grpc服务器,该服务器没有注册服务,还不能开始接收请求
    3. func NewServer(opt ...ServerOption) *Server {
    4. opts := defaultServerOptions
    5. for _, o := range opt {
    6. o.apply(&opts)
    7. }
    8. s := &Server{
    9. lis: make(map[net.Listener]bool),
    10. opts: opts,
    11. conns: make(map[transport.ServerTransport]bool),
    12. m: make(map[string]*service),
    13. quit: grpcsync.NewEvent(),//退出事件
    14. done: grpcsync.NewEvent(),//完成事件
    15. czData: new(channelzData),
    16. }
    17. s.cv = sync.NewCond(&s.mu)
    18. if EnableTracing {
    19. _, file, line, _ := runtime.Caller(1)
    20. s.events = trace.NewEventLog("grpc.Server"))
    21. }
    22. if channelz.IsOn() {
    23. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
    24. }
    25. return s
    26. }

    上面的代码,总体四个步骤,当然,其中也有诸多细节,暂不深追究,后面会说到

  8. 接受入参参数切片,并把默认参数全部赋值到入参上

  9. 构造Server实例
  10. 判断是否tracing(链路跟踪),IsOn(返回channelz数据收集是否打开)
  11. 返回server实例

然后我们说一下入参和返回Server的结构体的参数组成都是什么含义?

ServerOption入参
  1. type serverOptions struct {
  2. creds credentials.TransportCredentials //cred证书
  3. codec baseCodec //序列化和反序列化
  4. cp Compressor //压缩接口
  5. dc Decompressor //解压缩接口
  6. unaryInt UnaryServerInterceptor //一元拦截器
  7. streamInt StreamServerInterceptor //流拦截器
  8. inTapHandle tap.ServerInHandle //见下面文档
  9. statsHandler stats.Handler //见下面文档
  10. maxConcurrentStreams uint32 //http2中最大的并发流个数
  11. maxReceiveMessageSize int //最大接受消息大小
  12. maxSendMessageSize int //最大发送消息大小
  13. unknownStreamDesc *StreamDesc
  14. keepaliveParams keepalive.ServerParameters //长连接的server参数
  15. keepalivePolicy keepalive.EnforcementPolicy
  16. //初始化stream的Window大小,下限值是64K,上限2^31
  17. initialWindowSize int32
  18. //初始化conn大小,一个conn会有多个stream, 等于上面的值 * 16 ,http2的限 制是大于0,默认一个连接有100个流,超过了就被拒绝
  19. initialConnWindowSize int32
  20. writeBufferSize int //写缓冲大小
  21. readBufferSize int //读缓冲大小
  22. connectionTimeout time.Duration //连接超时时间
  23. maxHeaderListSize *uint32 //最大头部大小
  24. }

cred证书是接口grpc内部是有实现的代码包如下,我们使用是只需要,调用方法传入证书文件就可以了

  1. \google.golang.org\grpc@v1.23.1\credentials\credentials.go
  2. creds, err := credentials.NewClientTLSFromFile("E:\\server.pem", "")

inTapHandle tap.ServerInHandle定义在服务器端创建新流之前运行的函数,如果你定义的这个ServerInhandler返回非nil错误,则服务端不会创建流,并将一个rst_stream流发送给具有refused_stream的客户端,它旨在用于您不想服务端创建新的流浪费资源来接受新客户端的情况,比如微服务常见的限流功能,注意,他是在每个conn的协程中执行,而不是在每个rpc的协程中执行中执行。

statsHandler stats.Handler这个接口中定义的方法主要是为统计做处理的,比如一次调用中的rpc和conn,默认的实现有如下

  1. ClientHandler //主要是Client端服务的(rpc,conn)跟踪和状态统计
  2. ServerHandler //主要是Server端服务的(rpc,conn)跟踪和状态统计
  3. statshandler //grpc测试用的

Server参数
  1. // Server is a gRPC server to serve RPC requests.
  2. type Server struct {
  3. opts serverOptions //上面介绍的就是
  4. mu sync.Mutex // guards following
  5. lis map[net.Listener]bool //服务端的监听地址
  6. //server transport是所有grpc服务器端传输实现的通用接口。
  7. conns map[transport.ServerTransport]bool
  8. serve bool //表示服务是否开启,在Serve()方法中赋值为true
  9. drain bool //在调用GracefulStop(优雅的停止服务)方法被赋值为true
  10. cv *sync.Cond // 当连接关闭以正常停止时发出信号
  11. m map[string]*service // service name -> service info
  12. events trace.EventLog //跟踪事件日志
  13. quit *grpcsync.Event //同步退出事件
  14. done *grpcsync.Event //同步完成事件
  15. channelzRemoveOnce sync.Once
  16. serveWG sync.WaitGroup //counts active Serve goroutines for GracefulStop
  17. channelzID int64 // channelz unique identification number
  18. czData *channelzData //存储一些conn的自增id数据
  19. }

分析RegisterHelloServiceServer源码

我们看到最上面的StartServer代码中调用了pb(proto buffer)的代码,这是自动生成的,这个方法的作用要把HelloService的实现注册到Server上,我们看下面的代码,这个方法的入参有两个,一个是NewServer创建的grpcServer实例,一个是HelloService的实现类,然后调用grpcServer的RegisterService的方法。

  1. func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) {
  2. s.RegisterService(&_HelloService_serviceDesc, srv)
  3. }

RegisterService方法如下,registerservice将服务及其实现注册到grpc服务器。它是从idl(接口描述语言 Interface Description Lanauage)的代码中调用的。这必须是在调用SERVE方法之前调用。s.register(sd, ss) 方法最终是吧服务的名称的和服务的描述信息注册到上面Server中的map[string]*service

  1. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  2. ht := reflect.TypeOf(sd.HandlerType).Elem()
  3. st := reflect.TypeOf(ss)
  4. if !st.Implements(ht) {
  5. grpclog.Fatalf("grpc: Server.RegisterService found")
  6. }
  7. //注册服务
  8. s.register(sd, ss)
  9. }
  10. //删除了非核心代码
  11. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  12. s.mu.Lock()
  13. defer s.mu.Unlock()
  14. //构造服务的实例
  15. srv := &service{
  16. server: ss, md: make(map[string]*MethodDesc),
  17. sd: make(map[string]*StreamDesc), mdata:sd.Metadata,
  18. }
  19. //放入方法
  20. for i := range sd.Methods {
  21. d := &sd.Methods[i]
  22. srv.md[d.MethodName] = d
  23. }
  24. //放入流
  25. for i := range sd.Streams {
  26. d := &sd.Streams[i]
  27. srv.sd[d.StreamName] = d
  28. }
  29. s.m[sd.ServiceName] = srv
  30. }

分析gRpcServer.Serve(lis)源码

Server()方法就正式开始监听客户端的连接,并开启协程处理客户端连接,方法核心步骤如下

  1. 加锁,初始化一些参数
  2. defer处理最后的资源情况
  3. for循环接受客户端的连接,每一个客户端的连接,开启一个协程处理
    1. func (s *Server) Serve(lis net.Listener) error {
    2. s.mu.Lock()
    3. s.serve = true
    4. s.serveWG.Add(1)
    5. //优雅的停止服务
    6. defer func() {
    7. s.serveWG.Done()
    8. if s.quit.HasFired() {
    9. <-s.done.Done()
    10. }
    11. }()
    12. //包装连接对象,并声明为true,代表有效
    13. ls := &listenSocket{Listener: lis}
    14. s.lis[ls] = true
    15. s.mu.Unlock()
    16. //清理资源
    17. defer func() {
    18. s.mu.Lock()
    19. if s.lis != nil && s.lis[ls] {
    20. ls.Close()
    21. delete(s.lis, ls)
    22. }
    23. s.mu.Unlock()
    24. }()
    25. //监听客户端连接
    26. for {
    27. rawConn, err := lis.Accept()
    28. s.serveWG.Add(1)
    29. //处理客户端请求
    30. go func() {
    31. s.handleRawConn(rawConn)
    32. s.serveWG.Done()
    33. }()
    34. }
    35. }
    我们可以看到 grpcServer整体启动流程是非常清晰的,而且go的代码阅读起来也比较清爽,但命名上大多都是缩写,需要结合上下文以及注释来仔细观察,今天的内容你掌握了吗?希望大家还是亲自去阅读一下源码,结合文章一起对照学习,这样记忆和理解也更深入,包括前面的一些基础练习也要自己实践,实践才是真理。

欢迎大家关注微信公众号:“golang那点事”,更多精彩期待你的到来
GoLang公众号.jpg