Channel 和 Goroutine 是一对共生体,能够开发强大的并发应用。Channel 是 Goroutine-safe 的,能够在多个 Goroutines 传输数据,能够唤醒和挂起 Goroutines。

Channel 的生成

简析 Channel - 图1

发送和接收数据

我们看看 Channel 及其相关的结构体:

  1. type hchan struct {
  2. qcount uint // total data in the queue
  3. dataqsiz uint // size of the circular queue
  4. buf unsafe.Pointer // points to an array of dataqsiz elements
  5. elemsize uint16
  6. closed uint32
  7. elemtype *_type // element type
  8. sendx uint // send index
  9. recvx uint // receive index
  10. recvq waitq // list of recv waiters
  11. sendq waitq // list of send waiters
  12. // lock protects all fields in hchan, as well as several
  13. // fields in sudogs blocked on this channel.
  14. //
  15. // Do not change another G's status while holding this lock
  16. // (in particular, do not ready a G), as this can deadlock
  17. // with stack shrinking.
  18. lock mutex
  19. }
  20. type waitq struct {
  21. first *sudog
  22. last *sudog
  23. }
  24. type sudog struct {
  25. // goroutine
  26. g *g
  27. // isSelect indicates g is participating in a select, so
  28. // g.selectDone must be CAS'd to win the wake-up race.
  29. isSelect bool
  30. next *sudog
  31. prev *sudog
  32. elem unsafe.Pointer // data element (may point to stack)
  33. // The following fields are never accessed concurrently.
  34. // For channels, waitlink is only accessed by g.
  35. // For semaphores, all fields (including the ones above)
  36. // are only accessed when holding a semaRoot lock.
  37. acquiretime int64
  38. releasetime int64
  39. ticket uint32
  40. parent *sudog // semaRoot binary tree
  41. waitlink *sudog // g.waiting list or semaRoot
  42. waittail *sudog // semaRoot
  43. c *hchan // channel
  44. }

我们以带缓冲的 Channel 为例简示一下数据的发送和接收。
简析 Channel - 图2

接下来,我们深入一点了解如果发送和接收数据的,在之前,我们需要先了解一下 Goroutines 是如何调度的。Goroutines 是用户态的线程(user-space threads)。它由 Go runtime 自己来管理,而不是由 OS。相比 OS 的线程,它更加轻量级。

简析 Channel - 图3
部分 Goroutine 的状态:

  1. // _Gidle means this goroutine was just allocated and has not
  2. // yet been initialized.
  3. _Gidle = iota // 0
  4. // _Grunnable means this goroutine is on a run queue. It is
  5. // not currently executing user code. The stack is not owned.
  6. _Grunnable // 1
  7. // _Grunning means this goroutine may execute user code. The
  8. // stack is owned by this goroutine. It is not on a run queue.
  9. // It is assigned an M and a P (g.m and g.m.p are valid).
  10. _Grunning // 2
  11. // _Gsyscall means this goroutine is executing a system call.
  12. // It is not executing user code. The stack is owned by this
  13. // goroutine. It is not on a run queue. It is assigned an M.
  14. _Gsyscall // 3
  15. // _Gwaiting means this goroutine is blocked in the runtime.
  16. // It is not executing user code. It is not on a run queue,
  17. // but should be recorded somewhere (e.g., a channel wait
  18. // queue) so it can be ready()d when necessary. The stack is
  19. // not owned *except* that a channel operation may read or
  20. // write parts of the stack under the appropriate channel
  21. // lock. Otherwise, it is not safe to access the stack after a
  22. // goroutine enters _Gwaiting (e.g., it may get moved).
  23. _Gwaiting // 4
  • gopark 将 Goroutine 的状态从 _Grunning 设置为 _Gwaiting,离开调度。
  • goready 将 Goroutine 的状态从 _Gwaiting 设置为 _Grunnable,等待下一次调度再次执行。

简析 Channel - 图4

每个 Goroutine 都有这样的一个结构体实例,

  1. type g struct {
  2. stack stack // offset known to runtime/cgo
  3. stackguard0 uintptr // offset known to liblink
  4. stackguard1 uintptr // offset known to liblink
  5. _panic *_panic // innermost panic - offset known to liblink
  6. _defer *_defer // innermost defer
  7. m *m // current m; offset known to arm liblink
  8. sched gobuf
  9. syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
  10. syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
  11. stktopsp uintptr // expected sp at top of stack, to check in traceback
  12. param unsafe.Pointer // passed parameter on wakeup
  13. atomicstatus uint32
  14. stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
  15. goid int64
  16. ...
  17. waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
  18. cgoCtxt []uintptr // cgo traceback context
  19. labels unsafe.Pointer // profiler labels
  20. timer *timer // cached timer for time.Sleep
  21. selectDone uint32 // are we participating in a select and did someone win the race?
  22. ...
  23. gcAssistBytes int64
  24. }

再回顾一下结构体 sudog 和 waitq,

  1. type sudog struct {
  2. g *g //current goroutine
  3. ...
  4. elem unsafe.Pointer // data element (may point to stack)
  5. ...
  6. c *hchan // channel
  7. }
  8. type waitq struct {
  9. first *sudog
  10. last *sudog
  11. }

可以说 sudog 结构体实例就代表着一个在等待发送或者接收队列里的 Goroutine。

我们看看发送数据的代码:

  1. /*
  2. * generic single channel send/recv
  3. * If block is not nil,
  4. * then the protocol will not
  5. * sleep but return if it could
  6. * not complete.
  7. *
  8. * sleep can wake up with g.param == nil
  9. * when a channel involved in the sleep has
  10. * been closed. it is easiest to loop and re-run
  11. * the operation; we'll see that it's now closed.
  12. */
  13. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  14. if c == nil {
  15. if !block {
  16. return false
  17. }
  18. gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  19. throw("unreachable")
  20. }
  21. if debugChan {
  22. print("chansend: chan=", c, "\n")
  23. }
  24. if raceenabled {
  25. racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
  26. }
  27. // Fast path: check for failed non-blocking operation without acquiring the lock.
  28. //
  29. // After observing that the channel is not closed, we observe that the channel is
  30. // not ready for sending. Each of these observations is a single word-sized read
  31. // (first c.closed and second full()).
  32. // Because a closed channel cannot transition from 'ready for sending' to
  33. // 'not ready for sending', even if the channel is closed between the two observations,
  34. // they imply a moment between the two when the channel was both not yet closed
  35. // and not ready for sending. We behave as if we observed the channel at that moment,
  36. // and report that the send cannot proceed.
  37. //
  38. // It is okay if the reads are reordered here: if we observe that the channel is not
  39. // ready for sending and then observe that it is not closed, that implies that the
  40. // channel wasn't closed during the first observation. However, nothing here
  41. // guarantees forward progress. We rely on the side effects of lock release in
  42. // chanrecv() and closechan() to update this thread's view of c.closed and full().
  43. if !block && c.closed == 0 && full(c) {
  44. return false
  45. }
  46. var t0 int64
  47. if blockprofilerate > 0 {
  48. t0 = cputicks()
  49. }
  50. lock(&c.lock)
  51. if c.closed != 0 {
  52. unlock(&c.lock)
  53. panic(plainError("send on closed channel"))
  54. }
  55. if sg := c.recvq.dequeue(); sg != nil {
  56. // 直接复制,存在等待接收的 Goroutine
  57. // Found a waiting receiver. We pass the value we want to send
  58. // directly to the receiver, bypassing the channel buffer (if any).
  59. send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  60. return true
  61. }
  62. if c.qcount < c.dataqsiz {
  63. // Space is available in the channel buffer. Enqueue the element to send.
  64. qp := chanbuf(c, c.sendx)
  65. if raceenabled {
  66. raceacquire(qp)
  67. racerelease(qp)
  68. }
  69. typedmemmove(c.elemtype, qp, ep)
  70. c.sendx++
  71. if c.sendx == c.dataqsiz {
  72. c.sendx = 0
  73. }
  74. c.qcount++
  75. unlock(&c.lock)
  76. return true
  77. }
  78. if !block {
  79. unlock(&c.lock)
  80. return false
  81. }
  82. // Block on the channel. Some receiver will complete our operation for us.
  83. gp := getg()
  84. mysg := acquireSudog()
  85. mysg.releasetime = 0
  86. if t0 != 0 {
  87. mysg.releasetime = -1
  88. }
  89. // No stack splits between assigning elem and enqueuing mysg
  90. // on gp.waiting where copystack can find it.
  91. mysg.elem = ep
  92. mysg.waitlink = nil
  93. mysg.g = gp
  94. mysg.isSelect = false
  95. mysg.c = c
  96. gp.waiting = mysg
  97. gp.param = nil
  98. c.sendq.enqueue(mysg)
  99. gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  100. // Ensure the value being sent is kept alive until the
  101. // receiver copies it out. The sudog has a pointer to the
  102. // stack object, but sudogs aren't considered as roots of the
  103. // stack tracer.
  104. KeepAlive(ep)
  105. // someone woke us up.
  106. if mysg != gp.waiting {
  107. throw("G waiting list is corrupted")
  108. }
  109. gp.waiting = nil
  110. gp.activeStackChans = false
  111. if gp.param == nil {
  112. if c.closed == 0 {
  113. throw("chansend: spurious wakeup")
  114. }
  115. panic(plainError("send on closed channel"))
  116. }
  117. gp.param = nil
  118. if mysg.releasetime > 0 {
  119. blockevent(mysg.releasetime-t0, 2)
  120. }
  121. mysg.c = nil
  122. releaseSudog(mysg)
  123. return true
  124. }

