概念

ARQ: 自动重传请求 (Automatic Repeat-reQuest,ARQ) 是 OSI 模型中数据链路层的错误纠正协议之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correction

kcp 简介

kcp 是一个基于 udp 实现快速、可靠、向前纠错的的协议,能以比 TCP 浪费 10%-20% 的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如 UDP)的收发。查看官方文档kcp

kcp-go 是用 go 实现了 kcp 协议的一个库,其实 kcp 类似 tcp,协议的实现也很多参考 tcp 协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。
kcp 和 tcp 一样,也分客户端和监听端。

  1. +-+-+-+-+-+ +-+-+-+-+-+
  2. | Client | | Server |
  3. +-+-+-+-+-+ +-+-+-+-+-+
  4. |------ kcp data ------>|
  5. |<----- kcp data -------|

kcp 协议

layer model

  1. +----------------------+
  2. | Session |
  3. +----------------------+
  4. | KCP(ARQ) |
  5. +----------------------+
  6. | FEC(OPTIONAL) |
  7. +----------------------+
  8. | CRYPTO(OPTIONAL)|
  9. +----------------------+
  10. | UDP(Packet) |
  11. +----------------------+

KCP Header Format

  1. 4 1 1 2 (Byte)
  2. +---+---+---+---+---+---+---+---+
  3. | conv |cmd|frg| wnd |
  4. +---+---+---+---+---+---+---+---+
  5. | ts | sn |
  6. +---+---+---+---+---+---+---+---+
  7. | una | len |
  8. +---+---+---+---+---+---+---+---+
  9. | |
  10. + DATA +
  11. | |
  12. +---+---+---+---+---+---+---+---+

代码结构

  1. src/vendor/github.com/xtaci/kcp-go/
  2. ├── LICENSE
  3. ├── README.md
  4. ├── crypt.go 加解密实现
  5. ├── crypt_test.go
  6. ├── donate.png
  7. ├── fec.go 向前纠错实现
  8. ├── frame.png
  9. ├── kcp-go.png
  10. ├── kcp.go kcp协议实现
  11. ├── kcp_test.go
  12. ├── sess.go 会话管理实现
  13. ├── sess_test.go
  14. ├── snmp.go 数据统计实现
  15. ├── updater.go 任务调度实现
  16. ├── xor.go xor封装
  17. └── xor_test.go

着重研究两个文件kcp.gosess.go

kcp 浅析

kcp 是基于 udp 实现的,所有 udp 的实现这里不做介绍,kcp 做的事情就是怎么封装 udp 的数据和怎么解析 udp 的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍 kcp 客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。

kcp client 整体函数流

和 tcp 一样,kcp 要连接服务端需要先拨号,但是和 tcp 有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而 tcp 在这里需要三次握手。

  1. DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
  2. V
  3. net.DialUDP("udp", nil, udpaddr)
  4. V
  5. NewConn()
  6. V
  7. newUDPSession() {初始化UDPSession}
  8. V
  9. NewKCP() {初始化kcp}
  10. V
  11. updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
  12. V
  13. go sess.readLoop()
  14. V
  15. go s.receiver(chPacket)
  16. V
  17. s.kcpInput(data)
  18. V
  19. s.fecDecoder.decodeBytes(data)
  20. V
  21. s.kcp.Input(data, true, s.ackNoDelay)
  22. V
  23. kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
  24. V
  25. notifyReadEvent()

客户端大体的流程如上面所示,先Dial,建立 udp 连接,将这个连接封装成一个会话,然后启动一个 go 程,接收 udp 的消息。

