Go语言在设计上对同步(Synchronization,数据同步和线程同步)提供大量的支持,比如 goroutine和channel同步原语,库层面有

  • sync:提供基本的同步原语(比如Mutex、RWMutex、Locker)和 工具类(Once、WaitGroup、Cond、Pool、Map)
  • sync/atomic:提供变量的原子操作(基于硬件指令 compare-and-swap)

WaitGroup 是什么?

WaitGroup 是 Go 内置的 sync 包解决任务编排的并发原语。WaitGroup 直译是“等待组”,翻译成大白话就是等待一组协程完成任务。如果没有完成,就阻塞。
举个例子,我们要计算100万个数的和,并对这个和求根号。常规的思路肯定是先一个 for 循环计算总和,再开根号,但是这样效率很低。我们可以起1000个协程,每个协程计算1000个数的和,然后再对这些和求和,最后再开个根号。
这里有一个问题,计算根号的时候,需要等所有并发的协程都计算完才行,WaitGroup 就是解决等所有并发协程完成计算的问题的。

WaitGroup 用法

WaitGroup 的用法很简单。标准库中的 WaitGroup 只有三个方法,分别是:

  1. type WaitGroup struct {
  2. // Has unexported fields.
  3. }
  4. func (wg *WaitGroup) Add(delta int)
  5. func (wg *WaitGroup) Done()
  6. func (wg *WaitGroup) Wait()
  • Add:用来设置 WaitGroup 的计数值,delta 可正可负
  • Done:用来将 WaitGroup 的计数值减一,其实就是调用了 Add(-1)。
  • Wait:阻塞等待,直到 WaitGroup 的计数值变成0,进入下一步。

三个方法:

  • Add:在每次调用 goroutine 前调用,比如有两个 goroutine,Add(2)或者两次 Add(1)都可以,设置计数器为 2,表示等待这两个 goroutine 执行结束。
  • Done:每个 goroutine 执行完之后,需要调用 Done 来表示真正执行完成,计数器减 1.
  • Wait:和 join 类似,阻塞主进程,等待所有的 goroutine 执行完成时,即计数器减为 0 时,取消阻塞。

通俗来说,Add()用来增加要等待的 goroutine 的数量,Done()用来减少计数,表示执行完成,Wait()表示等待所有需要等待的 goroutine 全部执行完成。


为什么要用WaitGroup?

WaitGroup解决了并发等待的问题。在使用groutine执行任务时,经常需要等待goroutine全部执行完成后再执行下一步。WaitGroup并发原语非常容易的解决了这个问题。

启动1个 goroutine

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func p() {
  8. for {
  9. fmt.Println("I am p method")
  10. }
  11. }
  12. func main1() {
  13. // 子协程
  14. go p()
  15. // 主协程
  16. // 主死随从
  17. fmt.Println("I am main method")
  18. }

以上代码只会打印main 方法中的 I am main method,打印完后,程序结束。
go p() 还没有来得及执行。也叫主死随从,主线程结束,子线程跟着结束。

启动N个 goroutine

  1. func print4(j int) {
  2. fmt.Println("I am p method", j)
  3. }
  4. func main() {
  5. for i:=0; i< 10; i++ {
  6. go print4(i)
  7. }
  8. fmt.Println("I am main method")
  9. }
  10. /**
  11. I am p method 5
  12. I am p method 4
  13. I am p method 8
  14. I am p method 0
  15. I am p method 2
  16. I am p method 3
  17. I am p method 1
  18. I am p method 6
  19. I am main method
  20. */

启动了10个线程,但并不是每次的执行结果都会打印10次。那怎么保证每次执行所有的goroutine都会被执行到呢?
解决方法, 在主协程里:

  1. 加定时器,但是不能确定需要等多长时间,只能把时间设置很长
  2. 利用 for{} 无限循环 ```go func print4(j int) { fmt.Println(“I am p method”, j) }

func main() { for i:=0; i< 10; i++ { go print4(i) }

  1. fmt.Println("I am main method")
  2. // 10 s
  3. time.Sleep(time.Second * 10)
  4. // 或者
  5. // 无限循环
  6. for{}

}

  1. 但这种写法简单的业务逻辑还好,复杂场景呢?性能消耗也大,而且也并优雅。
  2. <a name="sMIIx"></a>
  3. ### 使用 WaitGroup
  4. 在文章开头我们已经说到,一个 WaitGroup 对象可以等待一组协程结束。代码如下
  5. ```go
  6. func print(n int, wg *sync.WaitGroup) {
  7. defer wg.Done() // 减1
  8. fmt.Println("I am p method", n)
  9. }
  10. func main() {
  11. var wgg sync.WaitGroup
  12. wgg.Add(10) // 总共有多少个
  13. for i:=0; i< 10; i++ {
  14. go print(i, &wgg)
  15. }
  16. // 阻塞主协程
  17. wgg.Wait()
  18. fmt.Println("I am main method")
  19. }
  20. /**
  21. I am p method 6
  22. I am p method 0
  23. I am p method 7
  24. I am p method 1
  25. I am p method 8
  26. I am p method 9
  27. I am p method 4
  28. I am p method 5
  29. I am p method 3
  30. I am p method 2
  31. I am main method
  32. */

