DialContext

DialContext是客户端建立连接的入口函数,我们看看在这个函数里面做了哪些事情:

  1. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  2. // 1.创建ClientConn结构体
  3. cc := &ClientConn{
  4. target: target,
  5. ...
  6. }
  7. // 2.解析target
  8. cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
  9. // 3.根据解析的target找到合适的resolverBuilder
  10. resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
  11. // 4.创建Resolver
  12. rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  13. // 5.完事
  14. return cc, nil
  15. }

显而易见,在省略了亿点点细节之后,我们发现建立连接的过程其实也很简单,我们梳理一遍:
因为gRPC没有提供服务注册,服务发现的功能,所以需要开发者自己编写服务发现的逻辑:也就是Resolver——解析器
在得到了解析的结果,也就是一连串的IP地址之后,需要对其中的IP进行选择,也就是Balancer

Builder的获取

我们从Builder开始讲起。

  1. cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)

关于ParseTarget的逻辑我们用简单一句话来概括:获取开发者传入的target参数的地址类型,在后续查找适合这种类型地址的Builder
然后我们来看查找Builder的这部分操作,这部分代码比较简单,我在代码中加了一些注释:

  1. resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
  2. func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
  3. // 先查看是否在配置中存在resolver
  4. for _, rb := range cc.dopts.resolvers {
  5. if scheme == rb.Scheme() {
  6. return rb
  7. }
  8. }
  9. // 如果配置中没有相应的resolver,再从注册的resolver中寻找
  10. return resolver.Get(scheme)
  11. }
  12. // 可以看出,ResolverBuilder是从m这个map里面找到的
  13. func Get(scheme string) Builder {
  14. if b, ok := m[scheme]; ok {
  15. return b
  16. }
  17. return nil
  18. }

看到这里我们可以推测:对于每个**ResolverBuilder**,是需要提前注册的
我们找到Resolver的代码中,果然发现他在init()的时候注册了自己。

  1. func init() {
  2. resolver.Register(&passthroughBuilder{})
  3. }
  4. // 注册Resolver,即是把自己加入map中
  5. func Register(b Builder) {
  6. m[b.Scheme()] = b
  7. }

至此,我们已经研究完了Builder的注册和获取。

ResolverWrapper的创建

回到ClientConn的创建过程中,在获取到了ResolverBuilder之后,进行下一步的操作:

  1. rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  2. ...
  3. cc.resolverWrapper = rWrapper
  4. ...

在创建ResolverBuilder后,设置给cc,就没什么逻辑了
gRPC为了实现插件式的Resolver,因此采用了装饰器模式,创建了一个ResolverWrapper
我们看看在创建ResolverWrapper的细节:

  1. func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  2. ccr := &ccResolverWrapper{
  3. cc: cc,
  4. done: grpcsync.NewEvent(),
  5. }
  6. // 根据传入的Builder,创建resolver,并放入wrapper中
  7. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
  8. return ccr, nil
  9. }

好,到了这里我们可以暂停一下。
我们停下来思考一下我们需要实现的功能:为了解耦ResolverBalancer,我们希望能够有一个中间的部分,接收到Resolver解析到的地址,然后对它们进行负载均衡。因此,在接下来的代码阅读过程中,我们可以带着这个问题:**Resolver****Balancer**的通信过程是什么样的?
再看上面的代码,ClientConn的创建已经结束了。那么我们可以推测,剩下的逻辑就在rb.Build(cc.parsedTarget, ccr, rbo)这一行代码里面。

此时的Builder是什么?

由于demo中没有使用解析器,会进入以下逻辑

  1. if resolverBuilder == nil {
  2. channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
  3. cc.parsedTarget = resolver.Target{
  4. Scheme: resolver.GetDefaultScheme(),
  5. Endpoint: target,
  6. }
  7. resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
  8. if resolverBuilder == nil {
  9. return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
  10. }
  11. }

