Go的channel是Go的一大特性,相对于其他编程而言,Go的channel提供了进程内消息通信,数据同步的重要功能,因为是内置数据结构,因此使用起来非常方便,因此,针对这个常用的数据结构,我们需要探究下其实现原理以及设计思想。

https://github.com/golang/go/blob/go1.12.7/src/runtime/chan.go

Go管理channel结构体如下,我们使用make创建channel时,就是初始化hchan结构体,hchan的定义如下:

  1. type hchan struct {
  2. qcount uint // 当前环形队列内的数据个数
  3. dataqsiz uint // 环形队列总长度
  4. buf unsafe.Pointer // 数据缓冲区指针,指向一个dataqsiz大小的数组,环形队列由该数组实现
  5. elemsize uint16 // 单个数据的大小
  6. closed uint32 // channel是否关闭
  7. elemtype *_type // element type // 环形队列内的数据类型
  8. sendx uint // send index // 队列下标,元素写入队列时的队列的位置,即队列尾
  9. recvx uint // receive index // 队列下标,下一个被读取的元素在队列的位置,即队列头
  10. recvq waitq // 等待读消息的goroutine队列,环形队列空时,recvq会堆积
  11. sendq waitq // 等待写消息的goroutine队列,环形队列满时,sendq会堆积
  12. lock mutex // 互斥锁,保证channel并发读写安全
  13. }
  14. type waitq struct {
  15. first *sudog // 队列头指针
  16. last *sudog // 队列尾指针
  17. }

由channel的结构体可以看出,组合成channel的几大部分是:

  1. 环形队列,用于缓存数据
  2. groutine等待队列
  3. 互斥锁

channel的创建

我们使用

  1. c := make(chan int, 6)

创建了一个channel,其缓冲区大小为6,元素类型为int。我们通过源码makeChan可以看出,我们调用make创建channel时返回的返回值的类型值hchan指针。

  1. func makechan(t *chantype, size int) *hchan {
  2. elem := t.elem
  3. // 单个元素size过大
  4. if elem.size >= 1<<16 {
  5. throw("makechan: invalid channel element type")
  6. }
  7. // 字节对齐
  8. if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  9. throw("makechan: bad alignment")
  10. }
  11. // 计算需要分配的内存大小,如果申请的内存过大,就panic
  12. mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  13. if overflow || mem > maxAlloc-hchanSize || size < 0 {
  14. panic(plainError("makechan: size out of range"))
  15. }
  16. var c *hchan
  17. switch {
  18. case mem == 0:
  19. // 申请的是0缓冲队列
  20. c = (*hchan)(mallocgc(hchanSize, nil, true))
  21. // Race detector uses this location for synchronization.
  22. c.buf = c.raceaddr()
  23. case elem.kind&kindNoPointers != 0:
  24. // 元素不包含指针
  25. // Allocate hchan and buf in one call.
  26. c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  27. c.buf = add(unsafe.Pointer(c), hchanSize)
  28. default:
  29. // 元素包含指针
  30. c = new(hchan)
  31. c.buf = mallocgc(mem, elem, true)
  32. }
  33. c.elemsize = uint16(elem.size)
  34. c.elemtype = elem
  35. c.dataqsiz = uint(size)
  36. ...
  37. return c
  38. }

根据makechan的源码,我们了解到创建一个channel的本质是创建一个hchan结构体,我们通过makechan拿到的就是这个hchan结构体的指针。makechan具体做了以下几个事情:

  1. 前置检查,检查元素大小,以及申请分配的内存空间是否合理。
  2. 根据channel类型触发相应的内存分配,无缓冲长度的channel不分配内存给环形缓冲区;带缓冲区长度的channel调用不同的方法创建缓冲区,分配相应内存。
  3. hchan结构体赋值,返回hchan指针。

channel的数据发送

