参考 https://zhuanlan.zhihu.com/p/74613114

channel 实现原理

  • 对chan的接收和发送都会在编译期间转换成为底层的发送接收函数。
  • channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作“同步模式”,带缓冲的则称为“异步模式”。
  • 同步模式下,发送接收方都要都要同步就绪,只有在双方都处于ready状态,数据才能在两者之间传输(实际上是内存拷贝)。否则,任意一方都会被挂起,等待另一方来唤醒。
  • 异步模式下,在缓冲区有剩余槽位时(有剩余容量),发送接收操作能顺利执行,不会被阻塞以致挂起。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。

chan 数据结构

  1. type hchan struct {
  2. // chan 里元素数量
  3. qcount uint
  4. // chan 底层循环数组的长度
  5. dataqsiz uint
  6. // 指向底层循环数组的指针
  7. // 只针对有缓冲的 channel
  8. buf unsafe.Pointer
  9. // chan 中元素大小
  10. elemsize uint16
  11. // chan 是否被关闭的标志
  12. closed uint32
  13. // chan 中元素类型
  14. elemtype *_type // element type
  15. // 已发送元素在循环数组中的索引
  16. sendx uint // send index
  17. // 已接收元素在循环数组中的索引
  18. recvx uint // receive index
  19. // 等待接收的 goroutine 队列
  20. recvq waitq // list of recv waiters
  21. // 等待发送的 goroutine 队列
  22. sendq waitq // list of send waiters
  23. // 保护 hchan 中所有字段
  24. lock mutex
  25. }
  • buf 指向底层循环数组,只有缓冲型的 channel 才有。
  • lock 用来保证每个读 channel 或写 channel 的操作都是原子的。
  • sendxrecvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
  • sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
  1. type waitq struct {
  2. first *sudog
  3. last *sudog
  4. }

waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装。

1. Go内置数据结构 - 图1

操作方法实现

make

  1. const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
  2. func makechan(t *chantype, size int64) *hchan {
  3. elem := t.elem
  4. // 省略了检查 channel size,align 的代码
  5. // ……
  6. var c *hchan
  7. // 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
  8. // 只进行一次内存分配
  9. if elem.kind&kindNoPointers != 0 || size == 0 {
  10. // 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
  11. // 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
  12. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
  13. // 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
  14. if size > 0 && elem.size != 0 {
  15. c.buf = add(unsafe.Pointer(c), hchanSize)
  16. } else {
  17. // race detector uses this location for synchronization
  18. // Also prevents us from pointing beyond the allocation (see issue 9401).
  19. // 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
  20. // 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
  21. // 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
  22. c.buf = unsafe.Pointer(c)
  23. }
  24. } else {
  25. // 进行两次内存分配操作
  26. c = new(hchan)
  27. c.buf = newarray(elem, int(size))
  28. }
  29. c.elemsize = uint16(elem.size)
  30. c.elemtype = elem
  31. // 循环数组长度
  32. c.dataqsiz = uint(size)
  33. // 返回 hchan 指针
  34. return c
  35. }

recv

  1. // entry points for <- c from compiled code
  2. func chanrecv1(c *hchan, elem unsafe.Pointer) {
  3. chanrecv(c, elem, true)
  4. }
  5. func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
  6. _, received = chanrecv(c, elem, true)
  7. return
  8. }

chanrecv1 函数处理不带 “ok” 的情形,chanrecv2 则通过返回 “received” 这个字段来反应 channel 是否被关闭。接收值则比较特殊,会“放到”参数 elem 所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。

无论如何,最终转向了 chanrecv 函数:

  1. // 位于 src/runtime/chan.go
  2. // chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
  3. // 如果 ep 是 nil,说明忽略了接收值。
  4. // 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
  5. // 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
  6. // 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
  7. // 如果 ep 非空,则应该指向堆或者函数调用者的栈
  8. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  9. // 省略 debug 内容 …………
  10. // 如果是一个 nil 的 channel
  11. if c == nil {
  12. // 如果不阻塞,直接返回 (false, false)
  13. if !block {
  14. return
  15. }
  16. // 否则,接收一个 nil 的 channel,goroutine 挂起
  17. gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
  18. throw("unreachable") // 不会执行到这
  19. }
  20. // 非阻塞,buf没元素,
  21. if !block && empty(c) {
  22. // 未关闭,返回 (false, false)
  23. if atomic.Load(&c.closed) == 0 {
  24. return
  25. }
  26. // 已关闭。 因为 channel 不可能被重复打开。
  27. // 它可能在上面的空检查和关闭检查之间到达,重新检查通道是否有待处理的数据接收,
  28. if empty(c) {
  29. if ep != nil {
  30. // 未忽略返回值,那么接收的值将是一个该类型的零值
  31. // typedmemclr 根据类型清理相应地址的内存,标为零值
  32. typedmemclr(c.elemtype, ep)
  33. }
  34. return true, false
  35. }
  36. }
  37. var t0 int64
  38. if blockprofilerate > 0 {
  39. t0 = cputicks()
  40. }
  41. // 加锁
  42. lock(&c.lock)
  43. // 已关闭 && 循环数组 buf 里没有元素
  44. // 这里可以处理 非缓冲型关闭 和 缓冲型 buf 无元素关闭 两种情况
  45. // 也就是说即使是关闭状态,但在缓冲型的 channel,buf 里有元素的情况下还能接收到元素
  46. if c.closed != 0 && c.qcount == 0 {
  47. if raceenabled {
  48. raceacquire(unsafe.Pointer(c))
  49. }
  50. // 解锁
  51. unlock(&c.lock)
  52. if ep != nil {
  53. // 从一个已关闭的并且空的 channel 执行接收操作,且未忽略返回值
  54. // 那么接收的值将是一个该类型的零值
  55. // typedmemclr 根据类型清理相应地址的内存,标为零值
  56. typedmemclr(c.elemtype, ep)
  57. }
  58. // 从一个已关闭的 channel 接收,selected 会返回true
  59. return true, false
  60. }
  61. // 等待发送队列里有 goroutine 存在,说明 buf 是满的
  62. // 这有可能是:
  63. // 1. 非缓冲型的 channel
  64. // 2. 缓冲型的 channel,但 buf 满了
  65. // 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
  66. // 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
  67. if sg := c.sendq.dequeue(); sg != nil {
  68. // Found a waiting sender. If buffer is size 0, receive value
  69. // directly from sender. Otherwise, receive from head of queue
  70. // and add sender's value to the tail of the queue (both map to
  71. // the same buffer slot because the queue is full).
  72. recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
  73. return true, true
  74. }
  75. // 下面逻辑是 没有阻塞在sendq 的 g
  76. // 缓冲型,buf 里有元素,可以正常接收
  77. if c.qcount > 0 {
  78. // 直接从循环数组里找到要接收的元素
  79. qp := chanbuf(c, c.recvx)
  80. // …………
  81. // 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
  82. if ep != nil {
  83. typedmemmove(c.elemtype, ep, qp)
  84. }
  85. // 清理掉循环数组里相应位置的值
  86. typedmemclr(c.elemtype, qp)
  87. // 接收游标向前移动
  88. c.recvx++
  89. // 接收游标归零
  90. if c.recvx == c.dataqsiz {
  91. c.recvx = 0
  92. }
  93. // buf 数组里的元素个数减 1
  94. c.qcount--
  95. // 解锁
  96. unlock(&c.lock)
  97. return true, true
  98. }
  99. if !block {
  100. // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
  101. unlock(&c.lock)
  102. return false, false
  103. }
  104. // 接下来就是要被阻塞接收的情况了
  105. // 构造一个 sudog
  106. gp := getg() // p上运行的g
  107. mysg := acquireSudog() // 新创建的g
  108. mysg.releasetime = 0
  109. if t0 != 0 {
  110. mysg.releasetime = -1
  111. }
  112. // 待接收数据的地址保存下来
  113. mysg.elem = ep
  114. mysg.waitlink = nil
  115. gp.waiting = mysg
  116. mysg.g = gp
  117. mysg.selectdone = nil
  118. mysg.c = c
  119. gp.param = nil
  120. // 进入channel 的等待接收队列
  121. c.recvq.enqueue(mysg)
  122. // 将当前 goroutine 挂起
  123. goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
  124. // 被唤醒了,接着从这里继续执行一些扫尾工作
  125. if mysg != gp.waiting {
  126. throw("G waiting list is corrupted")
  127. }
  128. gp.waiting = nil
  129. if mysg.releasetime > 0 {
  130. blockevent(mysg.releasetime-t0, 2)
  131. }
  132. closed := gp.param == nil
  133. gp.param = nil
  134. mysg.c = nil
  135. releaseSudog(mysg)
  136. return true, !closed
  137. }

send

  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. if c == nil {
  3. // 不能阻塞,直接返回 false,表示未发送成功
  4. if !block {
  5. return false
  6. }
  7. // 当前 goroutine 被挂起
  8. gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
  9. throw("unreachable")
  10. }
  11. // 省略 debug 相关……
  12. // 对于不阻塞的 send,快速检测失败场景
  13. //
  14. // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
  15. // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
  16. // 2. channel 是缓冲型的,但循环数组已经装满了元素
  17. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
  18. (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
  19. return false
  20. }
  21. var t0 int64
  22. if blockprofilerate > 0 {
  23. t0 = cputicks()
  24. }
  25. // 锁住 channel,并发安全
  26. lock(&c.lock)
  27. // 如果 channel 关闭了
  28. if c.closed != 0 {
  29. unlock(&c.lock)
  30. panic(plainError("send on closed channel"))
  31. }
  32. // 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
  33. if sg := c.recvq.dequeue(); sg != nil {
  34. send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  35. return true
  36. }
  37. // 对于缓冲型的 channel,如果还有缓冲空间
  38. if c.qcount < c.dataqsiz {
  39. // qp 指向 buf 的 sendx 位置
  40. qp := chanbuf(c, c.sendx)
  41. // ……
  42. // 将数据从 ep 处拷贝到 qp
  43. typedmemmove(c.elemtype, qp, ep)
  44. // 发送游标值加 1
  45. c.sendx++
  46. // 如果发送游标值等于容量值,游标值归 0
  47. if c.sendx == c.dataqsiz {
  48. c.sendx = 0
  49. }
  50. // 缓冲区的元素数量加一
  51. c.qcount++
  52. unlock(&c.lock)
  53. return true
  54. }
  55. // 如果不需要阻塞,则直接返回错误
  56. if !block {
  57. unlock(&c.lock)
  58. return false
  59. }
  60. // channel 满了,发送方会被阻塞。接下来会构造一个 sudog
  61. // 获取当前 goroutine 的指针
  62. gp := getg()
  63. mysg := acquireSudog()
  64. mysg.releasetime = 0
  65. if t0 != 0 {
  66. mysg.releasetime = -1
  67. }
  68. mysg.elem = ep
  69. mysg.waitlink = nil
  70. mysg.g = gp
  71. mysg.selectdone = nil
  72. mysg.c = c
  73. gp.waiting = mysg
  74. gp.param = nil
  75. // 当前 goroutine 进入发送等待队列
  76. c.sendq.enqueue(mysg)
  77. // 当前 goroutine 被挂起
  78. goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
  79. // 从这里开始被唤醒了(channel 有机会可以发送了)
  80. if mysg != gp.waiting {
  81. throw("G waiting list is corrupted")
  82. }
  83. gp.waiting = nil
  84. if gp.param == nil {
  85. if c.closed == 0 {
  86. throw("chansend: spurious wakeup")
  87. }
  88. // 被唤醒后,channel 关闭了。坑爹啊,panic
  89. panic(plainError("send on closed channel"))
  90. }
  91. gp.param = nil
  92. if mysg.releasetime > 0 {
  93. blockevent(mysg.releasetime-t0, 2)
  94. }
  95. // 去掉 mysg 上绑定的 channel
  96. mysg.c = nil
  97. releaseSudog(mysg)
  98. return true
  99. }
  • 如果检测到 channel 是nil的,当前 goroutine 会被挂起。
  • 对于不阻塞的发送操作,如果 channel 未关闭并且没有多余的缓冲空间(说明:a. channel 是非缓冲型的,且等待接收队列里没有 goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素),会返回false

close

  1. func closechan(c *hchan) {
  2. // 关闭一个 nil channel,panic
  3. if c == nil {
  4. panic(plainError("close of nil channel"))
  5. }
  6. // 上锁
  7. lock(&c.lock)
  8. // 如果 channel 已经关闭
  9. if c.closed != 0 {
  10. unlock(&c.lock)
  11. // panic
  12. panic(plainError("close of closed channel"))
  13. }
  14. // …………
  15. // 修改关闭状态
  16. c.closed = 1
  17. var glist *g
  18. // 将 channel 所有等待接收队列的里 sudog 释放
  19. for {
  20. // 从接收队列里出队一个 sudog
  21. sg := c.recvq.dequeue()
  22. // 出队完毕,跳出循环
  23. if sg == nil {
  24. break
  25. }
  26. // 如果 elem 不为空,说明此 receiver 未忽略接收数据
  27. // 给它赋一个相应类型的零值
  28. if sg.elem != nil {
  29. typedmemclr(c.elemtype, sg.elem)
  30. sg.elem = nil
  31. }
  32. if sg.releasetime != 0 {
  33. sg.releasetime = cputicks()
  34. }
  35. // 取出 goroutine
  36. gp := sg.g
  37. gp.param = nil
  38. if raceenabled {
  39. raceacquireg(gp, unsafe.Pointer(c))
  40. }
  41. // 相连,形成链表
  42. gp.schedlink.set(glist)
  43. glist = gp
  44. }
  45. // 将 channel 等待发送队列里的 sudog 释放
  46. // 如果存在,这些 goroutine 将会 panic
  47. for {
  48. // 从发送队列里出队一个 sudog
  49. sg := c.sendq.dequeue()
  50. if sg == nil {
  51. break
  52. }
  53. // 发送者会 panic
  54. sg.elem = nil
  55. if sg.releasetime != 0 {
  56. sg.releasetime = cputicks()
  57. }
  58. gp := sg.g
  59. gp.param = nil
  60. if raceenabled {
  61. raceacquireg(gp, unsafe.Pointer(c))
  62. }
  63. // 形成链表
  64. gp.schedlink.set(glist)
  65. glist = gp
  66. }
  67. // 解锁
  68. unlock(&c.lock)
  69. // Ready all Gs now that we've dropped the channel lock.
  70. // 遍历链表
  71. for glist != nil {
  72. // 取最后一个
  73. gp := glist
  74. // 向前走一步,下一个唤醒的 g
  75. glist = glist.schedlink.ptr()
  76. gp.schedlink = 0
  77. // 唤醒相应 goroutine
  78. goready(gp, 3)
  79. }
  80. }