在 Go 中,channel 类型为一等公民(first-class citizen),这意味着我们可以像普通变量那样使用 channel,比如:定义 channel 类型变量、给 channel 变量赋值、将 channel 作为参数传递给函数/方法、将 channel 作为返回值从函数/方法中返回,甚至将 channel 发送到其他 channel 中。

正是由于 channel 一等公民的特性,channel 原语使用起来很简单:

  1. c := make(chan int) // 创建一个无缓冲(unbuffered)的int类型的channel
  2. c := make(chan int, 5) // 创建一个带缓冲的int类型的Channel
  3. c <- x // 向channel c中发送一个值
  4. <- c // 从channel c中接收一个值
  5. x = <- c // 从channel c接收一个值并将其存储到变量x中
  6. x, ok = <- c // 从channel c接收一个值。如果channel关闭了,那么ok将被置为false
  7. for i := range c { ... ... } // for range与channel结合使用
  8. close(c) // 关闭channel c
  9. c := make(chan chan int) // 创建一个无缓冲的chan int类型的channel
  10. func stream(ctx context.Context, out chan<- Value) error // 将只发送(send-only) channel作为函数参数
  11. func spawn(...) <-chan T // 将只接收(receive-only)类型channel作为返回值

当涉及同时对多个 channel 进行操作时,我们会结合使用到 Go 为 CSP 并发模型提供的另外一个原语:select。通过 select,我们可以同时在多个 channel 上进行发送/接收操作:

  1. select {
  2. case x := <-c1: // 从channel c1接收数据
  3. ... ...
  4. case y, ok := <-c2: // 从channel c2接收数据,并根据ok值判断c2是否已经关闭
  5. ... ...
  6. case c3 <- z: // 将z值发送到channel c3中:
  7. ... ...
  8. default: // 当上面case中的channel通信均无法实施时,执行该默认分支
  9. }

1. 无缓冲(unbuffered)channel

无缓冲 channel 兼具通信和同步特性,在并发程序中应用颇为广泛。我们可以通过不带有capacity参数的内置 make 函数创建一个可用的无缓冲 channel:

  1. c := make(chan T) // T为channel中元素的类型

由于无缓冲 channel 的运行时层实现不带有缓冲区,因此对无缓冲 channel 的接收和发送操作是同步的,即对同一个无缓冲 channel,只有对其进行接收操作的 goroutine 和对其进行发送操作的 goroutine 都存在的情况下,通信才能得以进行,否则单方面的操作会让对应的 goroutine 陷入挂起状态。

各种顺序:

  • 如果一个无缓冲 channel 没有任何 goroutine 对其进行接收操作,一旦此时有 goroutine 先对其进行发送操作,那么动作发生和完成的时序如下:
  1. 发送动作发生 -> 接收动作发生(有goroutine对其进行接收操作) -> 发送动作完成/接收动作完成(先后顺序不能确定)
  • 如果一个无缓冲 channel 没有任何 goroutine 对其进行发送操作,一旦此时有 goroutine 先对其进行接收操作,那么动作发生和完成的时序如下:
  1. 接收动作发生 -> 发送动作发生(有goroutine对其进行发送操作) -> 发送动作完成/接收动作完成(先后顺序不确定)

因此,根据上述时序结果,对于无缓冲 channel 而言,我们得到以下结论:

  • 发送动作一定发生在接收动作完成之前;
  • 接收动作一定发生在发送动作完成之前。

想接收要先有发送, 想发送要先有接收.

这与 Go 官方“Go 内存模型”一文中对 channel 通信的描述是一致的。也正因为如此,下面的代码可以保证main输出的变量 a 的值为”hello, world”,因为函数 f 中的 channel 接收动作发生在主 goroutine 对 channel 发送动作完成之前,而a = “hello, world”语句又发生在 channel 接收动作之前,因此主 goroutine 在 channel 发送操作完成后看到的变量 a 的值一定是”hello, world”,而不是空字符串。

  1. // go-channel-case-1.go
  2. package main
  3. import "time"
  4. var c = make(chan int)
  5. var a string
  6. func f() {
  7. a = "hello, world"
  8. <-c
  9. }
  10. func main() {
  11. go f()
  12. c <- 5
  13. println(a)
  14. }

