不要通过共享内存来通信,而应该通过通信来共享内存 - Rob Pike,Go 语言之父

  • Go 并发模型指 Go 实现的 CSP
  • Go 并发模式指 Go 中的并发原语的常用组合模式

1. Go 并发模型

在前面的内容中,我们说过:传统的编程语言(比如:C++、Java、Python 等)并非面向并发而生,因此他们面对并发的逻辑多是基于操作系统的线程。并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如:共享内存、信号(signal)、管道(pipe)、消息队列、套接字(socket)等。在这些通信原语中,使用最多最广泛(也是最高效的)是结合了线程同步原语(比如:锁以及更为低级的原子操作)的共享内存方式,因此,我们可以说传统语言的并发模型是基于对内存的共享的。

image.png

Go 语言从设计伊始就将解决上述传统并发模型的问题作为 Go 的一个目标,并在新并发模型设计中借鉴了著名计算机科学家Tony Hoare提出的 CSP(Communicationing Sequential Processes,通信顺序进程) 并发模型。

这里要理解中文翻译: 按照通信顺序处理数据的程序 (我由下文得出).

Tony Hoare 的 CSP 模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。

  1. Tony Hoare 认为输入输出应该是基本的编程原语,数据处理逻辑(即 CSP 中的 P)仅需调用输入原语获取数据,顺序地处理数据,并将结果数据通过输出原语输出即可。
  2. 因此,在 Tony Hoare 眼中,一个符合 CSP 模型的并发程序应该是一组通过输入输出原语连接起来的 P 的集合。从这个角度来看,CSP 理论不仅是一个并发参考模型,也是一种并发程序的程序组织方法。
  3. 其组合思想与 Go 的设计哲学不谋而合。Tony Hoare 的 CSP 理论中的 P,即“Process(进程)”,是一个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他 P 的输出获取),并生产出可以被其他 P 消费的输出数据。

由上面一段得出 CSP 的组成有:

  • 输入原语
  • 输出原语
  • 顺序处理逻辑

image.png

P 并不一定与操作系统的进程或线程划等号。在 Go 中,与“Process”对应的是 goroutine,但 Go 语言中 goroutine 的执行逻辑并不一定是顺序的,goroutine 也可以创建其他 goroutine 以并发地完成工作

为了实现 CSP 并发模型中的输入和输出原语,Go 引入了 goroutine§之间的通信原语channel。goroutine 可以从 channel 获取输入数据,再将处理后得到的结果数据通过 channel 输出。通过 channel 将 goroutine§组合连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰,我们无需再为那些传统共享内存并发模型中的问题而伤脑筋了。

虽然 CSP 模型已经成为 Go 语言支持的主流并发模型,但 Go 也支持传统的基于共享内存的并发模型,并提供了基本的低级别同步原语(主要是 sync 包中的互斥锁、条件变量、读写锁、原子操作等)。

那么我们在实践中应该选择哪个模型的并发原语呢?是使用 channel 还是在低级同步原语保护下的共享内存呢?

  • 毫无疑问,从程序的整体结构来看,就像本节开头引述 Rob Pike 的那句话一样,Go 始终推荐以 CSP 并发模型风格构建并发程序,尤其是在复杂的业务层面,这将提升程序的逻辑清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;
  • 对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,可以使用更为高效的低级同步原语(如 mutex)保证 goroutine 对数据的同步访问。

2. Go 常见的并发模式

在语言层面,Go 针对 CSP 并发模型提供了三种并发原语:

  • goroutine:对应 CSP 模型中的P,封装了数据的处理逻辑,是 Go 运行时调度的基本执行单元;
  • channel:对应 CSP 模型中的输入/输出原语,用于 goroutine 之间的通信和同步;
  • select:用于应对多路输入/输出,可以让 goroutine 同时协调处理多个 channel 操作

接下来,我们就来深入了解一下实践中这些原语的常见组合方式,即并发模式

1) 创建模式

创建 goroutine:

  1. go fmt.Println("I am a goroutine")
  2. // $GOROOT/src/net/http/server.go
  3. c := srv.newConn(rw)
  4. go c.serve(connCtx)

但在稍复杂一些的并发程序中,我们需要考虑通过 CSP 模型输入/输出原语的承载体channel在 goroutine 之间建立联系。为了满足这一需求,我们通常使用下面的方式来创建一个 goroutine:

  1. type T struct {...}
  2. func spawn(f func()) chan T {
  3. c := make(chan T)
  4. go func() {
  5. // 使用channel变量c(通过闭包方式)与调用spawn的goroutine通信
  6. ... ...
  7. f()
  8. ... ...
  9. }()
  10. return c
  11. }
  12. func main() {
  13. c := spawn(func(){})
  14. // 使用channel变量c与新创建的goroutine通信
  15. }