上面10个 goroutine 启动前调用的 Add()方法增加一个计数,在每个 goroutine 内,当函数执行完后调用 Done()表示此 goroutine 已执行完成,计数减1。

在 main 函数里调用 Wait()来进行阻塞。而且10个 goroutine 执行没有先后顺序,是完全随机的,当所有 goroutine 执行完成后,main 主进程才开始继续打印。

注意的是再给 print() 函数传参的时候,使用的是执行类型 *sync.WaitGroup,而不是值类型 sync.WaitGroup,因为所有的 goroutine 使用的是同一个 wg,如果使用的是值类型,是 wg 的拷贝,那么相当于所有的 goroutine 在运行自己的 wg,并不是共享,这时候会造成永远阻塞,也就是死锁。

100万个数之和再开根号

  1. package main
  2. import (
  3. "fmt"
  4. "math"
  5. "sync"
  6. )
  7. // 计算1000个数的和
  8. func compute(m *sync.Mutex, wg *sync.WaitGroup, s, e int, count *int) {
  9. sum := 0
  10. for i := s; i < e; i++ {
  11. sum += i
  12. }
  13. m.Lock()
  14. *count += sum
  15. m.Unlock()
  16. wg.Done()
  17. }
  18. func main() {
  19. var mutex sync.Mutex
  20. var wg sync.WaitGroup
  21. var n int = 1000
  22. var count int
  23. wg.Add(n)
  24. for i := 0; i < n; i++ {
  25. go compute(&mutex, &wg, i*n+1, (i+1)*n+1, &count)
  26. }
  27. wg.Wait()
  28. fmt.Println(math.Sqrt(float64(count))) // 707107.1347398497
  29. return
  30. }

别踩坑

记数器设置为负值

WaitGroup计数值必须大于等于0,否则会panic

  1. func main() {
  2. wg := sync.WaitGroup{}
  3. wg.Add(-1)
  4. }

调用Done的次数过多,导致计数值小于0,引发panic

  1. func main() {
  2. wg := sync.WaitGroup{}
  3. wg.Add(1)
  4. wg.Done()
  5. wg.Done()
  6. }

不期望的Add的时机

使用WaitGroup应该在所有的Add方法后调用Wait,否则会有panic或者不期望的结果

  1. func main() {
  2. wg := sync.WaitGroup{}
  3. go doSomething(&wg)
  4. go doSomething(&wg)
  5. wg.Wait()
  6. }
  7. func doSomething(wg *sync.WaitGroup) {
  8. wg.Add(1)
  9. fmt.Println("do something")
  10. wg.Done()
  11. }

上述情况,期望时两个doSomething之后再结束,但是Add是在goroutine中去add的,没有得到想要的结果。一般使用WaitGroup先Add再启动groutine

重用WaitGroup

WaitGroup是允许重用的,但是第二次使用的时候要确保计数值恢复到零值.

  1. func main() {
  2. wg := sync.WaitGroup{}
  3. wg.Add(1)
  4. go func() {
  5. time.Sleep(time.Millisecond)
  6. wg.Done()
  7. wg.Add(1)
  8. }()
  9. wg.Wait()
  10. }

总结

避免错误使用 WaitGroup 的情况,只需要尽量保证下面 5 点就可以了:

  • 不重用 WaitGroup。新建一个 WaitGroup 不会带来多大的资源开销,重用反而更容易出错。
  • 保证所有的 Add 方法调用都在 Wait 之前。
  • 不传递负数给 Add 方法,只通过 Done 来给计数值减 1。
  • 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是一样的。
  • 不遗漏 Done 方法的调用,否则会导致 Wait hang 住无法返回。

参考文章: