什么是缓冲 channels?

我们在前一个教程中讨论的所有 channel 基本上都是无缓冲的。我们在 channel 教程中详细讨论过,发送和接收消息到非缓冲 channel 是阻塞的。

可以使用缓冲区创建 channel 。只有当缓冲区已满时,发送到缓冲 channel 的消息才会被阻塞。类似地,只有当缓冲区为空时,来自缓冲 channel 的接收才会阻塞。

通过向指定缓冲区大小的 make 函数传递额外的容量参数,可以创建缓冲 channel 。


  1. ch := make(chan type, capacity)

对于具有缓冲区的 channel ,上述语法中的 capacity 应该大于 0。默认情况下,未缓冲 channel 的容量为 0,因此我们在前面的教程中创建 channel 时省略了容量参数。

让我们编写一些代码并创建一个缓冲 channel 。


Example


  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. ch := make(chan string, 2)
  7. ch <- "naveen"
  8. ch <- "paul"
  9. fmt.Println(<- ch)
  10. fmt.Println(<- ch)
  11. }

Run program in playground

在上面的程序中,在第 8 行我们创建一个容量为 2 的缓冲 channel 。由于 channel 的容量为 2,所以可以在不阻塞的情况下将两个字符串写入 channel 。我们在第 9 10 行中向 channel 写入两个字符串, channel 不会阻塞。我们在第 11 行和第 12 行分别读取写入的两个字符串。这个程序输出


  1. naveen
  2. paul

Another Example

让我们再看一个缓冲 channel 的例子,其中 channel 的值用并发的 Goroutine 编写,并从 main Goroutine 读取。这个例子将帮助我们更好地理解何时写入缓冲 channel 块。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func write(ch chan int) {
  7. for i := 0; i < 5; i++ {
  8. ch <- i
  9. fmt.Println("successfully wrote", i, "to ch")
  10. }
  11. close(ch)
  12. }
  13. func main() {
  14. ch := make(chan int, 2)
  15. go write(ch)
  16. time.Sleep(2 * time.Second)
  17. for v := range ch {
  18. fmt.Println("read value", v,"from ch")
  19. time.Sleep(2 * time.Second)
  20. }
  21. }

Run program in playground

在上面的程序中,在 main Goroutine 中第 17 行创建了一个容量为 2 的缓冲 channel ch,并在第 18 行传参给 write Goroutine。然后 main Goroutine 休眠 2 秒。在此期间,write Goroutine 将同时运行。write Goroutine 有一个 for 循环,它将从 0 到 4 写入 ch channel 。该缓冲 channel 的容量为 2,因此 write Goroutine 将能够立即将值 01 写入 ch channel ,然后阻塞,直到从 ch channel 读取至少一个值。所以这个程序会立即输出以下 2 行。

  1. successfully wrote 0 to ch
  2. successfully wrote 1 to ch

在输出上述两行之后,write Goroutine 中对 ch channel 的写入被阻止,直到从 ch channel 读取。由于 main Goroutine 开始从 channel 读取之前会休眠 2 秒,因此程序在接下来的 2 秒内不会输出任何内容。main Goroutine 在 2 秒后唤醒,并使用第 20 行中的 for range 循环开始从 ch channel 读取,然后输出读取值,然后再次休眠 2 秒,此循环一直运行,直到 ch 关闭。因此程序将在 2 秒后输出,

  1. read value 0 from ch
  2. successfully wrote 2 to ch

循环一直继续,直到所有值都写入 channel 并在 write Goroutine中关闭。最终的输出是,

  1. successfully wrote 0 to ch
  2. successfully wrote 1 to ch
  3. read value 0 from ch
  4. successfully wrote 2 to ch
  5. read value 1 from ch
  6. successfully wrote 3 to ch
  7. read value 2 from ch
  8. successfully wrote 4 to ch
  9. read value 3 from ch
  10. read value 4 from ch

死锁


  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. ch := make(chan string, 2)
  7. ch <- "naveen"
  8. ch <- "paul"
  9. ch <- "steve"
  10. fmt.Println(<-ch)
  11. fmt.Println(<-ch)
  12. }

Run program in playground

在上面的程序中,我们将 3 个字符串写入容量为 2 的缓冲 channel 。当代码执行到第 11 行时,由于 channel 已超过其容量,因此写入被阻塞。现在 Goroutine 必须从 channel 中读取才能继续写入,但在这种情况下,没有 goroutine 从该 channel 同时读取。因此会出现 deadlock(**死锁)**,程序将在运行时出现以下消息,

  1. fatal error: all goroutines are asleep - deadlock!
  2. goroutine 1 [chan send]:
  3. main.main()
  4. /tmp/sandbox274756028/main.go:11 +0x100

长度 vs 容量

缓冲 channel 的容量是 channel 可以容纳的值的数量。这是我们使用 make 函数创建缓冲 channel 时指定的值。

缓冲 channel 的长度是当前在其中排队的元素个数。

让我们通过代码来更直观一点😀

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. ch := make(chan string, 3)
  7. ch <- "naveen"
  8. ch <- "paul"
  9. fmt.Println("capacity is", cap(ch))
  10. fmt.Println("length is", len(ch))
  11. fmt.Println("read value", <-ch)
  12. fmt.Println("new length is", len(ch))
  13. }

Run program in playground

在上面的程序中,创建的 channel 的容量为 3,即可以容纳 3 个字符串。然后,我们分别在第 9 行和第 10 行向 channel 写入两个字符串。现在 channel 中有两个字符串排队,因此它的长度是 2。在第 13 行代码中我们从 channel 中读取一个字符串。现在 channel 中只有一个字符串排队,因此它的长度变为 1。这个程序将输出

  1. capacity is 3
  2. length is 2
  3. read value naveen
  4. new length is 1

WaitGroup

本教程的下一部分是关于 Worker Pools。要了解工作池,我们首先需要了解 WaitGroup ,因为它是工作池的实现。

WaitGroup 用于等待所有 Goroutines 完成执行。程序一直被阻塞直到所有 Goroutines 完成执行。打个比方我们有 3 个并发执行的 Goroutines 派生自 main Goroutine。main Goroutines 需要等待其他 3 个 Goroutines 完成才能终止。这可以使用 WaitGroup 来完成。

让我们停止理论开始编写一些代码😀

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func process(i int, wg *sync.WaitGroup) {
  8. fmt.Println("started Goroutine ", i)
  9. time.Sleep(2 * time.Second)
  10. fmt.Printf("Goroutine %d ended\n", i)
  11. wg.Done()
  12. }
  13. func main() {
  14. no := 3
  15. var wg sync.WaitGroup
  16. for i := 0; i < no; i++ {
  17. wg.Add(1)
  18. go process(i, &wg)
  19. }
  20. wg.Wait()
  21. fmt.Println("All go routines finished executing")
  22. }

Run in playground

WaitGroup 是一种结构类型,我们在第 18 行创建一个 WaitGroup 类型的零值变量。 WaitGroup 的工作方式是使用计数器。当我们在 WaitGroup 上调用 Add 并将其传递给 int 时,WaitGroup 的计数器会被传递给 Add 的值递增。递减计数器的方法是在 WaitGroup 上调用 Done() 方法。Wait() 方法阻塞调用它的 Goroutine ,直到计数器变为零。

在上面的程序中,我们在第 20 行中调用 wg.Add(1)for 循环中迭代 3 次。所以计数器现在变成 3。for 循环也产生 3 个 process Goroutines,然后在第 23 行中调用wg.Wait()main Goroutine 等到计数器变为零。在第 13 行的 process Goroutine中,通过调用 wg.Done() 来减少计数器。 一旦所有 3 个派生的 Goroutines 完成执行,那就是 wg.Done() 被调用三次,计数器将变为零,main Goroutine 将被解锁。

在第 21 行传递 wg 的地址是很重要的。如果没有传递地址,那么每个 Goroutine 将拥有自己的 WaitGroup 副本,并且当它们执行完毕时,main 将不会收到通知。

这个程序输出

  1. started Goroutine 2
  2. started Goroutine 0
  3. started Goroutine 1
  4. Goroutine 0 ended
  5. Goroutine 2 ended
  6. Goroutine 1 ended
  7. All go routines finished executing

你的输出可能与我的不同,因为 goroutine 的执行顺序可能不同:)。

实现工作池

缓冲 channel 的一个重要用途就是 worker pool 的实现。

通常工作池是一组线程,它们正等待分配给它们的任务。一旦完成分配的任务,他们就会再次为下一个任务提供服务。

我们将使用缓冲 channel 实现工作池。 我们的工作池将执行查找输入数字的数字之和的任务。 例如,如果传递 234,则输出将为 9 (2 + 3 + 4)。 工作池的输入将是伪随机整数列表。

以下是我们工作池的核心功能

  • 创建一个goroutine池,该池在等待分配任务的输入缓冲 channel 上监听

  • 向输入缓冲 channel 添加任务

  • 任务完成后将结果写入输出缓冲 channel

  • 从输出缓冲 channel 读取和输出结果

我们将一步一步地编写这个程序,使它更容易理解。

第一步是创建表示任务和结果的结构。

  1. type Job struct {
  2. id int
  3. randomno int
  4. }
  5. type Result struct {
  6. job Job
  7. sumofdigits int
  8. }

每个 Job 结构都有一个 id 和一个 randomno 是为计算单个数字的和。

Result 结构有一个 job 字段,它在 sumofdigits 字段中保存结果(单个数字的和)。

下一步是创建用于接收任务和写入输出的缓冲 channel 。

  1. var jobs = make(chan Job, 10)
  2. var results = make(chan Result, 10)

Worker Goroutines在 jobs 缓冲 channel 上监听新任务。任务完成后,将结果写入results 缓冲 channel 。

下面的 digits 函数的实际作用是查找整数的个位数的和并返回它。我们将为这个函数添加一个 2 秒的睡眠时间,只是为了模拟这个函数需要一些时间来计算结果。

  1. func digits(number int) int {
  2. sum := 0
  3. no := number
  4. for no != 0 {
  5. digit := no % 10
  6. sum += digit
  7. no /= 10
  8. }
  9. time.Sleep(2 * time.Second)
  10. return sum
  11. }

接下来,我们将编写一个函数来创建 worker Goroutine。


  1. func worker(wg *sync.WaitGroup) {
  2. for job := range jobs {
  3. output := Result{job, digits(job.randomno)}
  4. results <- output
  5. }
  6. wg.Done()
  7. }

上面的函数创建一个worker,它从 jobs channel 读取数据,使用jobdigits 函数的返回值创建一个 Result 结构,然后将结果写入 results 缓冲 channel 。函数将 WaitGroup wg 作为参数,当所有 jobs 都完成时,它将调用 Done() 方法。

createWorkerPool 函数将创建一个线程池


  1. func createWorkerPool(noOfWorkers int) {
  2. var wg sync.WaitGroup
  3. for i := 0; i < noOfWorkers; i++ {
  4. wg.Add(1)
  5. go worker(&wg)
  6. }
  7. wg.Wait()
  8. close(results)
  9. }

上面的函数将要创建的 worker 数作为参数。它在创建 Goroutine 之前调用 wg.Add(1)来增加 WaitGroup 计数器。然后它通过将 WaitGroupwg 的地址传递给 worker 函数来创建 worker Goroutines。在创建了需要的 worker Goroutines 之后,它等待所有 Goroutines 通过调用 wg.Wait() 来完成执行。在所有 Goroutines 完成执行后,它关闭了 results channel ,因为所有 Goroutines 都已完成执行,没有其他人会进一步写入 results channel 。

现在我们已准备好工作池,让我们继续编写将分配工作给 worker 的函数。


  1. func allocate(noOfJobs int) {
  2. for i := 0; i < noOfJobs; i++ {
  3. randomno := rand.Intn(999)
  4. job := Job{i, randomno}
  5. jobs <- job
  6. }
  7. close(jobs)
  8. }

上面的 allocate 函数将创建的 jobs 数量作为输入参数,生成最大值为 998 的伪随机数,使用随机数和 for 循环计数器 i 作为 id 创建 Job 结构,然后将它们写入 jobs channel 。它在写入所有 jobs 之后关闭 jobs channel 。

下一步将创建读取 results channel 并输出

  1. func result(done chan bool) {
  2. for result := range results {
  3. fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
  4. }
  5. done <- true
  6. }

result 函数读取 results channel 并输出 job id、输入的随机编号和随机编号的数字和。result 函数还接受 done channel 作为参数,在输出完所有结果后,它将写入该 channel 。