我觉得 因为函数 f 中的 channel 接收动作发生在主 goroutine 对 channel 发送动作完成之前 这句还有另一种情况: 发送动作在接收动作之前.

1) 用作信号传递

a) 1 对 1 通知信号

无缓冲 channel 常被用于在两个 goroutine 之间 1 对 1 的传递通知信号,比如下面这个例子:

  1. // go-channel-case-2.go
  2. package main
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7. type signal struct{}
  8. func worker() {
  9. println("worker is working...")
  10. time.Sleep(1 * time.Second)
  11. }
  12. func spawn(f func()) <-chan signal {
  13. c := make(chan signal)
  14. go func() {
  15. println("worker start to work...")
  16. f()
  17. c <- signal(struct{}{})
  18. }()
  19. return c
  20. }
  21. func main() {
  22. println("start a worker...")
  23. c := spawn(worker)
  24. <-c
  25. fmt.Println("worker work done!")
  26. }

我们来运行一下这个例子:

  1. $go run go-channel-case-2.go
  2. start a worker...
  3. worker start to work...
  4. worker is working...
  5. worker work done!

b) 1 对 n 通知信号

有些时候,无缓冲 channel 还被用来实现1 对 n 的信号通知机制。这样的信号通知机制常被用于协调多个 goroutine 一起工作,比如下面的例子:

  1. // go-channel-case-3.go
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. type signal struct{}
  9. func worker(i int) {
  10. fmt.Printf("worker %d: is working...\n", i)
  11. time.Sleep(1 * time.Second)
  12. fmt.Printf("worker %d: works done\n", i)
  13. }
  14. func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal {
  15. c := make(chan signal)
  16. var wg sync.WaitGroup
  17. for i := 0; i < num; i++ {
  18. wg.Add(1)
  19. go func(i int) {
  20. <-groupSignal
  21. fmt.Printf("worker %d: start to work...\n", i)
  22. f(i)
  23. wg.Done()
  24. }(i + 1)
  25. }
  26. go func() {
  27. wg.Wait()
  28. c <- signal(struct{}{})
  29. }()
  30. return c
  31. }
  32. func main() {
  33. fmt.Println("start a group of workers...")
  34. groupSignal := make(chan signal)
  35. c := spawnGroup(worker, 5, groupSignal)
  36. time.Sleep(5 * time.Second)
  37. fmt.Println("the group of workers start to work...")
  38. close(groupSignal)
  39. <-c
  40. fmt.Println("the group of workers work done!")
  41. }

在上面例子中,main goroutine 创建了一组 5 个 worker goroutine,这些 goroutine 启动后会阻塞在名为 groupSignal 的无缓冲 channel 上。main goroutine 通过close(groupSignal)向所有 worker goroutine 广播出“开始工作”的信号,所有 worker goroutine 在收到 groupSignal 后“一起”开始工作,就像起跑线上的运动员听到了裁判员发出的起跑信号枪声。
**
这个例子的运行结果如下:

  1. $go run go-channel-case-3.go
  2. start a group of workers...
  3. the group of workers start to work...
  4. worker 3: start to work...
  5. worker 3: is working...
  6. worker 4: start to work...
  7. worker 4: is working...
  8. worker 1: start to work...
  9. worker 1: is working...
  10. worker 5: start to work...
  11. worker 5: is working...
  12. worker 2: start to work...
  13. worker 2: is working...
  14. worker 3: works done
  15. worker 4: works done
  16. worker 5: works done
  17. worker 1: works done
  18. worker 2: works done
  19. the group of workers work done!