理解: spawn 同时创建 chan 与 goroutine, 就是说旧 goroutine 创建新 goroutine 会伴随着创建 chan, 否则之后如何通信?

这个在内部创建一个 goroutine 并返回一个 channel 类型变量的函数就是 Go 中最常见的 goroutine 创建模式。spawn 函数创建的新 goroutine 与调用 spawn 函数的 goroutine 之间通过一个 channel 建立起了联系:两个 goroutine 可以通过这个 channel 进行通信。spawn 函数的实现也益于 channel 作为 Go 语言一等公民(first-class citizen)的存在:channel 可以像变量一样被初始化、传递和赋值。上面例子中的 spawn 只返回了一个 channel 变量,大家可以根据需要自行定义返回的 channel 个数和用途。

2) 退出模式

a) 分离(detached)模式

这里借鉴了一些线程模型中的术语,比如分离(detached)模式。分离模式是使用最为广泛的 goroutine 退出方式。所谓分离的 goroutine,即创建它的 goroutine 不需要关心它的退出,这类 goroutine 启动后与其创建者彻底分离(detached),其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。通常,这类 goroutine 有两个常见用途:

  • 一次性任务:顾名思义,新创建的 goroutine 用来执行一个简单的任务,执行后即退出。比如下面标准库中的代码:
  1. // $GOROOT/src/net/dial.go
  2. func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
  3. ... ...
  4. if oldCancel := d.Cancel; oldCancel != nil {
  5. subCtx, cancel := context.WithCancel(ctx)
  6. defer cancel()
  7. go func() {
  8. select {
  9. case <-oldCancel:
  10. cancel()
  11. case <-subCtx.Done():
  12. }
  13. }()
  14. ctx = subCtx
  15. }
  16. ... ...
  17. }
  • 常驻后台执行一些特定任务,如:监视(monitor)、观察(watcher)等。其实现通常采用for {…}或for { select{… } }代码段形式,并多以定时器(timer)或事件(event)驱动执行。

Go 为每个 P 内置的 GC goroutine 就是这种类型的:

  1. // $GOROOT/src/runtime/mgc.go
  2. func gcBgMarkStartWorkers() {
  3. // Background marking is performed by per-P G's. Ensure that
  4. // each P has a background GC G.
  5. for _, p := range allp {
  6. if p.gcBgMarkWorker == 0 {
  7. go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorker
  8. notetsleepg(&work.bgMarkReady, -1)
  9. noteclear(&work.bgMarkReady)
  10. }
  11. }
  12. }
  13. func gcBgMarkWorker(_p_ *p) {
  14. gp := getg()
  15. ... ...
  16. for { // 常驻后台处理GC事宜
  17. ... ...
  18. }
  19. }

b) join 模式

在线程模型中,父线程可以通过 pthread_join 来等待子线程结束并获取子线程的结束状态。在 Go 中,我们有时候也有类似的需求:goroutine 的创建者需要等待新 goroutine 的结束。笔者为这样的 goroutine 退出模式起名为“join 模式”

  • 等待一个 goroutine 退出

我们从一个简单的场景开始,先来看看如何等待一个 goroutine 结束。下面是模拟该场景的一段示例代码:

  1. // go-concurrency-pattern-1.go
  2. package main
  3. import "time"
  4. func worker(args ...interface{}) {
  5. if len(args) == 0 {
  6. return
  7. }
  8. interval, ok := args[0].(int)
  9. if !ok {
  10. return
  11. }
  12. time.Sleep(time.Second * (time.Duration(interval)))
  13. }
  14. func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
  15. c := make(chan struct{})
  16. go func() {
  17. f(args...)
  18. c <- struct{}{} // f 运行结束后发送通知
  19. }()
  20. return c
  21. }
  22. func main() {
  23. done := spawn(worker, 5)
  24. println("spawn a worker goroutine")
  25. <-done
  26. println("worker done")
  27. }

运行该示例:

  1. $ go run go-concurrency-pattern-1.go
  2. spawn a worker goroutine
  3. worker done
  • 获取 goroutine 的退出状态

