本文主要分析 golang 实现并发基础组件 channel 的实现原理;
主要内容分为几个部分
Section1:channel 使用实例分析
Section2:源码分析

channel 主要是为了实现 go 的并发特性,用于并发通信的,也就是在不同的协程单元 goroutine 之间同步通信。

下面主要从三个方面来讲解:

  • make channel,主要也就是 hchan 的数据结构原型;
  • 发送和接收数据时,goroutine 会怎么调度;
  • 设计思考;

1.1 make channel

我们创建 channel 时候有两种,一种是带缓冲的 channel 一种是不带缓冲的 channel。创建方式分别如下:

  1. ch := make(chan Task, 3)
  2. ch := make(chan int)

buffered channel

如果我们创建一个带 buffer 的 channel,底层的数据模型如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图1

当我们向 channel 里面写入数据时候,会直接把数据存入 circular queue(send)。当 Queue 存满了之后就会是如下的状态:
Golang-Channel原理解析_惜暮-CSDN博客 - 图2

当 dequeue 一个元素时候,如下所示:
Golang-Channel原理解析_惜暮-CSDN博客 - 图3

从上图可知,recvx 自增加一,表示出队了一个元素,其实也就是循环数组实现 FIFO 语义。

那么还有一个问题,当我们新建 channel 的时候,底层创建的 hchan 数据结构是在哪里分配内存的呢?其实 Section2 里面源码分析时候已经做了分析,hchan 是在 heap 里面分配的。

如下图所示:
Golang-Channel原理解析_惜暮-CSDN博客 - 图4

当我们使用 make 去创建一个 channel 的时候,实际上返回的是一个指向 channel 的 pointer,所以我们能够在不同的 function 之间直接传递 channel 对象,而不用通过指向 channel 的指针。

1.2 sends and receives

不同 goroutine 在 channel 上面进行读写时,涉及到的过程比较复杂,比如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图5

上图中 G1 会往 channel 里面写入数据,G2 会从 channel 里面读取数据。

G1 作用于底层 hchan 的流程如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图6

  1. 先获取全局锁;
  2. 然后 enqueue 元素 (通过移动拷贝的方式);
  3. 释放锁;

G2 读取时候作用于底层数据结构流程如下图所示:
Golang-Channel原理解析_惜暮-CSDN博客 - 图7

  1. 先获取全局锁;
  2. 然后 dequeue 元素 (通过移动拷贝的方式);
  3. 释放锁;

上面的读写思路其实很简单,除了 hchan 数据结构外,不要通过共享内存去通信;而是通过通信 (复制) 实现共享内存。

写入满 channel 的场景

如下图所示:channel 写入 3 个 task 之后队列已经满了,这时候 G1 再写入第四个 task 的时候会发生什么呢?
Golang-Channel原理解析_惜暮-CSDN博客 - 图8

G1 这时候会暂停直到出现一个 receiver。

这个地方需要介绍一下 Golang 的 scheduler 的。我们知道 goroutine 是用户空间的线程,创建和管理协程都是通过 Go 的 runtime,而不是通过 OS 的 thread。

但是 Go 的 runtime 调度执行 goroutine 却是基于 OS thread 的。如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图9

具体关于 golang 的 scheduler 的原理,可以看前面的一篇博客,关于 go 的 scheduler 原理分析。

当向已经满的 channel 里面写入数据时候,会发生什么呢?如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图10

上图流程大概如下:

  1. 当前 goroutine(G1)会调用 gopark 函数,将当前协程置为 waiting 状态;
  2. 将 M 和 G1 绑定关系断开;
  3. scheduler 会调度另外一个就绪态的 goroutine 与 M 建立绑定关系,然后 M 会运行另外一个 G。

所以整个过程中,OS thread 会一直处于运行状态,不会因为协程 G1 的阻塞而阻塞。最后当前的 G1 的引用会存入 channel 的 sender 队列 (队列元素是持有 G1 的 sudog)。

那么 blocked 的 G1 怎么恢复呢?当有一个 receiver 接收 channel 数据的时候,会恢复 G1。

实际上 hchan 数据结构也存储了 channel 的 sender 和 receiver 的等待队列。数据原型如下:
Golang-Channel原理解析_惜暮-CSDN博客 - 图11

等待队列里面是 sudog 的单链表,sudog 持有一个 G 代表 goroutine 对象引用,elem 代表 channel 里面保存的元素。当 G1 执行ch<-task4的时候,G1 会创建一个 sudog 然后保存进入 sendq 队列,实际上 hchan 结构如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图12

这个时候,如果 G1 进行一个读取 channel 操作,读取前和读取后的变化图如下图:

Golang-Channel原理解析_惜暮-CSDN博客 - 图13

