前言

由于其自以为是的性质和直观的并发代码方法,Golang 变得越来越流行。 并发的最基本用例之一是能够处理值不相互依赖且执行顺序无关的值列表。我们将在下面查看该用例。

问题

让我们设置一个场景。假设我们有一个需要处理的字符串列表。处理将涉及在 0 到 10 微秒之间随机休眠一段时间,然后打印出当前字符串。

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "time"
  6. )
  7. func main() {
  8. list := []string{"a", "b", "c", "d", "e", "f", "g", "h"}
  9. for i := 0; i < len(list); i++ {
  10. d := time.Duration(rand.Intn(10)) * time.Microsecond
  11. time.Sleep(d)
  12. fmt.Printf("Current value is : %s \n", list[i])
  13. }
  14. fmt.Println("Program finished executing")
  15. }

当我们运行这个函数时,输出如下所示:

  1. 当前值为:a
  2. 当前值为:b
  3. 当前值为:c
  4. 当前值为:d
  5. 当前值为:e
  6. 当前值为:f
  7. 当前值为:g
  8. 当前值为:h
  9. 程序执行完毕

这当然是意料之中的。现在让我们看看如何并行化它并将列表中每个元素的执行传递给 goroutine

在 goroutine 中实现函数

A function can be executed in a separate goroutine by using the keyword go . Let us modify our for loop to start a seperate goroutine for each element of the list, and do the same processing inside the goroutine.

  1. for i := 0; i < len(list); i++ {
  2. go func(val string) {
  3. d := time.Duration(rand.Intn(10)) * time.Microsecond
  4. time.Sleep(d)
  5. fmt.Printf("Current value is : %s \n", val)
  6. }(list[i])
  7. }

As you can see, we moved the execution logic insidefunc(val string) , and are calling that function in a separate goroutine for each element in the list.

Upon running this, we expect to get the 8 lines of output for the eight elements in the list, followed by the end of function print. But instead, we observe that the program finishes execution with the following output:

  1. Program finished executing
None of the elements of the list were processed, and yet the program finished executing. This is because when we started goroutines for the elements of the list, we never told the goroutine running the **main** function to wait for them. Golang has provided a solution for this with WaitGroup . WaitGroup is available in the sync package provided by golang. It has functionalities that allow blocking and waiting for any number of goroutines to finish executing.

:::info <font style="color:rgb(18, 18, 18);">WaitGroup</font> 用来阻塞主协程,可以等待所有协程执行完。

:::

Using WaitGroup

我们想要等待的 goroutine 的数量可以使用 WaitGroup 实例上的 Add 函数来设置。这必须在执行 goroutine 之前设置。在我们的例子中,我们将 goroutine 的数量设置为列表的长度,因为我们为列表中的每个元素启动一个 goroutine

  1. var wg sync.WaitGroup
  2. wg.Add(len(list))

要发出 goroutine 执行完成的信号,它必须在执行结束时调用 WaitGroup 对象上的 Done 方法。一种惯用的方法是使用 **defer** 语句。我们将在 goroutine 中正在执行的函数的开头添加 defer 语句。因此,我们的 for 循环现在更改为:

  1. for i := 0; i < len(list); i++ {
  2. go func(val string) {
  3. defer wg.Done()
  4. d := time.Duration(rand.Intn(10)) * time.Microsecond
  5. time.Sleep(d)
  6. fmt.Printf("Current value is : %s \n", val)
  7. }(list[i])
  8. }

请注意, wg 实例在函数内部可用,因为我们使用的是匿名函数。如果您更喜欢使用命名函数,则可以向 wg 实例传递一个参数。

现在,我们只需要通知我们当前的 goroutine,即执行 main 方法的 goroutine 等待所有使用 Add 添加的 goroutine 执行完毕。这是通过从当前 **goroutine** 调用 **WaitGroup** 实例上的 **Wait** 方法来完成的。

  1. wg.Wait()

With all these changes made, our code looks like this:

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. )
  8. func main() {
  9. list := []string{"a", "b", "c", "d", "e", "f", "g", "h"}
  10. var wg sync.WaitGroup
  11. wg.Add(len(list))
  12. for i := 0; i < len(list); i++ {
  13. go func(val string) {
  14. defer wg.Done()
  15. d := time.Duration(rand.Intn(10)) * time.Microsecond
  16. time.Sleep(d)
  17. fmt.Printf("Current value is : %s \n", val)
  18. }(list[i])
  19. }
  20. wg.Wait()
  21. fmt.Println("Program finished executing")
  22. }

执行此代码后,我们会看到以下输出:

  1. Current value is : h
  2. Current value is : d
  3. Current value is : f
  4. Current value is : b
  5. Current value is : c
  6. Current value is : g
  7. Current value is : e
  8. Current value is : a
  9. Program finished executing

goroutines 按照预期以随机顺序执行,我们的 main goroutine 等待所有其他 goroutines 完成,然后继续执行自己直到完成。

总结

WaitGroup的作用

:::info <font style="color:rgb(18, 18, 18);">WaitGroup</font> 用来阻塞主协程,可以等待所有协程执行完。

:::

wg.Wait()

:::info

main协程调用 wg.Wait() 且被block,直到所有worker协程全部执行结束后返回。

:::

wg.Add(delta int)

:::info

main协程通过调用 wg.Add(delta int) 设置worker协程的个数,然后创建worker协程;

:::

wg.Done()

:::info

worker协程执行结束以后,都要调用 wg.Done()。

等待组的计数器 -1

:::

官方文档对 WaitGroup 的描述是:一个 WaitGroup 对象可以等待一组协程结束。使用方法是:
  1. main协程通过调用 wg.Add(delta int) 设置worker协程的个数,然后创建worker协程;
  2. worker协程执行结束以后,都要调用 wg.Done()
  3. main协程调用 wg.Wait() 且被block,直到所有worker协程全部执行结束后返回。

参考资料