当执行c <- 1这样的代码时,即向channel发送数据,实际调用的函数是chansend。这里的chansend做了以下几件事情:

  1. 先加锁,保证并发读写安全
  2. 检查channel是否关闭,如果向已关闭的channel发送数据,触发panic
  3. 检查recvq是否有等待数据的goroutine,如果有,则取出recvq头部的goroutine,调用send,把发送方的数据直接拷贝给接收方,而无需拷贝到环形缓冲区。
  4. 如果环形缓冲队列未满,则把要发送的数据拷贝到环形缓冲区,更新环形队列的sendx值。
  5. 环形队列已满时,如果是非阻塞模式,则立即返回;如果是阻塞模式,那就把这个goroutine先挂起放到sendq等待数据。
  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. if c == nil {
  3. if !block {
  4. return false
  5. }
  6. gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  7. throw("unreachable")
  8. }
  9. if debugChan {
  10. print("chansend: chan=", c, "\n")
  11. }
  12. if raceenabled {
  13. racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
  14. }
  15. // ready for sending and then observe that it is not closed, that implies that the
  16. // channel wasn't closed during the first observation.
  17. // 这里的判断没有加锁,而是使用了
  18. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
  19. (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
  20. return false
  21. }
  22. var t0 int64
  23. if blockprofilerate > 0 {
  24. t0 = cputicks()
  25. }
  26. // chansend操作全程加锁
  27. lock(&c.lock)
  28. // channel已关闭,继续往里send会panic
  29. if c.closed != 0 {
  30. unlock(&c.lock)
  31. panic(plainError("send on closed channel"))
  32. }
  33. // 检查recvq是否有等待数据的goroutine,如果有,取出recvq头部的goroutine,调用send,把发送方的数据直接拷贝给接收方,而无需拷贝到环形缓冲区
  34. if sg := c.recvq.dequeue(); sg != nil {
  35. // Found a waiting receiver. We pass the value we want to send
  36. // directly to the receiver, bypassing the channel buffer (if any).
  37. send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  38. return true
  39. }
  40. // 环形队列未满,则将要发送的数据放入环形队列,更新sendx值。
  41. if c.qcount < c.dataqsiz {
  42. // Space is available in the channel buffer. Enqueue the element to send.
  43. qp := chanbuf(c, c.sendx)
  44. if raceenabled {
  45. raceacquire(qp)
  46. racerelease(qp)
  47. }
  48. typedmemmove(c.elemtype, qp, ep)
  49. c.sendx++
  50. if c.sendx == c.dataqsiz {
  51. c.sendx = 0
  52. }
  53. c.qcount++
  54. unlock(&c.lock)
  55. return true
  56. }
  57. // 下面就是环形队列已满的情况
  58. // 如果是非阻塞模式,此时就可以返回了
  59. if !block {
  60. unlock(&c.lock)
  61. return false
  62. }
  63. // 如果是阻塞模式,因为队列已满,但是此时又需要发送数据到该队列,那就把这个goroutine先挂起放到sendq
  64. gp := getg()
  65. mysg := acquireSudog()
  66. mysg.releasetime = 0
  67. if t0 != 0 {
  68. mysg.releasetime = -1
  69. }
  70. // No stack splits between assigning elem and enqueuing mysg
  71. // on gp.waiting where copystack can find it.
  72. mysg.elem = ep
  73. mysg.waitlink = nil
  74. mysg.g = gp
  75. mysg.isSelect = false
  76. mysg.c = c
  77. gp.waiting = mysg
  78. gp.param = nil
  79. c.sendq.enqueue(mysg) // 将当前要发送数据的goroutine挂起放到sendq
  80. goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  81. KeepAlive(ep)
  82. // someone woke us up.
  83. if mysg != gp.waiting {
  84. throw("G waiting list is corrupted")
  85. }
  86. gp.waiting = nil
  87. if gp.param == nil {
  88. if c.closed == 0 {
  89. throw("chansend: spurious wakeup")
  90. }
  91. panic(plainError("send on closed channel"))
  92. }
  93. gp.param = nil
  94. if mysg.releasetime > 0 {
  95. blockevent(mysg.releasetime-t0, 2)
  96. }
  97. mysg.c = nil
  98. releaseSudog(mysg)
  99. return true
  100. }

channel的数据接收