上述代码覆盖多个场景,部分分析如下:
简析 Channel - 图5

相关代码如下:

  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. ...
  3. if sg := c.recvq.dequeue(); sg != nil {
  4. // Found a waiting receiver. We pass the value we want to send
  5. // directly to the receiver, bypassing the channel buffer (if any).
  6. send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  7. return true
  8. }
  9. ...
  10. }
  11. // send processes a send operation on an empty channel c.
  12. // The value ep sent by the sender is copied to the receiver sg.
  13. // The receiver is then woken up to go on its merry way.
  14. // Channel c must be empty and locked. send unlocks c with unlockf.
  15. // sg must already be dequeued from c.
  16. // ep must be non-nil and point to the heap or the caller's stack.
  17. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  18. if raceenabled {
  19. if c.dataqsiz == 0 {
  20. racesync(c, sg)
  21. } else {
  22. // Pretend we go through the buffer, even though
  23. // we copy directly. Note that we need to increment
  24. // the head/tail locations only when raceenabled.
  25. qp := chanbuf(c, c.recvx)
  26. raceacquire(qp)
  27. racerelease(qp)
  28. raceacquireg(sg.g, qp)
  29. racereleaseg(sg.g, qp)
  30. c.recvx++
  31. if c.recvx == c.dataqsiz {
  32. c.recvx = 0
  33. }
  34. c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  35. }
  36. }
  37. if sg.elem != nil {
  38. sendDirect(c.elemtype, sg, ep) // 直接复制
  39. sg.elem = nil
  40. }
  41. gp := sg.g
  42. unlockf()
  43. gp.param = unsafe.Pointer(sg)
  44. if sg.releasetime != 0 {
  45. sg.releasetime = cputicks()
  46. }
  47. goready(gp, skip+1) // 设置 Goroutine 为 runnable
  48. }
  49. func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  50. // src is on our stack, dst is a slot on another stack.
  51. // Once we read sg.elem out of sg, it will no longer
  52. // be updated if the destination's stack gets copied (shrunk).
  53. // So make sure that no preemption points can happen between read & use.
  54. dst := sg.elem
  55. typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
  56. // No need for cgo write barrier checks because dst is always
  57. // Go memory.
  58. memmove(dst, src, t.size)
  59. }

sendwithbufffull.png

相关代码如下:

  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. ...
  3. // 进入阻塞模式,将 Goroutine 被存储在一个新的 sudog 结构实例中
  4. // Block on the channel. Some receiver will complete our operation for us.
  5. gp := getg()
  6. mysg := acquireSudog()
  7. mysg.releasetime = 0
  8. if t0 != 0 {
  9. mysg.releasetime = -1
  10. }
  11. // No stack splits between assigning elem and enqueuing mysg
  12. // on gp.waiting where copystack can find it.
  13. mysg.elem = ep
  14. mysg.waitlink = nil
  15. mysg.g = gp
  16. mysg.isSelect = false
  17. mysg.c = c
  18. gp.waiting = mysg
  19. gp.param = nil
  20. c.sendq.enqueue(mysg)
  21. gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  22. // Ensure the value being sent is kept alive until the
  23. // receiver copies it out. The sudog has a pointer to the
  24. // stack object, but sudogs aren't considered as roots of the
  25. // stack tracer.
  26. KeepAlive(ep)
  27. // someone woke us up.
  28. if mysg != gp.waiting {
  29. throw("G waiting list is corrupted")
  30. }
  31. gp.waiting = nil
  32. gp.activeStackChans = false
  33. if gp.param == nil {
  34. if c.closed == 0 {
  35. throw("chansend: spurious wakeup")
  36. }
  37. panic(plainError("send on closed channel"))
  38. }
  39. gp.param = nil
  40. if mysg.releasetime > 0 {
  41. blockevent(mysg.releasetime-t0, 2)
  42. }
  43. mysg.c = nil
  44. releaseSudog(mysg)
  45. return true
  46. }

