不要通过共享内存来通信,而应该通过通信来共享内存 - Rob Pike,Go 语言之父
- Go 并发模型指 Go 实现的 CSP
- Go 并发模式指 Go 中的并发原语的常用组合模式
1. Go 并发模型
在前面的内容中,我们说过:传统的编程语言(比如:C++、Java、Python 等)并非面向并发而生,因此他们面对并发的逻辑多是基于操作系统的线程。并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如:共享内存、信号(signal)、管道(pipe)、消息队列、套接字(socket)等。在这些通信原语中,使用最多最广泛(也是最高效的)是结合了线程同步原语(比如:锁以及更为低级的原子操作)的共享内存方式,因此,我们可以说传统语言的并发模型是基于对内存的共享的。

Go 语言从设计伊始就将解决上述传统并发模型的问题作为 Go 的一个目标,并在新并发模型设计中借鉴了著名计算机科学家Tony Hoare提出的 CSP(Communicationing Sequential Processes,通信顺序进程) 并发模型。
这里要理解中文翻译: 按照通信顺序处理数据的程序 (我由下文得出).
Tony Hoare 的 CSP 模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。
- Tony Hoare 认为输入输出应该是基本的编程原语,数据处理逻辑(即 CSP 中的 P)仅需调用输入原语获取数据,顺序地处理数据,并将结果数据通过输出原语输出即可。
- 因此,在 Tony Hoare 眼中,一个符合 CSP 模型的并发程序应该是一组通过输入输出原语连接起来的 P 的集合。从这个角度来看,CSP 理论不仅是一个并发参考模型,也是一种并发程序的程序组织方法。
- 其组合思想与 Go 的设计哲学不谋而合。Tony Hoare 的 CSP 理论中的 P,即“Process(进程)”,是一个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他 P 的输出获取),并生产出可以被其他 P 消费的输出数据。
由上面一段得出 CSP 的组成有:
- 输入原语
- 输出原语
- 顺序处理逻辑

P 并不一定与操作系统的进程或线程划等号。在 Go 中,与“Process”对应的是 goroutine,但 Go 语言中 goroutine 的执行逻辑并不一定是顺序的,goroutine 也可以创建其他 goroutine 以并发地完成工作。
为了实现 CSP 并发模型中的输入和输出原语,Go 引入了 goroutine§之间的通信原语channel。goroutine 可以从 channel 获取输入数据,再将处理后得到的结果数据通过 channel 输出。通过 channel 将 goroutine§组合连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰,我们无需再为那些传统共享内存并发模型中的问题而伤脑筋了。
虽然 CSP 模型已经成为 Go 语言支持的主流并发模型,但 Go 也支持传统的基于共享内存的并发模型,并提供了基本的低级别同步原语(主要是 sync 包中的互斥锁、条件变量、读写锁、原子操作等)。
那么我们在实践中应该选择哪个模型的并发原语呢?是使用 channel 还是在低级同步原语保护下的共享内存呢?
- 毫无疑问,从程序的整体结构来看,就像本节开头引述 Rob Pike 的那句话一样,Go 始终推荐以 CSP 并发模型风格构建并发程序,尤其是在复杂的业务层面,这将提升程序的逻辑清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;
- 对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,可以使用更为高效的低级同步原语(如 mutex)保证 goroutine 对数据的同步访问。
2. Go 常见的并发模式
在语言层面,Go 针对 CSP 并发模型提供了三种并发原语:
- goroutine:对应 CSP 模型中的P,封装了数据的处理逻辑,是 Go 运行时调度的基本执行单元;
- channel:对应 CSP 模型中的输入/输出原语,用于 goroutine 之间的通信和同步;
- select:用于应对多路输入/输出,可以让 goroutine 同时协调处理多个 channel 操作。
接下来,我们就来深入了解一下实践中这些原语的常见组合方式,即并发模式。
1) 创建模式
创建 goroutine:
go fmt.Println("I am a goroutine")// $GOROOT/src/net/http/server.goc := srv.newConn(rw)go c.serve(connCtx)
但在稍复杂一些的并发程序中,我们需要考虑通过 CSP 模型输入/输出原语的承载体channel在 goroutine 之间建立联系。为了满足这一需求,我们通常使用下面的方式来创建一个 goroutine:
type T struct {...}func spawn(f func()) chan T {c := make(chan T)go func() {// 使用channel变量c(通过闭包方式)与调用spawn的goroutine通信... ...f()... ...}()return c}func main() {c := spawn(func(){})// 使用channel变量c与新创建的goroutine通信}
理解: spawn 同时创建 chan 与 goroutine, 就是说旧 goroutine 创建新 goroutine 会伴随着创建 chan, 否则之后如何通信?
这个在内部创建一个 goroutine 并返回一个 channel 类型变量的函数就是 Go 中最常见的 goroutine 创建模式。spawn 函数创建的新 goroutine 与调用 spawn 函数的 goroutine 之间通过一个 channel 建立起了联系:两个 goroutine 可以通过这个 channel 进行通信。spawn 函数的实现也益于 channel 作为 Go 语言一等公民(first-class citizen)的存在:channel 可以像变量一样被初始化、传递和赋值。上面例子中的 spawn 只返回了一个 channel 变量,大家可以根据需要自行定义返回的 channel 个数和用途。
2) 退出模式
a) 分离(detached)模式
这里借鉴了一些线程模型中的术语,比如分离(detached)模式。分离模式是使用最为广泛的 goroutine 退出方式。所谓分离的 goroutine,即创建它的 goroutine 不需要关心它的退出,这类 goroutine 启动后与其创建者彻底分离(detached),其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。通常,这类 goroutine 有两个常见用途:
- 一次性任务:顾名思义,新创建的 goroutine 用来执行一个简单的任务,执行后即退出。比如下面标准库中的代码:
// $GOROOT/src/net/dial.gofunc (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {... ...if oldCancel := d.Cancel; oldCancel != nil {subCtx, cancel := context.WithCancel(ctx)defer cancel()go func() {select {case <-oldCancel:cancel()case <-subCtx.Done():}}()ctx = subCtx}... ...}
- 常驻后台执行一些特定任务,如:监视(monitor)、观察(watcher)等。其实现通常采用for {…}或for { select{… } }代码段形式,并多以定时器(timer)或事件(event)驱动执行。
Go 为每个 P 内置的 GC goroutine 就是这种类型的:
// $GOROOT/src/runtime/mgc.gofunc gcBgMarkStartWorkers() {// Background marking is performed by per-P G's. Ensure that// each P has a background GC G.for _, p := range allp {if p.gcBgMarkWorker == 0 {go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorkernotetsleepg(&work.bgMarkReady, -1)noteclear(&work.bgMarkReady)}}}func gcBgMarkWorker(_p_ *p) {gp := getg()... ...for { // 常驻后台处理GC事宜... ...}}
b) join 模式
在线程模型中,父线程可以通过 pthread_join 来等待子线程结束并获取子线程的结束状态。在 Go 中,我们有时候也有类似的需求:goroutine 的创建者需要等待新 goroutine 的结束。笔者为这样的 goroutine 退出模式起名为“join 模式”。
- 等待一个 goroutine 退出
我们从一个简单的场景开始,先来看看如何等待一个 goroutine 结束。下面是模拟该场景的一段示例代码:
// go-concurrency-pattern-1.gopackage mainimport "time"func worker(args ...interface{}) {if len(args) == 0 {return}interval, ok := args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))}func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {c := make(chan struct{})go func() {f(args...)c <- struct{}{} // f 运行结束后发送通知}()return c}func main() {done := spawn(worker, 5)println("spawn a worker goroutine")<-doneprintln("worker done")}
运行该示例:
$ go run go-concurrency-pattern-1.gospawn a worker goroutineworker done
- 获取 goroutine 的退出状态
如果新 goroutine 的创建者不仅仅要等待 goroutine 的退出,还要精准获取其结束状态,我们可以同样可以通过自定义类型的 channel 来实现这一场景需求。下面是基于上面代码改造后的示例:
// go-concurrency-pattern-2.gopackage mainimport ("errors""fmt""time")var OK = errors.New("ok")func worker(args ...interface{}) error {if len(args) == 0 {return errors.New("invalid args")}interval, ok := args[0].(int)if !ok {return errors.New("invalid interval arg")}time.Sleep(time.Second * (time.Duration(interval)))return OK}func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {c := make(chan error)go func() {c <- f(args...)}()return c}func main() {done := spawn(worker, 5)println("spawn worker1")err := <-donefmt.Println("worker1 done:", err)done = spawn(worker)println("spawn worker2")err = <-donefmt.Println("worker2 done:", err)}
我们将 channel 中承载的类型由struct{}改为了error,这样 channel 承载的信息就不仅仅是一个“信号”了,还携带了“有价值”的信息:新 goroutine 的结束状态。运行上述示例:
$go run go-concurrency-pattern-2.gospawn worker1worker1 done: okspawn worker2worker2 done: invalid args
- 等待多个 goroutine 退出
有些场景中,goroutine 的创建者可能会创建不止一个 goroutine,并且需要等待全部新 goroutine 退出。我们可以通过 Go 语言提供的sync.WaitGroup实现等待多个 goroutine 退出的模式:
// go-concurrency-pattern-3.gopackage mainimport ("fmt""sync""time")func worker(args ...interface{}) {if len(args) == 0 {return}interval, ok := args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))}func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {c := make(chan struct{})var wg sync.WaitGroupfor i := 0; i < n; i++ {wg.Add(1)go func(i int) {name := fmt.Sprintf("worker-%d:", i)f(args...)println(name, "done")wg.Done() // worker done!}(i)}// 避免阻塞 spawnGroup 所在 goroutinego func() {wg.Wait()c <- struct{}{}}()return c}func main() {done := spawnGroup(5, worker, 3)println("spawn a group of workers")<-doneprintln("group workers done")}
运行上述示例代码:
$go run go-concurrency-pattern-3.gospawn a group of workersworker-2: doneworker-1: doneworker-0: doneworker-4: doneworker-3: donegroup workers done
- 支持超时机制的等待
有时候,我们不想无限阻塞等待所有新创建 goroutine 的退出,而是仅等待一个合理的时间。如果在这个时间范围内 goroutine 没有退出,则创建者会继续向下执行或主动退出。下面的示例代码是在等待多个 goroutine 退出的例子之上增加了超时机制:
// go-concurrency-pattern-4.go... ...func main() {done := spawnGroup(5, worker, 30)println("spawn a group of workers")timer := time.NewTimer(time.Second * 5)defer timer.Stop()select {case <-timer.C:println("wait group workers exit timeout!")case <-done:println("group workers done")}}
运行上述示例代码:
$ go run go-concurrency-pattern-4.gospawn a group of workerswait group workers exit timeout!
c) “notify-and-wait”模式
前面的几个场景中,goroutine 的创建者都是在被动地等待着新 goroutine 的退出。但很多时候,goroutine 创建者需要主动通知那些新 goroutine 退出,尤其是当 main goroutine 作为创建者时。main goroutine 退出意味着 Go 程序的终止,而粗暴地直接让 main goroutine 退出的方式可能会导致业务数据的损坏、不完整或丢失。我们可以通过“notify-and-wait(通知并等待)”模式来满足这一场景的要求。虽然这一模式也不能完全避免“损失”,但是它给了各个 goroutine 一个“挽救数据”的机会,可以尽可能地减少损失的程度。
- 通知并等待一个 goroutine 退出
我们先从一个简单的“通知并等待一个 goroutine 退出”场景入手,下面是满足该场景要求的示例代码:
// go-concurrency-pattern-5.gopackage mainimport "time"func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))}func spawn(f func(int)) chan string {quit := make(chan string)go func() {var job chan int // 模拟job channelfor {select {case j := <-job:f(j)case <-quit:quit <- "ok" // 只用一个 chan 就完成停止通知和结束返回}}}()return quit}func main() {quit := spawn(worker)println("spawn a worker goroutine")time.Sleep(5 * time.Second)// 通知新创建的goroutine退出println("notify the worker to exit...")quit <- "exit"timer := time.NewTimer(time.Second * 10)defer timer.Stop()select {case status := <-quit:println("worker done:", status)case <-timer.C:println("wait worker exit timeout")}}
运行上述示例代码:
$go run go-concurrency-pattern-5.gospawn a worker goroutinenotify the worker to exit...worker done: ok
- 通知并等待多个 goroutine 退出
下面是“通知并等待多个 goroutine 退出”的场景。Go 语言的 channel 有一个特性,那就是当使用 close 函数关于 (闭) channel 时,所有阻塞到该 channel 上的 goroutine 都会得到“通知”,我们就利用这一特性实现满足这一场景的模式:
// go-concurrency-pattern-6.gopackage mainimport ("fmt""sync""time")func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))}func spawnGroup(n int, f func(int)) chan struct{} {quit := make(chan struct{})job := make(chan int)var wg sync.WaitGroupfor i := 0; i < n; i++ {wg.Add(1)go func(i int) {defer wg.Done() // 保证wg.Done在goroutine退出前被执行name := fmt.Sprintf("worker-%d:", i)for {j, ok := <-jobif !ok {println(name, "done")return}// do the jobworker(j)}}(i)}go func() {<-quit // 接收停止信号close(job) // 广播给所有新goroutinewg.Wait()quit <- struct{}{} // 返回结束信号}()return quit}func main() {quit := spawnGroup(5, worker)println("spawn a group of workers")time.Sleep(5 * time.Second)// notify the worker goroutine group to exitprintln("notify the worker group to exit...")quit <- struct{}{}timer := time.NewTimer(time.Second * 5)defer timer.Stop()select {case <-timer.C:println("wait group workers exit timeout!")case <-quit:println("group workers done")}}
运行上述示例代码:
$go run go-concurrency-pattern-6.gospawn a group of workersnotify the worker group to exit...worker-3: doneworker-0: doneworker-4: doneworker-2: doneworker-1: donegroup workers done
d) 退出模式的应用
我们尝试将问题范围缩小,聚焦在实现一个“超时等待退出”框架以统一解决各种运行形态 goroutine 的优雅退出问题。
// go-concurrency-pattern-7.gotype GracefullyShutdowner interface {Shutdown(waitTimeout time.Duration) error}
这样,凡是实现了该接口的类型均可在程序退出时得到退出的通知和调用,从而有机会做退出前的最后清理工作。这里还提供了一个类似http.HandlerFunc的类型ShutdownerFunc,用于将普通函数转化为实现了 GracefullyShutdowner 接口的类型实例(得益于函数在 Go 中为“一等公民”的特质):
// go-concurrency-pattern-7.gotype ShutdownerFunc func(time.Duration) errorfunc (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {return f(waitTimeout)}
一组 goroutine 的退出总体上有两种情况。一种是并发退出,在这类退出方式下,各个 goroutine 的退出先后次序对数据处理无影响,因此各个 goroutine 可以并发执行退出逻辑;另外一种则是串行退出,即各个 goroutine 之间的退出是按照一定次序逐个进行的。次序若错了可能会导致程序的状态混乱和错误。
并发退出:
// go-concurrency-pattern-7.gofunc ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {c := make(chan struct{})go func() {var wg sync.WaitGroupfor _, g := range shutdowners {wg.Add(1)go func(shutdowner GracefullyShutdowner) {defer wg.Done()shutdowner.Shutdown(waitTimeout)}(g)}wg.Wait()c <- struct{}{}}()timer := time.NewTimer(waitTimeout)defer timer.Stop()select {case <-c:return nilcase <-timer.C:return errors.New("wait timeout")}}
下面是该并发退出函数对应的测试用例,通过该用例我们也可以直观了解到该函数的使用方法:
// go-concurrency-pattern-7_test.gopackage mainimport ("testing""time")func shutdownMaker(processTm int) func(time.Duration) error {return func(time.Duration) error { // 参数没有使用time.Sleep(time.Second * time.Duration(processTm))return nil}}func TestConcurrentShutdown(t *testing.T) {f1 := shutdownMaker(2)f2 := shutdownMaker(6)err := ConcurrentShutdown(10*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))if err != nil {t.Errorf("want nil, actual: %s", err)return}err = ConcurrentShutdown(4*time.Second, ShutdownerFunc(f1), ShutdownerFunc(f2))if err == nil { // 注意这里是 "=="t.Error("want timeout, actual nil")return}}
在上面测试中,我们通过一个工具函数shutdownMaker“制作”出通过ShutdownerFunc转型即可满足接口 GracefullyShutdowner 的类型实例,并分别测试了ConcurrentShutdown函数的正常和等待超时两种状况。运行上面测试用例:
$ go test -v ./go-concurrency-pattern-7_test.go ./go-concurrency-pattern-7.go=== RUN TestConcurrentShutdown--- PASS: TestConcurrentShutdown (10.00s)PASSok command-line-arguments 10.001s
串行退出:
// go-concurrency-pattern-7.gofunc SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {start := time.Now()var left time.Durationtimer := time.NewTimer(waitTimeout)for _, g := range shutdowners {elapsed := time.Since(start)left = waitTimeout - elapsedc := make(chan struct{})go func(shutdowner GracefullyShutdowner) {shutdowner.Shutdown(left)c <- struct{}{}}(g)timer.Reset(left)select {case <-c://continuecase <-timer.C:return errors.New("wait timeout")}}return nil}
停止顺序是传入的顺序.
3) 管道(pipeline)模式
管道是 Unix/Linux 上一种典型的并发程序设计模式,也是 Unix 崇尚“组合”设计哲学的具体体现。Go 中没有定义管道,但是具有深厚 Unix 文化背景的 Go 语言缔造者们显然借鉴了 Unix 的设计哲学,在 Go 中引入了channel这种并发原语,而channel原语使构建管道并发模式变得容易且自然。

我们看到在 Go 中管道模式被实现成了由channel连接的一条“数据流水线”。该流水线中,每个数据处理环节都由一组相同功能的 goroutine完成。在每个数据处理环节,goroutine 都要从数据输入 channel 获取前一个环节生产的数据,然后对这些数据进行处理,并将处理后的结果数据通过数据输出 channel 发往下一个环节。
// go-concurrency-pattern-8.gopackage mainfunc newNumGenerator(start, count int) <-chan int {c := make(chan int)go func() {for i := start; i < start+count; i++ {c <- i}close(c) // 塞完数据后才关闭}()return c}func filterOdd(in int) (int, bool) {if in%2 != 0 {return 0, false}return in, true}func square(in int) (int, bool) {return in * in, true}// 原数据源 + 数据处理函数 -> 新的数据源func spawn(f func(int) (int, bool), in <-chan int) <-chan int {out := make(chan int)go func() {for v := range in {r, ok := f(v)if ok {out <- r}}close(out)}()return out}func main() {in := newNumGenerator(1, 20)out := spawn(square, spawn(filterOdd, in))for v := range out {println(v)}}
运行上述示例代码:
$go run go-concurrency-pattern-8.go4163664100144196256324400
管道模式具有良好的可扩展性。如果我们要在上面示例代码的基础上在最开始处新增一个处理环节,比如过滤掉所有大于 100 的数(filterNumOver100,我们可以像下面代码这样扩展我们的管道流水线:
in := newNumGenerator(1, 20)out := spawn(square, spawn(filterOdd, spawn(filterNumOver10, in))
下面我们再来了解两种基于管道模式的扩展模式。
- 扇出模式(fan-out)
在某一处理环节中,多个功能相同的 goroutine 从同一个 channel 读取数据并处理,直到该 channel 关闭,这种情况被称为扇出(fan-out)。使用扇出模式可以在一组 goroutine 中均衡分配工作量,从而可以更好地并行化对 CPU 和 I/O 的使用。
- 扇入模式(fan-in)
在某个处理环节,处理程序面对不止一个输入 channel。我们把所有输入 channel 的数据汇聚到一个统一的输入 channel,然后处理程序再从这个汇聚后的 channel 读取数据并处理,直到该汇聚 channel 因所有输入 channel 关闭而关闭。这种情况被称为扇入(fan-in)。
下图直观的展示了扇出和扇入模式:

下面我们来看看扇出和扇入模式的实现示例:
// go-concurrency-pattern-9.gopackage mainimport ("fmt""sync""time")func newNumGenerator(start, count int) <-chan int {c := make(chan int)go func() {for i := start; i < start+count; i++ {c <- i}close(c)}()return c}func filterOdd(in int) (int, bool) {if in%2 != 0 {return 0, false}return in, true}func square(in int) (int, bool) {return in * in, true}func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {groupOut := make(chan int)var outSlice []chan intfor i := 0; i < num; i++ {out := make(chan int)go func(i int) {name := fmt.Sprintf("%s-%d:", name, i)fmt.Printf("%s begin to work...\n", name)for v := range in {r, ok := f(v)if ok {out <- r}}close(out)fmt.Printf("%s work done\n", name)}(i)outSlice = append(outSlice, out)}// Fan-in pattern//// out --\// \// out ---- --> groupOut// /// out --///go func() {var wg sync.WaitGroupfor _, out := range outSlice {wg.Add(1)go func(out <-chan int) {for v := range out {groupOut <- v}wg.Done()}(out)}wg.Wait()close(groupOut)}()return groupOut}func main() {in := newNumGenerator(1, 20)out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))time.Sleep(3 * time.Second) //为了输出更直观的结果,这里等上面的goroutine都就绪for v := range out {fmt.Println(v)}}
运行上述示例代码:
$ go run go-concurrency-pattern-9.gosquare-1: begin to work...filterOdd-1: begin to work...square-0: begin to work...filterOdd-2: begin to work...filterOdd-0: begin to work...filterOdd-1: work done4163610064144324400256196filterOdd-2: work donefilterOdd-0: work donesquare-0: work donesquare-1: work done
4) 超时(timeout)与取消(cancel)模式
我们经常会使用 Go 编写向服务发起请求并获取应答结果的客户端应用。这里我们就来看一个这样的例子:我们要编写一个从气象数据服务中心获取气象信息的客户端。该客户端每次会并发向从三个气象数据服务中心发起数据查询请求,并以返回最快的那个响应信息作为此次请求的应答返回值。下面的代码是这个例子的第一版实现:
// go-concurrency-pattern-10.gopackage mainimport ("io/ioutil""log""net/http""net/http/httptest""time")type result struct {value string}func first(servers ...*httptest.Server) (result, error) {c := make(chan result, len(servers))queryFunc := func(server *httptest.Server) {url := server.URLresp, err := http.Get(url)if err != nil {log.Printf("http get error: %s\n", err)return}defer resp.Body.Close()body, _ := ioutil.ReadAll(resp.Body)c <- result{value: string(body),}}for _, serv := range servers {go queryFunc(serv)}return <-c, nil}func fakeWeatherServer(name string) *httptest.Server {return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {log.Printf("%s receive a http request\n", name)time.Sleep(1 * time.Second)w.Write([]byte(name + ":ok"))}))}func main() {result, err := first(fakeWeatherServer("open-weather-1"),fakeWeatherServer("open-weather-2"),fakeWeatherServer("open-weather-3"))if err != nil {log.Println("invoke first error:", err)return}log.Println(result)}
我们运行一下这段示例代码:
$go run go-concurrency-pattern-10.go2020/01/21 21:57:04 open-weather-3 receive a http request2020/01/21 21:57:04 open-weather-1 receive a http request2020/01/21 21:57:04 open-weather-2 receive a http request2020/01/21 21:57:05 {open-weather-3:ok}
上述的例子运行在一种较为“理想”的情况下,但现实中网络情况错综复杂,远程服务的状态也不甚明朗,很可能出现服务端长时间没有响应的情况,这时为了保证良好的用户体验,我们需要对客户端的行为进行精细化的控制,比如:我们只等待 500ms,超过 500ms 仍然没有收到哪怕是一个“气象数据服务中心”的响应,我们的 first 函数就返回失败,以保证等待的时间在人类的忍耐力承受范围之内。我们在上述例子的基础上对 first 函数做的调整如下:
// go-concurrency-pattern-11.gofunc first(servers ...*httptest.Server) (result, error) {c := make(chan result, len(servers))queryFunc := func(server *httptest.Server) {url := server.URLresp, err := http.Get(url)if err != nil {log.Printf("http get error: %s\n", err)return}defer resp.Body.Close()body, _ := ioutil.ReadAll(resp.Body)c <- result{value: string(body),}}for _, serv := range servers {go queryFunc(serv)}select {case r := <-c:return r, nilcase <-time.After(500 * time.Millisecond):return result{}, errors.New("timeout")}}
我们通过增加一个定时器,并通过 select 原语监视该定时器事件和响应 channel 上的事件。如果响应 channel 上长时间没有数据返回,则当定时器事件触发后,first 函数返回:
$ go run go-concurrency-pattern-11.go2020/01/21 22:41:02 open-weather-1 receive a http request2020/01/21 22:41:02 open-weather-2 receive a http request2020/01/21 22:41:02 open-weather-3 receive a http request2020/01/21 22:41:02 invoke first error: timeout
加上了 “超时模式” 的版本依然有一个明显的问题,那就是即便 first 函数因超时返回,三个已经创建的 goroutine 可能依然处在向“气象数据服务中心”请求或等待应答中,没有返回,也没有被回收,资源仍然在占用,即使它们的存在已经没有了任何意义。一种合理的解决思路是让这三个 goroutine 支持“取消”操作。这种情况下,我们一般使用 Go 的 context 包来实现“取消”模式。context 包是谷歌内部关于 Go 的一个最佳实践,Go 在 1.7 版本将 context 包引入到标准库中。下面是利用 context 包实现“取消模式”的代码:
// go-concurrency-pattern-12.gopackage mainimport ("context""errors""fmt""io/ioutil""log""net/http""net/http/httptest""time")type result struct {value string}func first(servers ...*httptest.Server) (result, error) {c := make(chan result)ctx, cancel := context.WithCancel(context.Background())defer cancel()queryFunc := func(i int, server *httptest.Server) {url := server.URLreq, err := http.NewRequest("GET", url, nil)if err != nil {log.Printf("query goroutine-%d: http NewRequest error: %s\n", i, err)return}req = req.WithContext(ctx)log.Printf("query goroutine-%d: send request...\n", i)resp, err := http.DefaultClient.Do(req)if err != nil {log.Printf("query goroutine-%d: get return error: %s\n", i, err)return}log.Printf("query goroutine-%d: get response\n", i)defer resp.Body.Close()body, _ := ioutil.ReadAll(resp.Body)c <- result{value: string(body),}return}for i, serv := range servers {go queryFunc(i, serv)}select {case r := <-c:return r, nilcase <-time.After(500 * time.Millisecond):return result{}, errors.New("timeout")}}func fakeWeatherServer(name string, interval int) *httptest.Server {return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {log.Printf("%s receive a http request\n", name)time.Sleep(time.Duration(interval) * time.Millisecond)w.Write([]byte(name + ":ok"))}))}func main() {result, err := first(fakeWeatherServer("open-weather-1", 200),fakeWeatherServer("open-weather-2", 1000),fakeWeatherServer("open-weather-3", 600))if err != nil {log.Println("invoke first error:", err)return}fmt.Println(result)time.Sleep(10 * time.Second) // 不要立即结束}
在这版实现中,我们利用context.WithCancel创建了一个可以取消的 context.Context 变量,在每个发起查询请求的 goroutine 中,我们用该变量更新了 request 中的 ctx 变量,使其支持“被取消”。这样在 first 函数中,无论是成功得到某个查询 goroutine 的返回结果,还是超时失败返回,通过defer cancel()设定 cancel 函数在 first 函数返回前被执行,那些尚未返回的在途(on-flight)查询的 goroutine 都将收到 cancel 事件并退出(http 包支持利用 context.Context 的超时和 cancel 机制)。下面是运行该示例的结果:
$go run go-concurrency-pattern-12.go2020/01/21 23:20:32 query goroutine-1: send request...2020/01/21 23:20:32 query goroutine-0: send request...2020/01/21 23:20:32 query goroutine-2: send request...2020/01/21 23:20:32 open-weather-3 receive a http request2020/01/21 23:20:32 open-weather-2 receive a http request2020/01/21 23:20:32 open-weather-1 receive a http request2020/01/21 23:20:32 query goroutine-0: get response{open-weather-1:ok}2020/01/21 23:20:32 query goroutine-1: get return error: Get http://127.0.0.1:56437: context canceled2020/01/21 23:20:32 query goroutine-2: get return error: Get http://127.0.0.1:56438: context canceled