我们看到:关闭一个无缓冲 channel 会让所有阻塞在该 channel 上的接收操作返回,从而实现一种 1 对 n 的“广播”机制。该 1 对 n 的信号通知机制还常用于通知一组 worker goroutine 退出,比如下面例子:

  1. // go-channel-case-4.go
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. type signal struct{}
  9. func worker(i int, quit <-chan signal) {
  10. fmt.Printf("worker %d: is working...\n", i)
  11. LOOP:
  12. for {
  13. select {
  14. default:
  15. // 模拟worker工作
  16. time.Sleep(1 * time.Second)
  17. case <-quit:
  18. break LOOP
  19. }
  20. }
  21. fmt.Printf("worker %d: works done\n", i)
  22. }
  23. func spawnGroup(f func(int, <-chan signal), num int, groupSignal <-chan signal) <-chan signal {
  24. c := make(chan signal)
  25. var wg sync.WaitGroup
  26. for i := 0; i < num; i++ {
  27. wg.Add(1)
  28. go func(i int) {
  29. fmt.Printf("worker %d: start to work...\n", i)
  30. f(i, groupSignal)
  31. wg.Done()
  32. }(i + 1)
  33. }
  34. go func() {
  35. wg.Wait()
  36. c <- signal(struct{}{})
  37. }()
  38. return c
  39. }
  40. func main() {
  41. fmt.Println("start a group of workers...")
  42. groupSignal := make(chan signal)
  43. c := spawnGroup(worker, 5, groupSignal)
  44. fmt.Println("the group of workers start to work...")
  45. time.Sleep(5 * time.Second)
  46. // 通知workers退出
  47. fmt.Println("notify the group of workers to exit...")
  48. close(groupSignal)
  49. <-c
  50. fmt.Println("the group of workers work done!")
  51. }

运行该示例:

  1. $go run go-channel-case-4.go
  2. start a group of workers...
  3. the group of workers start to work...
  4. worker 1: start to work...
  5. worker 1: is working...
  6. worker 3: start to work...
  7. worker 3: is working...
  8. worker 5: start to work...
  9. worker 5: is working...
  10. worker 4: start to work...
  11. worker 4: is working...
  12. worker 2: start to work...
  13. worker 2: is working...
  14. notify the group of workers to exit...
  15. worker 2: works done
  16. worker 4: works done
  17. worker 5: works done
  18. worker 1: works done
  19. worker 3: works done
  20. the group of workers work done!

2) 用于替代锁机制

无缓冲 channel 具有同步特性,这让它在某些场合可以替代锁,从而使得程序更加清晰,可读性更好。下面是一个传统的基于“共享内存”+“锁”模式的 goroutine 安全的计数器的实现:

  1. // go-channel-case-5.go
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. type counter struct {
  9. sync.Mutex
  10. i int
  11. }
  12. var cter counter
  13. func Increase() int {
  14. cter.Lock()
  15. defer cter.Unlock()
  16. cter.i++
  17. return cter.i
  18. }
  19. func main() {
  20. for i := 0; i < 10; i++ {
  21. go func(i int) {
  22. v := Increase()
  23. fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
  24. }(i)
  25. }
  26. time.Sleep(5 * time.Second)
  27. }

下面是我们使用无缓冲 channel 替代锁后的实现:

  1. // go-channel-case-6.go
  2. package main
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7. type counter struct {
  8. c chan int
  9. i int
  10. }
  11. var cter counter
  12. func InitCounter() {
  13. cter = counter{
  14. c: make(chan int),
  15. }
  16. go func() {
  17. for {
  18. cter.i++
  19. cter.c <- cter.i
  20. }
  21. }()
  22. fmt.Println("counter init ok")
  23. }
  24. func Increase() int {
  25. return <-cter.c // 多个 goroutine 顺序读
  26. }
  27. func init() {
  28. InitCounter()
  29. }
  30. func main() {
  31. for i := 0; i < 10; i++ {
  32. go func(i int) {
  33. v := Increase()
  34. fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
  35. }(i)
  36. }
  37. time.Sleep(5 * time.Second)
  38. }

运行该示例,我们得到如下结果:

  1. $go run go-channel-case-6.go
  2. counter init ok
  3. goroutine-9: current counter value is 10
  4. goroutine-0: current counter value is 1
  5. goroutine-6: current counter value is 7
  6. goroutine-2: current counter value is 3
  7. goroutine-8: current counter value is 9
  8. goroutine-4: current counter value is 5
  9. goroutine-5: current counter value is 6
  10. goroutine-1: current counter value is 2
  11. goroutine-7: current counter value is 8
  12. goroutine-3: current counter value is 4

2. 带缓冲(buffered)channel

和无缓冲 channel 不同,我们通过带有capacity参数的内置 make 函数可以创建一个可用的带缓冲 channel:

  1. c := make(chan T, capacity) // T为channel中元素的类型, capacity为带缓冲channel的缓冲区容量

1) 用作消息队列