我们看看接收数据的代码:

  1. // chanrecv receives on channel c and writes the received data to ep.
  2. // ep may be nil, in which case received data is ignored.
  3. // If block == false and no elements are available, returns (false, false).
  4. // Otherwise, if c is closed, zeros *ep and returns (true, false).
  5. // Otherwise, fills in *ep with an element and returns (true, true).
  6. // A non-nil ep must point to the heap or the caller's stack.
  7. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  8. // raceenabled: don't need to check ep, as it is always on the stack
  9. // or is new memory allocated by reflect.
  10. if debugChan {
  11. print("chanrecv: chan=", c, "\n")
  12. }
  13. if c == nil {
  14. if !block {
  15. return
  16. }
  17. gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  18. throw("unreachable")
  19. }
  20. // Fast path: check for failed non-blocking operation without acquiring the lock.
  21. if !block && empty(c) {
  22. // After observing that the channel is not ready for receiving, we observe whether the
  23. // channel is closed.
  24. //
  25. // Reordering of these checks could lead to incorrect behavior when racing with a close.
  26. // For example, if the channel was open and not empty, was closed, and then drained,
  27. // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
  28. // we use atomic loads for both checks, and rely on emptying and closing to happen in
  29. // separate critical sections under the same lock. This assumption fails when closing
  30. // an unbuffered channel with a blocked send, but that is an error condition anyway.
  31. if atomic.Load(&c.closed) == 0 {
  32. // Because a channel cannot be reopened, the later observation of the channel
  33. // being not closed implies that it was also not closed at the moment of the
  34. // first observation. We behave as if we observed the channel at that moment
  35. // and report that the receive cannot proceed.
  36. return
  37. }
  38. // The channel is irreversibly closed. Re-check whether the channel has any pending data
  39. // to receive, which could have arrived between the empty and closed checks above.
  40. // Sequential consistency is also required here, when racing with such a send.
  41. if empty(c) {
  42. // The channel is irreversibly closed and empty.
  43. if raceenabled {
  44. raceacquire(c.raceaddr())
  45. }
  46. if ep != nil {
  47. typedmemclr(c.elemtype, ep)
  48. }
  49. return true, false
  50. }
  51. }
  52. var t0 int64
  53. if blockprofilerate > 0 {
  54. t0 = cputicks()
  55. }
  56. lock(&c.lock)
  57. if c.closed != 0 && c.qcount == 0 {
  58. if raceenabled {
  59. raceacquire(c.raceaddr())
  60. }
  61. unlock(&c.lock)
  62. if ep != nil {
  63. typedmemclr(c.elemtype, ep)
  64. }
  65. return true, false
  66. }
  67. if sg := c.sendq.dequeue(); sg != nil {
  68. // Found a waiting sender. If buffer is size 0, receive value
  69. // directly from sender. Otherwise, receive from head of queue
  70. // and add sender's value to the tail of the queue (both map to
  71. // the same buffer slot because the queue is full).
  72. recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
  73. return true, true
  74. }
  75. if c.qcount > 0 {
  76. // Receive directly from queue
  77. qp := chanbuf(c, c.recvx)
  78. if raceenabled {
  79. raceacquire(qp)
  80. racerelease(qp)
  81. }
  82. if ep != nil {
  83. typedmemmove(c.elemtype, ep, qp)
  84. }
  85. typedmemclr(c.elemtype, qp)
  86. c.recvx++
  87. if c.recvx == c.dataqsiz {
  88. c.recvx = 0
  89. }
  90. c.qcount--
  91. unlock(&c.lock)
  92. return true, true
  93. }
  94. if !block {
  95. unlock(&c.lock)
  96. return false, false
  97. }
  98. // no sender available: block on this channel.
  99. gp := getg()
  100. mysg := acquireSudog()
  101. mysg.releasetime = 0
  102. if t0 != 0 {
  103. mysg.releasetime = -1
  104. }
  105. // No stack splits between assigning elem and enqueuing mysg
  106. // on gp.waiting where copystack can find it.
  107. mysg.elem = ep
  108. mysg.waitlink = nil
  109. gp.waiting = mysg
  110. mysg.g = gp
  111. mysg.isSelect = false
  112. mysg.c = c
  113. gp.param = nil
  114. c.recvq.enqueue(mysg)
  115. gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
  116. // someone woke us up
  117. if mysg != gp.waiting {
  118. throw("G waiting list is corrupted")
  119. }
  120. gp.waiting = nil
  121. gp.activeStackChans = false
  122. if mysg.releasetime > 0 {
  123. blockevent(mysg.releasetime-t0, 2)
  124. }
  125. closed := gp.param == nil
  126. gp.param = nil
  127. mysg.c = nil
  128. releaseSudog(mysg)
  129. return true, !closed
  130. }

