Go 并发

goroutine

很多语言(如 Python, C#)都有协程概念,Go 的协程与它们都不同,为了区分以及体现它的独特性,将 Go 的协程称为 goroutine。
直观上它是通过 go 关键字的启动的一个函数。

特点

1. 可增长的栈空间

KLT (Kernel Level Thread,内核级线程,也称操作系统线程,即 OS 线程)有固定大小的栈内存,通常为 2 MB
栈内存空间用于保存那些在调用其他函数时,正在执行或临时暂停的函数中的局部变量。
固定大小的栈内存空间有两个缺点:

  1. 空间浪费。该 KLT 可能执行一些简单的操作就结束了。
  2. 空间不足。该 KLT 可能执行递归深度很深的递归函数,2 MB 完全不够用。

因此 goroutine 采取了一种动态增长的栈空间方式,一开始只有 2 KB 大小,然后按需增长或缩小,栈大小限制可以到达 1 GB。
当然 goroutine 的栈内存空间也是用于保存那些在调用其他函数时,正在执行或临时暂停的函数中的局部变量。

2. 调度代价小

KLT 由内核调度。每隔几毫秒,一个硬件时钟中断发到 CPU,CPU 调用一个称为调度器的内核函数来调度 KLT。
调度过程:调度器暂停当前正在运行的 KLT,并把它的寄存器信息保存到内存,查看线程列表并决定接下来运行哪一个 KLT,再从内存恢复该 KLT 的注册表信息,最后开始执行它。
上下文切换:保存一个 KLT 的状态到内存,恢复另一个 KLT 的状态,更新调度器的数据结构。
调度代价:上下文切换时发生用户态到内核态的转换,因为 KLT 是由内核调度的,要陷入内核态执行内核代码,这个代价是比较高的。
而 goroutine 拥有自己的调度器,不需要内核调度,因此 goroutine 的调度不需要发生用户态到内核态的转换,所以 goroutine 的调度代价非常小。
关于 goroutine 调度模型,详见 Go 并发模型

3. goroutine 没有标识

设计上有 gid,但是不公开。
在大部分支持多线程的操作系统和编程语言中,当前线程都有一个独特的表示,通常是一个整数或指针。利用它可以构建一个局部存储,本质是一个全局的 map,以线程标识为 key,每个线程都能访问这个 map 获取值而不受其他线程干扰。
这样的设计导致局部存储有被滥用的倾向。比如当一个 Web 服务器用一个支持线程局部存储的语言来实现时,很多函数都会通过访问这个存储来查找关于 HTTP 请求的信息,这是一种不健康的“超距作用”,使得函数的行为不仅取决于它的参数(HTTP 请求),还取决于运行它的线程。在线程的标识需要改变的场景下,这些函数的行为就会变得很诡异。

启动 Go 协程

使用关键字 go 调用或执行一个函数或来实现(也可以是匿名或者 lambda 函数)。
这样会在当前 goroutine 的地址空间中给子协程分配独立的栈空间。

Go 协程的两种同步方式

sync 包和 channel 通道。
Go 认为在并发编程中,对共享变量的访问需要精确的控制,这非常复杂且容易出错,因此 Go 把共享变量的值通过信道传递。实际上,多个独立执行的线程从不会主动共享。 在任意给定的时间点,只有一个 Go 协程能够访问该值,数据竞争从设计上就被杜绝了。
为了提倡这种思考方式,Go 把它简化成一句口号:

不要通过共享内存来通信,而应通过通信来共享内存。

sync 包

通过使用 sync 包实现协程同步,Go 不鼓励这么做,而提倡使用 channel。

等待组 sync.WaitGroup

通常用于协程总同步,由主协程来通过 Add 设置需要 Wait 的协程数量,子协程完成后自行调用 Done。
sync.WaitGroup 可以看作是一个任务队列结构,它只有 3 个方法:

  • Add 方法:添加一个任务
  • Done 方法:完成一个任务
  • Wait 方法:等待所有任务完成

使用 Wait 方法后,若有任务尚未完成,则阻塞。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var wg sync.WaitGroup
  7. func main() {
  8. wg.Add(3) // main 协程加入 3 个任务
  9. // 启动 3 个子协程
  10. for i := 0; i < 3; i++ {
  11. go task()
  12. fmt.Printf("%d task done!\n", i)
  13. }
  14. wg.Wait() // 等待所有子协程完成
  15. }
  16. // 模拟子协程执行的任务
  17. func task() {
  18. for i := 0; i < 100000; i++ {
  19. factorial(100)
  20. }
  21. wg.Done() // 子协程完成
  22. }
  23. func factorial(n int) int {
  24. if n <= 0 {
  25. return 1
  26. }
  27. return n * factorial(n-1)
  28. }
  29. // output
  30. 0 task done!
  31. 1 task done!
  32. 2 task done!

注:Done() 与 Add(-1) 效果相同。

乐观锁与悲观锁

一种锁的分类方法。
悲观锁:当一个线程访问共享资源时,总是认为会有其他线程也想访问它,所以会首先获取锁对资源进行上锁。

  • 总的来说,互斥锁,自旋锁,读写锁都是悲观锁。
  • Java 中的 synchronized 关键字。
  • Go 中的 sync.Mutex 和 sync.RWMutex。

乐观锁:当一个线程访问共享资源时,总是认为没有其他线程也想访问它,所以不对资源上锁。

  • 原子性操作实现对共享资源的访问,不需要加锁。

原子操作

原子操作具有一气呵成,不可中断的特性,由 CPU 指令实现。
Go语言的 sync/atomic 提供了对原子操作的支持,用于同步访问整数和指针。

  • Go语言提供的原子操作都是非入侵式的
  • 原子操作支持的类型包括 int32、int64、uint32、uint64、uintptr、unsafe.Pointer。 ```go // CompareAndSwap 系列 func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr int64, old, new int64) (swapped bool) func CompareAndSwapPointer(addr unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool)

解释: addr:内存中共享变量的地址 old:共享变量的副本,也叫预期值 new:需要将共享变量更新成的新值

// 功能:原子比较与交换 if addr == old { addr = new return true } else { return false }

// 通常来说,*addr 总是频繁变化的 // CAS 不能总是执行成功 // 因此需要通过 for 循环来执行 CAS // 不断地尝试原子地更新 value 的值,直到操作成功为止 func addValue(delta int32) { for { v := value // value是一个共享变量,频繁变化 if atomic.CompareAndSwapInt32(&value, v, v+delta) { break } } }

// Swap 系列 func SwapInt32(addr int32, new int32) (old int32) func SwapInt64(addr int64, new int64) (old int64) func SwapPointer(addr unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) func SwapUint32(addr uint32, new uint32) (old uint32) func SwapUint64(addr uint64, new uint64) (old uint64) func SwapUintptr(addr uintptr, new uintptr) (old uintptr)

// 功能:原子交换 tmp := addr addr = new return tmp

// Add 系列 func AddInt32(addr int32, delta int32) (new int32) func AddInt64(addr int64, delta int64) (new int64) func AddUint32(addr uint32, delta uint32) (new uint32) func AddUint64(addr uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

// 功能:原子加 return new = *addr + delta

// Load 系列 func LoadInt32(addr int32) (val int32) func LoadInt64(addr int64) (val int64) func LoadPointer(addr unsafe.Pointer) (val unsafe.Pointer) func LoadUint32(addr uint32) (val uint32) func LoadUint64(addr uint64) (val uint64) func LoadUintptr(addr uintptr) (val uintptr)

// 功能:原子读 return val = *addr

// Store 系列 func StoreInt32(addr int32, val int32) func StoreInt64(addr int64, val int64) func StorePointer(addr unsafe.Pointer, val unsafe.Pointer) func StoreUint32(addr uint32, val uint32) func StoreUint64(addr uint64, val uint64) func StoreUintptr(addr uintptr, val uintptr)

// 功能:原子写 *addr = val

// atomic.Value // A Value must not be copied after first use. type Value struct { v interface{} }

// 两个操作 Store(c):v = c Load(): c = v

  1. <a name="fffbc1fa"></a>
  2. #### 自旋锁
  3. 若一个线程尝试获取自旋锁失败了,那么它陷入**忙等(不释放 CPU)**,直至得到锁。
  4. - Go 没有提供自旋锁。
  5. - 单核 CPU 下,必须是**抢占式**的调度器,否则自旋锁无法使用。因为如果是非抢占式的调度器,当一个线程进入自旋(占着 CPU 不放,等待锁),而持有锁的线程由于得到 CPU 也执行不下去,因此出现死锁。
  6. <a name="fLClK"></a>
  7. #### 互斥锁 sync.Mutex
  8. 用于对共享资源并发访问时进行访问控制。<br />只有 2 个方法:
  9. - Lock:上锁。
  10. - Unlock:解锁。
  11. ```go
  12. package main
  13. import (
  14. "fmt"
  15. "sync"
  16. )
  17. var mutex sync.Mutex // 互斥锁
  18. var wg sync.WaitGroup
  19. var common int = 666 // 模拟共享资源
  20. func main() {
  21. wg.Add(4)
  22. for i := 0; i < 2; i++ {
  23. go write(i)
  24. go read(i)
  25. }
  26. wg.Wait()
  27. }
  28. // 尝试对共享资源进行写操作
  29. func write(x int) {
  30. // 获取锁
  31. fmt.Printf("W%d locking.\n", x)
  32. mutex.Lock()
  33. // 访问共享资源
  34. fmt.Printf("W%d locked\n", x)
  35. common = x
  36. fmt.Printf("W%d has written the common to %d.\n", x, x)
  37. // 解锁
  38. fmt.Printf("W%d unloking.\n", x)
  39. mutex.Unlock()
  40. fmt.Printf("W%d unlocked.\n", x)
  41. wg.Done()
  42. }
  43. // 尝试对共享资源进行读操作
  44. func read(x int) {
  45. // 获取锁
  46. fmt.Printf("R%d locking.\n", x)
  47. mutex.Lock()
  48. // 访问共享资源
  49. fmt.Printf("R%d locked.\n", x)
  50. fmt.Printf("R%d has read the common which is %d.\n", x, common)
  51. fmt.Printf("R%d has read the common.\n", x)
  52. // 解锁
  53. fmt.Printf("R%d unloking.\n", x)
  54. mutex.Unlock()
  55. fmt.Printf("R%d unlocked.\n", x)
  56. wg.Done()
  57. }
  58. // output
  59. R1 locking. // R1 尝试获取锁
  60. R1 locked. // R1 获取锁成功
  61. W1 locking. // W1 尝试获取锁,但由于已被 R1 获取,W1 阻塞
  62. R1 has read the common which is 666. // R1 读共享资源
  63. R0 locking. // R0 尝试获取锁,但由于已被 R1 获取,R0 阻塞
  64. W0 locking. // W0 尝试获取锁,但由于已被 R1 获取,W0 阻塞
  65. R1 unloking.
  66. R1 unlocked.
  67. W1 locked
  68. W1 has written the common to 1.
  69. W1 unloking.
  70. W1 unlocked.
  71. R0 locked.
  72. R0 has read the common which is 1.
  73. R0 unloking.
  74. R0 unlocked.
  75. W0 locked
  76. W0 has written the common to 0.
  77. W0 unloking.
  78. W0 unlocked.

以上是模拟 4 个协程对共享资源 common 进行读或写的过程。
使用互斥锁的经典模式为:

  1. var mutex sync.Mutex
  2. func f() {
  3. mutex.Lock()
  4. defer mutex.Unlock()
  5. // visit
  6. }

上述代码没有采用是为了方便打印执行流程。

读写锁 sync.RWMutex

本质是多读单写互斥锁,适用于读频率高于写频率的场景,它有 4 个方法:

  • Lock:写锁
  • Unlock:写解锁
  • RLock:读锁
  • RUnlock:读解锁

规则:

  1. 写锁定时,尝试写锁定或读锁定都将阻塞。
  2. 读锁定时,尝试写锁定将阻塞,尝试读锁定能够成功,因为可以多读。
  3. 未锁定时,尝试写锁定或读锁定都将引发异常。
  4. 写解锁时,尝试唤醒所有因读锁定而阻塞的协程(保证读优先)。
  5. 读解锁时,尝试唤醒一个因写锁定而阻塞的协程(保证写协程不会饿死)。 ```go package main

import ( “fmt” “sync” “time” )

var rwMutext sync.RWMutex // 读写锁 var wg sync.WaitGroup // 等待组

var data int = 666 // 共享资源

func main() { wg.Add(10) for i := 0; i < 5; i++ { go func(t int) { rwMutext.RLock()
defer rwMutext.RUnlock() fmt.Printf(“R%d read data %d\n”, t, data) wg.Done() time.Sleep(2 * time.Second) }(i)

  1. go func(t int) {
  2. rwMutext.Lock()
  3. defer rwMutext.Unlock()
  4. fmt.Printf("W%d writen data to %d\n", t, t)
  5. wg.Done()
  6. time.Sleep(10 * time.Second)
  7. }(i)
  8. }
  9. wg.Wait()

} // output R1 read data 666 W4 writen data to 4 R2 read data 666 // 支持 R0 read data 666 // 多个 R3 read data 666 // 读 R4 read data 666 // 协程 W1 writen data to 1 W2 writen data to 2 W0 writen data to 0 W3 writen data to 3

  1. 代码模拟了 10 个协程对共享资源 data 进行读写的执行流程,其中读的频率是写频率的 5 倍。
  2. <a name="5Iv0w"></a>
  3. #### sync.Map
  4. [sync.Map 源码分析](https://www.yuque.com/zwjason/golang/sync-map)<br />原生支持并发安全的字典(并发安全,线程安全,通常是一样意思)。<br />**它的 key 和 value 都是 interface{} 类型的。**<br />**用法如下:**
  5. ```go
  6. package main
  7. import (
  8. "fmt"
  9. "sync"
  10. )
  11. func run() {
  12. var m sync.Map
  13. // 保存数据
  14. m.Store(0, 'a')
  15. m.Store("hello", 1)
  16. // 读取数据
  17. if v, ok := m.Load(0); ok {
  18. fmt.Printf("Load successfully, is %v\n", v)
  19. } else {
  20. fmt.Printf("Load failed.\n")
  21. }
  22. // 遍历
  23. f := func(k, v interface{}) bool {
  24. fmt.Printf("key: %v, value: %v\n", v, v)
  25. return true
  26. }
  27. m.Range(f)
  28. // 读取或保存数据
  29. // 存在则读取,不存在则存储
  30. if v, ok := m.LoadOrStore("world", 2); !ok {
  31. fmt.Printf("Load failed, store %v\n", v)
  32. } else {
  33. fmt.Printf("Load successfully, is %v\n", v)
  34. }
  35. // 读取后删除数据
  36. if v, ok := m.LoadAndDelete(0); !ok {
  37. fmt.Printf("Load failed, store %v\n", v)
  38. } else {
  39. fmt.Printf("Load and delete successfully, is %v\n", v)
  40. }
  41. }

为什么说普通的 map 是并发不安全的呢?因为虽然并发读是安全的,但是并发写是不安全的。
尝试开启 100 个协程读,如何运行多少次下面的代码都不会出错。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. var m map[int]struct{}
  7. func run() {
  8. m = make(map[int]struct{})
  9. m[1] = struct{}{}
  10. m[2] = struct{}{}
  11. for i := 0; i < 100; i++ {
  12. go read(i)
  13. //go write(i)
  14. }
  15. time.Sleep(2 * time.Second)
  16. }
  17. func read(key int) {
  18. if v, ok := m[key]; !ok {
  19. fmt.Printf("No value for key %d\n", key)
  20. } else {
  21. fmt.Printf("The value for key %d is %v", key, v)
  22. }
  23. }
  24. /*
  25. func write(key int) {
  26. m[key] = struct{}{}
  27. }
  28. */

加入 100 个协程进行写呢?(去掉上面的注释)
多执行几次就会报错!当读写操作异步执行时,将会出错。
解决方案:

  • 使用互斥锁或读写锁对读写操作进行同步
  • 不用锁,用 sync.Map 替代 map

sync.Once

sync.Once.Do(f func()) 能保证 Do 方法只执行一次。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var once sync.Once
  8. func run() {
  9. for i := 0; i < 3; i++ {
  10. go func() {
  11. once.Do(Say)
  12. fmt.Printf("%d\n", i)
  13. }()
  14. }
  15. time.Sleep(2 * time.Second)
  16. }
  17. func Say() {
  18. fmt.Printf("hello\n")
  19. }
  20. // output
  21. hello
  22. 3
  23. 3
  24. 3

即使启动了三个协程,却只输出一个 hello。
好奇为什么输出的数字都是 3 吗? 因为 main 协程启动三个协程速度太快了,子协程还没来得及打印 i,main 协程就已经执行到 time.Sleep 了,此时 i 已经为 3,你大可以尝试循环 1000 次,你就会发现不全都是 1000。
这里让 main 协程睡眠等待子协程执行完毕。

channel 通道

类型与定义方式

channel 是一种 FIFO 的通道,具有特定类型,只能传送相同类型的数据,用于连接 goroutine

  • var name chan type,如 var ich chan int,其中 type 可以是任意类型,默认值为 nil
  • make 方法,如 ch := make(chan string)
  • 只读通道。 ch := make(<-chan int)
  • 只写通道。 ch := make(chan<- int)

image.png

关闭通道

使用内置的 close 函数。

  1. ch := make(chan int)
  2. // ...
  3. close(ch)

一个通道关闭后,不能再往其中发数据,但是可以从其中接收数据。

无缓冲通道

可以理解为同步通道,不能理解为容量为 1 的通道。它要求发送者和接受者都做好了准备,否则任何一方想要从通道中接收或发送数据都将阻塞。

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. ch := make(chan int)
  7. // ch := make(chan int, 1)
  8. ch <- 1
  9. fmt.Println(<-ch)
  10. }

