- DialContext
- Builder的获取
- ResolverWrapper的创建
- Balancer创建
- addrConn连接
- 回到 2. ccr.cc.updateResolverState
- 7. bw.updateClientConnState
- 8. b.cc.NewSubConn
- 9. ccb.cc.newAddrConn
- 回到7.updateClientConnState
- 10. b.cc.UpdateState
- 11. ccb.cc.blockingpicker.updatePicker
- 12. ccb.cc.csMgr.updateState
- 回到7.updateClientConnState
- 13. b.sc.Connect()
- 14. ac.resetTransport
- 15. ac.tryAllAddrs
- 16. ac.createTransport
- 17. transport.NewClientTransport
- 18. dial
- 回到14. ac.resetTransport
- 总结
DialContext
DialContext是客户端建立连接的入口函数,我们看看在这个函数里面做了哪些事情:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {// 1.创建ClientConn结构体cc := &ClientConn{target: target,...}// 2.解析targetcc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)// 3.根据解析的target找到合适的resolverBuilderresolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)// 4.创建ResolverrWrapper, err := newCCResolverWrapper(cc, resolverBuilder)// 5.完事return cc, nil}
显而易见,在省略了亿点点细节之后,我们发现建立连接的过程其实也很简单,我们梳理一遍:
因为gRPC没有提供服务注册,服务发现的功能,所以需要开发者自己编写服务发现的逻辑:也就是Resolver——解析器。
在得到了解析的结果,也就是一连串的IP地址之后,需要对其中的IP进行选择,也就是Balancer。
Builder的获取
我们从Builder开始讲起。
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
关于ParseTarget的逻辑我们用简单一句话来概括:获取开发者传入的target参数的地址类型,在后续查找适合这种类型地址的Builder。
然后我们来看查找Builder的这部分操作,这部分代码比较简单,我在代码中加了一些注释:
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)func (cc *ClientConn) getResolver(scheme string) resolver.Builder {// 先查看是否在配置中存在resolverfor _, rb := range cc.dopts.resolvers {if scheme == rb.Scheme() {return rb}}// 如果配置中没有相应的resolver,再从注册的resolver中寻找return resolver.Get(scheme)}// 可以看出,ResolverBuilder是从m这个map里面找到的func Get(scheme string) Builder {if b, ok := m[scheme]; ok {return b}return nil}
看到这里我们可以推测:对于每个**ResolverBuilder**,是需要提前注册的。
我们找到Resolver的代码中,果然发现他在init()的时候注册了自己。
func init() {resolver.Register(&passthroughBuilder{})}// 注册Resolver,即是把自己加入map中func Register(b Builder) {m[b.Scheme()] = b}
至此,我们已经研究完了Builder的注册和获取。
ResolverWrapper的创建
回到ClientConn的创建过程中,在获取到了ResolverBuilder之后,进行下一步的操作:
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)...cc.resolverWrapper = rWrapper...
在创建ResolverBuilder后,设置给cc,就没什么逻辑了gRPC为了实现插件式的Resolver,因此采用了装饰器模式,创建了一个ResolverWrapper
我们看看在创建ResolverWrapper的细节:
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {ccr := &ccResolverWrapper{cc: cc,done: grpcsync.NewEvent(),}// 根据传入的Builder,创建resolver,并放入wrapper中ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)return ccr, nil}
好,到了这里我们可以暂停一下。
我们停下来思考一下我们需要实现的功能:为了解耦Resolver和Balancer,我们希望能够有一个中间的部分,接收到Resolver解析到的地址,然后对它们进行负载均衡。因此,在接下来的代码阅读过程中,我们可以带着这个问题:**Resolver**和**Balancer**的通信过程是什么样的?
再看上面的代码,ClientConn的创建已经结束了。那么我们可以推测,剩下的逻辑就在rb.Build(cc.parsedTarget, ccr, rbo)这一行代码里面。
此时的Builder是什么?
由于demo中没有使用解析器,会进入以下逻辑
if resolverBuilder == nil {channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)cc.parsedTarget = resolver.Target{Scheme: resolver.GetDefaultScheme(),Endpoint: target,}resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)if resolverBuilder == nil {return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)}}
调用cc.getResolver获取resolverBuilder
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {for _, rb := range cc.dopts.resolvers {if scheme == rb.Scheme() {return rb}}return resolver.Get(scheme)}
注意次的Scheme,是默认的passthrough
passthrough哪来的?
框架自带,用init注册
package passthroughimport "google.golang.org/grpc/resolver"const scheme = "passthrough"type passthroughBuilder struct{}func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {r := &passthroughResolver{target: target,cc: cc,}r.start()return r, nil}func (*passthroughBuilder) Scheme() string {return scheme}type passthroughResolver struct {target resolver.Targetcc resolver.ClientConn}func (r *passthroughResolver) start() {r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})}func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}func (*passthroughResolver) Close() {}func init() {resolver.Register(&passthroughBuilder{})}
Build
build的逻辑很简单,代用了start()
start的逻辑也很简单,调用了r.cc.UpdateState
1. r.cc.UpdateState
注意,此时的cc 是 ccr ,类型为*ccResolverWrapper
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {if ccr.done.HasFired() {return}channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)if channelz.IsOn() {ccr.addChannelzTraceEvent(s)}ccr.curState = sccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))}
将ccr.curState 赋值为s后调用ccr.cc.updateResolverState()
2. ccr.cc.updateResolverState
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {.......var ret errorif cc.dopts.disableServiceConfig || s.ServiceConfig == nil {// 第一次进入这个函数调用cc.maybeApplyDefaultServiceConfig(s.Addresses)// TODO: do we need to apply a failing LB policy if there is no// default, per the error handling design?} else {if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {configSelector := iresolver.GetConfigSelector(s)if configSelector != nil {if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")}} else {configSelector = &defaultConfigSelector{sc}}cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)} else {ret = balancer.ErrBadResolverStateif cc.balancerWrapper == nil {var err errorif s.ServiceConfig.Err != nil {err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)} else {err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)}cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})cc.blockingpicker.updatePicker(base.NewErrPicker(err))cc.csMgr.updateState(connectivity.TransientFailure)cc.mu.Unlock()return ret}}}var balCfg serviceconfig.LoadBalancingConfigif cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {balCfg = cc.sc.lbConfig.cfg}cbn := cc.curBalancerNamebw := cc.balancerWrappercc.mu.Unlock()if cbn != grpclbName {// Filter any grpclb addresses since we don't have the grpclb balancer.for i := 0; i < len(s.Addresses); {if s.Addresses[i].Type == resolver.GRPCLB {copy(s.Addresses[i:], s.Addresses[i+1:])s.Addresses = s.Addresses[:len(s.Addresses)-1]continue}i++}}uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})if ret == nil {ret = uccsErr // prefer ErrBadResolver state since any other error is// currently meaningless to the caller.}return ret}
Balancer创建
3. cc.maybeApplyDefaultServiceConfig
上面ccr.cc.updateResolverState,会调用cc.maybeApplyDefaultServiceConfig
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {if cc.sc != nil {cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)return}if cc.dopts.defaultServiceConfig != nil {cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)} else {cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)}}
4. cc.applyServiceConfigAndBalancer
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {.....if cc.dopts.balancerBuilder == nil {var newBalancerName stringif cc.sc != nil && cc.sc.lbConfig != nil {newBalancerName = cc.sc.lbConfig.name} else {var isGRPCLB boolfor _, a := range addrs {if a.Type == resolver.GRPCLB {isGRPCLB = truebreak}}if isGRPCLB {newBalancerName = grpclbName} else if cc.sc != nil && cc.sc.LB != nil {newBalancerName = *cc.sc.LB} else {newBalancerName = PickFirstBalancerName}}cc.switchBalancer(newBalancerName)} else if cc.balancerWrapper == nil {// Balancer dial option was set, and this is the first time handling// resolved addresses. Build a balancer with dopts.balancerBuilder.cc.curBalancerName = cc.dopts.balancerBuilder.Name()cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)}}

可以看到newBalancerName 为 pick_first
5. cc.switchBalancer
func (cc *ClientConn) switchBalancer(name string) {// 此时的builder是grpc.pickfirstBuilderbuilder := balancer.Get(name)......cc.curBalancerName = builder.Name()cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)}
默认balancerBuilder的获取与resolverBuilder一样,都是提前注册好的
func init() {balancer.Register(newPickfirstBuilder())}
6. newCCBalancerWrapper
可以看到Balancer也有一个Wrapper
type ccBalancerWrapper struct {cc *ClientConnbalancerMu sync.Mutex // synchronizes calls to the balancerbalancer balancer.BalancerscBuffer *buffer.Unboundeddone *grpcsync.Eventmu sync.MutexsubConns map[*acBalancerWrapper]struct{} // 没看懂这里为什么要map}
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {ccb := &ccBalancerWrapper{cc: cc,scBuffer: buffer.NewUnbounded(),done: grpcsync.NewEvent(),subConns: make(map[*acBalancerWrapper]struct{}),}go ccb.watcher()ccb.balancer = b.Build(ccb, bopts)return ccb}// demo原因 这里返回的是pickfirstBuilderfunc (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {return &pickfirstBalancer{cc: cc}}
addrConn连接
回到 2. ccr.cc.updateResolverState
7. bw.updateClientConnState
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {ccb.balancerMu.Lock()defer ccb.balancerMu.Unlock()return ccb.balancer.UpdateClientConnState(*ccs)}func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {.....if b.sc == nil { // 第一次进来 b.sc balancer.SubConn 肯定是nil,所以进入ifvar err errorb.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})if err != nil {if logger.V(2) {logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)}b.state = connectivity.TransientFailureb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},})return balancer.ErrBadResolverState}b.state = connectivity.Idleb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})b.sc.Connect()} else {b.sc.UpdateAddresses(cs.ResolverState.Addresses)b.sc.Connect()}return nil}
8. b.cc.NewSubConn
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {if len(addrs) <= 0 {return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")}ccb.mu.Lock()defer ccb.mu.Unlock()if ccb.subConns == nil {return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")}// 初始化addrConnac, err := ccb.cc.newAddrConn(addrs, opts)if err != nil {return nil, err}// 把addrConn再包装下acbw := &acBalancerWrapper{ac: ac}acbw.ac.mu.Lock()ac.acbw = acbwacbw.ac.mu.Unlock()ccb.subConns[acbw] = struct{}{}return acbw, nil}
addrConn为什么还需要包装,额,是要实现接口,不得不说grpc的设计模式很棒棒
// acBalancerWrapper is a wrapper on top of ac for balancers.// It implements balancer.SubConn interface.type acBalancerWrapper struct {mu sync.Mutexac *addrConn}
9. ccb.cc.newAddrConn
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {ac := &addrConn{state: connectivity.Idle,cc: cc,addrs: addrs,scopts: opts,dopts: cc.dopts,czData: new(channelzData),resetBackoff: make(chan struct{}),}ac.ctx, ac.cancel = context.WithCancel(cc.ctx)// Track ac in cc. This needs to be done before any getTransport(...) is called.cc.mu.Lock().....// 这里设置了cc的conns字段// 因此得知cc.conns == ccb.subConnscc.conns[ac] = struct{}{}cc.mu.Unlock()return ac, nil}
回到7.updateClientConnState
10. b.cc.UpdateState
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {ccb.mu.Lock()defer ccb.mu.Unlock()if ccb.subConns == nil {return}ccb.cc.blockingpicker.updatePicker(s.Picker)ccb.cc.csMgr.updateState(s.ConnectivityState)}
11. ccb.cc.blockingpicker.updatePicker
// pickerWrapper是balance.picker的包装器。它阻塞某些拾取动作,并在拾取更新时解除阻塞// 它是在DialContext中初始化的type pickerWrapper struct {mu sync.Mutexdone boolblockingCh chan struct{}picker balancer.Picker}
// 更新pickerWrapper,将picker 设置为balance.pickerfunc (pw *pickerWrapper) updatePicker(p balancer.Picker) {pw.mu.Lock()if pw.done {pw.mu.Unlock()return}pw.picker = p// pw.blockingCh should never be nil.close(pw.blockingCh)pw.blockingCh = make(chan struct{})pw.mu.Unlock()}
12. ccb.cc.csMgr.updateState
// 根据上面流程,这里state是Idle状态// 这里只是保证connectivityStateManager的state,与传过来的state 保持一致func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = statechannelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}}
回到7.updateClientConnState
13. b.sc.Connect()
func (acbw *acBalancerWrapper) Connect() {acbw.mu.Lock()defer acbw.mu.Unlock()acbw.ac.connect()}func (ac *addrConn) connect() error {ac.mu.Lock()if ac.state == connectivity.Shutdown {ac.mu.Unlock()return errConnClosing}// 必须是Idle才能过来进行连接if ac.state != connectivity.Idle {ac.mu.Unlock()return nil}// 更新锁内的连通性状态,以防止后续或并发调用多次重置传输// 很明显,将ac的状态更新为Connecting了ac.updateConnectivityState(connectivity.Connecting, nil)ac.mu.Unlock()// Start a goroutine connecting to the server asynchronously.go ac.resetTransport()return nil}
14. ac.resetTransport
func (ac *addrConn) resetTransport() {for i := 0; ; i++ {if i > 0 {ac.cc.resolveNow(resolver.ResolveNowOptions{})}ac.mu.Lock()if ac.state == connectivity.Shutdown {ac.mu.Unlock()return}.......ac.updateConnectivityState(connectivity.Connecting, nil)ac.transport = nilac.mu.Unlock()newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)if err != nil {.....}ac.mu.Lock()if ac.state == connectivity.Shutdown {ac.mu.Unlock()newTr.Close()return}ac.curAddr = addrac.transport = newTrac.backoffIdx = 0hctx, hcancel := context.WithCancel(ac.ctx)ac.startHealthCheck(hctx)ac.mu.Unlock()// 阻塞,直到创建的传输关闭。当这种情况发生时,从addr列表的顶部重新启动。<-reconnect.Done()hcancel()重新启动连接-循环的顶部将设置state为connecting。这是违反当前的连接语义文档,但是它允许优雅的行为尚未分派RPC -不幸的时间将导致RPC失败,即使TRANSIENT_FAILURE状态(由文档调用)将是瞬时的。理想情况下,我们应该在这里过渡到Idle并阻塞,直到出现RPC活动导致平衡器请求重新连接关联的SubConn。}}
15. ac.tryAllAddrs
tryAllAddrs尝试创建到该地址的连接,成功一个就返回传输、地址和一个Event。当返回的传输断开连接时触发事件
这里埋个问题,第一个成功就返回了,那其他的链接是怎么链接上的
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {var firstConnErr errorfor _, addr := range addrs {ac.mu.Lock()if ac.state == connectivity.Shutdown {ac.mu.Unlock()return nil, resolver.Address{}, nil, errConnClosing}ac.cc.mu.RLock()ac.dopts.copts.KeepaliveParams = ac.cc.mkpac.cc.mu.RUnlock()copts := ac.dopts.coptsif ac.scopts.CredsBundle != nil {copts.CredsBundle = ac.scopts.CredsBundle}ac.mu.Unlock()channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)if err == nil {return newTr, addr, reconnect, nil}if firstConnErr == nil {firstConnErr = err}ac.cc.updateConnectionError(err)}// Couldn't connect to any address.return nil, resolver.Address{}, nil, firstConnErr}
16. ac.createTransport
createTransport创建到addr的连接。它返回传输和成功情况下的事件。事件在返回的传输时触发断开连接。
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {prefaceReceived := make(chan struct{})onCloseCalled := make(chan struct{})reconnect := grpcsync.NewEvent()// addr.ServerName takes precedent over ClientConn authority, if present.if addr.ServerName == "" {addr.ServerName = ac.cc.authority}once := sync.Once{}onGoAway := func(r transport.GoAwayReason) {ac.mu.Lock()ac.adjustParams(r)once.Do(func() {if ac.state == connectivity.Ready {// Prevent this SubConn from being used for new RPCs by setting its// state to Connecting.//// TODO: this should be Idle when grpc-go properly supports it.ac.updateConnectivityState(connectivity.Connecting, nil)}})ac.mu.Unlock()reconnect.Fire()}onClose := func() {ac.mu.Lock()once.Do(func() {if ac.state == connectivity.Ready {// Prevent this SubConn from being used for new RPCs by setting its// state to Connecting.//// TODO: this should be Idle when grpc-go properly supports it.ac.updateConnectivityState(connectivity.Connecting, nil)}})ac.mu.Unlock()close(onCloseCalled)reconnect.Fire()}onPrefaceReceipt := func() {close(prefaceReceived)}connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)defer cancel()if channelz.IsOn() {copts.ChannelzParentID = ac.channelzID}newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)if err != nil {// newTr is either nil, or closed.channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)return nil, nil, err}select {case <-time.After(time.Until(connectDeadline)):// We didn't get the preface in time.newTr.Close()channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)return nil, nil, errors.New("timed out waiting for server handshake")case <-prefaceReceived:// We got the preface - huzzah! things are good.case <-onCloseCalled:// The transport has already closed - noop.return nil, nil, errors.New("connection closed")// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.}return newTr, reconnect, nil}
17. transport.NewClientTransport
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)}
newHTTP2Client基于HTTP2构造一个连接的ClientTransport到addr,并开始在其上接收消息。如果失败,则返回非nil错误失败。
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {scheme := "http"...// 经理千心万苦,终于到了dial操作conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)...// Keepalive相关kp := opts.KeepaliveParams...var (isSecure boolauthInfo credentials.AuthInfo)// transportCreds,perRPCCreds https,凭证相关transportCreds := opts.TransportCredentialsperRPCCreds := opts.PerRPCCredentialsif b := opts.CredsBundle; b != nil {if t := b.TransportCredentials(); t != nil {transportCreds = t}if t := b.PerRPCCredentials(); t != nil {perRPCCreds = append(perRPCCreds, t)}}.....// 窗口,writeBufSize,readBufSize,maxHeaderListSize相关dynamicWindow := trueicwz := int32(initialWindowSize)if opts.InitialConnWindowSize >= defaultWindowSize {icwz = opts.InitialConnWindowSizedynamicWindow = false}writeBufSize := opts.WriteBufferSizereadBufSize := opts.ReadBufferSizemaxHeaderListSize := defaultClientMaxHeaderListSizeif opts.MaxHeaderListSize != nil {maxHeaderListSize = *opts.MaxHeaderListSize}// 初始化http2Clientt := &http2Client{ctx: ctx,ctxDone: ctx.Done(), // Cache Done chan.cancel: cancel,userAgent: opts.UserAgent,conn: conn,remoteAddr: conn.RemoteAddr(),localAddr: conn.LocalAddr(),authInfo: authInfo,readerDone: make(chan struct{}),writerDone: make(chan struct{}),goAway: make(chan struct{}),framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),fc: &trInFlow{limit: uint32(icwz)},scheme: scheme,activeStreams: make(map[uint32]*Stream),isSecure: isSecure,perRPCCreds: perRPCCreds,kp: kp,statsHandler: opts.StatsHandler,initialWindowSize: initialWindowSize,onPrefaceReceipt: onPrefaceReceipt,nextID: 1,maxConcurrentStreams: defaultMaxStreamsClient,streamQuota: defaultMaxStreamsClient,streamsQuotaAvailable: make(chan struct{}, 1),czData: new(channelzData),onGoAway: onGoAway,onClose: onClose,keepaliveEnabled: keepaliveEnabled,bufferPool: newBufferPool(),}....// 健康检查,跟server端逻辑一样if t.keepaliveEnabled {t.kpDormancyCond = sync.NewCond(&t.mu)go t.keepalive()}// 为传入消息启动阅读器例行程序。每个传输都有一个专用的从网络中读取HTTP2帧// 的goroutine。然后将帧发送给相应的流实体。// 属于数据交互部分了go t.reader()// 看grpc抓包分析的部分,有个数据帧是Magic,这个就是发送Magic逻辑n, err := t.conn.Write(clientPreface)...// 看grpc抓包分析的部分,有个数据帧是SETTINGS,这个就是发送SETTINGS逻辑...err = t.framer.fr.WriteSettings(ss...)if err != nil {t.Close()return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)}// 连接建立,如果窗口大小需要调整,发WINDOW_UPDATE数据帧if delta := uint32(icwz - defaultWindowSize); delta > 0 {if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {t.Close()return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)}}// 为该连接分配一个IDt.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)// 这里是将自带缓冲区写入net.connif err := t.framer.writer.Flush(); err != nil {return nil, err}go func() {t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)err := t.loopy.run()if err != nil {if logger.V(logLevel) {logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)}}// If it's a connection error, let reader goroutine handle it// since there might be data in the buffers.if _, ok := err.(net.Error); !ok {t.conn.Close()}close(t.writerDone)}()return t, nil}
const (// ClientPreface is the string that must be sent by new// connections from clients.ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"// SETTINGS_MAX_FRAME_SIZE default// http://http2.github.io/http2-spec/#rfc.section.6.5.2initialMaxFrameSize = 16384// NextProtoTLS is the NPN/ALPN protocol negotiated during// HTTP/2's TLS setup.NextProtoTLS = "h2"// http://http2.github.io/http2-spec/#SettingValuesinitialHeaderTableSize = 4096initialWindowSize = 65535 // 6.9.2 Initial Flow Control Window SizedefaultMaxReadFrameSize = 1 << 20)var (clientPreface = []byte(ClientPreface))const (FrameData FrameType = 0x0FrameHeaders FrameType = 0x1FramePriority FrameType = 0x2FrameRSTStream FrameType = 0x3FrameSettings FrameType = 0x4FramePushPromise FrameType = 0x5FramePing FrameType = 0x6FrameGoAway FrameType = 0x7FrameWindowUpdate FrameType = 0x8FrameContinuation FrameType = 0x9)var frameName = map[FrameType]string{FrameData: "DATA",FrameHeaders: "HEADERS",FramePriority: "PRIORITY",FrameRSTStream: "RST_STREAM",FrameSettings: "SETTINGS",FramePushPromise: "PUSH_PROMISE",FramePing: "PING",FrameGoAway: "GOAWAY",FrameWindowUpdate: "WINDOW_UPDATE",FrameContinuation: "CONTINUATION",}
18. dial
拿到4层连接
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {address := addr.AddrnetworkType, ok := networktype.Get(addr)if fn != nil {if networkType == "unix" && !strings.HasPrefix(address, "\x00") {// For backward compatibility, if the user dialed "unix:///path",// the passthrough resolver would be used and the user's custom// dialer would see "unix:///path". Since the unix resolver is used// and the address is now "/path", prepend "unix://" so the user's// custom dialer sees the same address.return fn(ctx, "unix://"+address)}return fn(ctx, address)}if !ok {networkType, address = parseDialTarget(address)}if networkType == "tcp" && useProxy {return proxyDial(ctx, address, grpcUA)}return (&net.Dialer{}).DialContext(ctx, networkType, address)}func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {newAddr := addrproxyURL, err := mapAddress(ctx, addr)if err != nil {return nil, err}if proxyURL != nil {newAddr = proxyURL.Host}conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr)if err != nil {return}if proxyURL != nil {// proxy is disabled if proxyURL is nil.conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)}return}
回到14. ac.resetTransport
// 设置ac的相关信息ac.curAddr = addrac.transport = newTrac.backoffIdx = 0hctx, hcancel := context.WithCancel(ac.ctx)//开启 健康检查ac.startHealthCheck(hctx)ac.mu.Unlock()//如果请求并配置了健康检查,startHealthCheck将启动健康检查流(RPC)来监视此连接的健康统计信息。当LB通道健康检查满足以下要求时,启用LB通道健康检查:1. 它不被用户使用WithDisableHealthCheck DialOption禁用2. internal.HealthCheckFunc是通过导入grpc/health包设置的3.提供了一个带有非空healthCheckConfig字段的服务配置4. 负载均衡器请求它//如果没有启动健康检查流,它将addrConn设置为READY。//呼叫者必须保持ac.mu。//因此我们的直连demo,不进行HealthCheckfunc (ac *addrConn) startHealthCheck(ctx context.Context) {...}
至此,连接建立完成
总结

�
