流量控制
流量控制代码实现在transport包,主要包括
1. 传输层的流量控制
- stream层流量控制
而流量控制则包含输入流量和输出流量
则整体结构为:
- 传输层的流量控制
- 传输层写入控制
- 传输层写出控制
- stream层流量控制
- stream层写入控制
- stream层写出控制
传输层流量控制:
传输层流量控制原理:
loopyWriter.SendQuota【return,就return,不会挂起!】
传输层流量控制原理为:
[processData]每次发送方写出时,将减少窗口额度值,当窗口额度不足时,就会不再向外发送数据,执行其他任务[ctlBuffer.get(),handle()],直到等待接收方传回 WindowUpdate 帧后,继续发送。
接收方每次收到数据时,判断当前接收的数据是否超过 limit/4,如果超过,则向外发送WindowUpdate 帧, 发送方收到WindowUpdate 帧后,会更新发送窗口额度,继续完成发送。
原理如下:

主要实现为;loopyWriter 和 trInFlow
代码实现:
发送方写出时:
loopyWriter.processData
if l.sendQuota == 0 {return true, nil}//...// 总发送配额小于流配额if l.sendQuota < uint32(size) { // connection-level flow control.size = int(l.sendQuota)}//...// 写出成功后,更新剩余配额l.sendQuota -= uint32(size)
发送方收到incomingWindowUpdate时
// tcp传输层的,streamId=0,增加SendQuotaif w.streamID == 0 {l.sendQuota += w.incrementreturn nil}
接收方:
http2Server.HandleStreams,当有Dataframe时,会调用handleData方法,首先会获取数据长度,然后调用
if w := t.fc.onData(size); w > 0 {t.controlBuf.put(&outgoingWindowUpdate{streamID: 0, // 注意,传输层 的StreamId为0increment: w,})}
终上,传输层的流量控制主要取决于接收方 处理数据的 大小。接收方收到数据后才会发送 windowUpdate 帧,更新发送方帧的大小。
Stream层流量控制
1. 写入写出控制原理:
stream层流量写入主要由Stream.inFlow 控制器来控制,而写出则由outStream【在activityStream】来做控制;
- 流入控制:
每次接收到data frame时,会调用fc.onData()增加pendingData的值,如果大于limit + f.delta,则报错;成功后后将数据写到 数据缓冲区【recvBuffer】中。 而每次读数据的时候,则会调用 inFlow的OnRead,减少 pendingData的值,然后判断是否发送stream 的 windowUpdate frame。
注:对于flag=FlagDataPadded 的数据帧,则会立即调用 onRead(size - uint32(len(f.Data())),并发送windowUpdate 帧。 因为 size - uint32(len(f.Data())) 表示 帧头的的大小,需要保证已接收到了帧头的数据。
- 写出控制:
每次写出数据后【l.framer.fr.WriteData】,就会累计输出流的str.bytesOutStanding += size,每次收到带 streamId的 windowUpdate frame,就会恢复【str.bytesOutStanding -= int(w.increment)】, 每次写之前,都会判断写额度是否充足,不足就挂起 outStream【str.state = waitingOnStreamQuota】,直到下次收到windowUpdate frame时,重新设置成activite状态,放入l.activeStreams。
原理基本与 传输层的流量控制一致,原理图:
2. outStream的代码实现如下:
// #loopyWriter.processData// ... 状态控制代码,累计的流配额比滑动窗口还大if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.// 改变状态,等待配额str.state = waitingOnStreamQuotareturn false, nil}// ... 写出过程if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {return false, err}// 更新buf,将已发送的去除buf = buf[size:]// 更新这个流已发送的字节str.bytesOutStanding += size// ...// windowUpdateFrame 的恢复代码 #loopyWriter.incomingWindowUpdateHandler// Find the stream and update it.if str, ok := l.estdStreams[w.streamID]; ok {str.bytesOutStanding -= int(w.increment)if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {str.state = activel.activeStreams.enqueue(str)return nil}}//
3. inFlow 流入控制代码实现如下:
// 控制器层 :据帧时调用 onData。它更新pendingData。func (f *inFlow) onData(n uint32) error {f.mu.Lock()f.pendingData += nif f.pendingData+f.pendingUpdate > f.limit+f.delta {limit := f.limitrcvd := f.pendingData + f.pendingUpdatef.mu.Unlock()return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)}f.mu.Unlock()return nil}// onRead is invoked when the application reads the data. It returns the window size// to be sent to the peer.//onRead 在应用程序读取数据时被调用。它返回要发送给对等方的窗口大小func (f *inFlow) onRead(n uint32) uint32 {f.mu.Lock()if f.pendingData == 0 {f.mu.Unlock()return 0}f.pendingData -= nif n > f.delta {n -= f.deltaf.delta = 0} else {f.delta -= nn = 0}f.pendingUpdate += n// update缓冲区if f.pendingUpdate >= f.limit/4 {wu := f.pendingUpdatef.pendingUpdate = 0f.mu.Unlock()return wu}f.mu.Unlock()return 0}### tramsport 层:func (t *http2Server) handleData(f *http2.DataFrame) {//...if err := s.fc.onData(size); err != nil {t.closeStream(s, true, http2.ErrCodeFlowControl, false)return}//...}// ... 发送窗口更新func (t *http2Server) updateWindow(s *Stream, n uint32) {if w := s.fc.onRead(n); w > 0 {t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,increment: w,})}}
4. 缓冲区层流量限制:writeQuota:
缓冲区写控制主要由WriteQuota控制的,WriteQuota控制的是 写入ControlBuffer和str.itemList的大小,控制缓冲区的使用,与传输层流量控制不一样。而bytesOutStanding 是outStream是上的一个属性,用于记录发送了多少字节大小的数据。
- WriteQuota的写控制原理为:
每次发送方写出时,将减少窗口额度值,然后把数据封装成dataFrame,放入到ControlBuffer 中,当窗口额度不足时,就会挂起协程!不再向外发送数据。 直到loopyWriter处理数据完成后【将数据write到frame后】,会把w.quota添加回去,如果加回去之后 quota小于零 && quota 大于 -sz,说明修复完quota后有可用的字节数,则唤醒get中等待的协程。
一 、WriteQuota代码实现如下:
http2Server.Write
// ...if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {select {case <-t.done:return ErrConnClosingdefault:}return ContextErr(s.ctx.Err())}return t.controlBuf.put(df)// -------------------- writeQuota代码--// get 的时候就减少 quotafunc (w *writeQuota) get(sz int32) error {for {if atomic.LoadInt32(&w.quota) > 0 {atomic.AddInt32(&w.quota, -sz)return nil}// 小于零,就switch ,阻塞!select {case <-w.ch:continuecase <-w.done:return errStreamDone}}}// ---- 数据恢复部分---(l *loopyWriter) processData()(bool, error){...str.wq.replenish(size)...}// 唤醒部分:func (w *writeQuota) realReplenish(n int) {sz := int32(n)a := atomic.AddInt32(&w.quota, sz)b := a - sz// b 小于零,即// quota小于零 && quota 大于 -sz,说明修复完quota后有可用的字节数,则唤醒get中等待的协程if b <= 0 && a > 0 {select {case w.ch <- struct{}{}:default:}}}
//原理图:

难点:限流器,滑动窗口,发送额度的限制
每次发送,都会减少额度,收到window update后,增加额度。
单个流流量控制
整体流程图如下:

流量控制总结:
传输层
传输层流量控制通过 loopyWriter中的SendQuota 控制,SendQuota的值是传输层初始化窗口大小(initialWindowSize),每次向外发送数据都会判断该值然后累减,然后loopy接收到streamId = 0 d的windowUpdate帧时,会累加SendQuota;接收端通过trInflow 组件,每次接受到DataFrame帧后,都会调用OnData,累加自身的unAcked,当unacked大于limit/4时,会回传一个 windowUpdate 帧给 发送方;【压缩整流作用】
stream层流量控制:
写出
每次写出数据后【l.framer.fr.WriteData】,就会累计输出流的str.bytesOutStanding += size,每次收到带 streamId的 windowUpdate frame,就会恢复【str.bytesOutStanding -= int(w.increment)】, 每次写之前,都会判断写额度是否充足【int(l.oiws) - str.bytesOutStanding <=0】,不足就挂起 outStream【str.state = waitingOnStreamQuota】,直到下次收到windowUpdate frame时,重新设置成activite状态,放入l.activeStreams。
流入
每次接收到data frame时,会调用fc.onData()增加pendingData的值,如果大于limit + f.delta,则报错;成功后后将数据写到 数据缓冲区【recvBuffer】中。 而每次读数据的时候,则会调用 inFlow的OnRead,减少 pendingData的值,然后判断是否发送stream 的 windowUpdate frame。
