goroutine

goroutine抢占与非抢占

非抢占式多任务处理(go1.14前),由协程主动交出控制权,集中处理切换的点,如IO编译器/解释器/虚拟机层面的多任务。
go1.14引入了可剥夺的调度,即使goroutine占着cpu的死循环也会被剥夺。

下面这个例子在go1.13.11会卡死,go1.14.2则执行正常。

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. // "time"
  6. )
  7. func main() {
  8. var array [10]int
  9. runtime.GOMAXPROCS(2) // 限制使用CPU的数量
  10. fmt.Println("Running in", runtime.Version()) // version
  11. for i := 0; i < 10; i++ {
  12. go func(i int) {
  13. for {
  14. array[i]++
  15. }
  16. }(i)
  17. }
  18. // sleep时,会进入子goroutine,如果是go1.14前版本,会阻塞这这个子进程;go1.14后是可以被强占,故不会阻塞
  19. time.Sleep(time.Second)
  20. fmt.Println(array)
  21. }

runtime.Gosched()一般极少用到。一般我们面临的都是高并发,很多i/o的场景,这样的场景下io操作就会造成切换。channel的读取写入,log打印等等,都会造成切换。 runtime.GOMAXPROCS(1),强制go语言只使用一个核

goroutine的特点

  • 任何函数只要加上go就可以以协程的方式在调度器上运行
  • 不需要再定义时区分是否是异步函数
  • 调度器在合适的点进行切换
  • goroutine和OS线程是多对多的关系,即 m:n
  • 活动线程数不会超过CPU的核数

    示例 斐波那契数列

    计算斐波那契数列+可见的标识来表明程序在正常运行
    1. // animation
    2. func spinner(delay time.Duration) {
    3. for {
    4. for _, r := range `-\|/`{
    5. fmt.Printf("\r%c", r)
    6. time.Sleep(delay)
    7. }
    8. }
    9. }
    10. // get fibonacci of Nth
    11. func fib(x int) int {
    12. if x < 2 {
    13. return x
    14. }
    15. return fib(x-1) + fib(x-2)
    16. }
    17. func main() {
    18. go spinner(100 * time.Millisecond)
    19. const n = 40
    20. fibN := fib(n) // slow
    21. fmt.Printf("\rfibonacci(%d) = %d\n", n, fibN)
    22. }

    程序分析

    main()函数开始运行创建了一个main goroutine
    go spinner创建一个子协程,子协程中存在死循环,所以只有在main goroutine退出时,子协程才会结束。在子协程printsleep时,子协程会主动交出控制权,然后主协程继续运行;
    主协程在计算斐波那契数列也会花费较多的时间,在函数递归调用时,也会交出控制权,让其子他协程运行;
    主函数返回时,所有的goroutine都会被直接打断,程序退出。
    主协程、子协程交替取得控制权,就形成了并发的效果:一边进行显示输出,一边进行斐波那契计算。

channel

goroutine是Go语音程序的并发机制,channels是goroutine间的通信机制。
一个 channels是一个通信机制,它可以让一个goroutine通过它给另一个goroutine发送值信息。每 个channel都有一个特殊的类型,也就是channels可发送数据的类型。一个可以发送int类型数据的channel一般写为chan int。

特点

  • goroutine之间通过通信channel实现共享内存,而不是通过共享内存实现通信。
  • channel是引用类型,声明后必须使用 make 初始化后才能使用。还有 slicemap

顺序通信进程 CSP

“顺序通信进程”(communicating sequential processes)或被简称为CSP。CSP是一种现代的并发编程模型,在这种编程模型中值会在不同的运行实例(goroutine)中传递,尽管大多数情况下仍然是被限制在单一实例中。
当一个程序启动时,其主函数即在一个单独的goroutine中运行,我们叫它 main goroutine 。新的goroutine会用go语句来创建。在语法上,go语句是一个普通的函数或方法调用前加上关键字go。go语句会使其语句中的函数在一个新创建的goroutine中运行。而go语句本身会迅速地完成。

channel类型

channel 是一种引用类型,声明通道类型的格式如下:

  1. var 变量 chan 元素类型

举例

  1. ch4 := make(chan int)
  2. ch5 := make(chan bool)
  3. ch6 := make(chan []int)

channel操作

channel支持发送,接收,和关闭三种操作。

发送

将一个值发送到通道中。

  1. ch <- 10 // 把10发送到ch中

接收

从一个通道中接收值。

  1. x := <- ch // 从ch中接收值并赋值给变量x
  2. <-ch // 从ch中接收值,忽略结果

关闭

我们通过调用内置的close函数来关闭通道。

  1. close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

    tmp, ok := <- ch 即使通道关闭,ok为true表示能取出值,false表示通道中已经没有值且通道已经关闭。 —> 通道值被取空并且关闭ok才能为false

无缓冲区通道

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道

有缓冲的通道

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。

下面两段代码中
同步通道运行到往通道里发送数据 ch <- 1 时,由于是同步通道,所以阻塞在这里;等待 goroutine 中的 <- ch 这里取到值,执行 Println ;主函数中的阻塞释放,函数运行结束。
异步通道运行到往通道里发送数据 ch <- 1 时,由于是异步通道,所以不会阻塞,主函数直接结束了, goroutine 还没来得及运行,所以没有输出。

  1. 同步通道 ```go package main

import ( “fmt” )

func main() { ch := make(chan int) go func(){ fmt.Println(<- ch) }() ch <- 1 // 阻塞 close(ch) } // out 1

  1. 2. 异步通道
  2. ```go
  3. package main
  4. import (
  5. "fmt"
  6. )
  7. func main() {
  8. ch := make(chan int, 1)
  9. go func(){
  10. fmt.Println(<- ch)
  11. }()
  12. ch <- 1
  13. close(ch)
  14. }
  15. // out
  16. // 无输出