上述代码覆盖多个场景,部分分析如下:
recvNoBlock.png

相关代码如下:

  1. // recv processes a receive operation on a full channel c.
  2. // There are 2 parts:
  3. // 1) The value sent by the sender sg is put into the channel
  4. // and the sender is woken up to go on its merry way.
  5. // 2) The value received by the receiver (the current G) is
  6. // written to ep.
  7. // For synchronous channels, both values are the same.
  8. // For asynchronous channels, the receiver gets its data from
  9. // the channel buffer and the sender's data is put in the
  10. // channel buffer.
  11. // Channel c must be full and locked. recv unlocks c with unlockf.
  12. // sg must already be dequeued from c.
  13. // A non-nil ep must point to the heap or the caller's stack.
  14. func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  15. if c.dataqsiz == 0 {
  16. //无缓冲区
  17. if raceenabled {
  18. racesync(c, sg)
  19. }
  20. if ep != nil {
  21. // copy data from sender
  22. recvDirect(c.elemtype, sg, ep)
  23. }
  24. } else {
  25. // 缓冲区满员
  26. // Queue is full. Take the item at the
  27. // head of the queue. Make the sender enqueue
  28. // its item at the tail of the queue. Since the
  29. // queue is full, those are both the same slot.
  30. qp := chanbuf(c, c.recvx)
  31. if raceenabled {
  32. raceacquire(qp)
  33. racerelease(qp)
  34. raceacquireg(sg.g, qp)
  35. racereleaseg(sg.g, qp)
  36. }
  37. // copy data from queue to receiver
  38. if ep != nil {
  39. typedmemmove(c.elemtype, ep, qp)
  40. }
  41. // copy data from sender to queue
  42. typedmemmove(c.elemtype, qp, sg.elem)
  43. c.recvx++
  44. if c.recvx == c.dataqsiz {
  45. c.recvx = 0
  46. }
  47. c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  48. }
  49. sg.elem = nil
  50. gp := sg.g
  51. unlockf()
  52. gp.param = unsafe.Pointer(sg)
  53. if sg.releasetime != 0 {
  54. sg.releasetime = cputicks()
  55. }
  56. goready(gp, skip+1)
  57. }

recvBlock.png

相关代码如下:

  1. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  2. ...
  3. // 进入阻塞模式,将 Goroutine 被存储在一个新的 sudog 结构实例中
  4. // no sender available: block on this channel.
  5. gp := getg()
  6. mysg := acquireSudog()
  7. mysg.releasetime = 0
  8. if t0 != 0 {
  9. mysg.releasetime = -1
  10. }
  11. // No stack splits between assigning elem and enqueuing mysg
  12. // on gp.waiting where copystack can find it.
  13. mysg.elem = ep
  14. mysg.waitlink = nil
  15. gp.waiting = mysg
  16. mysg.g = gp
  17. mysg.isSelect = false
  18. mysg.c = c
  19. gp.param = nil
  20. c.recvq.enqueue(mysg)
  21. gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
  22. // someone woke us up
  23. if mysg != gp.waiting {
  24. throw("G waiting list is corrupted")
  25. }
  26. gp.waiting = nil
  27. gp.activeStackChans = false
  28. if mysg.releasetime > 0 {
  29. blockevent(mysg.releasetime-t0, 2)
  30. }
  31. closed := gp.param == nil
  32. gp.param = nil
  33. mysg.c = nil
  34. releaseSudog(mysg)
  35. return true, !closed
  36. }

Channel 的关闭