以上代码会发生死锁:fatal error: all goroutines are asleep - deadlock!
因为这里就一个 main goroutine,由于它自己发送数据时 ch <- 1,没有对应的接受者,所以它会阻塞。
如果 channel 容量为 1,如注释部分,就不会发生死锁,程序正常输出 1,因为有缓冲的通道在没有满之前,往里面发送数据是不会发生阻塞的。
当然启动一个子协程发送也可以解决问题:

  1. go func() {
  2. ch <- 1
  3. }()

由此可见,无缓冲通道天生具有同步协程的作用,发送者和接受者必须都准备好,且一定是发送者先传送,接收者才能接收。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. ch := make(chan int)
  8. go useSum(ch)
  9. go getSum(ch)
  10. time.Sleep(time.Second)
  11. }
  12. func getSum(ch chan int) {
  13. sum := 0
  14. for i := 1; i <= 100000000; i++ {
  15. sum += i
  16. }
  17. ch <- sum
  18. }
  19. func useSum(ch chan int) {
  20. fmt.Println("waiting")
  21. x := <- ch
  22. fmt.Println(x)
  23. fmt.Println("done")
  24. }
  25. // 输出
  26. waiting
  27. 5000000050000000
  28. done

main 协程启动了两个 goroutine,getSum 用来计算 [1, 100000000] 的和,useSum 用来输出这个结果。虽然 useSum 先启动了,但是由于 getSum 还没计算完,useSum 阻塞等待 getSum。

