- DialContext
- Builder的获取
- ResolverWrapper的创建
- Balancer创建
- addrConn连接
- 回到 2. ccr.cc.updateResolverState
- 7. bw.updateClientConnState
- 8. b.cc.NewSubConn
- 9. ccb.cc.newAddrConn
- 回到7.updateClientConnState
- 10. b.cc.UpdateState
- 11. ccb.cc.blockingpicker.updatePicker
- 12. ccb.cc.csMgr.updateState
- 回到7.updateClientConnState
- 13. b.sc.Connect()
- 14. ac.resetTransport
- 15. ac.tryAllAddrs
- 16. ac.createTransport
- 17. transport.NewClientTransport
- 18. dial
- 回到14. ac.resetTransport
- 总结
DialContext
DialContext
是客户端建立连接的入口函数,我们看看在这个函数里面做了哪些事情:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
// 1.创建ClientConn结构体
cc := &ClientConn{
target: target,
...
}
// 2.解析target
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
// 3.根据解析的target找到合适的resolverBuilder
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
// 4.创建Resolver
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
// 5.完事
return cc, nil
}
显而易见,在省略了亿点点细节之后,我们发现建立连接的过程其实也很简单,我们梳理一遍:
因为gRPC没有提供服务注册,服务发现的功能,所以需要开发者自己编写服务发现的逻辑:也就是Resolver
——解析器。
在得到了解析的结果,也就是一连串的IP地址之后,需要对其中的IP进行选择,也就是Balancer
。
Builder的获取
我们从Builder
开始讲起。
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
关于ParseTarget
的逻辑我们用简单一句话来概括:获取开发者传入的target参数的地址类型,在后续查找适合这种类型地址的Builder
。
然后我们来看查找Builder
的这部分操作,这部分代码比较简单,我在代码中加了一些注释:
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
// 先查看是否在配置中存在resolver
for _, rb := range cc.dopts.resolvers {
if scheme == rb.Scheme() {
return rb
}
}
// 如果配置中没有相应的resolver,再从注册的resolver中寻找
return resolver.Get(scheme)
}
// 可以看出,ResolverBuilder是从m这个map里面找到的
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
看到这里我们可以推测:对于每个**ResolverBuilder**
,是需要提前注册的。
我们找到Resolver
的代码中,果然发现他在init()
的时候注册了自己。
func init() {
resolver.Register(&passthroughBuilder{})
}
// 注册Resolver,即是把自己加入map中
func Register(b Builder) {
m[b.Scheme()] = b
}
至此,我们已经研究完了Builder
的注册和获取。
ResolverWrapper的创建
回到ClientConn
的创建过程中,在获取到了ResolverBuilder
之后,进行下一步的操作:
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
...
cc.resolverWrapper = rWrapper
...
在创建ResolverBuilder
后,设置给cc,就没什么逻辑了gRPC
为了实现插件式的Resolver
,因此采用了装饰器模式,创建了一个ResolverWrapper
我们看看在创建ResolverWrapper
的细节:
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
ccr := &ccResolverWrapper{
cc: cc,
done: grpcsync.NewEvent(),
}
// 根据传入的Builder,创建resolver,并放入wrapper中
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
return ccr, nil
}
好,到了这里我们可以暂停一下。
我们停下来思考一下我们需要实现的功能:为了解耦Resolver
和Balancer
,我们希望能够有一个中间的部分,接收到Resolver
解析到的地址,然后对它们进行负载均衡。因此,在接下来的代码阅读过程中,我们可以带着这个问题:**Resolver**
和**Balancer**
的通信过程是什么样的?
再看上面的代码,ClientConn
的创建已经结束了。那么我们可以推测,剩下的逻辑就在rb.Build(cc.parsedTarget, ccr, rbo)
这一行代码里面。
此时的Builder是什么?
由于demo中没有使用解析器,会进入以下逻辑
if resolverBuilder == nil {
channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
}
调用cc.getResolver获取resolverBuilder
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
for _, rb := range cc.dopts.resolvers {
if scheme == rb.Scheme() {
return rb
}
}
return resolver.Get(scheme)
}
注意次的Scheme,是默认的passthrough
passthrough哪来的?
框架自带,用init注册
package passthrough
import "google.golang.org/grpc/resolver"
const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &passthroughResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}
func (*passthroughBuilder) Scheme() string {
return scheme
}
type passthroughResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (r *passthroughResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*passthroughResolver) Close() {}
func init() {
resolver.Register(&passthroughBuilder{})
}
Build
build的逻辑很简单,代用了start()
start的逻辑也很简单,调用了r.cc.UpdateState
1. r.cc.UpdateState
注意,此时的cc 是 ccr ,类型为*ccResolverWrapper
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.done.HasFired() {
return
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.curState = s
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
将ccr.curState 赋值为s后调用ccr.cc.updateResolverState()
2. ccr.cc.updateResolverState
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
.......
var ret error
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
// 第一次进入这个函数调用
cc.maybeApplyDefaultServiceConfig(s.Addresses)
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
configSelector := iresolver.GetConfigSelector(s)
if configSelector != nil {
if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
}
} else {
configSelector = &defaultConfigSelector{sc}
}
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
var err error
if s.ServiceConfig.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
return ret
}
}
}
var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
}
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
return ret
}
Balancer创建
3. cc.maybeApplyDefaultServiceConfig
上面ccr.cc.updateResolverState,会调用cc.maybeApplyDefaultServiceConfig
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
} else {
cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
}
}
4. cc.applyServiceConfigAndBalancer
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
.....
if cc.dopts.balancerBuilder == nil {
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}
}
可以看到newBalancerName 为 pick_first
5. cc.switchBalancer
func (cc *ClientConn) switchBalancer(name string) {
// 此时的builder是grpc.pickfirstBuilder
builder := balancer.Get(name)
......
cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}
默认balancerBuilder的获取与resolverBuilder一样,都是提前注册好的
func init() {
balancer.Register(newPickfirstBuilder())
}
6. newCCBalancerWrapper
可以看到Balancer也有一个Wrapper
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
scBuffer *buffer.Unbounded
done *grpcsync.Event
mu sync.Mutex
subConns map[*acBalancerWrapper]struct{} // 没看懂这里为什么要map
}
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
scBuffer: buffer.NewUnbounded(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
return ccb
}
// demo原因 这里返回的是pickfirstBuilder
func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &pickfirstBalancer{cc: cc}
}
addrConn连接
回到 2. ccr.cc.updateResolverState
7. bw.updateClientConnState
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
return ccb.balancer.UpdateClientConnState(*ccs)
}
func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
.....
if b.sc == nil { // 第一次进来 b.sc balancer.SubConn 肯定是nil,所以进入if
var err error
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
b.state = connectivity.TransientFailure
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
})
return balancer.ErrBadResolverState
}
b.state = connectivity.Idle
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
b.sc.Connect()
} else {
b.sc.UpdateAddresses(cs.ResolverState.Addresses)
b.sc.Connect()
}
return nil
}
8. b.cc.NewSubConn
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) <= 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
}
// 初始化addrConn
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
return nil, err
}
// 把addrConn再包装下
acbw := &acBalancerWrapper{ac: ac}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
addrConn为什么还需要包装,额,是要实现接口,不得不说grpc的设计模式很棒棒
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
}
9. ccb.cc.newAddrConn
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
.....
// 这里设置了cc的conns字段
// 因此得知cc.conns == ccb.subConns
cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil
}
回到7.updateClientConnState
10. b.cc.UpdateState
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return
}
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}
11. ccb.cc.blockingpicker.updatePicker
// pickerWrapper是balance.picker的包装器。它阻塞某些拾取动作,并在拾取更新时解除阻塞
// 它是在DialContext中初始化的
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
}
// 更新pickerWrapper,将picker 设置为balance.picker
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
return
}
pw.picker = p
// pw.blockingCh should never be nil.
close(pw.blockingCh)
pw.blockingCh = make(chan struct{})
pw.mu.Unlock()
}
12. ccb.cc.csMgr.updateState
// 根据上面流程,这里state是Idle状态
// 这里只是保证connectivityStateManager的state,与传过来的state 保持一致
func (csm *connectivityStateManager) updateState(state connectivity.State) {
csm.mu.Lock()
defer csm.mu.Unlock()
if csm.state == connectivity.Shutdown {
return
}
if csm.state == state {
return
}
csm.state = state
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
close(csm.notifyChan)
csm.notifyChan = nil
}
}
回到7.updateClientConnState
13. b.sc.Connect()
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.ac.connect()
}
func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
// 必须是Idle才能过来进行连接
if ac.state != connectivity.Idle {
ac.mu.Unlock()
return nil
}
// 更新锁内的连通性状态,以防止后续或并发调用多次重置传输
// 很明显,将ac的状态更新为Connecting了
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
go ac.resetTransport()
return nil
}
14. ac.resetTransport
func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
.......
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.transport = nil
ac.mu.Unlock()
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
if err != nil {
.....
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
return
}
ac.curAddr = addr
ac.transport = newTr
ac.backoffIdx = 0
hctx, hcancel := context.WithCancel(ac.ctx)
ac.startHealthCheck(hctx)
ac.mu.Unlock()
// 阻塞,直到创建的传输关闭。当这种情况发生时,从addr列表的顶部重新启动。
<-reconnect.Done()
hcancel()
重新启动连接-循环的顶部将设置state为connecting。这是违反当前的连接语义文档,但是它允许优雅
的行为尚未分派RPC -不幸的时间将导致RPC失败,即使TRANSIENT_FAILURE状态(由文档调用)将是瞬
时的。
理想情况下,我们应该在这里过渡到Idle并阻塞,直到出现RPC活动导致平衡器请求重新连接关联的
SubConn。
}
}
15. ac.tryAllAddrs
tryAllAddrs尝试创建到该地址的连接,成功一个就返回传输、地址和一个Event。当返回的传输断开连接时触发事件
这里埋个问题,第一个成功就返回了,那其他的链接是怎么链接上的
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
var firstConnErr error
for _, addr := range addrs {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return nil, resolver.Address{}, nil, errConnClosing
}
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
copts := ac.dopts.copts
if ac.scopts.CredsBundle != nil {
copts.CredsBundle = ac.scopts.CredsBundle
}
ac.mu.Unlock()
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
if err == nil {
return newTr, addr, reconnect, nil
}
if firstConnErr == nil {
firstConnErr = err
}
ac.cc.updateConnectionError(err)
}
// Couldn't connect to any address.
return nil, resolver.Address{}, nil, firstConnErr
}
16. ac.createTransport
createTransport创建到addr的连接。它返回传输和成功情况下的事件。事件在返回的传输时触发断开连接。
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
prefaceReceived := make(chan struct{})
onCloseCalled := make(chan struct{})
reconnect := grpcsync.NewEvent()
// addr.ServerName takes precedent over ClientConn authority, if present.
if addr.ServerName == "" {
addr.ServerName = ac.cc.authority
}
once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting, nil)
}
})
ac.mu.Unlock()
reconnect.Fire()
}
onClose := func() {
ac.mu.Lock()
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting, nil)
}
})
ac.mu.Unlock()
close(onCloseCalled)
reconnect.Fire()
}
onPrefaceReceipt := func() {
close(prefaceReceived)
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
return nil, nil, err
}
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
return newTr, reconnect, nil
}
17. transport.NewClientTransport
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}
newHTTP2Client基于HTTP2构造一个连接的ClientTransport到addr,并开始在其上接收消息。如果失败,则返回非nil错误失败。
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http"
...
// 经理千心万苦,终于到了dial操作
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
...
// Keepalive相关
kp := opts.KeepaliveParams
...
var (
isSecure bool
authInfo credentials.AuthInfo
)
// transportCreds,perRPCCreds https,凭证相关
transportCreds := opts.TransportCredentials
perRPCCreds := opts.PerRPCCredentials
if b := opts.CredsBundle; b != nil {
if t := b.TransportCredentials(); t != nil {
transportCreds = t
}
if t := b.PerRPCCredentials(); t != nil {
perRPCCreds = append(perRPCCreds, t)
}
}
.....
// 窗口,writeBufSize,readBufSize,maxHeaderListSize相关
dynamicWindow := true
icwz := int32(initialWindowSize)
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
maxHeaderListSize := defaultClientMaxHeaderListSize
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
}
// 初始化http2Client
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel,
userAgent: opts.UserAgent,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
activeStreams: make(map[uint32]*Stream),
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
statsHandler: opts.StatsHandler,
initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
....
// 健康检查,跟server端逻辑一样
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
}
// 为传入消息启动阅读器例行程序。每个传输都有一个专用的从网络中读取HTTP2帧
// 的goroutine。然后将帧发送给相应的流实体。
// 属于数据交互部分了
go t.reader()
// 看grpc抓包分析的部分,有个数据帧是Magic,这个就是发送Magic逻辑
n, err := t.conn.Write(clientPreface)
...
// 看grpc抓包分析的部分,有个数据帧是SETTINGS,这个就是发送SETTINGS逻辑
...
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
}
// 连接建立,如果窗口大小需要调整,发WINDOW_UPDATE数据帧
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
}
}
// 为该连接分配一个ID
t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
// 这里是将自带缓冲区写入net.conn
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
// If it's a connection error, let reader goroutine handle it
// since there might be data in the buffers.
if _, ok := err.(net.Error); !ok {
t.conn.Close()
}
close(t.writerDone)
}()
return t, nil
}
const (
// ClientPreface is the string that must be sent by new
// connections from clients.
ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
// SETTINGS_MAX_FRAME_SIZE default
// http://http2.github.io/http2-spec/#rfc.section.6.5.2
initialMaxFrameSize = 16384
// NextProtoTLS is the NPN/ALPN protocol negotiated during
// HTTP/2's TLS setup.
NextProtoTLS = "h2"
// http://http2.github.io/http2-spec/#SettingValues
initialHeaderTableSize = 4096
initialWindowSize = 65535 // 6.9.2 Initial Flow Control Window Size
defaultMaxReadFrameSize = 1 << 20
)
var (
clientPreface = []byte(ClientPreface)
)
const (
FrameData FrameType = 0x0
FrameHeaders FrameType = 0x1
FramePriority FrameType = 0x2
FrameRSTStream FrameType = 0x3
FrameSettings FrameType = 0x4
FramePushPromise FrameType = 0x5
FramePing FrameType = 0x6
FrameGoAway FrameType = 0x7
FrameWindowUpdate FrameType = 0x8
FrameContinuation FrameType = 0x9
)
var frameName = map[FrameType]string{
FrameData: "DATA",
FrameHeaders: "HEADERS",
FramePriority: "PRIORITY",
FrameRSTStream: "RST_STREAM",
FrameSettings: "SETTINGS",
FramePushPromise: "PUSH_PROMISE",
FramePing: "PING",
FrameGoAway: "GOAWAY",
FrameWindowUpdate: "WINDOW_UPDATE",
FrameContinuation: "CONTINUATION",
}
18. dial
拿到4层连接
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
address := addr.Addr
networkType, ok := networktype.Get(addr)
if fn != nil {
if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
// For backward compatibility, if the user dialed "unix:///path",
// the passthrough resolver would be used and the user's custom
// dialer would see "unix:///path". Since the unix resolver is used
// and the address is now "/path", prepend "unix://" so the user's
// custom dialer sees the same address.
return fn(ctx, "unix://"+address)
}
return fn(ctx, address)
}
if !ok {
networkType, address = parseDialTarget(address)
}
if networkType == "tcp" && useProxy {
return proxyDial(ctx, address, grpcUA)
}
return (&net.Dialer{}).DialContext(ctx, networkType, address)
}
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
newAddr := addr
proxyURL, err := mapAddress(ctx, addr)
if err != nil {
return nil, err
}
if proxyURL != nil {
newAddr = proxyURL.Host
}
conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr)
if err != nil {
return
}
if proxyURL != nil {
// proxy is disabled if proxyURL is nil.
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
return
}
回到14. ac.resetTransport
// 设置ac的相关信息
ac.curAddr = addr
ac.transport = newTr
ac.backoffIdx = 0
hctx, hcancel := context.WithCancel(ac.ctx)
//开启 健康检查
ac.startHealthCheck(hctx)
ac.mu.Unlock()
//如果请求并配置了健康检查,startHealthCheck将启动健康检查流(RPC)来监视此连接的健康统计信息。
当LB通道健康检查满足以下要求时,启用LB通道健康检查:
1. 它不被用户使用WithDisableHealthCheck DialOption禁用
2. internal.HealthCheckFunc是通过导入grpc/health包设置的
3.提供了一个带有非空healthCheckConfig字段的服务配置
4. 负载均衡器请求它
//如果没有启动健康检查流,它将addrConn设置为READY。
//呼叫者必须保持ac.mu。
//因此我们的直连demo,不进行HealthCheck
func (ac *addrConn) startHealthCheck(ctx context.Context) {
...
}
至此,连接建立完成
总结
