流量控制

流量控制代码实现在transport包,主要包括
1. 传输层的流量控制

  1. stream层流量控制

而流量控制则包含输入流量和输出流量
则整体结构为:

  • 传输层的流量控制
    • 传输层写入控制
    • 传输层写出控制
  • stream层流量控制
    • stream层写入控制
    • stream层写出控制

传输层流量控制:

传输层流量控制原理:

loopyWriter.SendQuota【return,就return,不会挂起!】
传输层流量控制原理为:
[processData]每次发送方写出时,将减少窗口额度值,当窗口额度不足时,就会不再向外发送数据,执行其他任务[ctlBuffer.get(),handle()],直到等待接收方传回 WindowUpdate 帧后,继续发送。
接收方每次收到数据时,判断当前接收的数据是否超过 limit/4,如果超过,则向外发送WindowUpdate 帧, 发送方收到WindowUpdate 帧后,会更新发送窗口额度,继续完成发送。

原理如下:

流量控制 - 图1
主要实现为;loopyWriter 和 trInFlow
代码实现:
发送方写出时:
loopyWriter.processData

  1. if l.sendQuota == 0 {
  2. return true, nil
  3. }
  4. //...
  5. // 总发送配额小于流配额
  6. if l.sendQuota < uint32(size) { // connection-level flow control.
  7. size = int(l.sendQuota)
  8. }
  9. //...
  10. // 写出成功后,更新剩余配额
  11. l.sendQuota -= uint32(size)

发送方收到incomingWindowUpdate时

  1. // tcp传输层的,streamId=0,增加SendQuota
  2. if w.streamID == 0 {
  3. l.sendQuota += w.increment
  4. return nil
  5. }

接收方:
http2Server.HandleStreams,当有Dataframe时,会调用handleData方法,首先会获取数据长度,然后调用

  1. if w := t.fc.onData(size); w > 0 {
  2. t.controlBuf.put(&outgoingWindowUpdate{
  3. streamID: 0, // 注意,传输层 的StreamId为0
  4. increment: w,
  5. })
  6. }

终上,传输层的流量控制主要取决于接收方 处理数据的 大小。接收方收到数据后才会发送 windowUpdate 帧,更新发送方帧的大小。

Stream层流量控制

1. 写入写出控制原理:

stream层流量写入主要由Stream.inFlow 控制器来控制,而写出则由outStream【在activityStream】来做控制;

  • 流入控制:

    每次接收到data frame时,会调用fc.onData()增加pendingData的值,如果大于limit + f.delta,则报错;成功后后将数据写到 数据缓冲区【recvBuffer】中。 而每次读数据的时候,则会调用 inFlow的OnRead,减少 pendingData的值,然后判断是否发送stream 的 windowUpdate frame。

注:对于flag=FlagDataPadded 的数据帧,则会立即调用 onRead(size - uint32(len(f.Data())),并发送windowUpdate 帧。 因为 size - uint32(len(f.Data())) 表示 帧头的的大小,需要保证已接收到了帧头的数据。

  • 写出控制:

    每次写出数据后【l.framer.fr.WriteData】,就会累计输出流的str.bytesOutStanding += size,每次收到带 streamId的 windowUpdate frame,就会恢复【str.bytesOutStanding -= int(w.increment)】, 每次写之前,都会判断写额度是否充足,不足就挂起 outStream【str.state = waitingOnStreamQuota】,直到下次收到windowUpdate frame时,重新设置成activite状态,放入l.activeStreams。

原理基本与 传输层的流量控制一致,原理图:

流量控制 - 图2

2. outStream的代码实现如下:

  1. // #loopyWriter.processData
  2. // ... 状态控制代码,累计的流配额比滑动窗口还大
  3. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
  4. // 改变状态,等待配额
  5. str.state = waitingOnStreamQuota
  6. return false, nil
  7. }
  8. // ... 写出过程
  9. if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
  10. return false, err
  11. }
  12. // 更新buf,将已发送的去除
  13. buf = buf[size:]
  14. // 更新这个流已发送的字节
  15. str.bytesOutStanding += size
  16. // ...
  17. // windowUpdateFrame 的恢复代码 #loopyWriter.incomingWindowUpdateHandler
  18. // Find the stream and update it.
  19. if str, ok := l.estdStreams[w.streamID]; ok {
  20. str.bytesOutStanding -= int(w.increment)
  21. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
  22. str.state = active
  23. l.activeStreams.enqueue(str)
  24. return nil
  25. }
  26. }
  27. //

3. inFlow 流入控制代码实现如下:

  1. // 控制器层 :据帧时调用 onData。它更新pendingData。
  2. func (f *inFlow) onData(n uint32) error {
  3. f.mu.Lock()
  4. f.pendingData += n
  5. if f.pendingData+f.pendingUpdate > f.limit+f.delta {
  6. limit := f.limit
  7. rcvd := f.pendingData + f.pendingUpdate
  8. f.mu.Unlock()
  9. return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
  10. }
  11. f.mu.Unlock()
  12. return nil
  13. }
  14. // onRead is invoked when the application reads the data. It returns the window size
  15. // to be sent to the peer.
  16. //onRead 在应用程序读取数据时被调用。它返回要发送给对等方的窗口大小
  17. func (f *inFlow) onRead(n uint32) uint32 {
  18. f.mu.Lock()
  19. if f.pendingData == 0 {
  20. f.mu.Unlock()
  21. return 0
  22. }
  23. f.pendingData -= n
  24. if n > f.delta {
  25. n -= f.delta
  26. f.delta = 0
  27. } else {
  28. f.delta -= n
  29. n = 0
  30. }
  31. f.pendingUpdate += n
  32. // update缓冲区
  33. if f.pendingUpdate >= f.limit/4 {
  34. wu := f.pendingUpdate
  35. f.pendingUpdate = 0
  36. f.mu.Unlock()
  37. return wu
  38. }
  39. f.mu.Unlock()
  40. return 0
  41. }
  42. ### tramsport 层:
  43. func (t *http2Server) handleData(f *http2.DataFrame) {
  44. //...
  45. if err := s.fc.onData(size); err != nil {
  46. t.closeStream(s, true, http2.ErrCodeFlowControl, false)
  47. return
  48. }
  49. //...
  50. }
  51. // ... 发送窗口更新
  52. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  53. if w := s.fc.onRead(n); w > 0 {
  54. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  55. increment: w,
  56. })
  57. }
  58. }

4. 缓冲区层流量限制:writeQuota:

缓冲区写控制主要由WriteQuota控制的,WriteQuota控制的是 写入ControlBuffer和str.itemList的大小,控制缓冲区的使用,与传输层流量控制不一样。而bytesOutStanding 是outStream是上的一个属性,用于记录发送了多少字节大小的数据。

  1. WriteQuota的写控制原理为:

    每次发送方写出时,将减少窗口额度值,然后把数据封装成dataFrame,放入到ControlBuffer 中,当窗口额度不足时,就会挂起协程!不再向外发送数据。 直到loopyWriter处理数据完成后【将数据write到frame后】,会把w.quota添加回去,如果加回去之后 quota小于零 && quota 大于 -sz,说明修复完quota后有可用的字节数,则唤醒get中等待的协程。

一 、WriteQuota代码实现如下:

http2Server.Write

  1. // ...
  2. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  3. select {
  4. case <-t.done:
  5. return ErrConnClosing
  6. default:
  7. }
  8. return ContextErr(s.ctx.Err())
  9. }
  10. return t.controlBuf.put(df)
  11. // -------------------- writeQuota代码--
  12. // get 的时候就减少 quota
  13. func (w *writeQuota) get(sz int32) error {
  14. for {
  15. if atomic.LoadInt32(&w.quota) > 0 {
  16. atomic.AddInt32(&w.quota, -sz)
  17. return nil
  18. }
  19. // 小于零,就switch ,阻塞!
  20. select {
  21. case <-w.ch:
  22. continue
  23. case <-w.done:
  24. return errStreamDone
  25. }
  26. }
  27. }
  28. // ---- 数据恢复部分---
  29. (l *loopyWriter) processData()(bool, error){
  30. ...
  31. str.wq.replenish(size)
  32. ...
  33. }
  34. // 唤醒部分:
  35. func (w *writeQuota) realReplenish(n int) {
  36. sz := int32(n)
  37. a := atomic.AddInt32(&w.quota, sz)
  38. b := a - sz
  39. // b 小于零,即
  40. // quota小于零 && quota 大于 -sz,说明修复完quota后有可用的字节数,则唤醒get中等待的协程
  41. if b <= 0 && a > 0 {
  42. select {
  43. case w.ch <- struct{}{}:
  44. default:
  45. }
  46. }
  47. }

//原理图:

流量控制 - 图3

难点:限流器,滑动窗口,发送额度的限制

每次发送,都会减少额度,收到window update后,增加额度。
单个流流量控制

整体流程图如下:

流量控制 - 图4

流量控制总结:

传输层

传输层流量控制通过 loopyWriter中的SendQuota 控制,SendQuota的值是传输层初始化窗口大小(initialWindowSize),每次向外发送数据都会判断该值然后累减,然后loopy接收到streamId = 0 d的windowUpdate帧时,会累加SendQuota;接收端通过trInflow 组件,每次接受到DataFrame帧后,都会调用OnData,累加自身的unAcked,当unacked大于limit/4时,会回传一个 windowUpdate 帧给 发送方;【压缩整流作用】

stream层流量控制:

写出

每次写出数据后【l.framer.fr.WriteData】,就会累计输出流的str.bytesOutStanding += size,每次收到带 streamId的 windowUpdate frame,就会恢复【str.bytesOutStanding -= int(w.increment)】, 每次写之前,都会判断写额度是否充足【int(l.oiws) - str.bytesOutStanding <=0】,不足就挂起 outStream【str.state = waitingOnStreamQuota】,直到下次收到windowUpdate frame时,重新设置成activite状态,放入l.activeStreams。

流入

每次接收到data frame时,会调用fc.onData()增加pendingData的值,如果大于limit + f.delta,则报错;成功后后将数据写到 数据缓冲区【recvBuffer】中。 而每次读数据的时候,则会调用 inFlow的OnRead,减少 pendingData的值,然后判断是否发送stream 的 windowUpdate frame。