如果新 goroutine 的创建者不仅仅要等待 goroutine 的退出,还要精准获取其结束状态,我们可以同样可以通过自定义类型的 channel 来实现这一场景需求。下面是基于上面代码改造后的示例:

  1. // go-concurrency-pattern-2.go
  2. package main
  3. import (
  4. "errors"
  5. "fmt"
  6. "time"
  7. )
  8. var OK = errors.New("ok")
  9. func worker(args ...interface{}) error {
  10. if len(args) == 0 {
  11. return errors.New("invalid args")
  12. }
  13. interval, ok := args[0].(int)
  14. if !ok {
  15. return errors.New("invalid interval arg")
  16. }
  17. time.Sleep(time.Second * (time.Duration(interval)))
  18. return OK
  19. }
  20. func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
  21. c := make(chan error)
  22. go func() {
  23. c <- f(args...)
  24. }()
  25. return c
  26. }
  27. func main() {
  28. done := spawn(worker, 5)
  29. println("spawn worker1")
  30. err := <-done
  31. fmt.Println("worker1 done:", err)
  32. done = spawn(worker)
  33. println("spawn worker2")
  34. err = <-done
  35. fmt.Println("worker2 done:", err)
  36. }

我们将 channel 中承载的类型由struct{}改为了error,这样 channel 承载的信息就不仅仅是一个“信号”了,还携带了“有价值”的信息:新 goroutine 的结束状态。运行上述示例:

  1. $go run go-concurrency-pattern-2.go
  2. spawn worker1
  3. worker1 done: ok
  4. spawn worker2
  5. worker2 done: invalid args
  • 等待多个 goroutine 退出

有些场景中,goroutine 的创建者可能会创建不止一个 goroutine,并且需要等待全部新 goroutine 退出。我们可以通过 Go 语言提供的sync.WaitGroup实现等待多个 goroutine 退出的模式:

  1. // go-concurrency-pattern-3.go
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. func worker(args ...interface{}) {
  9. if len(args) == 0 {
  10. return
  11. }
  12. interval, ok := args[0].(int)
  13. if !ok {
  14. return
  15. }
  16. time.Sleep(time.Second * (time.Duration(interval)))
  17. }
  18. func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
  19. c := make(chan struct{})
  20. var wg sync.WaitGroup
  21. for i := 0; i < n; i++ {
  22. wg.Add(1)
  23. go func(i int) {
  24. name := fmt.Sprintf("worker-%d:", i)
  25. f(args...)
  26. println(name, "done")
  27. wg.Done() // worker done!
  28. }(i)
  29. }
  30. // 避免阻塞 spawnGroup 所在 goroutine
  31. go func() {
  32. wg.Wait()
  33. c <- struct{}{}
  34. }()
  35. return c
  36. }
  37. func main() {
  38. done := spawnGroup(5, worker, 3)
  39. println("spawn a group of workers")
  40. <-done
  41. println("group workers done")
  42. }

运行上述示例代码:

  1. $go run go-concurrency-pattern-3.go
  2. spawn a group of workers
  3. worker-2: done
  4. worker-1: done
  5. worker-0: done
  6. worker-4: done
  7. worker-3: done
  8. group workers done
  • 支持超时机制的等待

有时候,我们不想无限阻塞等待所有新创建 goroutine 的退出,而是仅等待一个合理的时间。如果在这个时间范围内 goroutine 没有退出,则创建者会继续向下执行或主动退出。下面的示例代码是在等待多个 goroutine 退出的例子之上增加了超时机制:

  1. // go-concurrency-pattern-4.go
  2. ... ...
  3. func main() {
  4. done := spawnGroup(5, worker, 30)
  5. println("spawn a group of workers")
  6. timer := time.NewTimer(time.Second * 5)
  7. defer timer.Stop()
  8. select {
  9. case <-timer.C:
  10. println("wait group workers exit timeout!")
  11. case <-done:
  12. println("group workers done")
  13. }
  14. }

运行上述示例代码:

  1. $ go run go-concurrency-pattern-4.go
  2. spawn a group of workers
  3. wait group workers exit timeout!

c) “notify-and-wait”模式

前面的几个场景中,goroutine 的创建者都是在被动地等待着新 goroutine 的退出。但很多时候,goroutine 创建者需要主动通知那些新 goroutine 退出,尤其是当 main goroutine 作为创建者时。main goroutine 退出意味着 Go 程序的终止,而粗暴地直接让 main goroutine 退出的方式可能会导致业务数据的损坏、不完整或丢失。我们可以通过“notify-and-wait(通知并等待)”模式来满足这一场景的要求。虽然这一模式也不能完全避免“损失”,但是它给了各个 goroutine 一个“挽救数据”的机会,可以尽可能地减少损失的程度。

  • 通知并等待一个 goroutine 退出

我们先从一个简单的“通知并等待一个 goroutine 退出”场景入手,下面是满足该场景要求的示例代码:

  1. // go-concurrency-pattern-5.go
  2. package main
  3. import "time"
  4. func worker(j int) {
  5. time.Sleep(time.Second * (time.Duration(j)))
  6. }
  7. func spawn(f func(int)) chan string {
  8. quit := make(chan string)
  9. go func() {
  10. var job chan int // 模拟job channel
  11. for {
  12. select {
  13. case j := <-job:
  14. f(j)
  15. case <-quit:
  16. quit <- "ok" // 只用一个 chan 就完成停止通知和结束返回
  17. }
  18. }
  19. }()
  20. return quit
  21. }
  22. func main() {
  23. quit := spawn(worker)
  24. println("spawn a worker goroutine")
  25. time.Sleep(5 * time.Second)
  26. // 通知新创建的goroutine退出
  27. println("notify the worker to exit...")
  28. quit <- "exit"
  29. timer := time.NewTimer(time.Second * 10)
  30. defer timer.Stop()
  31. select {
  32. case status := <-quit:
  33. println("worker done:", status)
  34. case <-timer.C:
  35. println("wait worker exit timeout")
  36. }
  37. }

运行上述示例代码:

  1. $go run go-concurrency-pattern-5.go
  2. spawn a worker goroutine
  3. notify the worker to exit...
  4. worker done: ok
  • 通知并等待多个 goroutine 退出

下面是“通知并等待多个 goroutine 退出”的场景。Go 语言的 channel 有一个特性,那就是当使用 close 函数关于 (闭) channel 时,所有阻塞到该 channel 上的 goroutine 都会得到“通知”,我们就利用这一特性实现满足这一场景的模式:

  1. // go-concurrency-pattern-6.go
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. func worker(j int) {
  9. time.Sleep(time.Second * (time.Duration(j)))
  10. }
  11. func spawnGroup(n int, f func(int)) chan struct{} {
  12. quit := make(chan struct{})
  13. job := make(chan int)
  14. var wg sync.WaitGroup
  15. for i := 0; i < n; i++ {
  16. wg.Add(1)
  17. go func(i int) {
  18. defer wg.Done() // 保证wg.Done在goroutine退出前被执行
  19. name := fmt.Sprintf("worker-%d:", i)
  20. for {
  21. j, ok := <-job
  22. if !ok {
  23. println(name, "done")
  24. return
  25. }
  26. // do the job
  27. worker(j)
  28. }
  29. }(i)
  30. }
  31. go func() {
  32. <-quit // 接收停止信号
  33. close(job) // 广播给所有新goroutine
  34. wg.Wait()
  35. quit <- struct{}{} // 返回结束信号
  36. }()
  37. return quit
  38. }
  39. func main() {
  40. quit := spawnGroup(5, worker)
  41. println("spawn a group of workers")
  42. time.Sleep(5 * time.Second)
  43. // notify the worker goroutine group to exit
  44. println("notify the worker group to exit...")
  45. quit <- struct{}{}
  46. timer := time.NewTimer(time.Second * 5)
  47. defer timer.Stop()
  48. select {
  49. case <-timer.C:
  50. println("wait group workers exit timeout!")
  51. case <-quit:
  52. println("group workers done")
  53. }
  54. }

运行上述示例代码:

  1. $go run go-concurrency-pattern-6.go
  2. spawn a group of workers
  3. notify the worker group to exit...
  4. worker-3: done
  5. worker-0: done
  6. worker-4: done
  7. worker-2: done
  8. worker-1: done
  9. group workers done

d) 退出模式的应用

我们尝试将问题范围缩小,聚焦在实现一个“超时等待退出”框架以统一解决各种运行形态 goroutine 的优雅退出问题。

  1. // go-concurrency-pattern-7.go
  2. type GracefullyShutdowner interface {
  3. Shutdown(waitTimeout time.Duration) error
  4. }

这样,凡是实现了该接口的类型均可在程序退出时得到退出的通知和调用,从而有机会做退出前的最后清理工作。这里还提供了一个类似http.HandlerFunc的类型ShutdownerFunc,用于将普通函数转化为实现了 GracefullyShutdowner 接口的类型实例(得益于函数在 Go 中为“一等公民”的特质):

  1. // go-concurrency-pattern-7.go
  2. type ShutdownerFunc func(time.Duration) error
  3. func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
  4. return f(waitTimeout)
  5. }

