协程(goroutine)与通道(channel)

Go 语言为构建并发程序的基本代码块是协程 (goroutine) 与通道 (channel) 。他们需要语言,编译器,和 runtime 的支持。Go 语言提供的垃圾回收器对并发编程至关重要。

不要通过共享内存来通信,而通过通信来共享内存。

并发、并行和协程

1.什么是协程

一个应用程序是运行在机器上的一个进程;进程是一个运行在自己内存地址空间里的独立执行体。一个进程由一个或多个操作系统线程组成,这些线程其实是共享同一内存地址空间的一起工作的执行体。几乎所有’正式’的程序都是多线程的,以便让用户或计算机不必等待,或者能够同时服务多个请求(如Web服务器),或增加性能和吞吐量(例如,通过对不同的数据集并行执行代码)。一个并发程序可以在一个处理器或者内核上使用多个线程来执行任务,但是只有同一个程序在某个时间点同时运行在多核或者多处理器上才是真正的并行。

并行是一种通过使用多处理器以提高速度的能力。所以并发程序可以是并行的,也可以不是。

使用多线程的应用难以做到准确,最主要的问题是内存中的数据共享,它们会被多线程以无法预知的方式进行操作,导致一些无法重现或随机的结果(称作竟态)。

不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。

解决之道在于同步不同的线程,对数据加锁,这样同时就只有一个线程可以变更数据。在Go的标准库sync中有一些工具用来在低级别的代码中实现加锁;但是这个会带来更高的复杂度,更容易使代码出错以及更低的性能,所以这个经典的方法明显不再适合现代多核/多处理编程:thread-per-connection模型不够有效。

在Go中,应用程序并发处理的部分被称作goroutines(协程),它可以进行更有效的并发运算。在协程和操作系统线程之间并无一对一的关系:协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们只上的;协程调度器在Go运行时很好的完成了这个工作。

协程工作在相同的地址空间中,所以共享内存的方式一定是同步的;这个可以使用sync包来实现,不过很不鼓励这样做,Go使用channels来同步协程。

当系统调用(如等待I/O)阻塞协程时,其他协程会继续在其他线程上工作。协程的设计隐藏了许多线程创建和管理方面的复杂工作。

协程是轻量的,比线程更轻。它们痕迹非常不明显(使用少量的内存和资源):使用4K的栈内存就可以在堆中创建它们。因为创建非常廉价,必要的时候可以轻松创建并运行大量的协程(在同一个地址空间中100000个连续的协程)。并且它们对栈进行了分割,从而动态的增加(或缩减)内存的使用;栈的管理是自动的,但不是由垃圾回收器管理的,而是在协程退出后自动释放。

协程可以运行在多个操作系统线程之间,也可以运行在线程之内,让很小的内存占用就可以处理大量的任务。由于操作系统线程上的协程时间片,可以使用少量的操作系统线程就能拥有任意多个提供服务的协程,而且Go运行时可以聪明的意识到哪些协程被阻塞了,暂时搁置它们并处理其他协程。

存在两种并发方式:确定性的(明确定义排序)和非确定性的(加锁/互斥从而为定义排序)。Go的协程和通道理所当然的支持确定性的并发方式(例如通道具有一个sender和一个receiver)。

协程是通过使用关键字go调用(执行)一个函数或者方法来实现的(也可以是匿名或lambda函数)。这样会在当前的计算过程中开始一个同时进行的函数,在相同的地址空间中并且分配了独立的栈,例如:go sum(bigArray),在后台计算总和。

协程的栈会根据需要进行伸缩,不出现栈溢出;当协程结束的时候,它会静默退出:用来启动这个协程的函数不会得到任何的返回值。

任何Go程序都必须有main()函数也可以看做是一个协程,尽管它并没有通过go来启动。协程可以在程序初始化的过程中运行(在init()函数中)。

在一个协程中,比如它需要进行非常密集的运算,你可以在运算循环中周期的使用runtime.Gosched():这会让出处理器,允许运行其他协程;它并不会是当前协程挂起,所以它会自动恢复执行。使用Gosched()可以使计算均匀分布,是通信不至于迟迟得不到响应。

在Go 1.5以前调度器仅使用单线程,也就是说只实现了并发。想要发挥多核处理器的并行,需要在我们的程序中显式调用 runtime.GOMAXPROCS(n) 告诉调度器同时使用多个线程。GOMAXPROCS 设置了同时运行逻辑代码的系统线程的最大数量,并返回之前的设置。如果n < 1,不会改变当前设置。

默认情况下,在Go 1.5将标识并发系统线程个数的runtime.GOMAXPROCS的初始值由1改为了运行环境的CPU核数。

goroutine1.go:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. fmt.Println("In main()")
  8. go longWait()
  9. go shortWait()
  10. fmt.Println("sleep in main()")
  11. time.Sleep(10 * 1e9)
  12. fmt.Println("end of main()")
  13. }
  14. func longWait() {
  15. fmt.Println("Beging longWait()")
  16. time.Sleep(5 * 1e9)
  17. fmt.Println("End of longWait()")
  18. }
  19. func shortWait() {
  20. fmt.Println("Beging shortWait()")
  21. time.Sleep(5 * 1e9)
  22. fmt.Println("End of shortWait()")
  23. }

main(),longWait() 和 shortWait() 三个函数作为独立的处理单元按顺序启动,然后开始并行运行。每一个函数都在运行的开始和结束阶段输出了消息。为了模拟他们运算的时间消耗,我们使用了 time 包中的 Sleep 函数。Sleep() 可以按照指定的时间来暂停函数或协程的执行,这里使用了纳秒(ns,符号 1e9 表示 1 乘 10 的 9 次方,e=指数)。

他们按照我们期望的顺序打印出了消息,几乎都一样,可是我们明白这是模拟出来的,以并行的方式。我们让 main() 函数暂停 10 秒从而确定它会在另外两个协程之后结束。如果不这样(如果我们让 main() 函数停止 4 秒),main() 会提前结束,longWait() 则无法完成。如果我们不在 main() 中等待,协程会随着程序的结束而消亡。

当 main() 函数返回的时候,程序退出:它不会等待任何其他非 main 协程的结束。这就是为什么在服务器程序中,每一个请求都会启动一个协程来处理,server() 函数必须保持运行状态。通常使用一个无限循环来达到这样的目的。

另外,协程是独立的处理单元,一旦陆续启动一些协程,你无法确定他们是什么时候真正开始执行的。你的代码逻辑必须独立于协程调用的顺序。

协程间的信道

在上边例子中,协程是独立执行的,它们之间没有通信。它们必须通信才会变得更有用:彼此之间发送和接收信息并且协调/同步它们的工作。协程可以使用共享变量来通信,但是很不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。

Go有一种特殊的类型,通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。数据的所有权(可以读写数据的能力)也因此被传递。

通道服务于通信的两个目的:值的交换,同步的,保证了两个计算(协程)任何时候都是可知状态。

声明通道:var identifier chan datatype,为初始化的通道的值是nil。

所以通道只能传输一个类型的数据,比如chan int或者chan string,所有的类型都可以用于通道,空接口interface{}也可以,甚至可以创建通道的通道。

通道实际上是类型化消息的队列:使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给它们的元素的顺序。通道也是引用类型,所以使用make()函数来分配内存。

  1. // var ch1 chan string
  2. // ch1 = make(chan string)
  3. ch1 := make(chan string)

通信操作符 <-

这个操作符直观的标示了数据的传输:信息按照箭头的方向流动。

流向通道(发送):ch <- int,用通道ch发送变量int1

从通道流出(接收),三种方式:int2 = <- ch,变量int2从通道ch接收数据;假设int2已经声明过了,如果没有的话可以写成int2 := <- ch

<- ch可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:

  1. if <- ch != 1000 {
  2. ...
  3. }

同一个操作符<-既用于发送也用于接收,但Go会根据操作对象弄明白该干什么。虽非强制要求,但为了可读性通道的命名通常以ch开头或者包含chan。通道的发送和接收都是原子操作:它们总是互不干扰地完成。

goroutine2.go:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func sendData(ch chan string) {
  7. ch <- "test1"
  8. ch <- "test2"
  9. ch <- "test3"
  10. ch <- "test4"
  11. ch <- "test5"
  12. }
  13. func getData(ch chan string) {
  14. var input string
  15. for {
  16. input = <-ch
  17. fmt.Printf("%s ", input)
  18. }
  19. }
  20. func main() {
  21. ch := make(chan string)
  22. go sendData(ch)
  23. go getData(ch)
  24. time.Sleep(1e9)
  25. }

main() 函数中启动了两个协程:sendData() 通过通道 ch 发送了5个字符串,getData() 按顺序接收它们并打印出来。如果2个协程需要通信,必须给它们同一个通道作为参数才醒。

尝试一下如果注释掉time.Sleep(1e9)会如何。

会发现协程之间的同步非常重要:

  • main()等待了1秒让两个协程完成,如果不这样,sendData()就没有机会输出;
  • getData()使用了无限循环:它随着sendData()的发送完成和ch变空也结束了;
  • 如果移除一个或所有go关键字,程序无法运行,Go运行时会抛出panic;

运行时(runtime)会检查所有的协程(本例中只有一个)是否在等待着什么东西(可从某个通道读取或写入某个通道),意味着程序将无法继续执行。这是死锁(deadlock)的一种形式,而运行时(runtime)可以为我们检测到这种情况。

注意:不要使用打印状态来表明通道的发送和接收顺序:由于打印状态和通道实际发生读写的时间延迟会导致和真实发生的顺序不同。

通道阻塞

默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。可以想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:

  1. 对于同一个通道,发送操作(协程或函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待ch再次变为可用状态:就是通道值被接收时。
  2. 对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。

以下示例验证了上述理论,一个协程在无限循环中给通道发送整数数据。不过因为没有接收者,只输出了一个数字0.

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. ch1 := make(chan int)
  7. go pump(ch1)
  8. fmt.Println(<-ch1)
  9. }
  10. func pump(ch chan int) {
  11. for i := 0; ; i++ {
  12. ch <- i
  13. }
  14. }

输出:0
为通道解除阻塞定义了suck函数来在无限循环中读取通道,

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. ch1 := make(chan int)
  8. go pump(ch1)
  9. go suck(ch1)
  10. fmt.Println(<-ch1)
  11. time.Sleep(1e9)
  12. }
  13. func pump(ch chan int) {
  14. for i := 0; ; i++ {
  15. ch <- i
  16. }
  17. }
  18. func suck(ch chan int) {
  19. for {
  20. fmt.Println(<-ch)
  21. }
  22. }

通过一个(或多个)通道交换数据进行协程同步

通信是一种同步形式:通过通道,两个协程在通信中某刻同步交换数据。无缓冲通道成为了多个协程同步的完美工具。甚至可以在通道两端互相阻塞对方,形成死锁。Go运行时会检查并panic,停止程序。无缓冲通道会被阻塞。设计无阻塞的程序可以避免这种情况,或者使用带缓冲的通道。

下面示例会导致panic: fatal error: all goroutines are asleep - deadlock!

  1. package main
  2. import "fmt"
  3. func f1(in chan int) {
  4. fmt.Println(<-in)
  5. }
  6. func main() {
  7. out := make(chan int)
  8. out <- 2
  9. go f1(out)
  10. }

同步通道-使用带缓冲的通道

一个无缓冲通道只能包含1个元素,有时显得很局限。给通道提供一个缓存,可以在扩展的make命令中设置它的容量:

  1. // buf := 100
  2. // ch1 := make(chan string, buf)
  3. ch1 := make(chan string, 100)

在缓冲被全部使用之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。

缓冲容量和类型无关,所以可以(尽管可能导致危险)给一些通道设置不同的容量,只要它们拥有同样的元素类型,内置的cap函数可以返回缓冲区的容量。

如果容量大于0,通道就是异步的了:缓冲满载或变空之前通信不会阻塞,元素会按照发送的顺序被接收。如果容量是0或者未设置,通信仅在收发双发准备好的情况下才可以成功。

协程中用通道输出结果

为了知道计算何时完成,可以通过信道回报。例如:

  1. ch := make(chan int)
  2. go sum(bigArray, ch)
  3. ...
  4. sum := <- ch

信号量模式

下边的片段阐明:协程通过在通道ch中放置一个值来处理结束的信号。main协程等待<-ch直到从中获取的值。

  1. func compute(ch chan int) {
  2. ch <- someCoputation()
  3. }
  4. func main() {
  5. ch := make(chan int)
  6. go compute(ch)
  7. doSomethingElseForAWhile()
  8. result := <- ch
  9. }

这个信号也可以是其他的,不返回结果,比如下面这个协程中的匿名函数(lambda)协程:

  1. ch := make(chan int)
  2. go func() {
  3. // doSomething
  4. ch <- 1 // Send a signal
  5. }()
  6. doSomething
  7. <- ch // Wait for goroutine to finish; discard sent value

或者等待两个协程完成,每一个都会对切片s的一部分进行排序:

  1. done := make(chan bool)
  2. doSort := func(s []int){
  3. sort(s)
  4. done <- true
  5. }
  6. i := privot(s)
  7. go doSort(s[:i])
  8. go doSort(s[i:])
  9. <- done
  10. <- done

下边代码用完整的信号量模式对长度为N的float64切片进行了N个doSomething()计算并同时完成,通道sem分配了相同的长度(且包含空接口类型的元素),待所有的计算都完成后,发送信号(通过放入值)。在循环中从通道sem不停的接收数据来等待所有的协程完成。

  1. type Empty interface {}
  2. var empty Empty
  3. data := make([]float64, N)
  4. res := make([]float64, N)
  5. sem := make(chan Empty, N)
  6. for i, xi := range data {
  7. go func(i int, xi float64){
  8. res[i] = doSomething(i, xi)
  9. sem <- empty
  10. }(i,xi)
  11. }
  12. for i:=0; i<N; i++ { <-sem }

用带缓冲通道实现一个信号量

信号量是实现互斥锁(排外锁)常见的同步机制,限制对资源的访问,解决读写问题,比如没有实现信号量的sync的Go包,使用带缓冲的通道可以轻松实现:

  • 带缓冲通道的容量和要同步的资源容量相同
  • 通道的长度(当前存放的元素个数)与当前资源被使用的数量相同
  • 容量减去通道的长度就是未处理的资源个数(标准信号量的整数值)

