维护Grpc连接,与数据平面通信,推送Xds信息
初始化DiscoveryServer(XdsServer)
poilet主动推送消息给envoy
当注册中心中配置信息发生变化时,poilet监控到变化后,主动推送变更信息到数据平面(envoy)
图示
包结构图
UML时序图
说明
- ConfigController/ServiceController.informer感知到注册中心的变化后调用DiscoveryServer.ConfigUpdate方法把消息放入DiscoveryServer.pushChannel中
- 在DiscoveryServer.Start中开启了goruntine分别执行handleUpdates,sendPush
- handlerUpdates最终调用了debounce方法,从DiscoveryServer.pushChannel中取出信息,做消息合并和驱动都处理,然后推送到DiscoveryServer.pushQueue中
- sendPush方法最终doSendPushes方法,从DiscoveryServer.pushQueue取出数据,对数据进行markdown,放入XdsConnection.pushChannel中,供最终发送使用
- 在DiscoveryServer.Register方法中把DiscoveryServer注册到GrpcServer上,当有连接调用的时候,调用DiscoveryServer.StreamAggregatedResources方法,StreamAggregatedResources方法中从XdsConnection.pushChannel取出数据,调用DiscoveryStream.send方法,发送给数据平面(envoy)
代码分析
1,ServiceController/ConfigController感知到注册中心信息发生变化
// pilot/pkg/proxy/envoy/v2/discovery.go
// informer感知到注册中心配置发生变化后,回调DiscoveryServer.ConfigUpdate方法把消息放入DiscoveryServer.pushChannel中
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
inboundConfigUpdates.Increment()
s.pushChannel <- req
}
2,去抖动debounce设计
好处:避免集群过大的情况下,分发次数过多,导致的poilet过载及集群不稳定
原理:最小静默时间t1,距离上一次有更新的时间大于等于t1则推送一次
最大延迟时间t0,当一直有更新时,在达到最大延迟时间t0时强制推送一次
// pilot/pkg/proxy/envoy/v2/discovery.go
// 在DiscoveryServer.Start中启动协程运行handleUpdates方法进行防抖动处理
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
// 防抖动处理
go s.handleUpdates(stopCh)
go s.periodicRefreshMetrics(stopCh)
// 推送消息
go s.sendPushes(stopCh)
}
// 调用debounce并把s.Push方法传入供debounce使用
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
debounce(s.pushChannel, stopCh, s.Push)
}
// 防抖动设计的处理方法,
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
var timeChan <-chan time.Time
var startDebounce time.Time
var lastConfigUpdateTime time.Time
pushCounter := 0
debouncedEvents := 0
// 跟踪推送请求。 如果更新被反跳,它们将被合并。
var req *model.PushRequest
free := true
freeCh := make(chan struct{}, 1)
// 执行推送方法,即DiscoveryServer.Push
push := func(req *model.PushRequest) {
pushFn(req)
freeCh <- struct{}{}
}
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
//一段时间内没有收到新的PushRequest,再发起推送,即超过最大静默时间,强制推送
if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
if req != nil {
pushCounter++
adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents,
quietTime, eventDelay, req.Full)
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(DebounceAfter - quietTime)
}
}
for {
select {
...
case r := <-ch:
if !features.EnableEDSDebounce.Get() && !r.Full {
// trigger push now, just for EDS
go pushFn(r)
continue
}
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(DebounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
//合并连续发生的多个PushRequest
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}
3,推送消息到envoy
每个连接都有属于自己的StreamAggregatedResources方法,StreamAggregatedResources方法中从XdsConnection.PushChannel取出数据,调用DiscoveryServer.pushConnection方法,最终使用Discovery.DiscoveryStream.Send方法把数据发送到数据平面(envoy)
// pilot/pkg/proxy/envoy/v2/ads.go
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
...
con := newXdsConnection(peerAddr, stream)
...
for {
...
select {
...
// 从XdsConnection.PushChannel接收Service或者Config变化后的通知
case pushEv := <-con.pushChannel:
...
// 将变化内容推送到Envoy端
err := s.pushConnection(con, pushEv)
...
}
}
}
// pushConnection: 根据类型不同,按照顺序推送信息
func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
...
// 推送 cluster信息
if con.CDSWatch {
err := s.pushCds(con, pushEv.push, currentVersion)
if err != nil {
return err
}
}
// 推送Endpoint信息
if len(con.Clusters) > 0 {
err := s.pushEds(pushEv.push, con, currentVersion, nil)
if err != nil {
return err
}
}
// 推送listener信息
if con.LDSWatch {
err := s.pushLds(con, pushEv.push, currentVersion)
if err != nil {
return err
}
}
// 推送route信息
if len(con.Routes) > 0 {
err := s.pushRoute(con, pushEv.push, currentVersion)
if err != nil {
return err
}
}
...
}
// pushCds,pushEds,pushLds,pushRoute推送逻辑基本一致,这里以pushCds为例
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
...
// 推送开始时间
pushStart := time.Now()
rawClusters := s.generateRawClusters(con.node, push)
...
response := con.clusters(rawClusters, push.Version)
// 调用XdsConnection.send方法推送数据
err := con.send(response)
// 计算推送使用时间
cdsPushTime.Record(time.Since(pushStart).Seconds())
...
return nil
}
// 开启goruntine执行XdsConnection.stream.Send方法操作流直接发送DiscoveryResponse给数据平面(envoy)
func (conn *XdsConnection) send(res *xdsapi.DiscoveryResponse) error {
...
go func() {
// 发送数据
err := conn.stream.Send(res)
...
select {
case <-t.C:
// TODO: wait for ACK
adsLog.Infof("Timeout writing %s", conn.ConID)
xdsResponseWriteTimeouts.Increment()
return errors.New("timeout sending")
case err := <-done:
t.Stop()
return err
}
}
poilet接受envoy的请求
启动一个goruntine接受数据平面(envoy)的请求,根据请求类型的不通,构造不同类型的Xds Response返回给envoy
UML时序图
代码分析
根据请求类型推送对应类型消息
// pilot/pkg/proxy/envoy/v2/ads.go
// 创建一个XdsConnection连接对象
con := newXdsConnection(peerAddr, stream)
// 创建一个通道用以存放envoy发送的请求
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 启动一个goruntine,用con接受envoy的请求并放入reChannel中
go receiveThread(con, reqChannel, &receiveError)
for {
...
select {
//从reqChannel接收Envoy端主动发起的xDS请求
case discReq, ok := <-reqChannel:
...
//根据请求的类型构造相应的xDS Response并发送到Envoy端
switch discReq.TypeUrl {
case ClusterType:
...
err := s.pushCds(con, s.globalPushContext(), versionInfo())
...
case ListenerType:
...
err := s.pushLds(con, s.globalPushContext(), versionInfo())
...
case RouteType:
...
err := s.pushRoute(con, s.globalPushContext(), versionInfo())
...
case EndpointType:
...
err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
...
default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
...
}