和无缓冲 channel 更多用于信号/事件管道相比,可自行设置容量、异步收发的带缓冲 channel 更适合被用作为消息队列,并且带缓冲 channel 在数据收发性能上要明显好于无缓冲 channel。下面是一些关于无缓冲 channel 和带缓冲 channel 收发性能测试的结果(Go 1.13.6, MacBook Pro 8 核):

  • 单接收单发送性能基准测试

我们先来看看针对一个 channel 只有一个发送 goroutine 和一个接收 goroutine 的情况下,两种 channel 的收发性能比对数据:

  1. // 无缓冲channel
  2. // go-channel-operation-benchmark/unbuffered-chan
  3. $go test -bench . one_to_one_test.go
  4. goos: darwin
  5. goarch: amd64
  6. BenchmarkUnbufferedChan1To1Send-8 6202120 198 ns/op
  7. BenchmarkUnbufferedChan1To1Recv-8 6752820 178 ns/op
  8. PASS
  9. ok command-line-arguments 2.811s
  10. // 带缓冲channel
  11. // go-channel-operation-benchmark/buffered-chan
  12. $go test -bench . one_to_one_cap_10_test.go
  13. goos: darwin
  14. goarch: amd64
  15. BenchmarkBufferedChan1To1SendCap10-8 14397186 83.7 ns/op
  16. BenchmarkBufferedChan1To1RecvCap10-8 14275723 82.2 ns/op
  17. PASS
  18. ok command-line-arguments 2.555s
  19. $go test -bench . one_to_one_cap_100_test.go
  20. goos: darwin
  21. goarch: amd64
  22. BenchmarkBufferedChan1To1SendCap100-8 18011007 65.5 ns/op
  23. BenchmarkBufferedChan1To1RecvCap100-8 18031082 65.4 ns/op
  24. PASS
  25. ok command-line-arguments 2.499s
  • 多接收多发送性能基准测试

我们再来看看针对一个 channel 有多个发送 goroutine 和多个接收 goroutine 的情况下,两种 channel 的收发性能比对数据(这里建立 10 个发送 goroutine 和 10 个接收 goroutine):

  1. // 无缓冲channel
  2. // go-channel-operation-benchmark/unbuffered-chan
  3. $go test -bench . multi_to_multi_test.go
  4. goos: darwin
  5. goarch: amd64
  6. BenchmarkUnbufferedChanNToNSend-8 317324 3793 ns/op
  7. BenchmarkUnbufferedChanNToNRecv-8 295288 4139 ns/op
  8. PASS
  9. ok command-line-arguments 2.516s
  10. // 带缓冲channel
  11. // go-channel-operation-benchmark/buffered-chan
  12. $go test -bench . multi_to_multi_cap_10_test.go
  13. goos: darwin
  14. goarch: amd64
  15. BenchmarkBufferedChanNToNSendCap10-8 534625 2252 ns/op
  16. BenchmarkBufferedChanNToNRecvCap10-8 476221 2752 ns/op
  17. PASS
  18. ok command-line-arguments 2.573s
  19. $go test -bench . multi_to_multi_cap_100_test.go
  20. goos: darwin
  21. goarch: amd64
  22. BenchmarkBufferedChanNToNSendCap100-8 1000000 1283 ns/op
  23. BenchmarkBufferedChanNToNRecvCap100-8 1000000 1250 ns/op
  24. PASS
  25. ok command-line-arguments 2.564s

综合以上结果数据,我们可以得到两个结论:

  • 无论是 1 收 1 发还是多收多发,带缓冲 channel 的收发性能要好于无缓冲 channel;
  • 对于带缓冲 channel 而言,选择适当容量会在一定程度上提升一定收发性能

2) 用作计数信号量(counting semaphore)

Go 并发设计的一个惯用法就是将带缓冲 channel 用作计数信号量(counting semaphore)。带缓冲 channel 中的当前数据个数代表的是当前同时处于活动状态(处理业务)的 goroutine 的数量,而带缓冲 channel 的容量(capacity)就代表了允许同时处于活动状态的 goroutine 的最大数量。向带缓冲 channel 的一个发送操作表示获取一个信号量,而从 channel 的一个接收操作则表示释放一个信号量。

