前言
参考 陈剑煜的博客,纯笔记,转载请尊重原创。
为何要控制
Go 最大的优势就是超强的并发能力,但是一次性开启过多 goroutine 也会带来严重的问题。
在个人电脑上运行下面的程序就会导致程序卡死。
package main
import (
"fmt"
"time"
"math"
)
func main() {
userCount := math.MaxInt64
for i := 0; i < userCount; i++ {
go func(i int){
time.Sleep(time.Second) // 睡眠模拟处理逻辑
fmt.Printf("goroutine %d done\n", i) // 处理结束
}(i)
}
}
假设 userCount 是传入的参数,大小不可预知,同时开启 userCount 个 goroutine 处理同一件事,看起来效率会很高,但是当出现极端情况时(userCount 很大时,如 math.MaxInt64)会出现如下问题:
- CPU 使用率上涨
- 内存 / 虚拟内存 占用上涨
- 主协程崩溃。
如何控制
控制细分两种:
- 控制同时产生 goroutine 的数量
- 控制同时运行的 goroutine 的数量
channel 方案
利用 channel 的阻塞
package main
import (
"fmt"
"time"
)
func main() {
userCount := 10
ch := make(chan bool, 2) // 管道容量为 2
for i := 0; i < userCount; i++ {
// ch <- true // place 1
go work(ch, i)
}
// time.Sleep(time.Second) // 主协程不等待
}
func work(ch chan bool, i int) {
ch <- true // place 2
fmt.Printf("goroutine %d done\n", i) // 处理结束
<- ch
}
代码分析:
- 主协程一次性开启了 10 个子协程,每个子协程运行前都会往管道里发送一个数据,结束后接收一个数据,因此任意时刻最多只有两个子协程在运行。
- 注意到代码中往管道发送数据的两个 place
- 若放在 place 1,则主协程任意时刻只能开启两个子协程,并且这两个子协程都在运行。
- 若放在 place 2,则主协程一次性开启了 10 个子协程,但是任意时刻最多只有两个子协程在运行。
理论上会输出 10 行,但是运行该程序可能没有输出,也可能输出不足 10 行,原因是主协程在子协程执行完之前就终止了,子协程也随之终止(在这里让主协程等待 1 秒可以让全部子协程执行完毕)。
由此可见,纯 channel 不能很好地控制 goroutine 的并发数量,主要缺点是主协程不知道最后一个子协程什么时候结束,这时候我们可以引入 sync.WaitGroup 来完善。
channel + sync.WaitGroup 方案
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
userCount := 10
ch := make(chan bool, 2)
for i := 0; i < userCount; i++ {
wg.Add(1)
go work(ch, i)
}
wg.Wait()
}
func work(ch chan bool, i int) {
ch <- true
time.Sleep(time.Second) // 睡眠模拟处理逻辑
fmt.Printf("goroutine %d done\n", i) // 处理结束
wg.Done()
<- ch
}
运行程序输出
goroutine 0 done
goroutine 4 done
// 等待一下
goroutine 5 done
goroutine 9 done
// 等待一下
goroutine 6 done
goroutine 7 done
// 等待一下
goroutine 2 done
goroutine 8 done
// 等待一下
goroutine 3 done
goroutine 1 done
确实做到了任意时刻只有两个 goroutine 在处理业务逻辑。
但是很多业务场景需要更好的灵活性,至少允许改变并发运行 goroutine 的数量,通常灵活是相对的,一般会设置并发量的最大。
灵活的 channel + sync.WaitGroup 方案
package main
import (
"fmt"
"sync"
"time"
// "time"
)
var wg sync.WaitGroup
func main() {
userCount := 10
ch := make(chan int, 5) // 最多支持 5 个 goroutine 并发运行
for i := 0; i < userCount; i++ {
wg.Add(1)
go work(ch, i)
}
for i := 0; i < 10; i++ {
// 任意时刻最多有 2 个 goroutine 并发运行
ch <- i
ch <- i
// ch <- i
time.Sleep(time.Second)
}
close(ch) // 主协程主动关闭管道
wg.Wait()
}
func work(ch chan int, i int) {
defer wg.Done()
for data := range ch {
fmt.Printf("goroutine %d get data %d\n", i, data)
}
}
代码分析:
- 管道的容量为 5 可以认为最多支持 5 个 goroutine 并发运行,控制一次往管道里发送数据的数量,就可以控制并发量。
- 主协程必须主动关闭管道,否则将导致子协程一直阻塞等待从管道接收数据,进而导致死锁。
这只是一个最简单的灵活控制并发量的方案,有一点 goroutine pool(协程池)的雏形,目前有很多第三方库实现了很完善的 goroutine pool
这些都是目前比较成熟的第三方库,是生成和管理 goroutine pool 的工具。