维护Grpc连接,与数据平面通信,推送Xds信息

初始化DiscoveryServer(XdsServer)

poilet主动推送消息给envoy

当注册中心中配置信息发生变化时,poilet监控到变化后,主动推送变更信息到数据平面(envoy)

图示

包结构图

UML时序图

DiscoveryServer(XdsServer) - 图1

说明

  1. ConfigController/ServiceController.informer感知到注册中心的变化后调用DiscoveryServer.ConfigUpdate方法把消息放入DiscoveryServer.pushChannel中
  2. 在DiscoveryServer.Start中开启了goruntine分别执行handleUpdates,sendPush
  3. handlerUpdates最终调用了debounce方法,从DiscoveryServer.pushChannel中取出信息,做消息合并和驱动都处理,然后推送到DiscoveryServer.pushQueue中
  4. sendPush方法最终doSendPushes方法,从DiscoveryServer.pushQueue取出数据,对数据进行markdown,放入XdsConnection.pushChannel中,供最终发送使用
  5. 在DiscoveryServer.Register方法中把DiscoveryServer注册到GrpcServer上,当有连接调用的时候,调用DiscoveryServer.StreamAggregatedResources方法,StreamAggregatedResources方法中从XdsConnection.pushChannel取出数据,调用DiscoveryStream.send方法,发送给数据平面(envoy)

代码分析

1,ServiceController/ConfigController感知到注册中心信息发生变化

  1. // pilot/pkg/proxy/envoy/v2/discovery.go
  2. // informer感知到注册中心配置发生变化后,回调DiscoveryServer.ConfigUpdate方法把消息放入DiscoveryServer.pushChannel中
  3. func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
  4. inboundConfigUpdates.Increment()
  5. s.pushChannel <- req
  6. }

2,去抖动debounce设计

好处:避免集群过大的情况下,分发次数过多,导致的poilet过载及集群不稳定
原理:最小静默时间t1,距离上一次有更新的时间大于等于t1则推送一次
最大延迟时间t0,当一直有更新时,在达到最大延迟时间t0时强制推送一次

// pilot/pkg/proxy/envoy/v2/discovery.go
// 在DiscoveryServer.Start中启动协程运行handleUpdates方法进行防抖动处理
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
   // 防抖动处理
   go s.handleUpdates(stopCh)

   go s.periodicRefreshMetrics(stopCh)

   // 推送消息
   go s.sendPushes(stopCh)
}
// 调用debounce并把s.Push方法传入供debounce使用
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {
   debounce(s.pushChannel, stopCh, s.Push)
}
// 防抖动设计的处理方法,
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
   var timeChan <-chan time.Time
   var startDebounce time.Time
   var lastConfigUpdateTime time.Time

   pushCounter := 0
   debouncedEvents := 0

   // 跟踪推送请求。 如果更新被反跳,它们将被合并。
   var req *model.PushRequest

   free := true
   freeCh := make(chan struct{}, 1)

   // 执行推送方法,即DiscoveryServer.Push 
   push := func(req *model.PushRequest) {
      pushFn(req)
      freeCh <- struct{}{}
   }

   pushWorker := func() {
      eventDelay := time.Since(startDebounce)
      quietTime := time.Since(lastConfigUpdateTime)
      //一段时间内没有收到新的PushRequest,再发起推送,即超过最大静默时间,强制推送
      if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
         if req != nil {
            pushCounter++
            adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
               pushCounter, debouncedEvents,
               quietTime, eventDelay, req.Full)

            free = false
            go push(req)
            req = nil
            debouncedEvents = 0
         }
      } else {
         timeChan = time.After(DebounceAfter - quietTime)
      }
   }

   for {
      select {
      ...
      case r := <-ch:
         if !features.EnableEDSDebounce.Get() && !r.Full {
            // trigger push now, just for EDS
            go pushFn(r)
            continue
         }

         lastConfigUpdateTime = time.Now()
         if debouncedEvents == 0 {
            timeChan = time.After(DebounceAfter)
            startDebounce = lastConfigUpdateTime
         }
         debouncedEvents++
         //合并连续发生的多个PushRequest 
         req = req.Merge(r)
      case <-timeChan:
         if free {
            pushWorker()
         }
      case <-stopCh:
         return
      }
   }
}

3,推送消息到envoy

每个连接都有属于自己的StreamAggregatedResources方法,StreamAggregatedResources方法中从XdsConnection.PushChannel取出数据,调用DiscoveryServer.pushConnection方法,最终使用Discovery.DiscoveryStream.Send方法把数据发送到数据平面(envoy)

// pilot/pkg/proxy/envoy/v2/ads.go
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    ...
    con := newXdsConnection(peerAddr, stream)
    ...
    for {
        ...
        select {
        ...
        // 从XdsConnection.PushChannel接收Service或者Config变化后的通知
        case pushEv := <-con.pushChannel:
            ...
            // 将变化内容推送到Envoy端
            err := s.pushConnection(con, pushEv)
            ...
        }
    }
}
// pushConnection: 根据类型不同,按照顺序推送信息
func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
    ...
    // 推送 cluster信息
    if con.CDSWatch {
        err := s.pushCds(con, pushEv.push, currentVersion)
        if err != nil {
            return err
        }
    }
    // 推送Endpoint信息
    if len(con.Clusters) > 0 {
        err := s.pushEds(pushEv.push, con, currentVersion, nil)
        if err != nil {
            return err
        }
    }
    // 推送listener信息
    if con.LDSWatch {
        err := s.pushLds(con, pushEv.push, currentVersion)
        if err != nil {
            return err
        }
    }
    // 推送route信息
    if len(con.Routes) > 0 {
        err := s.pushRoute(con, pushEv.push, currentVersion)
        if err != nil {
            return err
        }
    }
    ...
}
// pushCds,pushEds,pushLds,pushRoute推送逻辑基本一致,这里以pushCds为例
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
    ...
    // 推送开始时间
    pushStart := time.Now()
    rawClusters := s.generateRawClusters(con.node, push)
    ...
    response := con.clusters(rawClusters, push.Version)
    // 调用XdsConnection.send方法推送数据
    err := con.send(response)
    // 计算推送使用时间
    cdsPushTime.Record(time.Since(pushStart).Seconds())
    ...
    return nil
}
// 开启goruntine执行XdsConnection.stream.Send方法操作流直接发送DiscoveryResponse给数据平面(envoy)
func (conn *XdsConnection) send(res *xdsapi.DiscoveryResponse) error {
    ...
    go func() {
        // 发送数据  
        err := conn.stream.Send(res)
        ...
    select {
    case <-t.C:
        // TODO: wait for ACK
        adsLog.Infof("Timeout writing %s", conn.ConID)
        xdsResponseWriteTimeouts.Increment()
        return errors.New("timeout sending")
    case err := <-done:
        t.Stop()
        return err
    }
}

poilet接受envoy的请求

启动一个goruntine接受数据平面(envoy)的请求,根据请求类型的不通,构造不同类型的Xds Response返回给envoy

UML时序图

DiscoveryServer(XdsServer) - 图2

代码分析

根据请求类型推送对应类型消息

// pilot/pkg/proxy/envoy/v2/ads.go    

    // 创建一个XdsConnection连接对象
    con := newXdsConnection(peerAddr, stream)
      // 创建一个通道用以存放envoy发送的请求
    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 启动一个goruntine,用con接受envoy的请求并放入reChannel中
    go receiveThread(con, reqChannel, &receiveError)

    for {
        ...
        select {
         //从reqChannel接收Envoy端主动发起的xDS请求  
        case discReq, ok := <-reqChannel:
          ...
            //根据请求的类型构造相应的xDS Response并发送到Envoy端            
            switch discReq.TypeUrl {
            case ClusterType:
                ...
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
                ...
            case ListenerType:
                ...
                err := s.pushLds(con, s.globalPushContext(), versionInfo())
                ...
            case RouteType:
                ...
                err := s.pushRoute(con, s.globalPushContext(), versionInfo())
                ...
            case EndpointType:
                ...
                err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
                ...
            default:
                adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
            }

        ...
}