并发编程-goroutine - 图1

单方向的channel

调用counter(naturals)时,naturals的类型将隐式地从chan int转换成chan<- int。任何双向channel向单向channel变量的赋值操作都将导致该隐式转换。这里并没有反向转换的语法:也就是不能将一个类似chan<- int类型的单向型的channel转换为chan int类型的双向型的channel。

  1. package main
  2. import "fmt"
  3. func counter(out chan<- int) {
  4. for x := 0; x < 100; x++ {
  5. out <- x
  6. }
  7. close(out)
  8. }
  9. func squarer(out chan<- int, in <-chan int) {
  10. for v := range in {
  11. out <- v * v
  12. }
  13. close(out)
  14. }
  15. func printer(in <-chan int) {
  16. for v := range in {
  17. fmt.Println(v)
  18. }
  19. }
  20. func main() {
  21. naturals := make(chan int)
  22. squares := make(chan int)
  23. go counter(naturals)
  24. go squarer(squares, naturals)
  25. printer(squares)
  26. }

for range从通道循环取值

当向通道中发送完数据时,我们可以通过close函数来关闭通道。
当通道被关闭时,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值,再然后取到的值一直都是对应类型的零值。
当通道被关闭且数据被取完后, for range 会退出。

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. // channel 练习
  6. func main() {
  7. ch1 := make(chan int, 10)
  8. ch2 := make(chan int, 10)
  9. // 开启goroutine将0~100的数发送到ch1中
  10. go func() {
  11. for i := 0; i < 10; i++ {
  12. ch1 <- i
  13. }
  14. close(ch1)
  15. }()
  16. // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
  17. go func() {
  18. for {
  19. i, ok := <-ch1 // 通道关闭后再取值ok=false
  20. if !ok {
  21. break
  22. }
  23. ch2 <- i * i
  24. }
  25. close(ch2)
  26. }()
  27. // 在主goroutine中从ch2中接收值打印
  28. for i := range ch2 { // 通道关闭后会退出for range循环
  29. fmt.Println(i)
  30. }
  31. }

channel总结

channel nil(未初始化) 非空 满了
发送 阻塞 可以发送 可以发送 阻塞
接收 阻塞 阻塞 可以接收 可以接收
关闭 panic 关闭成功后,返回类型零值 关闭成功,读取完数据后返回类型零值 关闭成功,读取完数据后返回类型零值

work pool

通常会使用可以指定启动的goroutine数量–worker pool模式,控制goroutine的数量,防止goroutine泄漏和暴涨。一个简易的work pool示例代码如下:

  1. package main
  2. import (
  3. "time"
  4. "fmt"
  5. )
  6. // 若干个等待从tasksPool中取任务的worker
  7. func work(id int, tasksPool <-chan int, resultsPool chan<- int) {
  8. for task := range tasksPool {
  9. fmt.Printf("worker:%d start task: %d\n", id, task)
  10. time.Sleep(time.Second)
  11. fmt.Printf("worker:%d end start: %d\n", id, task)
  12. resultsPool <- task * 2
  13. }
  14. }
  15. func main() {
  16. // 工作人数
  17. workers := 2
  18. // 任务数量,每个任务都有一个结果,故结果数量和任务数量相同
  19. tasks := 5
  20. tasksPool := make(chan int, tasks)
  21. // 结果数量
  22. results := 5
  23. resultsPool := make(chan int, results)
  24. // 开启3个goroutine, 表示有3个人在干活
  25. for w := 1; w <= workers; w++ {
  26. go work(w, tasksPool, resultsPool)
  27. }
  28. // 5个任务
  29. for task := 1; task <= tasks; task++ {
  30. tasksPool <- task
  31. }
  32. close(tasksPool)
  33. // 输出结果
  34. for result := 1; result <= results; result++ {
  35. fmt.Println(<-resultsPool)
  36. }
  37. close(resultsPool)
  38. }

补充

goroutines泄漏

如果我们使用了无缓存的channel,那么两个慢的goroutines将会因为没有人接收而被永远卡住。这种情况,称为goroutines泄漏,这将是一个BUG。和垃圾变量不同,泄漏的goroutines并不会被自动回收,因此确保每个不再需要的goroutine能正常退出是重要的。

channel的选择—缓存和不带缓存

关于无缓存或带缓存channels之间的选择,或者是带缓存channels的容量大小的选择,都可能影响程序的正确性。无缓存channel更强地保证了每个发送操作与相应的同步接收操作;但是对于带缓存channel,这些操作是解耦的。同样,即使我们知道将要发送到一个channel的信息的数量上限,创建一个对应容量大小的带缓存channel也是不现实的,因为这要求在执行任何接收操作之前缓存所有已经发送的值。如果未能分配足够的缓冲将导致程序死锁。

消息事件

有些消息事件并不携带额外的信息,它仅仅是用作两个goroutine之间的同步,这时候可以用struct{}空结构体作为channels元素的类型。