调用cc.getResolver获取resolverBuilder

  1. func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
  2. for _, rb := range cc.dopts.resolvers {
  3. if scheme == rb.Scheme() {
  4. return rb
  5. }
  6. }
  7. return resolver.Get(scheme)
  8. }

注意次的Scheme,是默认的passthrough
image.png

passthrough哪来的?

框架自带,用init注册

  1. package passthrough
  2. import "google.golang.org/grpc/resolver"
  3. const scheme = "passthrough"
  4. type passthroughBuilder struct{}
  5. func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  6. r := &passthroughResolver{
  7. target: target,
  8. cc: cc,
  9. }
  10. r.start()
  11. return r, nil
  12. }
  13. func (*passthroughBuilder) Scheme() string {
  14. return scheme
  15. }
  16. type passthroughResolver struct {
  17. target resolver.Target
  18. cc resolver.ClientConn
  19. }
  20. func (r *passthroughResolver) start() {
  21. r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
  22. }
  23. func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}
  24. func (*passthroughResolver) Close() {}
  25. func init() {
  26. resolver.Register(&passthroughBuilder{})
  27. }

Build

build的逻辑很简单,代用了start()
start的逻辑也很简单,调用了r.cc.UpdateState

1. r.cc.UpdateState

注意,此时的cc 是 ccr ,类型为*ccResolverWrapper

  1. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
  2. if ccr.done.HasFired() {
  3. return
  4. }
  5. channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
  6. if channelz.IsOn() {
  7. ccr.addChannelzTraceEvent(s)
  8. }
  9. ccr.curState = s
  10. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  11. }

将ccr.curState 赋值为s后调用ccr.cc.updateResolverState()

2. ccr.cc.updateResolverState

  1. func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
  2. .......
  3. var ret error
  4. if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
  5. // 第一次进入这个函数调用
  6. cc.maybeApplyDefaultServiceConfig(s.Addresses)
  7. // TODO: do we need to apply a failing LB policy if there is no
  8. // default, per the error handling design?
  9. } else {
  10. if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
  11. configSelector := iresolver.GetConfigSelector(s)
  12. if configSelector != nil {
  13. if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
  14. channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
  15. }
  16. } else {
  17. configSelector = &defaultConfigSelector{sc}
  18. }
  19. cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
  20. } else {
  21. ret = balancer.ErrBadResolverState
  22. if cc.balancerWrapper == nil {
  23. var err error
  24. if s.ServiceConfig.Err != nil {
  25. err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
  26. } else {
  27. err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
  28. }
  29. cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
  30. cc.blockingpicker.updatePicker(base.NewErrPicker(err))
  31. cc.csMgr.updateState(connectivity.TransientFailure)
  32. cc.mu.Unlock()
  33. return ret
  34. }
  35. }
  36. }
  37. var balCfg serviceconfig.LoadBalancingConfig
  38. if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
  39. balCfg = cc.sc.lbConfig.cfg
  40. }
  41. cbn := cc.curBalancerName
  42. bw := cc.balancerWrapper
  43. cc.mu.Unlock()
  44. if cbn != grpclbName {
  45. // Filter any grpclb addresses since we don't have the grpclb balancer.
  46. for i := 0; i < len(s.Addresses); {
  47. if s.Addresses[i].Type == resolver.GRPCLB {
  48. copy(s.Addresses[i:], s.Addresses[i+1:])
  49. s.Addresses = s.Addresses[:len(s.Addresses)-1]
  50. continue
  51. }
  52. i++
  53. }
  54. }
  55. uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
  56. if ret == nil {
  57. ret = uccsErr // prefer ErrBadResolver state since any other error is
  58. // currently meaningless to the caller.
  59. }
  60. return ret
  61. }

Balancer创建

3. cc.maybeApplyDefaultServiceConfig