整个过程如下:

  1. G2 调用 t:=<-ch 获取一个元素;
  2. 从 channel 的 buffer 里面取出一个元素 task1;
  3. 从 sender 等待队列里面 pop 一个 sudog;
  4. 将 task4 复制 buffer 中 task1 的位置,然后更新 buffer 的 sendx 和 recvx 索引值;
  5. 这时候需要将 G1 置为 Runable 状态,表示 G1 可以恢复运行;

这个时候将 G1 恢复到可运行状态需要 scheduler 的参与。G2 会调用 goready(G1) 来唤醒 G1。流程如下图所示:
Golang-Channel原理解析_惜暮-CSDN博客 - 图14

  1. 首先 G2 会调用 goready(G1),唤起 scheduler 的调度;
  2. 将 G1 设置成 Runable 状态;
  3. G1 会加入到局部调度器 P 的 local queue 队列,等待运行。

读取空 channel 的场景

当 channel 的 buffer 里面为空时,这时候如果 G2 首先发起了读取操作。如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图15

会创建一个 sudog,将代表 G2 的 sudog 存入 recvq 等待队列。然后 G2 会调用 gopark 函数进入等待状态,让出 OS thread,然后 G2 进入阻塞态。

这个时候,如果有一个 G1 执行读取操作,最直观的流程就是:

  1. 将 recvq 中的 task 存入 buffer;
  2. goready(G2) 唤醒 G2;

但是我们有更加智能的方法:direct send; 其实也就是 G1 直接把数据写入到 G2 中的 elem 中,这样就不用走 G2 中的 elem 复制到 buffer 中,再从 buffer 复制给 G1。如下图:
Golang-Channel原理解析_惜暮-CSDN博客 - 图16

具体过程就是 G1 直接把数据写入到 G2 的栈中。这样 G2 不需要去获取 channel 的全局锁和操作缓冲。

1.3 channel 主要特性

(1)goroutine-safe
hchan mutex

(2)store values, pass in FIFO.
copying into and out of hchan buffer

(3)can cause goroutines to pause and resume.
a)hchan sudog queues
b)calls into the runtime scheduler (gopark, goready)

(4)channel 的高性能所在:
a)调用 runtime scheduler 实现,OS thread 不需要阻塞;
b)跨 goroutine 栈可以直接进行读写;

2.1 channel 数据存储结构

在源码runtime/chan.go 里面定义了 channel 的数据模型,channel 可以理解成一个缓冲队列,这个缓冲队列用来存储元素,并且提供 FIFO 的语义。源码如下:

  1. type hchan struct {
  2. qcount uint
  3. dataqsiz uint
  4. buf unsafe.Pointer
  5. elemsize uint16
  6. closed uint32
  7. elemtype *_type
  8. sendx uint
  9. recvx uint
  10. recvq waitq
  11. sendq waitq
  12. lock mutex
  13. }

channel 的数据结构相对比较简单,主要是两个结构:
1)一个数组实现的环形队列,数组有两个下标索引分别表示读写的索引,用于保存 channel 缓冲区数据。
2)channel 的 send 和 recv 队列,队列里面都是持有 goroutine 的 sudog 元素,队列都是双链表实现的。
3)channel 的全局锁。

2.2 make channel

我们新建一个 channel 的时候一般使用 make(chan, n) 语句,这个语句的执行编译器会重写然后执行 chan.go 里面的 makechan 函数。函数源码如下:

  1. func makechan(t *chantype, size int) *hchan {
  2. elem := t.elem
  3. if elem.size >= 1<<16 {
  4. throw("makechan: invalid channel element type")
  5. }
  6. if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  7. throw("makechan: bad alignment")
  8. }
  9. if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
  10. panic(plainError("makechan: size out of range"))
  11. }
  12. var c *hchan
  13. switch {
  14. case size == 0 || elem.size == 0:
  15. c = (*hchan)(mallocgc(hchanSize, nil, true))
  16. c.buf = unsafe.Pointer(c)
  17. case elem.kind&kindNoPointers != 0:
  18. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
  19. c.buf = add(unsafe.Pointer(c), hchanSize)
  20. default:
  21. c = new(hchan)
  22. c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
  23. }
  24. c.elemsize = uint16(elem.size)
  25. c.elemtype = elem
  26. c.dataqsiz = uint(size)
  27. if debugChan {
  28. print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  29. }
  30. return c
  31. }

函数接收两个参数,一个是 channel 里面保存的元素的数据类型,一个是缓冲的容量 (如果为 0 表示是非缓冲 buffer),创建流程如下:

  • 根据传递的缓冲大小 size 是否为零,分别创建不带 buffer 的 channel 或则带 size 大小的缓冲 channel:

    • 对于不带缓冲 channel,申请一个 hchan 数据结构的内存大小;
    • 对于带缓冲 channel,new 一个 hchan 对象,并初始化 buffer 内存;
  • 更新 chan 中循环队列的关键属性:elemsize、elemtype、dataqsiz。

