前言
参考 陈剑煜的博客,纯笔记,转载请尊重原创。
为何要控制
Go 最大的优势就是超强的并发能力,但是一次性开启过多 goroutine 也会带来严重的问题。
在个人电脑上运行下面的程序就会导致程序卡死。
package mainimport ("fmt""time""math")func main() {userCount := math.MaxInt64for i := 0; i < userCount; i++ {go func(i int){time.Sleep(time.Second) // 睡眠模拟处理逻辑fmt.Printf("goroutine %d done\n", i) // 处理结束}(i)}}
假设 userCount 是传入的参数,大小不可预知,同时开启 userCount 个 goroutine 处理同一件事,看起来效率会很高,但是当出现极端情况时(userCount 很大时,如 math.MaxInt64)会出现如下问题:
- CPU 使用率上涨
- 内存 / 虚拟内存 占用上涨
- 主协程崩溃。
如何控制
控制细分两种:
- 控制同时产生 goroutine 的数量
- 控制同时运行的 goroutine 的数量
channel 方案
利用 channel 的阻塞
package mainimport ("fmt""time")func main() {userCount := 10ch := make(chan bool, 2) // 管道容量为 2for i := 0; i < userCount; i++ {// ch <- true // place 1go work(ch, i)}// time.Sleep(time.Second) // 主协程不等待}func work(ch chan bool, i int) {ch <- true // place 2fmt.Printf("goroutine %d done\n", i) // 处理结束<- ch}
代码分析:
- 主协程一次性开启了 10 个子协程,每个子协程运行前都会往管道里发送一个数据,结束后接收一个数据,因此任意时刻最多只有两个子协程在运行。
- 注意到代码中往管道发送数据的两个 place
- 若放在 place 1,则主协程任意时刻只能开启两个子协程,并且这两个子协程都在运行。
- 若放在 place 2,则主协程一次性开启了 10 个子协程,但是任意时刻最多只有两个子协程在运行。
理论上会输出 10 行,但是运行该程序可能没有输出,也可能输出不足 10 行,原因是主协程在子协程执行完之前就终止了,子协程也随之终止(在这里让主协程等待 1 秒可以让全部子协程执行完毕)。
由此可见,纯 channel 不能很好地控制 goroutine 的并发数量,主要缺点是主协程不知道最后一个子协程什么时候结束,这时候我们可以引入 sync.WaitGroup 来完善。
channel + sync.WaitGroup 方案
package mainimport ("fmt""sync""time")var wg sync.WaitGroupfunc main() {userCount := 10ch := make(chan bool, 2)for i := 0; i < userCount; i++ {wg.Add(1)go work(ch, i)}wg.Wait()}func work(ch chan bool, i int) {ch <- truetime.Sleep(time.Second) // 睡眠模拟处理逻辑fmt.Printf("goroutine %d done\n", i) // 处理结束wg.Done()<- ch}
运行程序输出
goroutine 0 donegoroutine 4 done// 等待一下goroutine 5 donegoroutine 9 done// 等待一下goroutine 6 donegoroutine 7 done// 等待一下goroutine 2 donegoroutine 8 done// 等待一下goroutine 3 donegoroutine 1 done
确实做到了任意时刻只有两个 goroutine 在处理业务逻辑。
但是很多业务场景需要更好的灵活性,至少允许改变并发运行 goroutine 的数量,通常灵活是相对的,一般会设置并发量的最大。
灵活的 channel + sync.WaitGroup 方案
package mainimport ("fmt""sync""time"// "time")var wg sync.WaitGroupfunc main() {userCount := 10ch := make(chan int, 5) // 最多支持 5 个 goroutine 并发运行for i := 0; i < userCount; i++ {wg.Add(1)go work(ch, i)}for i := 0; i < 10; i++ {// 任意时刻最多有 2 个 goroutine 并发运行ch <- ich <- i// ch <- itime.Sleep(time.Second)}close(ch) // 主协程主动关闭管道wg.Wait()}func work(ch chan int, i int) {defer wg.Done()for data := range ch {fmt.Printf("goroutine %d get data %d\n", i, data)}}
代码分析:
- 管道的容量为 5 可以认为最多支持 5 个 goroutine 并发运行,控制一次往管道里发送数据的数量,就可以控制并发量。
- 主协程必须主动关闭管道,否则将导致子协程一直阻塞等待从管道接收数据,进而导致死锁。
这只是一个最简单的灵活控制并发量的方案,有一点 goroutine pool(协程池)的雏形,目前有很多第三方库实现了很完善的 goroutine pool
这些都是目前比较成熟的第三方库,是生成和管理 goroutine pool 的工具。