上面ccr.cc.updateResolverState,会调用cc.maybeApplyDefaultServiceConfig

  1. func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
  2. if cc.sc != nil {
  3. cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
  4. return
  5. }
  6. if cc.dopts.defaultServiceConfig != nil {
  7. cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
  8. } else {
  9. cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
  10. }
  11. }

4. cc.applyServiceConfigAndBalancer

  1. func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
  2. .....
  3. if cc.dopts.balancerBuilder == nil {
  4. var newBalancerName string
  5. if cc.sc != nil && cc.sc.lbConfig != nil {
  6. newBalancerName = cc.sc.lbConfig.name
  7. } else {
  8. var isGRPCLB bool
  9. for _, a := range addrs {
  10. if a.Type == resolver.GRPCLB {
  11. isGRPCLB = true
  12. break
  13. }
  14. }
  15. if isGRPCLB {
  16. newBalancerName = grpclbName
  17. } else if cc.sc != nil && cc.sc.LB != nil {
  18. newBalancerName = *cc.sc.LB
  19. } else {
  20. newBalancerName = PickFirstBalancerName
  21. }
  22. }
  23. cc.switchBalancer(newBalancerName)
  24. } else if cc.balancerWrapper == nil {
  25. // Balancer dial option was set, and this is the first time handling
  26. // resolved addresses. Build a balancer with dopts.balancerBuilder.
  27. cc.curBalancerName = cc.dopts.balancerBuilder.Name()
  28. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
  29. }
  30. }

image.png
可以看到newBalancerName 为 pick_first

5. cc.switchBalancer

  1. func (cc *ClientConn) switchBalancer(name string) {
  2. // 此时的builder是grpc.pickfirstBuilder
  3. builder := balancer.Get(name)
  4. ......
  5. cc.curBalancerName = builder.Name()
  6. cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
  7. }

默认balancerBuilder的获取与resolverBuilder一样,都是提前注册好的

  1. func init() {
  2. balancer.Register(newPickfirstBuilder())
  3. }

6. newCCBalancerWrapper

可以看到Balancer也有一个Wrapper

  1. type ccBalancerWrapper struct {
  2. cc *ClientConn
  3. balancerMu sync.Mutex // synchronizes calls to the balancer
  4. balancer balancer.Balancer
  5. scBuffer *buffer.Unbounded
  6. done *grpcsync.Event
  7. mu sync.Mutex
  8. subConns map[*acBalancerWrapper]struct{} // 没看懂这里为什么要map
  9. }
  1. func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
  2. ccb := &ccBalancerWrapper{
  3. cc: cc,
  4. scBuffer: buffer.NewUnbounded(),
  5. done: grpcsync.NewEvent(),
  6. subConns: make(map[*acBalancerWrapper]struct{}),
  7. }
  8. go ccb.watcher()
  9. ccb.balancer = b.Build(ccb, bopts)
  10. return ccb
  11. }
  12. // demo原因 这里返回的是pickfirstBuilder
  13. func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  14. return &pickfirstBalancer{cc: cc}
  15. }

addrConn连接

回到 2. ccr.cc.updateResolverState

7. bw.updateClientConnState

  1. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
  2. ccb.balancerMu.Lock()
  3. defer ccb.balancerMu.Unlock()
  4. return ccb.balancer.UpdateClientConnState(*ccs)
  5. }
  6. func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
  7. .....
  8. if b.sc == nil { // 第一次进来 b.sc balancer.SubConn 肯定是nil,所以进入if
  9. var err error
  10. b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
  11. if err != nil {
  12. if logger.V(2) {
  13. logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
  14. }
  15. b.state = connectivity.TransientFailure
  16. b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
  17. Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
  18. })
  19. return balancer.ErrBadResolverState
  20. }
  21. b.state = connectivity.Idle
  22. b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
  23. b.sc.Connect()
  24. } else {
  25. b.sc.UpdateAddresses(cs.ResolverState.Addresses)
  26. b.sc.Connect()
  27. }
  28. return nil
  29. }

8. b.cc.NewSubConn

  1. func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
  2. if len(addrs) <= 0 {
  3. return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
  4. }
  5. ccb.mu.Lock()
  6. defer ccb.mu.Unlock()
  7. if ccb.subConns == nil {
  8. return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
  9. }
  10. // 初始化addrConn
  11. ac, err := ccb.cc.newAddrConn(addrs, opts)
  12. if err != nil {
  13. return nil, err
  14. }
  15. // 把addrConn再包装下
  16. acbw := &acBalancerWrapper{ac: ac}
  17. acbw.ac.mu.Lock()
  18. ac.acbw = acbw
  19. acbw.ac.mu.Unlock()
  20. ccb.subConns[acbw] = struct{}{}
  21. return acbw, nil
  22. }

addrConn为什么还需要包装,额,是要实现接口,不得不说grpc的设计模式很棒棒

  1. // acBalancerWrapper is a wrapper on top of ac for balancers.
  2. // It implements balancer.SubConn interface.
  3. type acBalancerWrapper struct {
  4. mu sync.Mutex
  5. ac *addrConn
  6. }

9. ccb.cc.newAddrConn

  1. func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
  2. ac := &addrConn{
  3. state: connectivity.Idle,
  4. cc: cc,
  5. addrs: addrs,
  6. scopts: opts,
  7. dopts: cc.dopts,
  8. czData: new(channelzData),
  9. resetBackoff: make(chan struct{}),
  10. }
  11. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  12. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  13. cc.mu.Lock()
  14. .....
  15. // 这里设置了cc的conns字段
  16. // 因此得知cc.conns == ccb.subConns
  17. cc.conns[ac] = struct{}{}
  18. cc.mu.Unlock()
  19. return ac, nil
  20. }

回到7.updateClientConnState

10. b.cc.UpdateState

  1. func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
  2. ccb.mu.Lock()
  3. defer ccb.mu.Unlock()
  4. if ccb.subConns == nil {
  5. return
  6. }
  7. ccb.cc.blockingpicker.updatePicker(s.Picker)
  8. ccb.cc.csMgr.updateState(s.ConnectivityState)
  9. }

11. ccb.cc.blockingpicker.updatePicker

  1. // pickerWrapper是balance.picker的包装器。它阻塞某些拾取动作,并在拾取更新时解除阻塞
  2. // 它是在DialContext中初始化的
  3. type pickerWrapper struct {
  4. mu sync.Mutex
  5. done bool
  6. blockingCh chan struct{}
  7. picker balancer.Picker
  8. }
  1. // 更新pickerWrapper,将picker 设置为balance.picker
  2. func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
  3. pw.mu.Lock()
  4. if pw.done {
  5. pw.mu.Unlock()
  6. return
  7. }
  8. pw.picker = p
  9. // pw.blockingCh should never be nil.
  10. close(pw.blockingCh)
  11. pw.blockingCh = make(chan struct{})
  12. pw.mu.Unlock()
  13. }

12. ccb.cc.csMgr.updateState

  1. // 根据上面流程,这里state是Idle状态
  2. // 这里只是保证connectivityStateManager的state,与传过来的state 保持一致
  3. func (csm *connectivityStateManager) updateState(state connectivity.State) {
  4. csm.mu.Lock()
  5. defer csm.mu.Unlock()
  6. if csm.state == connectivity.Shutdown {
  7. return
  8. }
  9. if csm.state == state {
  10. return
  11. }
  12. csm.state = state
  13. channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
  14. if csm.notifyChan != nil {
  15. // There are other goroutines waiting on this channel.
  16. close(csm.notifyChan)
  17. csm.notifyChan = nil
  18. }
  19. }

回到7.updateClientConnState