有缓冲通道

ch := make(chan int, capacity)
cap(ch) = capacity
一种 FIFO 管道,从队首取,从队尾放。
当通道已满时,发送者阻塞;当通道已空时,接受者阻塞;不空也不满,都不阻塞。
image.png

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var wg sync.WaitGroup
  8. func main() {
  9. wg.Add(2)
  10. ch := make(chan int, 3)
  11. go useNum(ch)
  12. go getNum(ch)
  13. wg.Wait()
  14. }
  15. func getNum(ch chan int) {
  16. for i := 1; i <= 3; i++ {
  17. ch <- i
  18. ch <- i
  19. ch <- i
  20. time.Sleep(time.Second)
  21. }
  22. close(ch) // 记得关闭
  23. wg.Done()
  24. }
  25. func useNum(ch chan int) {
  26. for x := range ch {
  27. fmt.Println(x)
  28. }
  29. wg.Done()
  30. }
  31. /*
  32. 代码将间隔性地输出 3个数:
  33. 1
  34. 1
  35. 1
  36. // 等待一下
  37. 2
  38. 2
  39. 2
  40. // 等待一下
  41. 3
  42. 3
  43. 3
  44. */

虽然 channel 在垃圾回收机制下会自动回收,并非一定要主动使用 close 关闭,而且很多时候不需要手动关闭。
是否手动关闭 channel 这个问题比较复杂,因为关闭与否都容易出现并发问题。
但是使用 for range 迭代管道获取数据时,一定要记得用 close 关闭 channel,否则将导致死锁,因为 for range 迭代 channel 即 channel 道为空也只是阻塞等待!