下面是一个将带缓冲 channel 用作计数信号量的例子:

  1. // go-channel-case-7.go
  2. package main
  3. import (
  4. "log"
  5. "sync"
  6. "time"
  7. )
  8. var active = make(chan struct{}, 3)
  9. var jobs = make(chan int, 10)
  10. func main() {
  11. go func() {
  12. for i := 0; i < 8; i++ {
  13. jobs <- (i + 1)
  14. }
  15. close(jobs)
  16. }()
  17. var wg sync.WaitGroup
  18. for j := range jobs {
  19. wg.Add(1)
  20. go func(j int) {
  21. active <- struct{}{}
  22. log.Printf("handle job: %d\n", j)
  23. time.Sleep(2 * time.Second)
  24. <-active
  25. wg.Done()
  26. }(j)
  27. }
  28. wg.Wait()
  29. }

我们看到:上面的示例创建了一组 goroutines 来处理 job,同一时间允许的最多 3 个 goroutine 处于活动状态。为达成这一目标,我们看到示例使用了一个容量(capacity)为 3 的带缓冲 channel: active作为计数信号量,这意味着允许同时处于活动状态的最大 goroutine 数量为 3。我们运行一下该示例:

  1. $go run go-channel-case-7.go
  2. 2020/02/04 09:57:02 handle job: 8
  3. 2020/02/04 09:57:02 handle job: 4
  4. 2020/02/04 09:57:02 handle job: 1
  5. 2020/02/04 09:57:04 handle job: 2
  6. 2020/02/04 09:57:04 handle job: 3
  7. 2020/02/04 09:57:04 handle job: 7
  8. 2020/02/04 09:57:06 handle job: 6
  9. 2020/02/04 09:57:06 handle job: 5

3) len(channel)的应用

len是 Go 语言的一个built-in 函数,它支持接受数组、切片、map、字符串和 channel 类型的参数,并返回对应类型的“长度” - 一个整型值。以len(s)为例:

  • 如果 s 是字符串类型(string),len(s) 返回字符串中的字节个数;
  • 如何 s 是 [n]T 或 *[n]T 的数组类型,len(s) 返回数组的长度 n;
  • 如果 s 是 []T 的切片类型(slice),len(s) 返回切片的当前长度;
  • 如果 s 是 map[K]T 的 map 类型,len(s) 返回 map 中的已定义的 key 的个数;
  • 如果 s 是 chan T 类型,那么 len(s) 针对 channel 的类型不同,有如下两种语义:
    • 当 s 为无缓冲 channel 时,len(s) 总是返回 0;
    • 当 s 为带缓冲 channel 时,len(s) 返回当前 channel s 中尚未被读取的元素个数。

这样一来,针对带缓冲 channel 的 len 调用似乎才是有意义的。那我们是否可以使用 len 函数来实现带缓冲 channel 的“判满”、“判有”和“判空”逻辑呢,就像下面示例中伪代码这样:

  1. var c chan T = make(chan T, capacity)
  2. // 判空
  3. if len(c) == 0 {
  4. // 此时channel c空了?
  5. }
  6. // 判有
  7. if len(c) > 0 {
  8. // 此时channel c有数据?
  9. }
  10. // 判满
  11. if len(channel) == cap(channel) {
  12. // 此时channel c满了?
  13. }

大家看到我在上面代码注释的“空了”、“有数据”和“满了”的后面打上了问号!channel 原语用于多个 goroutine 间的通信,一旦多个 goroutine 共同对 channel 进行收发操作,len(channel)就会在多个 goroutine 间形成“竞态”,单纯地依靠 len(channel)来判断 channel 中元素状态,不能保证在后续对 channel 的收发时 channel 状态是不变的。以判空为例:

image.png