13. b.sc.Connect()

  1. func (acbw *acBalancerWrapper) Connect() {
  2. acbw.mu.Lock()
  3. defer acbw.mu.Unlock()
  4. acbw.ac.connect()
  5. }
  6. func (ac *addrConn) connect() error {
  7. ac.mu.Lock()
  8. if ac.state == connectivity.Shutdown {
  9. ac.mu.Unlock()
  10. return errConnClosing
  11. }
  12. // 必须是Idle才能过来进行连接
  13. if ac.state != connectivity.Idle {
  14. ac.mu.Unlock()
  15. return nil
  16. }
  17. // 更新锁内的连通性状态,以防止后续或并发调用多次重置传输
  18. // 很明显,将ac的状态更新为Connecting了
  19. ac.updateConnectivityState(connectivity.Connecting, nil)
  20. ac.mu.Unlock()
  21. // Start a goroutine connecting to the server asynchronously.
  22. go ac.resetTransport()
  23. return nil
  24. }

14. ac.resetTransport

  1. func (ac *addrConn) resetTransport() {
  2. for i := 0; ; i++ {
  3. if i > 0 {
  4. ac.cc.resolveNow(resolver.ResolveNowOptions{})
  5. }
  6. ac.mu.Lock()
  7. if ac.state == connectivity.Shutdown {
  8. ac.mu.Unlock()
  9. return
  10. }
  11. .......
  12. ac.updateConnectivityState(connectivity.Connecting, nil)
  13. ac.transport = nil
  14. ac.mu.Unlock()
  15. newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
  16. if err != nil {
  17. .....
  18. }
  19. ac.mu.Lock()
  20. if ac.state == connectivity.Shutdown {
  21. ac.mu.Unlock()
  22. newTr.Close()
  23. return
  24. }
  25. ac.curAddr = addr
  26. ac.transport = newTr
  27. ac.backoffIdx = 0
  28. hctx, hcancel := context.WithCancel(ac.ctx)
  29. ac.startHealthCheck(hctx)
  30. ac.mu.Unlock()
  31. // 阻塞,直到创建的传输关闭。当这种情况发生时,从addr列表的顶部重新启动。
  32. <-reconnect.Done()
  33. hcancel()
  34. 重新启动连接-循环的顶部将设置stateconnecting。这是违反当前的连接语义文档,但是它允许优雅
  35. 的行为尚未分派RPC -不幸的时间将导致RPC失败,即使TRANSIENT_FAILURE状态(由文档调用)将是瞬
  36. 时的。
  37. 理想情况下,我们应该在这里过渡到Idle并阻塞,直到出现RPC活动导致平衡器请求重新连接关联的
  38. SubConn
  39. }
  40. }

15. ac.tryAllAddrs

tryAllAddrs尝试创建到该地址的连接,成功一个就返回传输、地址和一个Event。当返回的传输断开连接时触发事件
这里埋个问题,第一个成功就返回了,那其他的链接是怎么链接上的

  1. func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
  2. var firstConnErr error
  3. for _, addr := range addrs {
  4. ac.mu.Lock()
  5. if ac.state == connectivity.Shutdown {
  6. ac.mu.Unlock()
  7. return nil, resolver.Address{}, nil, errConnClosing
  8. }
  9. ac.cc.mu.RLock()
  10. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  11. ac.cc.mu.RUnlock()
  12. copts := ac.dopts.copts
  13. if ac.scopts.CredsBundle != nil {
  14. copts.CredsBundle = ac.scopts.CredsBundle
  15. }
  16. ac.mu.Unlock()
  17. channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
  18. newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
  19. if err == nil {
  20. return newTr, addr, reconnect, nil
  21. }
  22. if firstConnErr == nil {
  23. firstConnErr = err
  24. }
  25. ac.cc.updateConnectionError(err)
  26. }
  27. // Couldn't connect to any address.
  28. return nil, resolver.Address{}, nil, firstConnErr
  29. }

16. ac.createTransport

