Go语言在设计上对同步(Synchronization,数据同步和线程同步)提供大量的支持,比如 goroutine和channel同步原语,库层面有
- sync:提供基本的同步原语(比如Mutex、RWMutex、Locker)和 工具类(Once、WaitGroup、Cond、Pool、Map)
- sync/atomic:提供变量的原子操作(基于硬件指令 compare-and-swap)
WaitGroup 是什么?
WaitGroup 是 Go 内置的 sync 包解决任务编排的并发原语。WaitGroup 直译是“等待组”,翻译成大白话就是等待一组协程完成任务。如果没有完成,就阻塞。
举个例子,我们要计算100万个数的和,并对这个和求根号。常规的思路肯定是先一个 for 循环计算总和,再开根号,但是这样效率很低。我们可以起1000个协程,每个协程计算1000个数的和,然后再对这些和求和,最后再开个根号。
这里有一个问题,计算根号的时候,需要等所有并发的协程都计算完才行,WaitGroup 就是解决等所有并发协程完成计算的问题的。
WaitGroup 用法
WaitGroup 的用法很简单。标准库中的 WaitGroup 只有三个方法,分别是:
type WaitGroup struct {
// Has unexported fields.
}
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
- Add:用来设置 WaitGroup 的计数值,delta 可正可负。
- Done:用来将 WaitGroup 的计数值减一,其实就是调用了 Add(-1)。
- Wait:阻塞等待,直到 WaitGroup 的计数值变成0,进入下一步。
三个方法:
- Add:在每次调用 goroutine 前调用,比如有两个 goroutine,Add(2)或者两次 Add(1)都可以,设置计数器为 2,表示等待这两个 goroutine 执行结束。
- Done:每个 goroutine 执行完之后,需要调用 Done 来表示真正执行完成,计数器减 1.
- Wait:和 join 类似,阻塞主进程,等待所有的 goroutine 执行完成时,即计数器减为 0 时,取消阻塞。
通俗来说,Add()用来增加要等待的 goroutine 的数量,Done()用来减少计数,表示执行完成,Wait()表示等待所有需要等待的 goroutine 全部执行完成。
为什么要用WaitGroup?
WaitGroup解决了并发等待的问题。在使用groutine执行任务时,经常需要等待goroutine全部执行完成后再执行下一步。WaitGroup并发原语非常容易的解决了这个问题。
启动1个 goroutine
package main
import (
"fmt"
"sync"
"time"
)
func p() {
for {
fmt.Println("I am p method")
}
}
func main1() {
// 子协程
go p()
// 主协程
// 主死随从
fmt.Println("I am main method")
}
以上代码只会打印main 方法中的 I am main method,打印完后,程序结束。
go p() 还没有来得及执行。也叫主死随从,主线程结束,子线程跟着结束。
启动N个 goroutine
func print4(j int) {
fmt.Println("I am p method", j)
}
func main() {
for i:=0; i< 10; i++ {
go print4(i)
}
fmt.Println("I am main method")
}
/**
I am p method 5
I am p method 4
I am p method 8
I am p method 0
I am p method 2
I am p method 3
I am p method 1
I am p method 6
I am main method
*/
启动了10个线程,但并不是每次的执行结果都会打印10次。那怎么保证每次执行所有的goroutine都会被执行到呢?
解决方法, 在主协程里:
- 加定时器,但是不能确定需要等多长时间,只能把时间设置很长
- 利用 for{} 无限循环 ```go func print4(j int) { fmt.Println(“I am p method”, j) }
func main() { for i:=0; i< 10; i++ { go print4(i) }
fmt.Println("I am main method")
// 10 s
time.Sleep(time.Second * 10)
// 或者
// 无限循环
for{}
}
但这种写法简单的业务逻辑还好,复杂场景呢?性能消耗也大,而且也并优雅。
<a name="sMIIx"></a>
### 使用 WaitGroup
在文章开头我们已经说到,一个 WaitGroup 对象可以等待一组协程结束。代码如下
```go
func print(n int, wg *sync.WaitGroup) {
defer wg.Done() // 减1
fmt.Println("I am p method", n)
}
func main() {
var wgg sync.WaitGroup
wgg.Add(10) // 总共有多少个
for i:=0; i< 10; i++ {
go print(i, &wgg)
}
// 阻塞主协程
wgg.Wait()
fmt.Println("I am main method")
}
/**
I am p method 6
I am p method 0
I am p method 7
I am p method 1
I am p method 8
I am p method 9
I am p method 4
I am p method 5
I am p method 3
I am p method 2
I am main method
*/
上面10个 goroutine 启动前调用的 Add()方法增加一个计数,在每个 goroutine 内,当函数执行完后调用 Done()表示此 goroutine 已执行完成,计数减1。
在 main 函数里调用 Wait()来进行阻塞。而且10个 goroutine 执行没有先后顺序,是完全随机的,当所有 goroutine 执行完成后,main 主进程才开始继续打印。
注意的是再给 print() 函数传参的时候,使用的是执行类型 *sync.WaitGroup,而不是值类型 sync.WaitGroup,因为所有的 goroutine 使用的是同一个 wg,如果使用的是值类型,是 wg 的拷贝,那么相当于所有的 goroutine 在运行自己的 wg,并不是共享,这时候会造成永远阻塞,也就是死锁。
100万个数之和再开根号
package main
import (
"fmt"
"math"
"sync"
)
// 计算1000个数的和
func compute(m *sync.Mutex, wg *sync.WaitGroup, s, e int, count *int) {
sum := 0
for i := s; i < e; i++ {
sum += i
}
m.Lock()
*count += sum
m.Unlock()
wg.Done()
}
func main() {
var mutex sync.Mutex
var wg sync.WaitGroup
var n int = 1000
var count int
wg.Add(n)
for i := 0; i < n; i++ {
go compute(&mutex, &wg, i*n+1, (i+1)*n+1, &count)
}
wg.Wait()
fmt.Println(math.Sqrt(float64(count))) // 707107.1347398497
return
}
别踩坑
记数器设置为负值
WaitGroup计数值必须大于等于0,否则会panic
func main() {
wg := sync.WaitGroup{}
wg.Add(-1)
}
调用Done的次数过多,导致计数值小于0,引发panic
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
wg.Done()
wg.Done()
}
不期望的Add的时机
使用WaitGroup应该在所有的Add方法后调用Wait,否则会有panic或者不期望的结果
func main() {
wg := sync.WaitGroup{}
go doSomething(&wg)
go doSomething(&wg)
wg.Wait()
}
func doSomething(wg *sync.WaitGroup) {
wg.Add(1)
fmt.Println("do something")
wg.Done()
}
上述情况,期望时两个doSomething之后再结束,但是Add是在goroutine中去add的,没有得到想要的结果。一般使用WaitGroup先Add再启动groutine
重用WaitGroup
WaitGroup是允许重用的,但是第二次使用的时候要确保计数值恢复到零值.
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done()
wg.Add(1)
}()
wg.Wait()
}
总结
避免错误使用 WaitGroup 的情况,只需要尽量保证下面 5 点就可以了:
- 不重用 WaitGroup。新建一个 WaitGroup 不会带来多大的资源开销,重用反而更容易出错。
- 保证所有的 Add 方法调用都在 Wait 之前。
- 不传递负数给 Add 方法,只通过 Done 来给计数值减 1。
- 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是一样的。
- 不遗漏 Done 方法的调用,否则会导致 Wait hang 住无法返回。