从上图可以看到,当 goroutine1 使用 len(channel)判空后,便尝试从 channel 中接收数据。但在真正从 channel 读数据前,另外一个 goroutine2 已经将数据读了出去,goroutine1 后面的读取将阻塞在 channel 上,导致后面逻辑的失效。因此,为了不阻塞在 channel 上,常见的方法是将“判空与读取”放在一个“事务”中,将“判满与写入”放在一个“事务”中,而这类“事务”我们可以通过 select 实现。我们来看下面示例:

  1. // go-channel-case-8.go
  2. package main
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7. func producer(c chan<- int) {
  8. var i int = 1
  9. for {
  10. time.Sleep(2 * time.Second)
  11. ok := trySend(c, i)
  12. if ok {
  13. fmt.Printf("[producer]: send [%d] to channel\n", i)
  14. i++
  15. continue
  16. }
  17. fmt.Printf("[producer]: try send [%d], but channel is full\n", i)
  18. }
  19. }
  20. func tryRecv(c <-chan int) (int, bool) {
  21. select {
  22. case i := <-c:
  23. return i, true
  24. default:
  25. return 0, false
  26. }
  27. }
  28. func trySend(c chan<- int, i int) bool {
  29. select {
  30. case c <- i:
  31. return true
  32. default:
  33. return false
  34. }
  35. }
  36. func consumer(c <-chan int) {
  37. for {
  38. i, ok := tryRecv(c)
  39. if !ok {
  40. fmt.Println("[consumer]: try to recv from channel, but the channel is empty")
  41. time.Sleep(1 * time.Second)
  42. continue
  43. }
  44. fmt.Printf("[consumer]: recv [%d] from channel\n", i)
  45. if i >= 3 {
  46. fmt.Println("[consumer]: exit")
  47. return
  48. }
  49. }
  50. }
  51. func main() {
  52. c := make(chan int, 3)
  53. go producer(c)
  54. go consumer(c)
  55. select {} // 故意阻塞在此
  56. }

我们看到由于用到了 select 原语的 default 分支语义,当 channel 空的时候,tryRecv 不会阻塞;当 channel 满的时候,trySend 也不会阻塞。我们运行一下该示例:

  1. $go run go-channel-case-8.go
  2. [consumer]: try to recv from channel, but the channel is empty
  3. [consumer]: try to recv from channel, but the channel is empty
  4. [producer]: send [1] to channel
  5. [consumer]: recv [1] from channel
  6. [consumer]: try to recv from channel, but the channel is empty
  7. [consumer]: try to recv from channel, but the channel is empty
  8. [producer]: send [2] to channel
  9. [consumer]: recv [2] from channel
  10. [consumer]: try to recv from channel, but the channel is empty
  11. [consumer]: try to recv from channel, but the channel is empty
  12. [producer]: send [3] to channel
  13. [consumer]: recv [3] from channel
  14. [consumer]: exit
  15. [producer]: send [4] to channel
  16. [producer]: send [5] to channel
  17. [producer]: send [6] to channel
  18. [producer]: try send [7], but channel is full
  19. [producer]: try send [7], but channel is full
  20. [producer]: try send [7], but channel is full

这种方法可以适合大多数的场合,但是这种方法有一个“问题”,那就是它改变了 channel 的状态:接收了一个元素或发送了一个元素。有些时候我们不想这么做,我们想在不改变 channel 状态的前提下单纯地侦测 channel 的状态而又不会因 channel 满或空阻塞在 channel 上。但很遗憾,目前没有一种方法可以在实现这样的功能的同时又适用于所有场合。但是在特定的场景下,我们可以用 len(channel)来实现。比如下面这两种场景:

image.png

只能是1对多的情况下使用 len(chan).

3. nil channel 的妙用

对一个没有初始化的 channel(nil channel)进行读写操作都将发生阻塞,比如下面这段代码:

  1. func main() {
  2. var c chan int
  3. <-c
  4. }
  5. 或者
  6. func main() {
  7. var c chan int
  8. c<-1
  9. }

上述无论哪段代码被执行,我们将得到类似如下的错误信息:

  1. fatal error: all goroutines are asleep - deadlock!
  2. goroutine 1 [chan receive (nil chan)]:
  3. goroutine 1 [chan send (nil chan)]:

main goroutine 被阻塞在 channel 上,导致 Go 运行时认为“deadlock”状态出现而抛出 panic。

但 nil channel 也不是一无是处,有些时候妙用 nil channel 可以得到事半功倍的效果。我们来看一个例子:

  1. // go-channel-case-9.go
  2. package main
  3. import "fmt"
  4. import "time"
  5. func main() {
  6. c1, c2 := make(chan int), make(chan int)
  7. go func() {
  8. time.Sleep(time.Second * 5)
  9. c1 <- 5
  10. close(c1)
  11. }()
  12. go func() {
  13. time.Sleep(time.Second * 7)
  14. c2 <- 7
  15. close(c2)
  16. }()
  17. var ok1, ok2 bool
  18. for {
  19. select {
  20. case x := <-c1:
  21. ok1 = true
  22. fmt.Println(x)
  23. case x := <-c2:
  24. ok2 = true
  25. fmt.Println(x)
  26. }
  27. if ok1 && ok2 {
  28. break
  29. }
  30. }
  31. fmt.Println("program end")
  32. }