一切都 ok 了。让我们继续完成从 main() 函数调用所有这些函数的最后一步。

  1. func main() {
  2. startTime := time.Now()
  3. noOfJobs := 100
  4. go allocate(noOfJobs)
  5. done := make(chan bool)
  6. go result(done)
  7. noOfWorkers := 10
  8. createWorkerPool(noOfWorkers)
  9. <-done
  10. endTime := time.Now()
  11. diff := endTime.Sub(startTime)
  12. fmt.Println("total time taken ", diff.Seconds(), "seconds")
  13. }

我们首先将程序的执行开始时间存储在主函数的第 2 行中,在最后一行(第 12 行)中,我们计算 endTime 和 startTime 之间的时间差,并显示程序运行花费的总时间。这是必要的,因为我们将通过改变 Goroutines 的数量来做一些基准测试。
noOfJobs 设置为100,然后调用 allocate 以将作业添加到 jobs channel 。

然后创建 done 的 channel 并将其传递给 result Goroutine,以便它可以开始打印输出并在打印完所有内容后通知。

最后,通过调用 createWorkerPool 函数创建一个包含 10 个worker goroutine线程池,然后 main 函数在 done channel 上等待输出所有结果。

这是完整的程序供你参考。我也导入了必要的包。

  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. )
  8. type Job struct {
  9. id int
  10. randomno int
  11. }
  12. type Result struct {
  13. job Job
  14. sumofdigits int
  15. }
  16. var jobs = make(chan Job, 10)
  17. var results = make(chan Result, 10)
  18. func digits(number int) int {
  19. sum := 0
  20. no := number
  21. for no != 0 {
  22. digit := no % 10
  23. sum += digit
  24. no /= 10
  25. }
  26. time.Sleep(2 * time.Second)
  27. return sum
  28. }
  29. func worker(wg *sync.WaitGroup) {
  30. for job := range jobs {
  31. output := Result{job, digits(job.randomno)}
  32. results <- output
  33. }
  34. wg.Done()
  35. }
  36. func createWorkerPool(noOfWorkers int) {
  37. var wg sync.WaitGroup
  38. for i := 0; i < noOfWorkers; i++ {
  39. wg.Add(1)
  40. go worker(&wg)
  41. }
  42. wg.Wait()
  43. close(results)
  44. }
  45. func allocate(noOfJobs int) {
  46. for i := 0; i < noOfJobs; i++ {
  47. randomno := rand.Intn(999)
  48. job := Job{i, randomno}
  49. jobs <- job
  50. }
  51. close(jobs)
  52. }
  53. func result(done chan bool) {
  54. for result := range results {
  55. fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
  56. }
  57. done <- true
  58. }
  59. func main() {
  60. startTime := time.Now()
  61. noOfJobs := 100
  62. go allocate(noOfJobs)
  63. done := make(chan bool)
  64. go result(done)
  65. noOfWorkers := 10
  66. createWorkerPool(noOfWorkers)
  67. <-done
  68. endTime := time.Now()
  69. diff := endTime.Sub(startTime)
  70. fmt.Println("total time taken ", diff.Seconds(), "seconds")
  71. }


Run in playground

最好在自己电脑上运行这个程序,以便在计算总时间上更准确。

程序将输出

  1. Job id 1, input random no 636, sum of digits 15
  2. Job id 0, input random no 878, sum of digits 23
  3. Job id 9, input random no 150, sum of digits 6
  4. ...
  5. total time taken 20.01081009 seconds

总共打印 100 行对应于 100 个 jobs,最后一行打印程序运行所需的总时间。你的输出不一定和我的输出一样,因为 goroutine 可以以任何顺序运行,总时间也会根据硬件的不同而变化。在我的机器上,程序完成大约需要 20 秒。

现在让我们将 main 函数里的 noOfWorkers 增加到 20。我们把工人的 workers 增加了一倍。由于 worker Goroutines 增加了(精确地说是增加了一倍),程序完成所需的总时间应该减少(精确地说是减少了一半)。它现在变成了 10.004364685 秒,程序打印出来


  1. ...
  2. total time taken 10.004364685 seconds

现在我们可以理解,随着 worker Goroutines 数量的增加,完成工作所需的总时间减少了。我把它留作练习,让你在 main 函数中使用 noOfJobsnoOfJobs 来代入不同的值并分析结果。

原文链接

https://golangbot.com/buffered-channels-worker-pools/