Channel 和 Goroutine 是一对共生体,能够开发强大的并发应用。Channel 是 Goroutine-safe 的,能够在多个 Goroutines 传输数据,能够唤醒和挂起 Goroutines。
Channel 的生成

发送和接收数据
我们看看 Channel 及其相关的结构体:
type hchan struct {qcount uint // total data in the queuedataqsiz uint // size of the circular queuebuf unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16closed uint32elemtype *_type // element typesendx uint // send indexrecvx uint // receive indexrecvq waitq // list of recv waiterssendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex}type waitq struct {first *sudoglast *sudog}type sudog struct {// goroutineg *g// isSelect indicates g is participating in a select, so// g.selectDone must be CAS'd to win the wake-up race.isSelect boolnext *sudogprev *sudogelem unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket uint32parent *sudog // semaRoot binary treewaitlink *sudog // g.waiting list or semaRootwaittail *sudog // semaRootc *hchan // channel}
我们以带缓冲的 Channel 为例简示一下数据的发送和接收。
接下来,我们深入一点了解如果发送和接收数据的,在之前,我们需要先了解一下 Goroutines 是如何调度的。Goroutines 是用户态的线程(user-space threads)。它由 Go runtime 自己来管理,而不是由 OS。相比 OS 的线程,它更加轻量级。

部分 Goroutine 的状态:
// _Gidle means this goroutine was just allocated and has not// yet been initialized._Gidle = iota // 0// _Grunnable means this goroutine is on a run queue. It is// not currently executing user code. The stack is not owned._Grunnable // 1// _Grunning means this goroutine may execute user code. The// stack is owned by this goroutine. It is not on a run queue.// It is assigned an M and a P (g.m and g.m.p are valid)._Grunning // 2// _Gsyscall means this goroutine is executing a system call.// It is not executing user code. The stack is owned by this// goroutine. It is not on a run queue. It is assigned an M._Gsyscall // 3// _Gwaiting means this goroutine is blocked in the runtime.// It is not executing user code. It is not on a run queue,// but should be recorded somewhere (e.g., a channel wait// queue) so it can be ready()d when necessary. The stack is// not owned *except* that a channel operation may read or// write parts of the stack under the appropriate channel// lock. Otherwise, it is not safe to access the stack after a// goroutine enters _Gwaiting (e.g., it may get moved)._Gwaiting // 4
- gopark 将 Goroutine 的状态从 _Grunning 设置为 _Gwaiting,离开调度。
- goready 将 Goroutine 的状态从 _Gwaiting 设置为 _Grunnable,等待下一次调度再次执行。

每个 Goroutine 都有这样的一个结构体实例,
type g struct {stack stack // offset known to runtime/cgostackguard0 uintptr // offset known to liblinkstackguard1 uintptr // offset known to liblink_panic *_panic // innermost panic - offset known to liblink_defer *_defer // innermost deferm *m // current m; offset known to arm liblinksched gobufsyscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gcsyscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gcstktopsp uintptr // expected sp at top of stack, to check in tracebackparam unsafe.Pointer // passed parameter on wakeupatomicstatus uint32stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatusgoid int64...waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock ordercgoCtxt []uintptr // cgo traceback contextlabels unsafe.Pointer // profiler labelstimer *timer // cached timer for time.SleepselectDone uint32 // are we participating in a select and did someone win the race?...gcAssistBytes int64}
再回顾一下结构体 sudog 和 waitq,
type sudog struct {g *g //current goroutine...elem unsafe.Pointer // data element (may point to stack)...c *hchan // channel}type waitq struct {first *sudoglast *sudog}
可以说 sudog 结构体实例就代表着一个在等待发送或者接收队列里的 Goroutine。
我们看看发送数据的代码:
/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed. it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(c.raceaddr(), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}if sg := c.recvq.dequeue(); sg != nil {// 直接复制,存在等待接收的 Goroutine// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true}
上述代码覆盖多个场景,部分分析如下:
相关代码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {...if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}...}// send processes a send operation on an empty channel c.// The value ep sent by the sender is copied to the receiver sg.// The receiver is then woken up to go on its merry way.// Channel c must be empty and locked. send unlocks c with unlockf.// sg must already be dequeued from c.// ep must be non-nil and point to the heap or the caller's stack.func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if raceenabled {if c.dataqsiz == 0 {racesync(c, sg)} else {// Pretend we go through the buffer, even though// we copy directly. Note that we need to increment// the head/tail locations only when raceenabled.qp := chanbuf(c, c.recvx)raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}}if sg.elem != nil {sendDirect(c.elemtype, sg, ep) // 直接复制sg.elem = nil}gp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1) // 设置 Goroutine 为 runnable}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destination's stack gets copied (shrunk).// So make sure that no preemption points can happen between read & use.dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)// No need for cgo write barrier checks because dst is always// Go memory.memmove(dst, src, t.size)}

相关代码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {...// 进入阻塞模式,将 Goroutine 被存储在一个新的 sudog 结构实例中// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true}
我们看看接收数据的代码:
// chanrecv receives on channel c and writes the received data to ep.// ep may be nil, in which case received data is ignored.// If block == false and no elements are available, returns (false, false).// Otherwise, if c is closed, zeros *ep and returns (true, false).// Otherwise, fills in *ep with an element and returns (true, true).// A non-nil ep must point to the heap or the caller's stack.func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}if c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock. This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed}
上述代码覆盖多个场景,部分分析如下:
相关代码如下:
// recv processes a receive operation on a full channel c.// There are 2 parts:// 1) The value sent by the sender sg is put into the channel// and the sender is woken up to go on its merry way.// 2) The value received by the receiver (the current G) is// written to ep.// For synchronous channels, both values are the same.// For asynchronous channels, the receiver gets its data from// the channel buffer and the sender's data is put in the// channel buffer.// Channel c must be full and locked. recv unlocks c with unlockf.// sg must already be dequeued from c.// A non-nil ep must point to the heap or the caller's stack.func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {//无缓冲区if raceenabled {racesync(c, sg)}if ep != nil {// copy data from senderrecvDirect(c.elemtype, sg, ep)}} else {// 缓冲区满员// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)raceacquireg(sg.g, qp)racereleaseg(sg.g, qp)}// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)}

相关代码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {...// 进入阻塞模式,将 Goroutine 被存储在一个新的 sudog 结构实例中// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed}
Channel 的关闭
在之前,我们需要先了解一下 Channel 和 Goroutine 的垃圾回收(GC)。一个 Goroutine 只有在退出后,其上分配的内存等才能自动释放,此 Goroutine 才能被垃圾回收。 Channel 的发送和等待队列不为空,也不能被垃圾回收。这就意味着阻塞让 Channel 和 Goroutine 都不会被垃圾回收。一个好的设计,要考虑到它们的垃圾回收。Goroutine 垃圾回收的前提是如何退出,没有一个强制的办法来销毁 Goroutine,只能主动退出。在很多场景中,Channel 的关闭能唤醒挂起的 Goroutine,让其走完剩下的旅程。进而问题变成了:如何关闭 Channel。(Channel 关闭还是不关闭,与Channel 本身的垃圾回收没有任何关系,关闭不关闭只是 Channel 的一种状态。)
我们先看一个反面的例子:
func leak() {ch := make(chan struct{})go func() {ch <- struct{}{}fmt.Println("永远到不了这里了")}()}
定义一个函数 leak,该函数生成一个 channel 类型为 struct{},一个新的 Goroutine 生成在第4行,即使函数 leak 返回了,只要没有接收 Goroutine,第4行生成的这个 Goroutine 会永远阻塞下去。第6行的 fmt.Println 永远不会被执行。我们可以称这个 Goroutine 存在 leak。在我们的日常设计中,稍有疏忽就会产生大量的被遗忘,永远阻塞在哪的发送或者接收 Goroutine。没有什么太好的办法能完全避免这种情况,只能小心设计,多做 code review。一个快速简单的解决如下:
// 使用带 buffered channelch := make(chan struct{}, 1)
我们看看关闭 channel 的代码:
func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed != 0 {// 不能再次关闭, 否则 panicunlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, funcPC(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}}
- 再次关闭已经关闭的 Channel 会 panic
- 遍历接收等待队列,清除数据和信息,将 sudog 的 Goroutine 加入到 glist.push(gp)
- 遍历发送等待队列,清除数据和信息,将 sudog 的 Goroutine 加入到 glist.push(gp)
- goready 将 glist 中所有 Goroutine 的状态设置为 runnable,可被调度器进行调度
- 被调度运行的 Goroutine 继续它的发送和接收旅程
简单的可以把关闭 Channel 分为两部分,
- 在一个 Goroutine 里如何关闭 Channel。
- 其它的 Goroutines 如何响应这个事件,包括正在发送和接收,或者即将发送和接收等。
安全关闭 Channel
关闭再关闭会 panic,围绕这个诞生出很多方法。
响应关闭 Channel
三种场景如下:



简单总结一下 Channel 的操作以及其影响

参考链接:
