在channel使用的时候,时常会遇到一些阻塞的场景,让人傻傻分不清。无论是无缓冲的通道,还是缓冲通道,都会存在阻塞的情况。下面,让我们来总结一下哪些情况下会存在阻塞,以及如何使用select来解决阻塞。在我们了解这些阻塞场景的同时,也可以学会更好的使用管道。

channel的简介

不同于传统的多线程并发模型使用共享内存来实现线程间通信,golang的哲学是使用channel进行协程之间的通信,来实现数据共享。这种方式的优点是通过提供原子的通信原语,避免了竞争情形(race condition)下复杂的锁机制。channel可以看成是一个FIFO的队列,对FIFO队列的读写都是原子操作,不需要加锁。

从字面上看,channel的意思大概就是管道的意思。channel是一种go协程用以接收或发送消息的消息队列,channel就像两个go协程之间的导管,来实现各种资源的同步。可以用下图示意:
image.png

无缓冲通道

无缓冲的通道的特点是,发送的数据需要被读取后,发送才会完成。他的阻塞场景为:

  • 管道中无数据,但执行读管道:

    1. func ReadNoDataFromNoBufCh() {
    2. noBufCh := make(chan int) // 无缓冲信道的声明方式
    3. <-noBufCh
    4. fmt.Println("read from no buffer channel success")
    5. // Output
    6. // fatal error: all goroutines are asleep - deadlock!
    7. }
  • 管道中无数据,向管道写数据,但是无协程读取:

    1. func WriteNoBufCh() {
    2. ch := make(chan int)
    3. ch <- 1
    4. fmt.Println("write success no block")
    5. // Output
    6. // fatal error: all goroutines are asleep - deadlock!
    7. }

    上述的示例代码中output注释表示了函数的执行结果,这两个函数都由于阻塞在管道的操作而无法继续向下执行,最后会导致死锁。下面是正确的使用方法,避免死锁:

    1. func testSimple() {
    2. ch := make(chan int)
    3. go func() {
    4. time.sleep(20 * time.Second)
    5. ch <- 1
    6. }()
    7. value := <- ch
    8. fmt.Println("value : ", value)
    9. }

    上面这个简单的例子就是新开启的goroutine向一个无缓冲信道发送了一个1的值,那么主线程的ch在20s之后就会收到这个值的消息,一次线程间的通信就完成了。

有缓冲通道

有缓冲通道的特点是,可以向管道中写入数据后直接返回,缓冲中有数据时可以从管道中读到数据直接返回,这时候缓存管道是不会阻塞的,它的阻塞场景是:

  • 管道的缓存无数据,但执行读管道

    1. func ReadNoDataFromBufCh() {
    2. bufCh := make(chan int, 1)
    3. <-bufCh
    4. fmt.Println("read from no buffer channel success")
    5. // Output
    6. // fatal error: all goroutines are asleep - deadlock!
    7. }
  • 管道中缓存已经占满,向管道写数据,但是无协程读数据

    1. func WriteBufChButFull() {
    2. ch := make(chan int, 1)
    3. // make ch full
    4. ch <- 100
    5. ch <- 1
    6. fmt.Println("write success no block")
    7. // Output
    8. // fatal error: all goroutines are asleep - deadlock!
    9. }

    检查通道是否关闭

读已经关闭的channel会造成panic,如果不确定channel,需要使用ok来进行检测。ok的结果和含义:1. true,读到数据,并且数据没有关闭 2. false,通道关闭,无数据读到:

  1. if v, ok := <- ch; ok {
  2. fmt.Println(v)
  3. }

使用for range 遍历读channel

有时候,我们需要不断地从channel读取数据,可以使用for-channel,这样既安全又便利。当channel关闭时,for循环循环会自动退出,无需主动监控channel是否关闭。如果在其他协程中调用了close(ch),那么就会跳出for range循环。这也就是for range的特别之处。

  1. ch:=make(chan int ,3)
  2. ch<-1
  3. ch<-2
  4. ch<-3
  5. for value:=range ch{
  6. fmt.Print(value)
  7. }

只读通道和只写通道

在我们日常的项目中,经常会使用<-chan 和 chan<-这两种数据类型:

  1. //只读通道
  2. func channelOnlyForReda(mes <-chan string) {
  3. msg := <-mes
  4. }
  5. // 只写通道
  6. func channelF2Writer(mes chan<- string) {
  7. mes <- "mynamezy"
  8. }

但只是单单定义只读只写的channel没有太大的意义,一般是用在参数传递中:

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. c := make(chan int)
  8. go send(c)
  9. go recv(c)
  10. time.Sleep(3 * time.Second)
  11. }
  12. //只能向chan里写数据
  13. func send(c chan<- int) {
  14. for i := 0; i < 10; i++ {
  15. c <- i
  16. }
  17. }
  18. //只能取channel中的数据
  19. func recv(c <-chan int) {
  20. for i := range c {
  21. fmt.Println(i)
  22. }
  23. }

这里例子解决了我很多的困惑:

  1. 如果将上面send方法和recv方法中的参数对调:
  2. func send(c <-chanint) {
  3. func recv(c chan<- int) {
  4. 编译就会报错:
  5. ./channel.go:18: invalid operation: c <- i (send to receive-only type <-chan int)
  6. ./channel.go:24: invalid operation: range c (receive from send-only type chan<- int)

使用select + timeout,实现无阻塞读写

早期的select函数是用来监控一系列的文件句柄,一旦其中一个文件句柄发生IO操作,该select调用就会被返回,这就是著名的多路复用。golang在语言级别直接支持select,用于处理异步IO问题。select语句属于条件分支流程控制语句,不过它只能用于通道。使用方法和switch非常类似。
下面的这组case是利用select实现选择操作,里面有一组case语句,它会执行其中无阻塞的那一个。如果都阻塞了,那就等待其中一个不阻塞,进而继续执行,它有一个default语句,该语句是永远不会阻塞的,我们可以借助它实现无阻塞的操作。但是,我有个疑问,这个select的执行顺序是怎么样的?随机的吗?

  • 读操作 ```go // 无缓冲管道读 func ReadNoDataFromNoBufChWithSelect() { bufCh := make(chan int) if v, err := ReadWithSelect(bufCh); err != nil {
    1. fmt.Println(err)
    } else {
    1. fmt.Printf("read: %d\n", v)
    } // output // channel has no data }

// 有缓冲管道读 func ReadNoDataFromBufChWithSelect() { bufCh := make(chan int, 1) if v, err := ReadWithSelect(bufCh); err != nil { fmt.Println(err) } else { fmt.Printf(“read: %d\n”, v) } // Output // channel has no data }

// select结构实现管道读

func ReadWithSelect(ch chan int) (x int, err error) { select { case x = <-ch: return x, nil default: return 0, errors.New(“channel has no data”) } }

  1. - 写操作
  2. ```go
  3. // 无缓冲管道写
  4. func WriteNoBufChWithSelect() {
  5. ch := make(chan int)
  6. if err := WriteChWithSelect(ch); err != nil {
  7. fmt.Println(err)
  8. } else {
  9. fmt.Println("write success")
  10. }
  11. // output
  12. // channel blocked, can not write
  13. }
  14. // 有缓冲管道写
  15. func WriteBufChButFullWithSelect() {
  16. ch := make(chan int, 1)
  17. // make ch full
  18. ch <- 100
  19. if err := WriteChWithSelect(ch); err != nil {
  20. fmt.Println(err)
  21. } else {
  22. fmt.Println("write success")
  23. }
  24. // Output
  25. // channel blocked, can not write
  26. }
  27. // select结构实现管道写
  28. func WriteChWithSelect(ch chan int) error {
  29. select {
  30. case ch <- 1:
  31. return nil
  32. default:
  33. return errors.New("channel blocked, can not write")
  34. }
  35. }
  • 使用定时器实现超时

使用default实现的无阻塞管道阻塞有一个缺陷:当管道不可读或者不可写的时候,会立即返回。而实际的场景和需求是:我们先读或者写数据,如果在规定的时间内无法读写,程序返回。
定时器材可以帮助我们解决这个问题,比如说,我给管道读写数据的容忍时间是500ms, 如果无法读写,便立即返回:

  1. func ReadWithSelect(ch chan int) (x int, err error) {
  2. timeout := time.NewTimer(time.Microsecond * 500)
  3. select {
  4. case x = <-ch:
  5. return x, nil
  6. case <-timeout.C:
  7. return 0, errors.New("read time out")
  8. }
  9. }
  10. func WriteChWithSelect(ch chan int) error {
  11. timeout := time.NewTimer(time.Microsecond * 500)
  12. select {
  13. case ch <- 1:
  14. return nil
  15. case <-timeout.C:
  16. return errors.New("write time out")
  17. }
  18. }

参考:
golang channel(1): channel的无阻塞读写
总结了才知道,原来channel有这么多用法!
Go 只读/只写channel