所以,整个创建 channel 的过程还是比较简单的。

2.3 协程从 channel 接收数据 (goroutine receive data)

所有执行 ep < c 使用 ep 接收 channel 数据的代码,最后都会调用到 chan.go 里面的 chanrecv 函数

函数的定义如下:

  1. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  2. ......
  3. }

从源码注释就可以知道,该函数从 channel 里面接收数据,然后将接收到的数据写入到 ep 指针指向的对象里面。

还有一个参数 block,表示当 channel 无法返回数据时是否阻塞等待。当 block=false 并且 channel 里面没有数据时候,函数直接返回 (false,false)。

接收 channel 的数据的流程如下:

  • CASE1:前置 channel 为 nil 的场景:

    • 如果 block 为非阻塞,直接 return;
    • 如果 block 为阻塞,就调用 gopark() 阻塞当前 goroutine,并抛出异常。
  • 前置场景,block 为非阻塞,且 channel 为非缓冲队列且 sender 等待队列为空 或则 channel 为有缓冲队列但是队列里面元素数量为 0,且 channel 未关闭,这个时候直接 return;
  • 调用 lock(&c.lock) 锁住 channel 的全局锁;
  • CASE2:channel 已经被关闭且 channel 缓冲中没有数据了,这时直接返回 success 和空值;
  • CASE3:sender 队列非空,调用func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) 函数处理:

    • channel 是非缓冲 channel,直接调用 recvDirect 函数直接从 sender recv 元素到 ep 对象,这样就只用复制一次;
    • 对于 sender 队列非空情况下, 有缓冲的 channel 的缓冲队列一定是满的:

        1. 先取 channel 缓冲队列的对头元素复制给 receiver(也就是 ep);
        1. 将 sender 队列的对头元素里面的数据复制到 channel 缓冲队列刚刚弹出的元素的位置,这样缓冲队列就不用移动数据了。
    • 释放 channel 的全局锁;
    • 调用 goready 函数标记当前 goroutine 处于 ready,可以运行的状态;
  • CASE4:sender 队列为空,缓冲队列非空,直接取队列元素,移动头索引;
  • CASE5:sender 队列为空、缓冲队列也没有元素且不阻塞协程,直接 return (false,false);
  • CASE6:sender 队列为空且 channel 的缓存队列为空,将 goroutine 加入 recv 队列,并阻塞。

2.4 协程向 channel 写入数据 (goroutine sender data)

所有执行 c < ep 将 ep 发送到 channel 的代码,最后都会调用到 chan.go 里面的 chansend 函数

函数的定义如下:

  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. ......
  3. }

函数有三个参数,第一个代表 channel 的数据结构,第二个是要指向写入的数据的指针,第三个 block 代表写入操作是否阻塞。

向 channel 写入数据主要流程如下:

  • CASE1:当 channel 为空或者未初始化,如果 block 表示阻塞那么向其中发送数据将会永久阻塞;如果 block 表示非阻塞就会直接 return;
  • CASE2:前置场景,block 为非阻塞,且 channel 没有关闭 (已关闭的 channel 不能写入数据) 且(channel 为非缓冲队列且 receiver 等待队列为空)或则( channel 为有缓冲队列但是队列已满),这个时候直接 return;
  • 调用 lock(&c.lock) 锁住 channel 的全局锁;
  • CASE3:不能向已经关闭的 channel send 数据,会导致 panic。
  • CASE4:如果 channel 上的 recv 队列非空,则跳过 channel 的缓存队列,直接向消息发送给接收的 goroutine:

    • 调用 sendDirect 方法,将待写入的消息发送给接收的 goroutine;
    • 释放 channel 的全局锁;
    • 调用 goready 函数,将接收消息的 goroutine 设置成就绪状态,等待调度。
  • CASE5:缓存队列未满,则将消息复制到缓存队列上,然后释放全局锁;
  • CASE6:缓存队列已满且接收消息队列 recv 为空,则将当前的 goroutine 加入到 send 队列;

    • 获取当前 goroutine 的 sudog,然后入 channel 的 send 队列;
    • 将当前 goroutine 休眠

2.5 channel close 关闭 channel 源码分析

当我们执行 channel 的 close 操作的时候会关闭 channel。

关闭的主要流程如下所示:

  • 获取全局锁;
  • 设置 channel 数据结构 chan 的关闭标志位;
  • 获取当前 channel 上面的读 goroutine 并链接成链表;
  • 获取当前 channel 上面的写 goroutine 然后拼接到前面的读链表后面;
  • 释放全局锁;
  • 唤醒所有的读写 goroutine。
    https://blog.csdn.net/u010853261/article/details/85231944/