createTransport创建到addr的连接。它返回传输和成功情况下的事件。事件在返回的传输时触发断开连接。

  1. func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
  2. prefaceReceived := make(chan struct{})
  3. onCloseCalled := make(chan struct{})
  4. reconnect := grpcsync.NewEvent()
  5. // addr.ServerName takes precedent over ClientConn authority, if present.
  6. if addr.ServerName == "" {
  7. addr.ServerName = ac.cc.authority
  8. }
  9. once := sync.Once{}
  10. onGoAway := func(r transport.GoAwayReason) {
  11. ac.mu.Lock()
  12. ac.adjustParams(r)
  13. once.Do(func() {
  14. if ac.state == connectivity.Ready {
  15. // Prevent this SubConn from being used for new RPCs by setting its
  16. // state to Connecting.
  17. //
  18. // TODO: this should be Idle when grpc-go properly supports it.
  19. ac.updateConnectivityState(connectivity.Connecting, nil)
  20. }
  21. })
  22. ac.mu.Unlock()
  23. reconnect.Fire()
  24. }
  25. onClose := func() {
  26. ac.mu.Lock()
  27. once.Do(func() {
  28. if ac.state == connectivity.Ready {
  29. // Prevent this SubConn from being used for new RPCs by setting its
  30. // state to Connecting.
  31. //
  32. // TODO: this should be Idle when grpc-go properly supports it.
  33. ac.updateConnectivityState(connectivity.Connecting, nil)
  34. }
  35. })
  36. ac.mu.Unlock()
  37. close(onCloseCalled)
  38. reconnect.Fire()
  39. }
  40. onPrefaceReceipt := func() {
  41. close(prefaceReceived)
  42. }
  43. connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
  44. defer cancel()
  45. if channelz.IsOn() {
  46. copts.ChannelzParentID = ac.channelzID
  47. }
  48. newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
  49. if err != nil {
  50. // newTr is either nil, or closed.
  51. channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
  52. return nil, nil, err
  53. }
  54. select {
  55. case <-time.After(time.Until(connectDeadline)):
  56. // We didn't get the preface in time.
  57. newTr.Close()
  58. channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
  59. return nil, nil, errors.New("timed out waiting for server handshake")
  60. case <-prefaceReceived:
  61. // We got the preface - huzzah! things are good.
  62. case <-onCloseCalled:
  63. // The transport has already closed - noop.
  64. return nil, nil, errors.New("connection closed")
  65. // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
  66. }
  67. return newTr, reconnect, nil
  68. }

17. transport.NewClientTransport

  1. func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
  2. return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
  3. }

newHTTP2Client基于HTTP2构造一个连接的ClientTransport到addr,并开始在其上接收消息。如果失败,则返回非nil错误失败。

  1. func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  2. scheme := "http"
  3. ...
  4. // 经理千心万苦,终于到了dial操作
  5. conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
  6. ...
  7. // Keepalive相关
  8. kp := opts.KeepaliveParams
  9. ...
  10. var (
  11. isSecure bool
  12. authInfo credentials.AuthInfo
  13. )
  14. // transportCreds,perRPCCreds https,凭证相关
  15. transportCreds := opts.TransportCredentials
  16. perRPCCreds := opts.PerRPCCredentials
  17. if b := opts.CredsBundle; b != nil {
  18. if t := b.TransportCredentials(); t != nil {
  19. transportCreds = t
  20. }
  21. if t := b.PerRPCCredentials(); t != nil {
  22. perRPCCreds = append(perRPCCreds, t)
  23. }
  24. }
  25. .....
  26. // 窗口,writeBufSize,readBufSize,maxHeaderListSize相关
  27. dynamicWindow := true
  28. icwz := int32(initialWindowSize)
  29. if opts.InitialConnWindowSize >= defaultWindowSize {
  30. icwz = opts.InitialConnWindowSize
  31. dynamicWindow = false
  32. }
  33. writeBufSize := opts.WriteBufferSize
  34. readBufSize := opts.ReadBufferSize
  35. maxHeaderListSize := defaultClientMaxHeaderListSize
  36. if opts.MaxHeaderListSize != nil {
  37. maxHeaderListSize = *opts.MaxHeaderListSize
  38. }
  39. // 初始化http2Client
  40. t := &http2Client{
  41. ctx: ctx,
  42. ctxDone: ctx.Done(), // Cache Done chan.
  43. cancel: cancel,
  44. userAgent: opts.UserAgent,
  45. conn: conn,
  46. remoteAddr: conn.RemoteAddr(),
  47. localAddr: conn.LocalAddr(),
  48. authInfo: authInfo,
  49. readerDone: make(chan struct{}),
  50. writerDone: make(chan struct{}),
  51. goAway: make(chan struct{}),
  52. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  53. fc: &trInFlow{limit: uint32(icwz)},
  54. scheme: scheme,
  55. activeStreams: make(map[uint32]*Stream),
  56. isSecure: isSecure,
  57. perRPCCreds: perRPCCreds,
  58. kp: kp,
  59. statsHandler: opts.StatsHandler,
  60. initialWindowSize: initialWindowSize,
  61. onPrefaceReceipt: onPrefaceReceipt,
  62. nextID: 1,
  63. maxConcurrentStreams: defaultMaxStreamsClient,
  64. streamQuota: defaultMaxStreamsClient,
  65. streamsQuotaAvailable: make(chan struct{}, 1),
  66. czData: new(channelzData),
  67. onGoAway: onGoAway,
  68. onClose: onClose,
  69. keepaliveEnabled: keepaliveEnabled,
  70. bufferPool: newBufferPool(),
  71. }
  72. ....
  73. // 健康检查,跟server端逻辑一样
  74. if t.keepaliveEnabled {
  75. t.kpDormancyCond = sync.NewCond(&t.mu)
  76. go t.keepalive()
  77. }
  78. // 为传入消息启动阅读器例行程序。每个传输都有一个专用的从网络中读取HTTP2帧
  79. // 的goroutine。然后将帧发送给相应的流实体。
  80. // 属于数据交互部分了
  81. go t.reader()
  82. // 看grpc抓包分析的部分,有个数据帧是Magic,这个就是发送Magic逻辑
  83. n, err := t.conn.Write(clientPreface)
  84. ...
  85. // 看grpc抓包分析的部分,有个数据帧是SETTINGS,这个就是发送SETTINGS逻辑
  86. ...
  87. err = t.framer.fr.WriteSettings(ss...)
  88. if err != nil {
  89. t.Close()
  90. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  91. }
  92. // 连接建立,如果窗口大小需要调整,发WINDOW_UPDATE数据帧
  93. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  94. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  95. t.Close()
  96. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  97. }
  98. }
  99. // 为该连接分配一个ID
  100. t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
  101. // 这里是将自带缓冲区写入net.conn
  102. if err := t.framer.writer.Flush(); err != nil {
  103. return nil, err
  104. }
  105. go func() {
  106. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  107. err := t.loopy.run()
  108. if err != nil {
  109. if logger.V(logLevel) {
  110. logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
  111. }
  112. }
  113. // If it's a connection error, let reader goroutine handle it
  114. // since there might be data in the buffers.
  115. if _, ok := err.(net.Error); !ok {
  116. t.conn.Close()
  117. }
  118. close(t.writerDone)
  119. }()
  120. return t, nil
  121. }
  1. const (
  2. // ClientPreface is the string that must be sent by new
  3. // connections from clients.
  4. ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
  5. // SETTINGS_MAX_FRAME_SIZE default
  6. // http://http2.github.io/http2-spec/#rfc.section.6.5.2
  7. initialMaxFrameSize = 16384
  8. // NextProtoTLS is the NPN/ALPN protocol negotiated during
  9. // HTTP/2's TLS setup.
  10. NextProtoTLS = "h2"
  11. // http://http2.github.io/http2-spec/#SettingValues
  12. initialHeaderTableSize = 4096
  13. initialWindowSize = 65535 // 6.9.2 Initial Flow Control Window Size
  14. defaultMaxReadFrameSize = 1 << 20
  15. )
  16. var (
  17. clientPreface = []byte(ClientPreface)
  18. )
  19. const (
  20. FrameData FrameType = 0x0
  21. FrameHeaders FrameType = 0x1
  22. FramePriority FrameType = 0x2
  23. FrameRSTStream FrameType = 0x3
  24. FrameSettings FrameType = 0x4
  25. FramePushPromise FrameType = 0x5
  26. FramePing FrameType = 0x6
  27. FrameGoAway FrameType = 0x7
  28. FrameWindowUpdate FrameType = 0x8
  29. FrameContinuation FrameType = 0x9
  30. )
  31. var frameName = map[FrameType]string{
  32. FrameData: "DATA",
  33. FrameHeaders: "HEADERS",
  34. FramePriority: "PRIORITY",
  35. FrameRSTStream: "RST_STREAM",
  36. FrameSettings: "SETTINGS",
  37. FramePushPromise: "PUSH_PROMISE",
  38. FramePing: "PING",
  39. FrameGoAway: "GOAWAY",
  40. FrameWindowUpdate: "WINDOW_UPDATE",
  41. FrameContinuation: "CONTINUATION",
  42. }

18. dial

拿到4层连接

  1. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
  2. address := addr.Addr
  3. networkType, ok := networktype.Get(addr)
  4. if fn != nil {
  5. if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
  6. // For backward compatibility, if the user dialed "unix:///path",
  7. // the passthrough resolver would be used and the user's custom
  8. // dialer would see "unix:///path". Since the unix resolver is used
  9. // and the address is now "/path", prepend "unix://" so the user's
  10. // custom dialer sees the same address.
  11. return fn(ctx, "unix://"+address)
  12. }
  13. return fn(ctx, address)
  14. }
  15. if !ok {
  16. networkType, address = parseDialTarget(address)
  17. }
  18. if networkType == "tcp" && useProxy {
  19. return proxyDial(ctx, address, grpcUA)
  20. }
  21. return (&net.Dialer{}).DialContext(ctx, networkType, address)
  22. }
  23. func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
  24. newAddr := addr
  25. proxyURL, err := mapAddress(ctx, addr)
  26. if err != nil {
  27. return nil, err
  28. }
  29. if proxyURL != nil {
  30. newAddr = proxyURL.Host
  31. }
  32. conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr)
  33. if err != nil {
  34. return
  35. }
  36. if proxyURL != nil {
  37. // proxy is disabled if proxyURL is nil.
  38. conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
  39. }
  40. return
  41. }

回到14. ac.resetTransport

  1. // 设置ac的相关信息
  2. ac.curAddr = addr
  3. ac.transport = newTr
  4. ac.backoffIdx = 0
  5. hctx, hcancel := context.WithCancel(ac.ctx)
  6. //开启 健康检查
  7. ac.startHealthCheck(hctx)
  8. ac.mu.Unlock()
  9. //如果请求并配置了健康检查,startHealthCheck将启动健康检查流(RPC)来监视此连接的健康统计信息。
  10. LB通道健康检查满足以下要求时,启用LB通道健康检查:
  11. 1. 它不被用户使用WithDisableHealthCheck DialOption禁用
  12. 2. internal.HealthCheckFunc是通过导入grpc/health包设置的
  13. 3.提供了一个带有非空healthCheckConfig字段的服务配置
  14. 4. 负载均衡器请求它
  15. //如果没有启动健康检查流,它将addrConn设置为READY。
  16. //呼叫者必须保持ac.mu。
  17. //因此我们的直连demo,不进行HealthCheck
  18. func (ac *addrConn) startHealthCheck(ctx context.Context) {
  19. ...
  20. }

至此,连接建立完成

总结

image.png