CSP

  1. Go实现了两种并发形式,第一种是大家普遍认知的多线程共享内存,其实就是 JavaC++ 等语言中的多线程开发。通常采用锁来解决资源共享问题。
  2. 另外一种是Go语言特有的,也是Go语言推荐的 CSP (communicating sequential processes)并发模型的一些概念为之实现并发的,但是Go语言并没有完全实现了 CSP 并发模型的所有理论,仅仅是实现了 process (goroutine)和 channel 这两个概念。通常共享的资源通过通道传递。

    为了更好地编写并发程序,从设计之初Go语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让开发人员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些烦琐的操作分散精力。


goroutine

介绍

  1. Go语言的并发是基于 goroutine 的,中文称协程,goroutine 类似于线程,但并非线程。可以将 goroutine 理解为一种虚拟线程。
  2. Go语言运行时会调度 goroutine,并将 goroutine 合理地分配到每个 CPU 中,最大限度地使用 CPU 性能。
  3. 多个 goroutine 中,Go语言使用通道(channel)进行通信,通道是一种内置的数据结构,可以让用户在不同的 goroutine 之间同步发送具有类型的消息。这让编程模型更倾向于在 goroutine 之间发送消息,而不是让多个 goroutine 争夺同一个数据的使用权。
  4. 如果希望让 goroutine 并行,必须使用多于一个逻辑处理器。当有多个逻辑处理器时,调度器会将 goroutine 平等分配到每个逻辑处理器上。这会让 goroutine 在不同的线程上运行。不过要想真的实现并行的效果,用户需要让自己的程序运行在有多个物理处理器的机器上。否则,哪怕 Go语言运行时使用多个线程,goroutine 依然会在同一个物理处理器上并发运行,达不到并行的效果。

    使用

  5. 使用 go 关键字创建 goroutine 时,被调用函数的返回值会被忽略。如果需要在 goroutine 中返回数据,请使用后面介绍的通道(channel)特性,通过通道把数据从 goroutine 中作为返回值传出。

  6. 所有 goroutine 在 main() 函数结束时会一同结束。

    普通方式

    ```go go 函数名( 参数列表 )

go GetThingDone(param1, param2)

  1. <a name="LzgOx"></a>
  2. #### 匿名函数
  3. ```go
  4. go func(参数列表) {
  5. 函数体
  6. }(参数)
  7. go func(param1, param2) {
  8. }(val1, val2)

通道控制

终止 goroutine 的最好方法就是自然返回 goroutine 对应的函数。虽然可以用 golang.org/x/net/context 包进行 goroutine 生命期深度控制,但这种方法仍然处于内部试验阶段,并不是官方推荐的特性。这里举例几个:

  1. // 让当前 goroutine 暂停的意思,退回执行队列,让其他等待的 goroutine 运行
  2. runtime.Gosched()
  3. // 开发者可以维护 协程 与 CPU 核心数量的对应关系
  4. // 调度器包含一些聪明的算法,这些算法会随着 Go语言的发布被更新和改进,所以不推荐盲目修改语言运行时对逻辑处理器的默认设置。
  5. // 从 Go 1.5 版本开始,默认执行下面语句以便让代码并发执行
  6. // 可以使用 runtime.NumCPU() 查询 CPU 数量,并使用 runtime.GOMAXPROCS() 函数进行设置
  7. runtime.GOMAXPROCS(runtime.NumCPU())

channel

介绍

  1. channel 是Go语言在语言级别提供的 goroutine 间的通信方式。我们可以使用 channel 在多个 goroutine 之间传递消息。
  2. channel 是进程内的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,我们建议用分布式系统的方法来解决,比如使用 Socket 或者 HTTP 等通信协议。Go语言对于网络方面也有非常完善的支持。
  3. channel 是类型相关的,也就是说,一个 channel 只能传递一种类型的值,这个类型需要在声明 channel 时指定。如果对 Unix 管道有所了解的话,就不难理解 channel,可以将其认为是一种类型安全的管道。
  4. Go语言提倡使用通信的方法代替共享内存。多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种队列一样的结构。
  5. 在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。
  6. 通道的内部实现代码在Go语言开发包的 src/runtime/chan.go 中,经过分析后大概了解到通道也是用常见的互斥量等进行同步。因此通道虽然是一个语言级特性,但也不是被神化的特性,通道的运行和使用都要比传统互斥量、等待组(sync.WaitGroup)有一定的消耗。


使用

创建

  1. 定义一个 channel 时,也需要定义发送到 channel 的值的类型,chan 类型的空值是 nil,声明后需要配合 make 创建后才能使用。 ```go // var 通道变量 chan 通道类型

ci := make(chan int) // 创建一个整型类型的通道 cs := make(chan string) // 创建一个字符串类型的通道 cf := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式

  1. <a name="AN5Mu"></a>
  2. #### 发送
  3. ```go
  4. // 通道变量 <- 值
  5. ch := make(chan interface{})
  6. ch <- 0 // 将0放入通道中
  7. ch <- "hello"// 将hello字符串放入通道中

❗️把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时会发现一些永远无法发送成功的语句并抛出 panic:fatal error: all goroutines are asleep - deadlock!

接收

  1. 通道的收发操作在不同的两个 goroutine 间进行。
  2. 接收将持续阻塞直到收到发送方发送的数据。
  3. 通道一次只能接收一个数据元素。
  4. 通道的数据接收一共有以下 4 种写法:
    1. 阻塞接收数据
    1. data := <-ch
    2. 非阻塞接收数据
    非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计时器 channel 进行,可以参见后面的内容。
    1. // data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。
    2. // ok:表示是否接收到数据。
    3. data, ok := <-ch
    3. 接收任意数据,忽略接收的数据
    执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。
    1. <-ch
    并发同步:
    1. func main() {
    2. ch := make(chan int)
    3. go func() {
    4. fmt.Println("go goroutine")
    5. ch <- 0
    6. }()
    7. fmt.Println("wait goroutine")
    8. // 等待匿名goroutine
    9. <-ch
    10. fmt.Println("all done")
    11. }
    4. 循环接收
    1. for data := range ch {
    2. }

关闭

1. 普通使用

通道是一个引用对象,在没有任何外部引用时,Go语言程序在运行时(runtime)会自动对内存进行垃圾回收(Garbage Collection, GC)。当然,通道也可以被主动关闭。

  1. close(ch)

2. 判断关闭

如何判断一个 channel 是否已经被关闭?我们可以在读取的时候使用非阻塞式方式来判断。

  1. close(ch)
  2. data,ok := <-ch
  3. if !ok {
  4. fmt.Println("close ok")
  5. }

3. 给被关闭通道发送数据将会触发 panic

被关闭的通道不会被置为 nil。如果尝试对已经关闭的通道进行发送,将会触发 panic:panic: send on closed channel,代码如下:

  1. func main() {
  2. // 创建一个整型的通道
  3. ch := make(chan int)
  4. // 关闭通道
  5. close(ch)
  6. // 打印通道的指针, 容量和长度
  7. fmt.Printf("ptr:%p cap:%d len:%d\n", ch, cap(ch), len(ch))
  8. // 给关闭的通道发送数据
  9. ch <- 1
  10. }

4. 从已关闭的通道接收数据时将不会发生阻塞

从已经关闭的通道接收数据或者正在接收数据时,将会接收到通道类型的零值,然后停止阻塞并返回。

  1. func main() {
  2. // 创建一个整型带两个缓冲的通道
  3. ch := make(chan int, 2)
  4. // 给通道放入两个数据
  5. ch <- 0
  6. ch <- 1
  7. // 关闭缓冲
  8. close(ch)
  9. // 遍历缓冲所有数据, 且多遍历1个
  10. for i := 0; i < cap(ch)+1; i++ {
  11. // 从通道中取出数据
  12. v, ok := <-ch
  13. // 打印取出数据的状态
  14. fmt.Println(v, ok)
  15. }
  16. }
  1. 0 true
  2. 1 true
  3. 0 false

单向 channel

介绍

  1. Go语言的类型系统提供了单方向的 channel 类型,顾名思义,单向 channel 就是只能用于写入或者只能用于读取数据。所谓的单向 channel 概念,其实只是对 channel 的一种使用限制,没有实际意义。因此,单向通道只是有利于代码接口的严谨性。 ```go // var 通道实例 chan<- 元素类型 // 只能写入数据的通道 // var 通道实例 <-chan 元素类型 // 只能读取数据的通道

ch := make(chan int) // 声明一个只能写入数据的通道类型, 并赋值为ch var chSendOnly chan<- int = ch //声明一个只能读取数据的通道类型, 并赋值为ch var chRecvOnly <-chan int = ch

  1. 上面的例子中,chSendOnly 只能写入数据,如果尝试读取数据,将会抛出panicinvalid operation: <-chSendOnly (receive from send-only type chan<- int)。同理,chRecvOnly 也是不能写入数据的。
  2. <a name="JHlla"></a>
  3. #### 例子
  4. time 包中的计时器会返回一个 timer 实例,代码如下:
  5. ```go
  6. type Timer struct {
  7. C <-chan Time
  8. r runtimeTimer
  9. }
  10. // 创建一个 Timer,它会在最少过去时间段 d 后到期,向其自身的 C 字段发送当时的时间
  11. func main() {
  12. t := time.NewTimer(time.Second * 2)
  13. defer t.Stop()
  14. for {
  15. <-t.C
  16. fmt.Println("timer running...")
  17. // 需要重置Reset 使 t 重新开始计时
  18. t.Reset(time.Second * 2)
  19. }
  20. }

无缓冲 channel

介绍

  1. Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。
  2. 如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。


  1. 阻塞指的是由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足才解除阻塞。
  2. 同步指的是在两个或多个协程(线程)之间,保持数据内容一致性的机制。

缓冲 channel

介绍

  1. Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。
  2. 通道阻塞的条件发送和接收也不同。
    1. 通道为空时,尝试接收数据时发生阻塞。
    2. 通道填满时,尝试发送数据时发生阻塞。
  3. 有缓冲的通道和无缓冲的通道之间的一个很大的不同:
    1. 无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换
    2. 有缓冲的通道没有这种保证,是一个异步过程。
  4. 带缓冲通道在很多特性上和无缓冲通道是类似的。无缓冲通道可以看作是长度永远为 0 的带缓冲通道。
  5. 为什么Go语言对通道要限制长度而不提供无限长度的通道?
    1. 我们知道通道(channel)是在两个 goroutine 间通信的桥梁。使用 goroutine 的代码必然有一方提供数据,一方消费数据。当提供数据一方的数据供给速度大于消费方的数据处理速度时,如果通道不限制长度,那么内存将不断膨胀直到应用崩溃。因此,限制通道的长度有利于约束数据提供方的供给速度,供给数据量必须在消费方处理量+通道长度的范围内,才能正常地处理数据。

      无缓冲通道保证收发过程同步: 举例快递员给你电话让你下楼取快递,整个递交快递的过程是同步发生的,你和快递员不见不散。但这样做快递员就必须等待所有人下楼完成操作后才能完成所有投递工作。 有缓冲通道不要求收发过程同步: 如果快递员将快递放入快递柜中,并通知用户来取,快递员和用户就成了异步收发过程,效率可以有明显的提升。带缓冲的通道就是这样的一个“快递柜”。

使用

  1. // 通道实例 := make(chan 通道类型, 缓冲大小)
  2. package main
  3. import "fmt"
  4. func main() {
  5. // 创建一个3个元素缓冲大小的整型通道
  6. ch := make(chan int, 3)
  7. // 查看当前通道的大小
  8. fmt.Println(len(ch))
  9. // 发送3个整型元素到通道
  10. ch <- 1
  11. ch <- 2
  12. ch <- 3
  13. // 查看当前通道的大小
  14. fmt.Println(len(ch))
  15. }

Select 多路复用

  1. 多路复用是通信和网络中的专业术语。多路复用通常表示在一个信道上传输多路信号或数据流的过程和技术
  2. go 提供了select 来实现多通道复用,可以同时处理接收和发送多个通道的数据。
  1. select 语句实现了一种监听模式,通常用在(无限)循环中。由 select 开始一个新的选择块,每个选择条件由 case 语句来描述。在某种情况下,通过 break 或 goto 语句使循环退出。
  2. select 的用法与 switch 语言非常类似:
    • default 语句是可选的;
    • fallthrough 是不允许的;
    • 只要其中有一个 case 已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况。
    • 在任何一个 case 中执行 break 或者 return,select 就结束了,每一个case默认最后带有break。
  3. select 有比较多的限制,其中最大的一条限制就是每个 case 语句里必须是一个 IO 操作。
  4. 在一个 select 语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。
  5. 如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有如下两种可能的情况:
    • 如果给出 default 语句,就会执行 default 语句,然后程序会跳出 select 语句,执行后面的程序。
    • 如果没有 default 语句,那么 select 语句将被阻塞,直到至少有一个通信可以进行下去。
      1. select {
      2. case <-chan1: // 接收任意数据
      3. case data := <-chan2: // 接收data从chan2
      4. case chan2 <- 1: // 发送数据
      5. default:
      6. // 如果上面都没有成功,则进入default处理流程
      7. }

channel 超时

Go语言没有提供直接的超时处理机制。我们可以使用 select 来设置超时,虽然 select 机制不是专门为超时而设计的,却能很方便的解决超时问题。

  1. func main() {
  2. ch := make(chan int)
  3. quit := make(chan bool)
  4. go func() {
  5. for {
  6. select {
  7. case num := <-ch:
  8. fmt.Println("num = ", num)
  9. case <-time.After(3 * time.Second):
  10. fmt.Println("超时")
  11. goto StopHere
  12. }
  13. }
  14. // 退出
  15. StopHere:
  16. fmt.Println("done")
  17. quit <- true
  18. }()
  19. for i := 0; i < 5; i++ {
  20. ch <- i
  21. time.Sleep(time.Second)
  22. }
  23. <-quit
  24. fmt.Println("程序结束")
  25. }
  1. num = 0
  2. num = 1
  3. num = 2
  4. num = 3
  5. num = 4
  6. 超时
  7. 程序结束