一组 goroutine 的退出总体上有两种情况。一种是并发退出,在这类退出方式下,各个 goroutine 的退出先后次序对数据处理无影响,因此各个 goroutine 可以并发执行退出逻辑;另外一种则是串行退出,即各个 goroutine 之间的退出是按照一定次序逐个进行的。次序若错了可能会导致程序的状态混乱和错误。

并发退出:

  1. // go-concurrency-pattern-7.go
  2. func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
  3. c := make(chan struct{})
  4. go func() {
  5. var wg sync.WaitGroup
  6. for _, g := range shutdowners {
  7. wg.Add(1)
  8. go func(shutdowner GracefullyShutdowner) {
  9. defer wg.Done()
  10. shutdowner.Shutdown(waitTimeout)
  11. }(g)
  12. }
  13. wg.Wait()
  14. c <- struct{}{}
  15. }()
  16. timer := time.NewTimer(waitTimeout)
  17. defer timer.Stop()
  18. select {
  19. case <-c:
  20. return nil
  21. case <-timer.C:
  22. return errors.New("wait timeout")
  23. }
  24. }

下面是该并发退出函数对应的测试用例,通过该用例我们也可以直观了解到该函数的使用方法:

  1. // go-concurrency-pattern-7_test.go
  2. package main
  3. import (
  4. "testing"
  5. "time"
  6. )
  7. func shutdownMaker(processTm int) func(time.Duration) error {
  8. return func(time.Duration) error { // 参数没有使用
  9. time.Sleep(time.Second * time.Duration(processTm))
  10. return nil
  11. }
  12. }
  13. func TestConcurrentShutdown(t *testing.T) {
  14. f1 := shutdownMaker(2)
  15. f2 := shutdownMaker(6)
  16. err := ConcurrentShutdown(10*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
  17. if err != nil {
  18. t.Errorf("want nil, actual: %s", err)
  19. return
  20. }
  21. err = ConcurrentShutdown(4*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))
  22. if err == nil { // 注意这里是 "=="
  23. t.Error("want timeout, actual nil")
  24. return
  25. }
  26. }

在上面测试中,我们通过一个工具函数shutdownMaker“制作”出通过ShutdownerFunc转型即可满足接口 GracefullyShutdowner 的类型实例,并分别测试了ConcurrentShutdown函数的正常和等待超时两种状况。运行上面测试用例:

  1. $ go test -v ./go-concurrency-pattern-7_test.go ./go-concurrency-pattern-7.go
  2. === RUN TestConcurrentShutdown
  3. --- PASS: TestConcurrentShutdown (10.00s)
  4. PASS
  5. ok command-line-arguments 10.001s

串行退出:

  1. // go-concurrency-pattern-7.go
  2. func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
  3. start := time.Now()
  4. var left time.Duration
  5. timer := time.NewTimer(waitTimeout)
  6. for _, g := range shutdowners {
  7. elapsed := time.Since(start)
  8. left = waitTimeout - elapsed
  9. c := make(chan struct{})
  10. go func(shutdowner GracefullyShutdowner) {
  11. shutdowner.Shutdown(left)
  12. c <- struct{}{}
  13. }(g)
  14. timer.Reset(left)
  15. select {
  16. case <-c:
  17. //continue
  18. case <-timer.C:
  19. return errors.New("wait timeout")
  20. }
  21. }
  22. return nil
  23. }

停止顺序是传入的顺序.

3) 管道(pipeline)模式

管道是 Unix/Linux 上一种典型的并发程序设计模式,也是 Unix 崇尚“组合”设计哲学的具体体现。Go 中没有定义管道,但是具有深厚 Unix 文化背景的 Go 语言缔造者们显然借鉴了 Unix 的设计哲学,在 Go 中引入了channel这种并发原语,而channel原语使构建管道并发模式变得容易且自然。

image.png

我们看到在 Go 中管道模式被实现成了由channel连接的一条“数据流水线”。该流水线中,每个数据处理环节都由一组相同功能的 goroutine完成。在每个数据处理环节,goroutine 都要从数据输入 channel 获取前一个环节生产的数据,然后对这些数据进行处理,并将处理后的结果数据通过数据输出 channel 发往下一个环节。

  1. // go-concurrency-pattern-8.go
  2. package main
  3. func newNumGenerator(start, count int) <-chan int {
  4. c := make(chan int)
  5. go func() {
  6. for i := start; i < start+count; i++ {
  7. c <- i
  8. }
  9. close(c) // 塞完数据后才关闭
  10. }()
  11. return c
  12. }
  13. func filterOdd(in int) (int, bool) {
  14. if in%2 != 0 {
  15. return 0, false
  16. }
  17. return in, true
  18. }
  19. func square(in int) (int, bool) {
  20. return in * in, true
  21. }
  22. // 原数据源 + 数据处理函数 -> 新的数据源
  23. func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
  24. out := make(chan int)
  25. go func() {
  26. for v := range in {
  27. r, ok := f(v)
  28. if ok {
  29. out <- r
  30. }
  31. }
  32. close(out)
  33. }()
  34. return out
  35. }
  36. func main() {
  37. in := newNumGenerator(1, 20)
  38. out := spawn(square, spawn(filterOdd, in))
  39. for v := range out {
  40. println(v)
  41. }
  42. }

运行上述示例代码:

  1. $go run go-concurrency-pattern-8.go
  2. 4
  3. 16
  4. 36
  5. 64
  6. 100
  7. 144
  8. 196
  9. 256
  10. 324
  11. 400

管道模式具有良好的可扩展性。如果我们要在上面示例代码的基础上在最开始处新增一个处理环节,比如过滤掉所有大于 100 的数(filterNumOver100,我们可以像下面代码这样扩展我们的管道流水线:

  1. in := newNumGenerator(1, 20)
  2. out := spawn(square, spawn(filterOdd, spawn(filterNumOver10, in))

下面我们再来了解两种基于管道模式的扩展模式。

  • 扇出模式(fan-out)

在某一处理环节中,多个功能相同的 goroutine 从同一个 channel 读取数据并处理,直到该 channel 关闭,这种情况被称为扇出(fan-out)。使用扇出模式可以在一组 goroutine 中均衡分配工作量,从而可以更好地并行化对 CPU 和 I/O 的使用。

  • 扇入模式(fan-in)

在某个处理环节,处理程序面对不止一个输入 channel。我们把所有输入 channel 的数据汇聚到一个统一的输入 channel,然后处理程序再从这个汇聚后的 channel 读取数据并处理,直到该汇聚 channel 因所有输入 channel 关闭而关闭。这种情况被称为扇入(fan-in)。

下图直观的展示了扇出和扇入模式:

image.png

下面我们来看看扇出和扇入模式的实现示例:

  1. // go-concurrency-pattern-9.go
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. func newNumGenerator(start, count int) <-chan int {
  9. c := make(chan int)
  10. go func() {
  11. for i := start; i < start+count; i++ {
  12. c <- i
  13. }
  14. close(c)
  15. }()
  16. return c
  17. }
  18. func filterOdd(in int) (int, bool) {
  19. if in%2 != 0 {
  20. return 0, false
  21. }
  22. return in, true
  23. }
  24. func square(in int) (int, bool) {
  25. return in * in, true
  26. }
  27. func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
  28. groupOut := make(chan int)
  29. var outSlice []chan int
  30. for i := 0; i < num; i++ {
  31. out := make(chan int)
  32. go func(i int) {
  33. name := fmt.Sprintf("%s-%d:", name, i)
  34. fmt.Printf("%s begin to work...\n", name)
  35. for v := range in {
  36. r, ok := f(v)
  37. if ok {
  38. out <- r
  39. }
  40. }
  41. close(out)
  42. fmt.Printf("%s work done\n", name)
  43. }(i)
  44. outSlice = append(outSlice, out)
  45. }
  46. // Fan-in pattern
  47. //
  48. // out --\
  49. // \
  50. // out ---- --> groupOut
  51. // /
  52. // out --/
  53. //
  54. go func() {
  55. var wg sync.WaitGroup
  56. for _, out := range outSlice {
  57. wg.Add(1)
  58. go func(out <-chan int) {
  59. for v := range out {
  60. groupOut <- v
  61. }
  62. wg.Done()
  63. }(out)
  64. }
  65. wg.Wait()
  66. close(groupOut)
  67. }()
  68. return groupOut
  69. }
  70. func main() {
  71. in := newNumGenerator(1, 20)
  72. out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))
  73. time.Sleep(3 * time.Second) //为了输出更直观的结果,这里等上面的goroutine都就绪
  74. for v := range out {
  75. fmt.Println(v)
  76. }
  77. }

运行上述示例代码:

  1. $ go run go-concurrency-pattern-9.go
  2. square-1: begin to work...
  3. filterOdd-1: begin to work...
  4. square-0: begin to work...
  5. filterOdd-2: begin to work...
  6. filterOdd-0: begin to work...
  7. filterOdd-1: work done
  8. 4
  9. 16
  10. 36
  11. 100
  12. 64
  13. 144
  14. 324
  15. 400
  16. 256
  17. 196
  18. filterOdd-2: work done
  19. filterOdd-0: work done
  20. square-0: work done
  21. square-1: work done

4) 超时(timeout)与取消(cancel)模式

我们经常会使用 Go 编写向服务发起请求并获取应答结果的客户端应用。这里我们就来看一个这样的例子:我们要编写一个从气象数据服务中心获取气象信息的客户端。该客户端每次会并发向从三个气象数据服务中心发起数据查询请求,并以返回最快的那个响应信息作为此次请求的应答返回值。下面的代码是这个例子的第一版实现:

  1. // go-concurrency-pattern-10.go
  2. package main
  3. import (
  4. "io/ioutil"
  5. "log"
  6. "net/http"
  7. "net/http/httptest"
  8. "time"
  9. )
  10. type result struct {
  11. value string
  12. }
  13. func first(servers ...*httptest.Server) (result, error) {
  14. c := make(chan result, len(servers))
  15. queryFunc := func(server *httptest.Server) {
  16. url := server.URL
  17. resp, err := http.Get(url)
  18. if err != nil {
  19. log.Printf("http get error: %s\n", err)
  20. return
  21. }
  22. defer resp.Body.Close()
  23. body, _ := ioutil.ReadAll(resp.Body)
  24. c <- result{
  25. value: string(body),
  26. }
  27. }
  28. for _, serv := range servers {
  29. go queryFunc(serv)
  30. }
  31. return <-c, nil
  32. }
  33. func fakeWeatherServer(name string) *httptest.Server {
  34. return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  35. log.Printf("%s receive a http request\n", name)
  36. time.Sleep(1 * time.Second)
  37. w.Write([]byte(name + ":ok"))
  38. }))
  39. }
  40. func main() {
  41. result, err := first(fakeWeatherServer("open-weather-1"),
  42. fakeWeatherServer("open-weather-2"),
  43. fakeWeatherServer("open-weather-3"))
  44. if err != nil {
  45. log.Println("invoke first error:", err)
  46. return
  47. }
  48. log.Println(result)
  49. }

我们运行一下这段示例代码:

  1. $go run go-concurrency-pattern-10.go
  2. 2020/01/21 21:57:04 open-weather-3 receive a http request
  3. 2020/01/21 21:57:04 open-weather-1 receive a http request
  4. 2020/01/21 21:57:04 open-weather-2 receive a http request
  5. 2020/01/21 21:57:05 {open-weather-3:ok}

上述的例子运行在一种较为“理想”的情况下,但现实中网络情况错综复杂,远程服务的状态也不甚明朗,很可能出现服务端长时间没有响应的情况,这时为了保证良好的用户体验,我们需要对客户端的行为进行精细化的控制,比如:我们只等待 500ms,超过 500ms 仍然没有收到哪怕是一个“气象数据服务中心”的响应,我们的 first 函数就返回失败,以保证等待的时间在人类的忍耐力承受范围之内。我们在上述例子的基础上对 first 函数做的调整如下:

  1. // go-concurrency-pattern-11.go
  2. func first(servers ...*httptest.Server) (result, error) {
  3. c := make(chan result, len(servers))
  4. queryFunc := func(server *httptest.Server) {
  5. url := server.URL
  6. resp, err := http.Get(url)
  7. if err != nil {
  8. log.Printf("http get error: %s\n", err)
  9. return
  10. }
  11. defer resp.Body.Close()
  12. body, _ := ioutil.ReadAll(resp.Body)
  13. c <- result{
  14. value: string(body),
  15. }
  16. }
  17. for _, serv := range servers {
  18. go queryFunc(serv)
  19. }
  20. select {
  21. case r := <-c:
  22. return r, nil
  23. case <-time.After(500 * time.Millisecond):
  24. return result{}, errors.New("timeout")
  25. }
  26. }

我们通过增加一个定时器,并通过 select 原语监视该定时器事件和响应 channel 上的事件。如果响应 channel 上长时间没有数据返回,则当定时器事件触发后,first 函数返回:

  1. $ go run go-concurrency-pattern-11.go
  2. 2020/01/21 22:41:02 open-weather-1 receive a http request
  3. 2020/01/21 22:41:02 open-weather-2 receive a http request
  4. 2020/01/21 22:41:02 open-weather-3 receive a http request
  5. 2020/01/21 22:41:02 invoke first error: timeout

加上了 “超时模式” 的版本依然有一个明显的问题,那就是即便 first 函数因超时返回,三个已经创建的 goroutine 可能依然处在向“气象数据服务中心”请求或等待应答中,没有返回,也没有被回收,资源仍然在占用,即使它们的存在已经没有了任何意义。一种合理的解决思路是让这三个 goroutine 支持“取消”操作。这种情况下,我们一般使用 Go 的 context 包来实现“取消”模式。context 包是谷歌内部关于 Go 的一个最佳实践,Go 在 1.7 版本将 context 包引入到标准库中。下面是利用 context 包实现“取消模式”的代码:

  1. // go-concurrency-pattern-12.go
  2. package main
  3. import (
  4. "context"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "net/http/httptest"
  11. "time"
  12. )
  13. type result struct {
  14. value string
  15. }
  16. func first(servers ...*httptest.Server) (result, error) {
  17. c := make(chan result)
  18. ctx, cancel := context.WithCancel(context.Background())
  19. defer cancel()
  20. queryFunc := func(i int, server *httptest.Server) {
  21. url := server.URL
  22. req, err := http.NewRequest("GET", url, nil)
  23. if err != nil {
  24. log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)
  25. return
  26. }
  27. req = req.WithContext(ctx)
  28. log.Printf("query goroutine-%d: send request...\n", i)
  29. resp, err := http.DefaultClient.Do(req)
  30. if err != nil {
  31. log.Printf("query goroutine-%d: get return error: %s\n", i, err)
  32. return
  33. }
  34. log.Printf("query goroutine-%d: get response\n", i)
  35. defer resp.Body.Close()
  36. body, _ := ioutil.ReadAll(resp.Body)
  37. c <- result{
  38. value: string(body),
  39. }
  40. return
  41. }
  42. for i, serv := range servers {
  43. go queryFunc(i, serv)
  44. }
  45. select {
  46. case r := <-c:
  47. return r, nil
  48. case <-time.After(500 * time.Millisecond):
  49. return result{}, errors.New("timeout")
  50. }
  51. }
  52. func fakeWeatherServer(name string, interval int) *httptest.Server {
  53. return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  54. log.Printf("%s receive a http request\n", name)
  55. time.Sleep(time.Duration(interval) * time.Millisecond)
  56. w.Write([]byte(name + ":ok"))
  57. }))
  58. }
  59. func main() {
  60. result, err := first(fakeWeatherServer("open-weather-1", 200),
  61. fakeWeatherServer("open-weather-2", 1000),
  62. fakeWeatherServer("open-weather-3", 600))
  63. if err != nil {
  64. log.Println("invoke first error:", err)
  65. return
  66. }
  67. fmt.Println(result)
  68. time.Sleep(10 * time.Second) // 不要立即结束
  69. }

在这版实现中,我们利用context.WithCancel创建了一个可以取消的 context.Context 变量,在每个发起查询请求的 goroutine 中,我们用该变量更新了 request 中的 ctx 变量,使其支持“被取消”。这样在 first 函数中,无论是成功得到某个查询 goroutine 的返回结果,还是超时失败返回,通过defer cancel()设定 cancel 函数在 first 函数返回前被执行,那些尚未返回的在途(on-flight)查询的 goroutine 都将收到 cancel 事件并退出(http 包支持利用 context.Context 的超时和 cancel 机制)。下面是运行该示例的结果:

  1. $go run go-concurrency-pattern-12.go
  2. 2020/01/21 23:20:32 query goroutine-1: send request...
  3. 2020/01/21 23:20:32 query goroutine-0: send request...
  4. 2020/01/21 23:20:32 query goroutine-2: send request...
  5. 2020/01/21 23:20:32 open-weather-3 receive a http request
  6. 2020/01/21 23:20:32 open-weather-2 receive a http request
  7. 2020/01/21 23:20:32 open-weather-1 receive a http request
  8. 2020/01/21 23:20:32 query goroutine-0: get response
  9. {open-weather-1:ok}
  10. 2020/01/21 23:20:32 query goroutine-1: get return error: Get http://127.0.0.1:56437: context canceled
  11. 2020/01/21 23:20:32 query goroutine-2: get return error: Get http://127.0.0.1:56438: context canceled