kcp server 整体函数流

  1. ListenWithOptions()
  2. V
  3. net.ListenUDP()
  4. V
  5. ServerConn()
  6. V
  7. newFECDecoder()
  8. V
  9. go l.monitor() {从chPacket接收udp数据,写入kcp}
  10. V
  11. go l.receiver(chPacket) {从upd接收数据,并入队列}
  12. V
  13. newUDPSession()
  14. V
  15. updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
  16. V
  17. s.kcpInput(data)`
  18. V
  19. s.fecDecoder.decodeBytes(data)
  20. V
  21. s.kcp.Input(data, true, s.ackNoDelay)
  22. V
  23. kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
  24. V
  25. notifyReadEvent()

服务端的大体流程如上图所示,先Listen,启动 udp 监听,接着用一个 go 程监控 udp 的数据包,负责将不同 session 的数据写入不同的 udp 连接,然后解析封装将数据交给上层。

kcp 数据流详细解析

不管是 kcp 的客户端还是服务端,他们都有 io 行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。

kcp client 发送消息

  1. s.Write(b []byte)
  2. V
  3. s.kcp.WaitSnd() {}
  4. V
  5. s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
  6. V
  7. s.kcp.flush(false) [flush data to output] {
  8. if writeDelay==true {
  9. flush
  10. }else{
  11. 每隔`interval`时间flush一次
  12. }
  13. }
  14. V
  15. kcp.output(buffer, size)
  16. V
  17. s.output(buf)
  18. V
  19. s.conn.WriteTo(ext, s.remote)
  20. V
  21. s.conn..Conn.WriteTo(buf)

读写都是在sess.go文件中实现的,Write 方法:

  1. func (s *UDPSession) Write(b []byte) (n int, err error) {
  2. for {
  3. ...
  4. if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
  5. n = len(b)
  6. for {
  7. if len(b) <= int(s.kcp.mss) {
  8. s.kcp.Send(b)
  9. break
  10. } else {
  11. s.kcp.Send(b[:s.kcp.mss])
  12. b = b[s.kcp.mss:]
  13. }
  14. }
  15. if !s.writeDelay {
  16. s.kcp.flush(false)
  17. }
  18. s.mu.Unlock()
  19. atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
  20. return n, nil
  21. }
  22. ...
  23. select {
  24. case <-s.chWriteEvent:
  25. case <-c:
  26. case <-s.die:
  27. }
  28. if timeout != nil {
  29. timeout.Stop()
  30. }
  31. }
  32. }

假设发送一个 hello 消息,Write 方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则 kcp.Send(“hello”), 而 Send 函数实现根据 mss 的值对数据分段,当然这里的发送的 hello,长度太短,只分了一个段,并把它们插入发送的队列里。

  1. func (kcp *KCP) Send(buffer []byte) int {
  2. ...
  3. for i := 0; i < count; i++ {
  4. var size int
  5. if len(buffer) > int(kcp.mss) {
  6. size = int(kcp.mss)
  7. } else {
  8. size = len(buffer)
  9. }
  10. seg := kcp.newSegment(size)
  11. copy(seg.data, buffer[:size])
  12. if kcp.stream == 0 {
  13. seg.frg = uint8(count - i - 1)
  14. } else {
  15. seg.frg = 0
  16. }
  17. kcp.snd_queue = append(kcp.snd_queue, seg)
  18. buffer = buffer[size:]
  19. }
  20. return 0
  21. }

接着判断参数writeDelay,如果参数设置为 false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由 flush 函数实现的。

  1. func (kcp *KCP) flush(ackOnly bool) {
  2. var seg Segment
  3. seg.conv = kcp.conv
  4. seg.cmd = IKCP_CMD_ACK
  5. seg.wnd = kcp.wnd_unused()
  6. seg.una = kcp.rcv_nxt
  7. buffer := kcp.buffer
  8. ptr := buffer
  9. for i, ack := range kcp.acklist {
  10. size := len(buffer) - len(ptr)
  11. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  12. kcp.output(buffer, size)
  13. ptr = buffer
  14. }
  15. if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
  16. seg.sn, seg.ts = ack.sn, ack.ts
  17. ptr = seg.encode(ptr)
  18. }
  19. }
  20. kcp.acklist = kcp.acklist[0:0]
  21. if ackOnly {
  22. size := len(buffer) - len(ptr)
  23. if size > 0 {
  24. kcp.output(buffer, size)
  25. }
  26. return
  27. }
  28. if kcp.rmt_wnd == 0 {
  29. current := currentMs()
  30. if kcp.probe_wait == 0 {
  31. kcp.probe_wait = IKCP_PROBE_INIT
  32. kcp.ts_probe = current + kcp.probe_wait
  33. } else {
  34. if _itimediff(current, kcp.ts_probe) >= 0 {
  35. if kcp.probe_wait < IKCP_PROBE_INIT {
  36. kcp.probe_wait = IKCP_PROBE_INIT
  37. }
  38. kcp.probe_wait += kcp.probe_wait / 2
  39. if kcp.probe_wait > IKCP_PROBE_LIMIT {
  40. kcp.probe_wait = IKCP_PROBE_LIMIT
  41. }
  42. kcp.ts_probe = current + kcp.probe_wait
  43. kcp.probe |= IKCP_ASK_SEND
  44. }
  45. }
  46. } else {
  47. kcp.ts_probe = 0
  48. kcp.probe_wait = 0
  49. }
  50. if (kcp.probe & IKCP_ASK_SEND) != 0 {
  51. seg.cmd = IKCP_CMD_WASK
  52. size := len(buffer) - len(ptr)
  53. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  54. kcp.output(buffer, size)
  55. ptr = buffer
  56. }
  57. ptr = seg.encode(ptr)
  58. }
  59. if (kcp.probe & IKCP_ASK_TELL) != 0 {
  60. seg.cmd = IKCP_CMD_WINS
  61. size := len(buffer) - len(ptr)
  62. if size+IKCP_OVERHEAD > int(kcp.mtu) {
  63. kcp.output(buffer, size)
  64. ptr = buffer
  65. }
  66. ptr = seg.encode(ptr)
  67. }
  68. kcp.probe = 0
  69. cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
  70. if kcp.nocwnd == 0 {
  71. cwnd = _imin_(kcp.cwnd, cwnd)
  72. }
  73. newSegsCount := 0
  74. for k := range kcp.snd_queue {
  75. if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
  76. break
  77. }
  78. newseg := kcp.snd_queue[k]
  79. newseg.conv = kcp.conv
  80. newseg.cmd = IKCP_CMD_PUSH
  81. newseg.sn = kcp.snd_nxt
  82. kcp.snd_buf = append(kcp.snd_buf, newseg)
  83. kcp.snd_nxt++
  84. newSegsCount++
  85. kcp.snd_queue[k].data = nil
  86. }
  87. if newSegsCount > 0 {
  88. kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
  89. }
  90. resent := uint32(kcp.fastresend)
  91. if kcp.fastresend <= 0 {
  92. resent = 0xffffffff
  93. }
  94. current := currentMs()
  95. var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
  96. for k := range kcp.snd_buf {
  97. segment := &kcp.snd_buf[k]
  98. needsend := false
  99. if segment.xmit == 0 {
  100. needsend = true
  101. segment.rto = kcp.rx_rto
  102. segment.resendts = current + segment.rto
  103. } else if _itimediff(current, segment.resendts) >= 0 {
  104. needsend = true
  105. if kcp.nodelay == 0 {
  106. segment.rto += kcp.rx_rto
  107. } else {
  108. segment.rto += kcp.rx_rto / 2
  109. }
  110. segment.resendts = current + segment.rto
  111. lost++
  112. lostSegs++
  113. } else if segment.fastack >= resent {
  114. needsend = true
  115. segment.fastack = 0
  116. segment.rto = kcp.rx_rto
  117. segment.resendts = current + segment.rto
  118. change++
  119. fastRetransSegs++
  120. } else if segment.fastack > 0 && newSegsCount == 0 {
  121. needsend = true
  122. segment.fastack = 0
  123. segment.rto = kcp.rx_rto
  124. segment.resendts = current + segment.rto
  125. change++
  126. earlyRetransSegs++
  127. }
  128. if needsend {
  129. segment.xmit++
  130. segment.ts = current
  131. segment.wnd = seg.wnd
  132. segment.una = seg.una
  133. size := len(buffer) - len(ptr)
  134. need := IKCP_OVERHEAD + len(segment.data)
  135. if size+need > int(kcp.mtu) {
  136. kcp.output(buffer, size)
  137. current = currentMs()
  138. ptr = buffer
  139. }
  140. ptr = segment.encode(ptr)
  141. copy(ptr, segment.data)
  142. ptr = ptr[len(segment.data):]
  143. if segment.xmit >= kcp.dead_link {
  144. kcp.state = 0xFFFFFFFF
  145. }
  146. }
  147. }
  148. size := len(buffer) - len(ptr)
  149. if size > 0 {
  150. kcp.output(buffer, size)
  151. }
  152. sum := lostSegs
  153. if lostSegs > 0 {
  154. atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
  155. }
  156. if fastRetransSegs > 0 {
  157. atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
  158. sum += fastRetransSegs
  159. }
  160. if earlyRetransSegs > 0 {
  161. atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
  162. sum += earlyRetransSegs
  163. }
  164. if sum > 0 {
  165. atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
  166. }
  167. if change > 0 {
  168. inflight := kcp.snd_nxt - kcp.snd_una
  169. kcp.ssthresh = inflight / 2
  170. if kcp.ssthresh < IKCP_THRESH_MIN {
  171. kcp.ssthresh = IKCP_THRESH_MIN
  172. }
  173. kcp.cwnd = kcp.ssthresh + resent
  174. kcp.incr = kcp.cwnd * kcp.mss
  175. }
  176. if lost > 0 {
  177. kcp.ssthresh = cwnd / 2
  178. if kcp.ssthresh < IKCP_THRESH_MIN {
  179. kcp.ssthresh = IKCP_THRESH_MIN
  180. }
  181. kcp.cwnd = 1
  182. kcp.incr = kcp.mss
  183. }
  184. if kcp.cwnd < 1 {
  185. kcp.cwnd = 1
  186. kcp.incr = kcp.mss
  187. }
  188. }

flush 函数非常的重要,kcp 的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送 ack,如果ackOnly为 true 的话,该函数只遍历 ack 列表,然后发送,就完事了。 如果不是,也会发送真实数据。 在发送数据前先进行 windSize 探测,如果开启了拥塞控制 nc=0,则每次发送前检测服务端的 winsize,如果服务端的 winsize 变小了,自身的 winsize 也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的 winsize 进行数据发送。
接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:

  1. 如果这个段数据首次发送,则直接发送数据。 2. 如果这个段数据的当前时间大于它自身重发的时间,也就是 RTO,则重传消息。 3. 如果这个段数据的 ack 丢失累计超过 resent 次数,则重传,也就是快速重传机制。这个 resent 参数由resend参数决定。 4. 如果这个段数据的 ack 有丢失且没有新的数据段,则触发 ER,ER 相关信息ER

最后通过 kcp.output 发送消息 hello,output 是个回调函数,函数的实体是sess.go的:

  1. func (s *UDPSession) output(buf []byte) {
  2. var ecc [][]byte
  3. ext := buf
  4. if s.headerSize > 0 {
  5. ext = s.ext[:s.headerSize+len(buf)]
  6. copy(ext[s.headerSize:], buf)
  7. }
  8. if s.fecEncoder != nil {
  9. ecc = s.fecEncoder.Encode(ext)
  10. }
  11. if s.block != nil {
  12. io.ReadFull(rand.Reader, ext[:nonceSize])
  13. checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
  14. binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
  15. s.block.Encrypt(ext, ext)
  16. if ecc != nil {
  17. for k := range ecc {
  18. io.ReadFull(rand.Reader, ecc[k][:nonceSize])
  19. checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
  20. binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
  21. s.block.Encrypt(ecc[k], ecc[k])
  22. }
  23. }
  24. }
  25. nbytes := 0
  26. npkts := 0
  27. for i := 0; i < s.dup+1; i++ {
  28. if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
  29. nbytes += n
  30. npkts++
  31. }
  32. }
  33. if ecc != nil {
  34. for k := range ecc {
  35. if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
  36. nbytes += n
  37. npkts++
  38. }
  39. }
  40. }
  41. atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
  42. atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
  43. }

output 函数才是真正的将数据写入内核中,在写入之前先进行了 fec 编码,fec 编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的 hello 就不是和原来的 hello 一样了,至少多了几个字节。 fec 编码器有两个重要的参数 reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,这两个参数决定了 fec 的冗余度,冗余度越大抗丢包性就越强。

kcp 的任务调度器

其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理 session,代码文件为updater.go。其中最主要的函数

  1. func (h *updateHeap) updateTask() {
  2. var timer <-chan time.Time
  3. for {
  4. select {
  5. case <-timer:
  6. case <-h.chWakeUp:
  7. }
  8. h.mu.Lock()
  9. hlen := h.Len()
  10. now := time.Now()
  11. if hlen > 0 && now.After(h.entries[0].ts) {
  12. for i := 0; i < hlen; i++ {
  13. entry := heap.Pop(h).(entry)
  14. if now.After(entry.ts) {
  15. entry.ts = now.Add(entry.s.update())
  16. heap.Push(h, entry)
  17. } else {
  18. heap.Push(h, entry)
  19. break
  20. }
  21. }
  22. }
  23. if hlen > 0 {
  24. timer = time.After(h.entries[0].ts.Sub(now))
  25. }
  26. h.mu.Unlock()
  27. }
  28. }

任务调度器实现了一个堆结构,每当有新的连接,session 都会插入到这个堆里,接着 for 循环每隔 interval 时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。

总结

这里简单介绍了 kcp 的整体流程,详细介绍了发送数据的流程,但未介绍 kcp 接收数据的流程,其实在客户端发送数据后,服务端是需要返回 ack 的,而客户端也需要根据返回的 ack 来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的 ack 是在函数 kcp.Input() 函数实现的。具体详细流程下次再介绍。
https://www.cnblogs.com/zhangboyu/p/34c07c3577c85e9ae5c3477d7cab5f52.html