select 通道选择器

select 可以等待多个管道操作,基本上哪个管道有数据,就先处理哪个管道的操作。

  • 当没有一个 case 可以运行时,select 将阻塞等待,直到有一个 case 可以运行,运行它并退出,但是可以用 default 来执行默认操作。
  • 若 for 循环中使用 select,则在 select 中使用的 break 只能跳出 select 而不能跳出循环。 ```go // 基本方式 select { case xx: // do something case yy: // do something default: // do something }

// select 执行后退出 for 循环? for { select { case xxx: // …. case yyy: // … break // 只能跳出 select 而不能跳出 for default: // … } }

// 解决方法 1:使用带标签的 for 循环

LOOP: for { select { case xxx: // …. case yyy: // … break LOOP default: // … } }

// 解决方法2:直接 return 适合退出 goroutine 的场景 for { select { case xxx: // …. case yyy: // … return default: // … } }

// 解决方法3:使用标志 haveGetData := false for { select { case xxx: // …. case yyy: // … default: // … } if haveGetData { break } }

  1. 下面启动两个协程 f1 计算 [1, 100] 的和,f2 计算 [1, 100] 的乘积。
  2. ```go
  3. package main
  4. import (
  5. "fmt"
  6. "sync"
  7. )
  8. var wg sync.WaitGroup
  9. func main() {
  10. wg.Add(2)
  11. ch1 := make(chan int)
  12. ch2 := make(chan int)
  13. go f1(ch1)
  14. go f2(ch2)
  15. for i := 0; i < 2; i++ {
  16. select {
  17. case x1 := <- ch1:
  18. fmt.Printf("add for [1, 100] first done, %d\n", x1)
  19. case x2 := <- ch2:
  20. fmt.Printf("times for [1, 100] first done, %d\n", x2)
  21. default:
  22. fmt.Printf("no channel is ready.")
  23. }
  24. }
  25. wg.Wait()
  26. }
  27. func f1(ch chan int) {
  28. sum := 0
  29. for i := 1; i <= 100; i++ {
  30. sum += i
  31. }
  32. ch <- sum
  33. wg.Done()
  34. }
  35. func f2(ch chan int) {
  36. sum := 0
  37. for i := 1; i <= 100; i++ {
  38. sum *= i
  39. }
  40. ch <- sum
  41. wg.Done()
  42. }

多执行几次观察输出,加和未必比乘积快。

goroutine 泄漏

goroutine leak
goroutine 泄漏指的是有些 goroutine 开启后没有正常退出,死循环或永久阻塞了,又不能被垃圾回收机制回收。
这种情况下,若导致了死锁,程序自然试运行不下去的;若没导致死锁,死循环或永久阻塞的 goroutine 必然占用越来越多的资源,只有当主协程结束了,它们才会结束。

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "runtime"
  6. "time"
  7. )
  8. func query() int {
  9. n := rand.Intn(100)
  10. time.Sleep(time.Duration(n) * time.Millisecond)
  11. return n
  12. }
  13. // 每次执行此函数,都会导致有两个goroutine处于阻塞状态
  14. func queryAll() int {
  15. ch := make(chan int)
  16. go func() { ch <- query() }()
  17. go func() { ch <- query() }()
  18. go func() { ch <- query() }()
  19. // <-ch
  20. // <-ch
  21. return <-ch
  22. }
  23. func main() {
  24. // 每次循环都会泄漏两个goroutine
  25. for i := 0; i < 3; i++ {
  26. queryAll()
  27. // main()也是一个主groutine
  28. fmt.Printf("#goroutines: %d\n", runtime.NumGoroutine())
  29. }
  30. }
  31. /* 输出
  32. #goroutines: 3
  33. #goroutines: 5
  34. #goroutines: 7
  35. */

按道理来说,每次执行完 queryAll 返回后,只剩下一个主协程,那为什么会依次打印 3,5,7 呢?
原因是 queryAll 内部同时开启了 3 个协程模拟查询请求,而只使用一个无缓冲 channel 来接收请求,当其中一个查询到后就返回,这将导致另外两个在以后查询到之后,没有 channel 可以发送,永久阻塞。
因此在主协程中每循环一次,就会产生两个永久阻塞的子协程。

解决方案(待完善)
context 包

并发的安全退出

是否手动关闭 channel 是一个比较复杂的问题。

  • 向已关闭的 channel 发送数据会导致 panic;关闭一个已关闭的 channel 会导致 panic。
  • 关闭 channel 后可以继续从其中接收但不能发送数据。
  • 对 nil 值的 channel,发送或接收数据将导致永久阻塞。

关闭时机:发送方发送完了所有数据时,由发送方关闭!

安全退出有 2 种方法

  1. 一对一通知:利用无缓冲 channel + select 实现。
  2. 一对多通知:也称广播通知,利用 close 方法实现。

channel + select 实现

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. quit := make(chan bool)
  8. go func() {
  9. for {
  10. select {
  11. default:
  12. fmt.Printf("working...\n")
  13. case <- quit:
  14. return
  15. }
  16. }
  17. }()
  18. time.Sleep(2 * time.Second)
  19. quit <- true
  20. }

主协程让子协程执行两秒后通过无缓冲通道 quit 发送一个布尔值通知子协程停止。 该方法的缺点是一对一,如果有很多 goroutine 要终止,需要同样数量的 channel 来通知,这不现实。

close 实现
当一个 channel 被关闭后,所有从该 channel 中接收的操作都会收到一个零值和 false 标志。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. quit := make(chan bool)
  8. for i := 0; i < 10; i++ {
  9. go worker(quit)
  10. }
  11. time.Sleep(2 * time.Second)
  12. close(quit)
  13. }
  14. func worker(quit chan bool) {
  15. for {
  16. select {
  17. default:
  18. fmt.Printf("working...\n")
  19. case <- quit:
  20. return
  21. }
  22. }
  23. }

主协程启动了 10 个子协程,并让它们执行两秒后,通过 close 方法关闭了通道,广播式地告诉 10 个子协程应该停止执行。但是子协程收到广播通知关闭后,会进行清理工作,主协程并没有等待清理工作的完成,可以使用 sync.WaitGroup 来改进。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var wg sync.WaitGroup
  8. func main() {
  9. quit := make(chan bool)
  10. for i := 0; i < 10; i++ {
  11. wg.Add(1)
  12. go worker(quit)
  13. }
  14. time.Sleep(2 * time.Second)
  15. close(quit)
  16. wg.Wait()
  17. }
  18. func worker(quit chan bool) {
  19. defer wg.Done()
  20. for {
  21. select {
  22. default:
  23. fmt.Printf("working...\n")
  24. case <- quit:
  25. return
  26. }
  27. }
  28. }

经典用法

for + if 循环从 channel 中接收值

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var wg sync.WaitGroup
  7. func main() {
  8. wg.Add(2)
  9. ch := make(chan int)
  10. go func() {
  11. for i := 0; i < 10; i++ {
  12. ch <- i
  13. }
  14. wg.Done()
  15. close(ch) // 由发送方关闭
  16. }()
  17. go func() {
  18. // 从 channel 中接收值的经典方法
  19. for {
  20. x, ok := <- ch
  21. if !ok {
  22. break
  23. }
  24. fmt.Printf("%d ", x)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }

但是上面的 for + if 方法太过笨拙。

for range 方法迭代 channel
将上面的 for + if 换成以下代码

  1. for x := range ch {
  2. fmt.Printf("%d ", x)
  3. }

注意 for range 迭代 channel,即使 channel 为空,也不会结束,因此需要手动关闭

常见并发模式

单生产者/消费者模型

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. ch := make(chan int)
  8. go producer(3, ch)
  9. go consumer(ch)
  10. time.Sleep(10 * time.Second)
  11. }
  12. func producer(factor int, ch chan int) {
  13. for i := 1; ; i++ {
  14. product := i * factor
  15. fmt.Printf("produce %d\n", product)
  16. ch <- product
  17. time.Sleep(time.Second)
  18. }
  19. }
  20. func consumer(ch chan int) {
  21. for product := range ch {
  22. fmt.Printf("consume %d\n", product)
  23. time.Sleep(time.Second)
  24. }
  25. }

生产者和消费者每生产/消费一个产品,都需要 1 秒时间,主协程等待 10 秒后结束。

多生产者/消费者模型

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. ch1 := make(chan int)
  8. ch2 := make(chan byte)
  9. go producer1(3, ch1)
  10. go producer2(ch2)
  11. go consumer1(ch1, ch2)
  12. go consumer2(ch1, ch2)
  13. time.Sleep(10 * time.Second)
  14. }
  15. func producer1(factor int, ch chan int) {
  16. for i := 1; ; i++ {
  17. product := i * factor
  18. fmt.Printf("1 produce %d\n", product)
  19. ch <- product
  20. time.Sleep(time.Second)
  21. }
  22. }
  23. func producer2(ch chan byte) {
  24. for i := 0; ; i++{
  25. product := byte('a' + i)
  26. fmt.Printf("2 produce %c\n", product)
  27. ch <- product
  28. time.Sleep(time.Second)
  29. }
  30. }
  31. func consumer1(ch1 chan int, ch2 chan byte) {
  32. for {
  33. select {
  34. case num := <- ch1:
  35. fmt.Printf("1 consume %d\n", num)
  36. case charac := <- ch2:
  37. fmt.Printf("1 consume %c\n", charac)
  38. }
  39. }
  40. }
  41. func consumer2(ch1 chan int, ch2 chan byte) {
  42. for {
  43. select {
  44. case num := <- ch1:
  45. fmt.Printf("2 consume %d\n", num)
  46. case charac := <- ch2:
  47. fmt.Printf("2 consume %c\n", charac)
  48. }
  49. }
  50. }

消费者使用 select 通道选择器获取生产者生产的产品。