在这个示例中,我们期望程序在接收完 c1 和 c2 两个 channel 上的数据后就退出。但实际的运行情况如下:

  1. $go run go-channel-case-9.go
  2. 5
  3. 0
  4. 0
  5. 0
  6. ... ... //循环输出0
  7. 7
  8. program end

我们怎么来改进一下这个程序使之能按照我们的预期输出呢?nil channel 是时候登场了!改进后的示例代码如下:

  1. // go-channel-case-10.go
  2. package main
  3. import "fmt"
  4. import "time"
  5. func main() {
  6. c1, c2 := make(chan int), make(chan int)
  7. go func() {
  8. time.Sleep(time.Second * 5)
  9. c1 <- 5
  10. close(c1)
  11. }()
  12. go func() {
  13. time.Sleep(time.Second * 7)
  14. c2 <- 7
  15. close(c2)
  16. }()
  17. for {
  18. select {
  19. case x, ok := <-c1:
  20. if !ok {
  21. c1 = nil
  22. } else {
  23. fmt.Println(x)
  24. }
  25. case x, ok := <-c2:
  26. if !ok {
  27. c2 = nil
  28. } else {
  29. fmt.Println(x)
  30. }
  31. }
  32. if c1 == nil && c2 == nil {
  33. break
  34. }
  35. }
  36. fmt.Println("program end")
  37. }

上面改进后的示例程序的一个最关键的变化是在判断 c1 或 c2 被关闭后,将显式地将 c1 或 c2 置为 nil。我们知道:对一个 nil channel 执行获取操作,该操作将阻塞,于是已经被置为 nil 的 c1 或 c2 的分支将再也不会被 select 选中执行。于是上述改进后的示例的运行结果如下:

  1. $go run go-channel-case-10.go
  2. 5
  3. 7
  4. program end

4. 与 select 结合使用的一些惯用法

1) 利用 default 分支避免阻塞

在 Go 标准库中,这个惯用法也有应用,比如:

  1. // $GOROOT/src/time/sleep.go
  2. func sendTime(c interface{}, seq uintptr) {
  3. // 无阻塞的向c发送当前时间
  4. // ...
  5. select {
  6. case c.(chan Time) <- Now():
  7. default:
  8. }
  9. }

2) 实现超时机制

带超时机制的 select 是 Go 一种常见的 select 和 channel 的组合用法,通过超时事件,我们既可以避免长期陷入某种操作的等待中,也可以做一些异常处理工作。下面示例代码实现了一次具有 30s 超时的 select:

  1. func worker() {
  2. select {
  3. case <-c:
  4. // ... do some stuff
  5. case <-time.After(30 *time.Second):
  6. return
  7. }
  8. }

应用带有超时机制的 select 时,要特别注意 timer 使用后的释放,尤其在大量创建 timer 时。

  • Go 语言标准库提供的 timer 实质上是由 Go 运行时自行维护的,而不是操作系统级的定时器资源。
  • Go 运行时启动了一个单独的 goroutine,该 goroutine 执行了一个名为timerproc的函数,维护了一个“最小堆”。
  • 该 goroutine 会定期被唤醒并读取堆顶的 timer 对象,执行该 timer 对象对应的函数(向 timer.C 中发送一条数据,触发定时器),执行完毕后就会从最小堆中移除该 timer 对象。
  • 创建一个 time.Timer 实则就是在这个最小堆中添加一个 timer 对象实例,而调用 timer.Stop 方法则是从堆中删除对应的 timer 对象。

3) 实现心跳机制

结合 time 包的 Ticker,我们可以实现带有心跳机制的 select。这种机制使得我们可以在监听 channel 的同时,执行一些周期性的任务,比如下面这段代码:

  1. func worker() {
  2. heartbeat := time.NewTicker(30 * time.Second)
  3. defer heartbeat.Stop()
  4. for {
  5. select {
  6. case <-c:
  7. // ... do some stuff
  8. case <- heartbeat.C:
  9. //... do heartbeat stuff
  10. }
  11. }
  12. }