本篇文章基于go1.18版本。 参考了halffrost的博客。https://halfrost.com/

简介

不要通过共享内存进行通信。建议,通过通信来共享内存。(Do not communicate by sharing memory; instead, share memory by communicating)这是 Go 语言并发的哲学座右铭。相比与其他语言,go实现了channel来进行通信,下面我们来看一下channel是如何实现的。

基本结构

channel的底层文件是 src/runtime/chan.go 。

  1. type hchan struct {
  2. qcount uint // 队列中所有数据总数
  3. dataqsiz uint // 环形队列的 size
  4. buf unsafe.Pointer // 指向 dataqsiz 长度的数组(环形队列)
  5. elemsize uint16 // 元素大小
  6. closed uint32
  7. elemtype *_type // 元素类型
  8. sendx uint // 已发送的元素在环形队列中的位置
  9. recvx uint // 已接收的元素在环形队列中的位置
  10. recvq waitq // 接收者的等待队列(双向链表)
  11. sendq waitq // 发送者的等待队列(双向链表)
  12. lock mutex //保护资源
  13. }

image.png
图1 channel基本结构

channel 最核心的数据结构是 sudog。sudog 代表了一个在等待队列中的 g。sudog 是 Go 中非常重要的数据结构,因为 g 与同步对象关系是多对多的。一个 g 可以出现在许多等待队列上,因此一个 g 可能有很多sudog。并且多个 g 可能正在等待同一个同步对象,因此一个对象可能有许多 sudog。sudog 是从特殊池中分配出来的。使用 acquireSudog 和 releaseSudog 分配和释放它们。

  1. type sudog struct {
  2. g *g
  3. next *sudog
  4. prev *sudog
  5. elem unsafe.Pointer // 指向数据 (可能指向栈)
  6. acquiretime int64
  7. releasetime int64
  8. ticket uint32
  9. isSelect bool
  10. success bool
  11. parent *sudog // semaRoot 二叉树
  12. waitlink *sudog // g.waiting 列表或者 semaRoot
  13. waittail *sudog // semaRoot
  14. c *hchan // channel
  15. }

创建channel

创建 channel 的主要实现在 makechan() 函数中:

  1. func makechan(t *chantype, size int) *hchan {
  2. elem := t.elem
  3. // 编译器检查数据项大小不能超过 64KB
  4. if elem.size >= 1<<16 {
  5. throw("makechan: invalid channel element type")
  6. }
  7. // 检查对齐是否正确
  8. if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  9. throw("makechan: bad alignment")
  10. }
  11. // 缓冲区大小检查,判断是否溢出
  12. mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  13. if overflow || mem > maxAlloc-hchanSize || size < 0 {
  14. panic(plainError("makechan: size out of range"))
  15. }
  16. var c *hchan
  17. switch {
  18. case mem == 0:
  19. // 队列或者元素大小为 zero 时
  20. c = (*hchan)(mallocgc(hchanSize, nil, true))
  21. // Race 竞争检查利用这个地址来进行同步操作
  22. c.buf = c.raceaddr()
  23. case elem.ptrdata == 0:
  24. // 元素不包含指针时。一次分配 hchan 和 buf 的内存。
  25. c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  26. c.buf = add(unsafe.Pointer(c), hchanSize)
  27. default:
  28. // 元素包含指针时
  29. c = new(hchan)
  30. c.buf = mallocgc(mem, elem, true)
  31. }
  32. // 设置属性
  33. c.elemsize = uint16(elem.size)
  34. c.elemtype = elem
  35. c.dataqsiz = uint(size)
  36. lockInit(&c.lock, lockRankHchan)
  37. if debugChan {
  38. print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
  39. }
  40. return c
  41. }

发送数据

walkSend() 函数中主要逻辑调用了 chansend1(),而 chansend1() 只是 chansend() 的“外壳”。所以 channel 发送数据的核心实现在 chansend() 中。根据阻塞与否,chansend可以分为四个部分:异常检查、同步发送、异步发送、阻塞发送。

异常检查

  1. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  2. // 判断 channel 是否为 nil
  3. if c == nil {
  4. if !block {
  5. return false
  6. }
  7. gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  8. throw("unreachable")
  9. }
  10. if debugChan {
  11. print("chansend: chan=", c, "\n")
  12. }
  13. if raceenabled {
  14. racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
  15. }
  16. // 简易快速的检查
  17. if !block && c.closed == 0 && full(c) {
  18. return false
  19. }
  20. ......
  21. }

同步发送


异步发送



阻塞发送


接收数据



关闭数据