当执行a <- c这样的代码时,即从channel读取数据时,实际调用的函数是chanrecv。这里的chanrecv做了以下几件事情:

  1. 首先加锁
  2. 如果环形缓冲区已满,且sendq有goroutine在的等待发送数据,直接调用recv去取sendq头部goroutine,将其要发送的数据拷贝给自己,不走环形缓冲区;
  3. 环形缓冲区有数据但队列未满,就消费环形队列里的头部元素;
  4. 环形缓冲区没有数据,非阻塞模式下就直接返回;阻塞模式下,此时需要把goroutine放入recvq等待数据。
  1. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  2. // raceenabled: don't need to check ep, as it is always on the stack
  3. // or is new memory allocated by reflect.
  4. if debugChan {
  5. print("chanrecv: chan=", c, "\n")
  6. }
  7. if c == nil {
  8. if !block {
  9. return
  10. }
  11. gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  12. throw("unreachable")
  13. }
  14. // The order of operations is important here: reversing the operations can lead to
  15. // incorrect behavior when racing with a close.
  16. if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
  17. c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
  18. atomic.Load(&c.closed) == 0 {
  19. return
  20. }
  21. var t0 int64
  22. if blockprofilerate > 0 {
  23. t0 = cputicks()
  24. }
  25. lock(&c.lock)
  26. if c.closed != 0 && c.qcount == 0 {
  27. if raceenabled {
  28. raceacquire(c.raceaddr())
  29. }
  30. unlock(&c.lock)
  31. if ep != nil {
  32. typedmemclr(c.elemtype, ep)
  33. }
  34. return true, false
  35. }
  36. // 值得注意,如果channel已经关闭了,我们还是可以走下面的逻辑,即数据读取
  37. // 此时环形缓冲区已满,sendq有goroutine在的等待发送数据,直接调用recv去取sendq头部goroutine,将其要发送的数据拷贝给自己,不走环形缓冲区
  38. if sg := c.sendq.dequeue(); sg != nil {
  39. recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
  40. return true, true
  41. }
  42. // 环形缓冲区有数据但队列未满,就消费环形队列里的头部元素
  43. if c.qcount > 0 {
  44. // Receive directly from queue
  45. qp := chanbuf(c, c.recvx)
  46. if raceenabled {
  47. raceacquire(qp)
  48. racerelease(qp)
  49. }
  50. if ep != nil {
  51. typedmemmove(c.elemtype, ep, qp) //数据拷贝,sender->reciever
  52. }
  53. typedmemclr(c.elemtype, qp) // 清理sender的发送数据内存
  54. c.recvx++
  55. if c.recvx == c.dataqsiz {
  56. c.recvx = 0
  57. }
  58. c.qcount--
  59. unlock(&c.lock)
  60. return true, true
  61. }
  62. if !block {
  63. unlock(&c.lock)
  64. return false, false
  65. }
  66. // 环形缓冲区没有数据,此时需要把goroutine放入recvq等待数据
  67. // no sender available: block on this channel.
  68. gp := getg()
  69. mysg := acquireSudog()
  70. mysg.releasetime = 0
  71. if t0 != 0 {
  72. mysg.releasetime = -1
  73. }
  74. // No stack splits between assigning elem and enqueuing mysg
  75. // on gp.waiting where copystack can find it.
  76. mysg.elem = ep
  77. mysg.waitlink = nil
  78. gp.waiting = mysg
  79. mysg.g = gp
  80. mysg.isSelect = false
  81. mysg.c = c
  82. gp.param = nil
  83. c.recvq.enqueue(mysg)
  84. goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
  85. // someone woke us up
  86. if mysg != gp.waiting {
  87. throw("G waiting list is corrupted")
  88. }
  89. gp.waiting = nil
  90. if mysg.releasetime > 0 {
  91. blockevent(mysg.releasetime-t0, 2)
  92. }
  93. closed := gp.param == nil
  94. gp.param = nil
  95. mysg.c = nil
  96. releaseSudog(mysg)
  97. return true, !closed
  98. }

channel的关闭

close(c)用于关闭channel。channel的关闭实际调用的是closechan,注意以下几点:

  1. 关闭已关闭的channel,触发panic
  2. channel关闭时会通知该recvq队列中的所有Reader当前channel已关闭,这些recvq中的g会被释放。
  3. channel关闭时会通知该sendq队列中的所有writer当前channel已关闭,这会触发panic。这些sendq中的g会被释放。
  4. channel关闭操作也是全程加锁。
  1. func closechan(c *hchan) {
  2. if c == nil {
  3. panic(plainError("close of nil channel"))
  4. }
  5. lock(&c.lock)
  6. // 关闭已关闭的channel,触发panic
  7. if c.closed != 0 {
  8. unlock(&c.lock)
  9. panic(plainError("close of closed channel"))
  10. }
  11. if raceenabled {
  12. callerpc := getcallerpc()
  13. racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
  14. racerelease(c.raceaddr())
  15. }
  16. c.closed = 1
  17. var glist gList
  18. // 通知所有该channel的Reader这个channel关闭了,释放recvq中的所有g
  19. // release all readers
  20. for {
  21. sg := c.recvq.dequeue()
  22. if sg == nil {
  23. break
  24. }
  25. if sg.elem != nil {
  26. typedmemclr(c.elemtype, sg.elem)
  27. sg.elem = nil
  28. }
  29. if sg.releasetime != 0 {
  30. sg.releasetime = cputicks()
  31. }
  32. gp := sg.g
  33. gp.param = nil
  34. if raceenabled {
  35. raceacquireg(gp, c.raceaddr())
  36. }
  37. glist.push(gp)
  38. }
  39. // release all writers (they will panic)
  40. // 释放sendq中的所有g
  41. for {
  42. sg := c.sendq.dequeue()
  43. if sg == nil {
  44. break
  45. }
  46. sg.elem = nil
  47. if sg.releasetime != 0 {
  48. sg.releasetime = cputicks()
  49. }
  50. gp := sg.g
  51. gp.param = nil
  52. if raceenabled {
  53. raceacquireg(gp, c.raceaddr())
  54. }
  55. glist.push(gp)
  56. }
  57. unlock(&c.lock)
  58. // 释放这些挂起的g
  59. for !glist.empty() {
  60. gp := glist.pop()
  61. gp.schedlink = 0
  62. goready(gp, 3)
  63. }
  64. }

如果channel已经关闭,但此时还有goroutine去接收该channel数据,如果channel的环形缓冲队列里已无数据,则x,ok := <- ch中,x为nil,ok为false;如果channel的环形缓冲队列还有数据,此时去接收数据时可以收到数据的,x为接收到的数据,ok为true。只有当channel关闭且无数据时,x才为nil,ok为false。