在之前,我们需要先了解一下 Channel 和 Goroutine 的垃圾回收(GC)。一个 Goroutine 只有在退出后,其上分配的内存等才能自动释放,此 Goroutine 才能被垃圾回收。 Channel 的发送和等待队列不为空,也不能被垃圾回收。这就意味着阻塞让 Channel 和 Goroutine 都不会被垃圾回收。一个好的设计,要考虑到它们的垃圾回收。Goroutine 垃圾回收的前提是如何退出,没有一个强制的办法来销毁 Goroutine,只能主动退出。在很多场景中,Channel 的关闭能唤醒挂起的 Goroutine,让其走完剩下的旅程。进而问题变成了:如何关闭 Channel。(Channel 关闭还是不关闭,与Channel 本身的垃圾回收没有任何关系,关闭不关闭只是 Channel 的一种状态。)

我们先看一个反面的例子:

  1. func leak() {
  2. ch := make(chan struct{})
  3. go func() {
  4. ch <- struct{}{}
  5. fmt.Println("永远到不了这里了")
  6. }()
  7. }

定义一个函数 leak,该函数生成一个 channel 类型为 struct{},一个新的 Goroutine 生成在第4行,即使函数 leak 返回了,只要没有接收 Goroutine,第4行生成的这个 Goroutine 会永远阻塞下去。第6行的 fmt.Println 永远不会被执行。我们可以称这个 Goroutine 存在 leak。在我们的日常设计中,稍有疏忽就会产生大量的被遗忘,永远阻塞在哪的发送或者接收 Goroutine。没有什么太好的办法能完全避免这种情况,只能小心设计,多做 code review。一个快速简单的解决如下:

  1. // 使用带 buffered channel
  2. ch := make(chan struct{}, 1)

我们看看关闭 channel 的代码:

  1. func closechan(c *hchan) {
  2. if c == nil {
  3. panic(plainError("close of nil channel"))
  4. }
  5. lock(&c.lock)
  6. if c.closed != 0 {
  7. // 不能再次关闭, 否则 panic
  8. unlock(&c.lock)
  9. panic(plainError("close of closed channel"))
  10. }
  11. if raceenabled {
  12. callerpc := getcallerpc()
  13. racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
  14. racerelease(c.raceaddr())
  15. }
  16. c.closed = 1
  17. var glist gList
  18. // release all readers
  19. for {
  20. sg := c.recvq.dequeue()
  21. if sg == nil {
  22. break
  23. }
  24. if sg.elem != nil {
  25. typedmemclr(c.elemtype, sg.elem)
  26. sg.elem = nil
  27. }
  28. if sg.releasetime != 0 {
  29. sg.releasetime = cputicks()
  30. }
  31. gp := sg.g
  32. gp.param = nil
  33. if raceenabled {
  34. raceacquireg(gp, c.raceaddr())
  35. }
  36. glist.push(gp)
  37. }
  38. // release all writers (they will panic)
  39. for {
  40. sg := c.sendq.dequeue()
  41. if sg == nil {
  42. break
  43. }
  44. sg.elem = nil
  45. if sg.releasetime != 0 {
  46. sg.releasetime = cputicks()
  47. }
  48. gp := sg.g
  49. gp.param = nil
  50. if raceenabled {
  51. raceacquireg(gp, c.raceaddr())
  52. }
  53. glist.push(gp)
  54. }
  55. unlock(&c.lock)
  56. // Ready all Gs now that we've dropped the channel lock.
  57. for !glist.empty() {
  58. gp := glist.pop()
  59. gp.schedlink = 0
  60. goready(gp, 3)
  61. }
  62. }
  • 再次关闭已经关闭的 Channel 会 panic
  • 遍历接收等待队列,清除数据和信息,将 sudog 的 Goroutine 加入到 glist.push(gp)
  • 遍历发送等待队列,清除数据和信息,将 sudog 的 Goroutine 加入到 glist.push(gp)
  • goready 将 glist 中所有 Goroutine 的状态设置为 runnable,可被调度器进行调度
  • 被调度运行的 Goroutine 继续它的发送和接收旅程

简单的可以把关闭 Channel 分为两部分,

  1. 在一个 Goroutine 里如何关闭 Channel。
  2. 其它的 Goroutines 如何响应这个事件,包括正在发送和接收,或者即将发送和接收等。

安全关闭 Channel
关闭再关闭会 panic,围绕这个诞生出很多方法。
safeclosed.png

响应关闭 Channel
三种场景如下:

简析 Channel - 图10

简析 Channel - 图11

简析 Channel - 图12

简单总结一下 Channel 的操作以及其影响

opt.png

参考链接: