前言

参考 陈剑煜的博客,纯笔记,转载请尊重原创。

为何要控制

Go 最大的优势就是超强的并发能力,但是一次性开启过多 goroutine 也会带来严重的问题。
在个人电脑上运行下面的程序就会导致程序卡死。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "math"
  6. )
  7. func main() {
  8. userCount := math.MaxInt64
  9. for i := 0; i < userCount; i++ {
  10. go func(i int){
  11. time.Sleep(time.Second) // 睡眠模拟处理逻辑
  12. fmt.Printf("goroutine %d done\n", i) // 处理结束
  13. }(i)
  14. }
  15. }

假设 userCount 是传入的参数,大小不可预知,同时开启 userCount 个 goroutine 处理同一件事,看起来效率会很高,但是当出现极端情况时(userCount 很大时,如 math.MaxInt64)会出现如下问题:

  • CPU 使用率上涨
  • 内存 / 虚拟内存 占用上涨
  • 主协程崩溃。

如何控制

控制细分两种:

  1. 控制同时产生 goroutine 的数量
  2. 控制同时运行的 goroutine 的数量

两者只有细微的差别,下面将重点探讨第 2 种情况。

channel 方案

利用 channel 的阻塞

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. userCount := 10
  8. ch := make(chan bool, 2) // 管道容量为 2
  9. for i := 0; i < userCount; i++ {
  10. // ch <- true // place 1
  11. go work(ch, i)
  12. }
  13. // time.Sleep(time.Second) // 主协程不等待
  14. }
  15. func work(ch chan bool, i int) {
  16. ch <- true // place 2
  17. fmt.Printf("goroutine %d done\n", i) // 处理结束
  18. <- ch
  19. }

代码分析:

  • 主协程一次性开启了 10 个子协程,每个子协程运行前都会往管道里发送一个数据,结束后接收一个数据,因此任意时刻最多只有两个子协程在运行。
  • 注意到代码中往管道发送数据的两个 place
    • 若放在 place 1,则主协程任意时刻只能开启两个子协程,并且这两个子协程都在运行。
    • 若放在 place 2,则主协程一次性开启了 10 个子协程,但是任意时刻最多只有两个子协程在运行。

理论上会输出 10 行,但是运行该程序可能没有输出,也可能输出不足 10 行,原因是主协程在子协程执行完之前就终止了,子协程也随之终止(在这里让主协程等待 1 秒可以让全部子协程执行完毕)。
由此可见,纯 channel 不能很好地控制 goroutine 的并发数量,主要缺点是主协程不知道最后一个子协程什么时候结束,这时候我们可以引入 sync.WaitGroup 来完善。

channel + sync.WaitGroup 方案

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var wg sync.WaitGroup
  8. func main() {
  9. userCount := 10
  10. ch := make(chan bool, 2)
  11. for i := 0; i < userCount; i++ {
  12. wg.Add(1)
  13. go work(ch, i)
  14. }
  15. wg.Wait()
  16. }
  17. func work(ch chan bool, i int) {
  18. ch <- true
  19. time.Sleep(time.Second) // 睡眠模拟处理逻辑
  20. fmt.Printf("goroutine %d done\n", i) // 处理结束
  21. wg.Done()
  22. <- ch
  23. }

运行程序输出

  1. goroutine 0 done
  2. goroutine 4 done
  3. // 等待一下
  4. goroutine 5 done
  5. goroutine 9 done
  6. // 等待一下
  7. goroutine 6 done
  8. goroutine 7 done
  9. // 等待一下
  10. goroutine 2 done
  11. goroutine 8 done
  12. // 等待一下
  13. goroutine 3 done
  14. goroutine 1 done

确实做到了任意时刻只有两个 goroutine 在处理业务逻辑。
但是很多业务场景需要更好的灵活性,至少允许改变并发运行 goroutine 的数量,通常灵活是相对的,一般会设置并发量的最大。

灵活的 channel + sync.WaitGroup 方案

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. // "time"
  7. )
  8. var wg sync.WaitGroup
  9. func main() {
  10. userCount := 10
  11. ch := make(chan int, 5) // 最多支持 5 个 goroutine 并发运行
  12. for i := 0; i < userCount; i++ {
  13. wg.Add(1)
  14. go work(ch, i)
  15. }
  16. for i := 0; i < 10; i++ {
  17. // 任意时刻最多有 2 个 goroutine 并发运行
  18. ch <- i
  19. ch <- i
  20. // ch <- i
  21. time.Sleep(time.Second)
  22. }
  23. close(ch) // 主协程主动关闭管道
  24. wg.Wait()
  25. }
  26. func work(ch chan int, i int) {
  27. defer wg.Done()
  28. for data := range ch {
  29. fmt.Printf("goroutine %d get data %d\n", i, data)
  30. }
  31. }

代码分析:

  • 管道的容量为 5 可以认为最多支持 5 个 goroutine 并发运行,控制一次往管道里发送数据的数量,就可以控制并发量。
  • 主协程必须主动关闭管道,否则将导致子协程一直阻塞等待从管道接收数据,进而导致死锁。

这只是一个最简单的灵活控制并发量的方案,有一点 goroutine pool(协程池)的雏形,目前有很多第三方库实现了很完善的 goroutine pool

这些都是目前比较成熟的第三方库,是生成和管理 goroutine pool 的工具。