不用管通道中存放的是什么,只关注长度;因此创建一个长度可变但容量为0(字节)的通道:

  1. type Empty interface {}
  2. type semaphore chan Empty

将可用资源的数量N来初始化信号量semaphore: sem = make(semaphore, N)

然后直接对信号量进行操作:

  1. func (s semaphore) P(n int) {
  2. e := new(Empty)
  3. for i := 0; i < n; i++ {
  4. s <- e
  5. }
  6. }
  7. func (s semaphore) V(n int) {
  8. for i := 0; i < n; i++ {
  9. <- s
  10. }
  11. }

练习:有两个协程,第一个提供数字 0,10,20,…90 并将他们放入通道,第二个协程从通道中读取并打印。main() 等待两个协程完成后再结束。

  1. package main
  2. import "fmt"
  3. func numGen(start, count int, out chan<- int) {
  4. for i := 0; i < count; i++ {
  5. out <- start
  6. start = start + count
  7. }
  8. close(out)
  9. }
  10. func numEchoRange(in <-chan int, done chan<- bool) {
  11. for num := range in {
  12. fmt.Printf("%d\n", num)
  13. }
  14. done <- true
  15. }
  16. func main() {
  17. numChan := make(chan int)
  18. done := make(chan bool)
  19. go numGen(0, 10, numChan)
  20. go numEchoRange(numChan, done)
  21. <-done
  22. }

编程中常见的另外一种模式如下:不将通道作为参数传递给协程,而用函数来生成一个通道并返回;函数内有个匿名函数被协程调用。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func pump() chan int {
  7. ch := make(chan int)
  8. go func() {
  9. for i := 0; ; i++ {
  10. ch <- i
  11. }
  12. }()
  13. return ch
  14. }
  15. func suck(ch chan int) {
  16. for {
  17. fmt.Println(<-ch)
  18. }
  19. }
  20. func main() {
  21. stream := pump()
  22. go suck(stream)
  23. time.Sleep(1e9)
  24. }

给通道使用for循环

for循环的range语句可以用在通道ch上,便可以从通道中获取值,像这样:

  1. for v := range ch {
  2. fmt.Printf("The value is %v\n", v)
  3. }

它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func pump() chan int {
  7. ch := make(chan int)
  8. go func() {
  9. for i := 0; ; i++ {
  10. ch <- i
  11. }
  12. }()
  13. return ch
  14. }
  15. func suck(ch chan int) {
  16. go func() {
  17. for v := range ch {
  18. fmt.Println(v)
  19. }
  20. }()
  21. }
  22. func main() {
  23. suck(pump())
  24. time.Sleep(1e9)
  25. }

这个模式用到了生产者-消费者模式,通常,需要从包含了地址索引字段items的容器给通道填入元素。为容器的类型定义一个方法Iter(),返回一个只读的通道items:

  1. func (c *container) Iter() <- chan item {
  2. ch := make(chan item)
  3. go func() {
  4. for i := 0; i < c.Len(); i++ {
  5. ch <- c.items[i]
  6. }
  7. }()
  8. return ch
  9. }

在协程里,一个for循环迭代容器c中的元素(对于树或图的算法,这种简单的for循环可以替换为深度优先搜索)。

调用这个方法的代码可以这样迭代容器:for x := range container.Iter() { ... }

通道的方向

通道类型可以用注解来表示它只发送或者只接收:

  1. var send_only chan <- int // channel can only receive data
  2. var recv_only <- chan int // channel can only send data

只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。通道创建的时候都是双向的,但也可以分配有方向的通道变量,像以下代码:

  1. var c = make(chan int)
  2. go source(c)
  3. go sink(c)
  4. func source(ch chan<- int) {
  5. for { ch <- 1 }
  6. }
  7. func sink(ch <-chan int) {
  8. for { <- ch }
  9. }

sieve1.go:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func pump() chan int {
  7. ch := make(chan int)
  8. go func() {
  9. for i := 0; ; i++ {
  10. ch <- i
  11. }
  12. }()
  13. return ch
  14. }
  15. func suck(ch chan int) {
  16. go func() {
  17. for v := range ch {
  18. fmt.Println(v)
  19. }
  20. }()
  21. }
  22. func main() {
  23. suck(pump())
  24. time.Sleep(1e9)
  25. }

sieve2.go:

  1. package main
  2. import "fmt"
  3. func generate() chan int {
  4. ch := make(chan int)
  5. go func() {
  6. for i := 2; ; i++ {
  7. ch <- i
  8. }
  9. }()
  10. return ch
  11. }
  12. func filter(in chan int, prime int) chan int {
  13. out := make(chan int)
  14. go func() {
  15. for {
  16. if i := <-in; i%prime != 0 {
  17. out <- i
  18. }
  19. }
  20. }()
  21. return out
  22. }
  23. func sieve() chan int {
  24. out := make(chan int)
  25. go func() {
  26. ch := generate()
  27. for {
  28. prime := <-ch
  29. ch = filter(ch, prime)
  30. out <- prime
  31. }
  32. }()
  33. return out
  34. }
  35. func main() {
  36. primes := sieve()
  37. for {
  38. fmt.Println(<-primes)
  39. }
  40. }

协程的同步:关闭通道-测试阻塞的通道

通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。

在上边的goroutine.go示例中,如何在通道的sendData()完成的时候发送一个信号,getData()又如何检测到通道是否关闭或阻塞?

第一个可以通过函数close(ch)来完成:这个将通道标记为无法通过发送操作<-接受更多的值;给已经关闭的通道发送或者再次关闭都会导致运行时的panic。在创建一个通道后使用defer语句是个不错的办法:

  1. ch := make(chan float64)
  2. defer close(ch)

第二个问题可以使用,ok操作符:用来检测通道是否被关闭。v, ok := <- ch // ok is true if v received value

通常和if语句一起使用:

  1. if v, ok := <- ch; ok {
  2. process(v)
  3. }
  4. v, ok := <- ch
  5. if !ok {
  6. break
  7. }
  8. process(v)

goroutine3.go:

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func sendData(ch chan string) {
  6. ch <- "test1"
  7. ch <- "test2"
  8. ch <- "test3"
  9. ch <- "test4"
  10. ch <- "test5"
  11. close(ch)
  12. }
  13. func getData(ch chan string) {
  14. for {
  15. input, open := <-ch
  16. if !open {
  17. break
  18. }
  19. fmt.Printf("%s ", input)
  20. }
  21. }
  22. func main() {
  23. ch := make(chan string)
  24. go sendData(ch)
  25. getData(ch)
  26. }

在for循环的getData()中,在每次接收通道的数据之前都使用 if !open 来检测,或者使用for-range语句来读取通道是更好的办法,因为会自动检测通道是否关闭:

  1. for input := range ch {
  2. process(input)
  3. }

使用select切换协程

从不同的并发执行的协程中获取值可以通过关键字select来完成,它和switch控制语句非常相似也被称作通信开关;select监听进入通道的数据,也可以是用通道发送值的时候:

  1. select {
  2. case u := <- ch1:
  3. ...
  4. case u := <- ch2:
  5. ...
  6. default:
  7. }

default语句是可选的,fallthrough行为和普通的switch相似,是不允许的。在任何一个case中执行break或return,select就结束了。

select做的就是:选择处理列出的多个通信情况中的一个:

  • 如果都阻塞了,会等待到其中一个可以处理;
  • 如果多个可以处理,随机选择一个;
  • 如果没有通道操作可以处理并且写了default,它就会执行:default永远是可运行的;

在select中使用发送操作并且有default可以确保发送不被阻塞!如果没有default,select就会一直阻塞;

goroutine_select.go:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func pump1(ch chan int) {
  7. for i := 0; ; i++ {
  8. ch <- i * 2
  9. }
  10. }
  11. func pump2(ch chan int) {
  12. for i := 0; ; i++ {
  13. ch <- i + 5
  14. }
  15. }
  16. func suck(ch1, ch2 chan int) {
  17. for {
  18. select {
  19. case v := <-ch1:
  20. fmt.Printf("Received on channel 1: %d\n", v)
  21. case v := <-ch2:
  22. fmt.Printf("Received on channel 2: %d\n", v)
  23. }
  24. }
  25. }
  26. func main() {
  27. ch1 := make(chan int)
  28. ch2 := make(chan int)
  29. go pump1(ch1)
  30. go pump2(ch2)
  31. go suck(ch1, ch2)
  32. time.Sleep(1e9)
  33. }

习惯用法:后台服务模式

服务通常是用后台协程中的无限循环实现的,在循环中使用select获取并处理通道中的数据:

  1. func backend() {
  2. for {
  3. select {
  4. case cmd := <-ch1:
  5. //...
  6. case cmd := <-ch2:
  7. //...
  8. case cmd := <-chStop:
  9. //stop server
  10. }
  11. }
  12. }

通道、超时和计时器(Ticker)

time包中包含了time.Ticker结构体,这个对象以指定的时间间隔重复的向通道C发送时间值:

  1. type Ticker struct {
  2. C <-chan Time
  3. }

时间间隔的单位是ns,在工厂函数time.NewTicker中以Duration类型的参数传入:func NewTicker(dur) *Ticker。在协程周期性的执行一些事情(打印状态日志,输出,计算等)的时候非常有用。

调用Stop()使计时器停止,在defer语句中使用。这些都很好的适应select语句:

  1. ticker := time.NewTicker(updateInterval)
  2. defer ticker.Stop()
  3. ...
  4. select {
  5. case u := <-ch1:
  6. ...
  7. case u := <-ch2:
  8. ...
  9. case <- ticker.C:
  10. logState(status)
  11. default:
  12. }

time.Tick()函数声明为Tick(d Duration) <-chan time,当你想返回一个通道而不必关闭它的时候这个函数非常有用:它以d为周期给返回的通道发送时间,d是纳秒数。

  1. rate_pre_sec := 10
  2. var dur Duration = 1e9
  3. chRate := time.Tick(dur)
  4. for req := range requests {
  5. <- chRate
  6. go client.Call("Service.Method", req, ...)
  7. }

timer_goroutine.go:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. tick := time.Tick(1e8)
  8. boom := time.After(5e8)
  9. for {
  10. select {
  11. case <-tick:
  12. fmt.Println("tick.")
  13. case <-boom:
  14. fmt.Println("BOOM!")
  15. return
  16. default:
  17. fmt.Println(" .")
  18. time.Sleep(5e7)
  19. }
  20. }
  21. }

协程和恢复(recover)

一个用到recover的程序停掉了服务器内部一个失败的协程而不影响其他协程的工作。

  1. func server(workChan <-chan *Work) {
  2. for work := range workChan {
  3. go safelyDo(work)
  4. }
  5. }
  6. func safelyDo(work *Work) {
  7. defer func() {
  8. if err := recover(); err != nil {
  9. log.Printf("Work failed with %s in %v", errm, work)
  10. }
  11. }()
  12. do(work)
  13. }

上边的代码,如果do(work)发生panic,错误会被记录且协程会退出并释放,而其他协程不受影响。

惰性生成器的实现

生成器是指当被调用时返回一个序列中下一个值的函数,例如:

  1. generateInteger() => 0
  2. generateInteger() => 1
  3. generateInteger() => 2

生成器每次返回的是序列下一个值而非整个序列;这种特性也称之为惰性求值:只在你需要时进行求值,同时保留相关变量资源(内存和CPU):这是一项在需要时对表达式进行求值的技术。例如,生成一个无限数量的偶数序列:要产生这样一个序列并且在一个一个的使用可能会很困难,而且内存会溢出!但是一个含有通道和go协程的函数能轻易实现这个需求。

  1. package main
  2. import "fmt"
  3. var resume chan int
  4. func integers() chan int {
  5. yield := make(chan int)
  6. count := 0
  7. go func() {
  8. for {
  9. yield <- count
  10. count++
  11. }
  12. }()
  13. return yield
  14. }
  15. func generateInteger() int {
  16. return <-resume
  17. }
  18. func main() {
  19. resume = integers()
  20. fmt.Println(generateInteger())
  21. fmt.Println(generateInteger())
  22. fmt.Println(generateInteger())
  23. }

通过巧妙地使用空接口、闭包和高阶函数,实现一个通用的惰性生产器的工厂函数BuildLazyEvaluator(这个应该放在一个工具包中实现)。工厂函数需要一个函数和一个初始状态作为输入参数,返回一个无参、返回值是生成序列的函数。传入的函数需要计算出下一个返回值以及下一个状态参数。在工厂函数中,创建一个通道和无限循环的go协程。返回值被放到了该通道中,返回函数稍后被调用时从该通道中取得该返回值。每当取得一个值时,下一个值即被计算。

在下面的例子中,定义了一个evenFunc函数,其是一个惰性生成函数:在main函数中,创建了前10个偶数,每个都通过调用even()函数取得下一个值的。

  1. package main
  2. import "fmt"
  3. type Any interface{}
  4. type EvalFunc func(Any) (Any, Any)
  5. func BuildLazyEvaluator(EvalFunc EvalFunc, initState Any) func() Any {
  6. retValChan := make(chan Any)
  7. loopFunc := func() {
  8. var actState Any = initState
  9. var retVal Any
  10. for {
  11. retVal, actState = EvalFunc(actState)
  12. retValChan <- retVal
  13. }
  14. }
  15. retFunc := func() Any {
  16. return <-retValChan
  17. }
  18. go loopFunc()
  19. return retFunc
  20. }
  21. func BuildLazyIntEvaluator(EvalFunc EvalFunc, initState Any) func() int {
  22. ef := BuildLazyEvaluator(EvalFunc, initState)
  23. return func() int {
  24. return ef().(int)
  25. }
  26. }
  27. func main() {
  28. evenFunc := func(state Any) (Any, Any) {
  29. os := state.(int)
  30. ns := os + 2
  31. return os, ns
  32. }
  33. even := BuildLazyIntEvaluator(evenFunc, 0)
  34. for i := 0; i < 10; i++ {
  35. fmt.Printf("%vth even: %v\n", i, even())
  36. }
  37. }

实现Futures模式

所谓Futures就是指:有时候在你使用某一个值之前需要先对其进行计算。这种情况下,就可以在另一个处理器上进行该值的计算,到使用时,该值就已经计算完毕了。

Futures模式通过闭包和通道可以很容器实现,类似于生成器,不同地方在于Futures需要返回一个值。

假设我们有一个矩阵类型,我们需要计算两个矩阵 A 和 B 乘积的逆,首先我们通过函数 Inverse(M) 分别对其进行求逆运算,再将结果相乘。如下函数 InverseProduct() 实现了如上过程:

  1. func InverseProduct(a Matrix, b Matrix) {
  2. a_inv := Inverse(a)
  3. b_inv := Inverse(b)
  4. return Product(a_inv, b_inv)
  5. }

在这个例子中,a 和 b 的求逆矩阵需要先被计算。那么为什么在计算 b 的逆矩阵时,需要等待 a 的逆计算完成呢?显然不必要,这两个求逆运算其实可以并行执行的。换句话说,调用 Product 函数只需要等到 a_inv 和 b_inv 的计算完成。如下代码实现了并行计算方式:

  1. func InverseProduct(a Matrix, b Matrix) {
  2. a_inv_future := InverseFuture(a)
  3. b_inv_future := InverseFuture(b)
  4. a_inv := <- a_inv_future
  5. b_inv := <- b_inv_future
  6. return Product(a_inv, b_inv)
  7. }

InverseFuture 函数以 goroutine 的形式起了一个闭包,该闭包会将矩阵求逆结果放入到 future 通道中:

  1. func InverseFuture(a Matrix) chan Matrix {
  2. future := make(chan Matrix)
  3. go func() {
  4. future <- Inverse(a)
  5. }()
  6. return future
  7. }

复用

典型的客户端/服务端(C/S)模式

使用 Go 的服务器通常会在协程中执行向客户端的响应,故而会对每一个客户端请求启动一个协程。一个常用的操作方法是客户端请求自身中包含一个通道,而服务器则向这个通道发送响应。

在以下测试例子中,100 个请求会被发送到服务器,只有它们全部被送达后我们才会按相反的顺序检查响应:

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. type Request struct {
  6. a, b int
  7. replyc chan int
  8. }
  9. type binOp func(a, b int) int
  10. func run(op binOp, req *Request) {
  11. req.replyc <- op(req.a, req.b)
  12. }
  13. func server(op binOp, service chan *Request) {
  14. for {
  15. req := <-service
  16. go run(op, req)
  17. }
  18. }
  19. func startServer(op binOp) chan *Request {
  20. reqChan := make(chan *Request)
  21. go server(op, reqChan)
  22. return reqChan
  23. }
  24. func main() {
  25. adder := startServer(func(a, b int) int { return a + b })
  26. const N = 100
  27. var reqs [N]Request
  28. for i := 0; i < N; i++ {
  29. req := &reqs[i]
  30. req.a = i
  31. req.b = i + N
  32. req.replyc = make(chan int)
  33. adder <- req
  34. }
  35. for i := N - 1; i >= 0; i-- {
  36. if <-reqs[i].replyc != N+2*i {
  37. fmt.Println("fail at", i)
  38. } else {
  39. fmt.Println("Request", i, "is ok!")
  40. }
  41. }
  42. fmt.Println("done")
  43. }

Request结构中内嵌了一个replyc通道,接下来使用简单的形式服务器为每一个请求启动一个协程并在其中执行run()函数,会将类型为binOp的op操作返回的int值发送到replyc通道。server协程会无限循环从chan *Request接收请求,并且为了避免被长时间操作所阻塞,它将为每一个请求启动一个协程来做具体的工作,server本身则是以协程的方式在startServer函数中启动。

卸载:通过信号通道关闭服务器

上一个例子server在main函数返回后并没有完全关闭,而被强制结束了。server函数现在则使用select在service通道和quit通道之间做出选择,当quit通道接收到一个true时,server就会返回并结束。

multiplex_server2.go:

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. type Request struct {
  6. a, b int
  7. replyc chan int
  8. }
  9. type binOp func(a, b int) int
  10. func run(op binOp, req *Request) {
  11. req.replyc <- op(req.a, req.b)
  12. }
  13. func server(op binOp, service chan *Request, quit chan bool) {
  14. for {
  15. select {
  16. case req := <-service:
  17. go run(op, req)
  18. case <-quit:
  19. return
  20. }
  21. }
  22. }
  23. func startServer(op binOp) (service chan *Request, quit chan bool) {
  24. service = make(chan *Request)
  25. quit = make(chan bool)
  26. go server(op, service, quit)
  27. return service, quit
  28. }
  29. func main() {
  30. adder, quit := startServer(func(a, b int) int { return a + b })
  31. const N = 100
  32. var reqs [N]Request
  33. for i := 0; i < N; i++ {
  34. req := &reqs[i]
  35. req.a = i
  36. req.b = i + N
  37. req.replyc = make(chan int)
  38. adder <- req
  39. }
  40. for i := N - 1; i >= 0; i-- {
  41. if <-reqs[i].replyc != N+2*i {
  42. fmt.Println("fail at", i)
  43. } else {
  44. fmt.Println("Request", i, "is ok!")
  45. }
  46. }
  47. quit <- true
  48. fmt.Println("done")
  49. }

限制同时处理的请求数

使用带缓冲区的通道很容易实现这一点,其缓冲区容量就是同时处理请求的最大数量,超过 MAXREQS 的请求将不会被同时处理,因为当信号通道表示缓冲区已满时 handle 函数会阻塞且不再处理其他请求,直到某个请求从 sem 中被移除。sem 就像一个信号量,这一专业术语用于在程序中表示特定条件的标志变量。

max_tasks.go:

  1. package main
  2. import "fmt"
  3. const MAXREQS = 50
  4. var sem = make(chan int, MAXREQS)
  5. type Request struct {
  6. a, b int
  7. replyc chan int
  8. }
  9. func process(r *Request) {
  10. fmt.Println("process...")
  11. }
  12. func handler(r *Request) {
  13. sem <- 1
  14. process(r)
  15. <-sem
  16. }
  17. func server(service chan *Request) {
  18. for {
  19. request := <-service
  20. go handler(request)
  21. }
  22. }
  23. func main() {
  24. service := make(chan *Request)
  25. go server(service)
  26. }

并行化大量数据的计算

假设我们需要处理一些数量巨大且互不相关的数据项,它们从一个 in 通道被传递进来,当我们处理完以后又要将它们放入另一个 out 通道,就像一个工厂流水线一样。处理每个数据项也可能包含许多步骤:Preprocess(预处理) / StepA(步骤A) / StepB(步骤B) / … / PostProcess(后处理)

一个典型的用于解决按顺序执行每个步骤的顺序流水线算法可以写成下面这样:

  1. func SerialProcessData(in <-chan *Data, out chan<- *Data) {
  2. for data := range in {
  3. tmpA := PreprocessData(data)
  4. tmpB := ProcessStepA(tmpA)
  5. tmpC := ProcessStepB(tmpB)
  6. out <- PostProcessData(tmpC)
  7. }
  8. }

一次只执行一个步骤,并且按顺序处理每个项目:在第 1 个项目没有被 PostProcess 并放入 out 通道之前绝不会处理第 2 个项目,很快就会发现这将会造成巨大的时间浪费。

一个更高效的计算方式是让每一个处理步骤作为一个协程独立工作。每一个步骤从上一步的输出通道中获得输入数据。这种方式仅有极少数时间会被浪费,而大部分时间所有的步骤都在一直执行中:

  1. func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
  2. // make channels:
  3. preOut := make(chan *Data, 100)
  4. stepAOut := make(chan *Data, 100)
  5. stepBOut := make(chan *Data, 100)
  6. stepCOut := make(chan *Data, 100)
  7. // start parallel computations:
  8. go PreprocessData(in, preOut)
  9. go ProcessStepA(preOut,StepAOut)
  10. go ProcessStepB(StepAOut,StepBOut)
  11. go ProcessStepC(StepBOut,StepCOut)
  12. go PostProcessData(StepCOut,out)
  13. }

使用通道并发访问对象

为了保护对象被并发访问修改,我们可以使用协程在后台顺序执行匿名函数来替代使用同步互斥锁。在下面的程序中我们有一个类型 Person 中包含一个字段 chF ,这是一个用于存放匿名函数的通道。

这个结构在构造函数NewPerson()中初始化的同时会启动一个后台协程backend()。backend()方法会在一个无限循序中执行chF中放置的所有函数,有效地将它们序列化从而提供了安全的并发访问。更改和读取salary的方法会通过将一个匿名函数写入chF通道中,然后让backend()按顺序执行以达到其目的。需注意的是salary方法创建的闭包函数是如何将fChan通道包含在其中的。

conc_access.go:

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. )
  6. type Person struct {
  7. Name string
  8. salary float64
  9. chF chan func()
  10. }
  11. func NewPerson(name string, salary float64) *Person {
  12. p := &Person{name, salary, make(chan func())}
  13. go p.backend()
  14. return p
  15. }
  16. func (p *Person) backend() {
  17. for f := range p.chF {
  18. f()
  19. }
  20. }
  21. func (p *Person) SetSalary(sal float64) {
  22. p.chF <- func() {
  23. p.salary = sal
  24. }
  25. }
  26. func (p *Person) Salary() float64 {
  27. fChan := make(chan float64)
  28. p.chF <- func() { fChan <- p.salary }
  29. return <-fChan
  30. }
  31. func (p *Person) String() string {
  32. return "Person - name is:" + p.Name + "- salary is: " + strconv.FormatFloat(p.Salary(), 'f', 2, 64)
  33. }
  34. func main() {
  35. bs := NewPerson("zky", 2500.5)
  36. fmt.Println(bs)
  37. bs.SetSalary(4000.25)
  38. fmt.Println("Salary changed")
  39. fmt.Println(bs)
  40. }