go并发编程

①、协程Goroutine

:::success Goroutine:每个协程的堆栈,go语言默认一开始分配2k但是很智能,可以自适应增长;Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。

语法:**<font style="color:#DF2A3F;">go</font>**** func**

子协程异常退出:子协程的异常退出会将异常传播到主协程,直接会导致主协程也跟着挂掉,然后整个程序就崩溃了使用panic和recover解决

:::

:::color2 主协程(**main**)退出了,其他该协程的子协程任务全部结束执行

父协程(**非main**)退出了只要main没结束,其他该协程的子协程任务继续执行

:::

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. // 合起来写
  8. go func() {
  9. i := 0
  10. for {
  11. i++
  12. fmt.Printf("new goroutine: i = %d\n", i)
  13. time.Sleep(time.Second)
  14. }
  15. }()
  16. i := 0
  17. for {
  18. i++
  19. fmt.Printf("main goroutine: i = %d\n", i)
  20. time.Sleep(time.Second)
  21. if i == 2 {
  22. break
  23. }
  24. }
  25. }
  26. /* 输出打印
  27. main goroutine: i = 1
  28. new goroutine: i = 1
  29. new goroutine: i = 2
  30. main goroutine: i = 2
  31. */
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func main() {
  7. // 合起来写
  8. go func() {
  9. i := 0
  10. go func() {
  11. i := 0
  12. for {
  13. i++
  14. fmt.Printf("zi zi goroutine: i = %d\n", i)
  15. time.Sleep(time.Second)
  16. }
  17. }()
  18. for {
  19. i++
  20. fmt.Printf("new goroutine: i = %d\n", i)
  21. time.Sleep(time.Second)
  22. if i == 2 {
  23. break
  24. }
  25. }
  26. }()
  27. i := 0
  28. for {
  29. i++
  30. fmt.Printf("main goroutine: i = %d\n", i)
  31. time.Sleep(time.Second)
  32. if i == 10 {
  33. break
  34. }
  35. }
  36. }
  37. /* 输出
  38. main goroutine: i = 1
  39. new goroutine: i = 1
  40. zi zi goroutine: i = 1
  41. new goroutine: i = 2
  42. zi zi goroutine: i = 2
  43. main goroutine: i = 2
  44. zi zi goroutine: i = 3
  45. main goroutine: i = 3
  46. zi zi goroutine: i = 4
  47. main goroutine: i = 4
  48. zi zi goroutine: i = 5
  49. main goroutine: i = 5
  50. zi zi goroutine: i = 6
  51. main goroutine: i = 6
  52. zi zi goroutine: i = 7
  53. main goroutine: i = 7
  54. zi zi goroutine: i = 8
  55. main goroutine: i = 8
  56. zi zi goroutine: i = 9
  57. main goroutine: i = 9
  58. zi zi goroutine: i = 10
  59. main goroutine: i = 10
  60. zi zi goroutine: i = 11
  61. */

:::color2 goroutine与OS线程对比
可增长的栈OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine调度GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统(调度是在用户态下完成的)。不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

:::

②、通道channel

:::warning 协程的同步问题解决方案1、channel 2、 锁

go语言中,控制协程间的并发主要是用channel;如果说goroutine是Go语言程序的并发体的话,那么channel是它们之间的通信机制(锁用的很少,协程间channel是并发安全的**)**。

channel

  1. 作为协程的输出,通道是一个容器,它可以容纳数据。
  2. 作为协程的输入,通道是一个生产者,它可以向协程提供数据。
  3. 通道作为容器是有限定大小的,满了就写不进去,空了就读不出来
  4. 通道有它自己的类型,它可以限定进入通道的数据的类型。

go并发编程(2) - 图1

:::

:::color2 channel是一种类型,一种引用类型声明通道类型:var 变量 chan 元素类型如:

var ch1 chan int // 声明一个传递整型的双向通道 ch1 --> <nil>

var ch2 chan bool // 声明一个传递布尔型的双向通道 ch2 --> <nil>

var ch3 chan []int // 声明一个传递int切片的双向通道 ch3 --> <nil>

声明的通道后需要**使用make函数初始化之后才能使用**:创建channel的格式:

make(chan 元素类型, [缓冲大小])如:

ch4 := make(chan int)

ch5 := make(chan bool)

ch6 := make(chan []int)

「缓冲型通道」:上述通道就是缓冲型通道;

「非缓冲型通道」省略[缓冲大小]为非缓冲型通道只能写入一个,等对方读取了才能再写,对方没读,你还写你就会阻塞

比较:两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相通的对象,那么比较的结果为真。一个channel也可以和nil进行比较

:::

:::color2 channel操作

发送(send)ch := make(chan int) //创建一个通道ch <- 10 // 把10发送到ch中

接收(receive)x := <- ch // 从ch中接收值并赋值给变量x<-ch // 从ch中接收值,**<font style="color:#DF2A3F;">忽略结果</font>**data, ok := <-ch // 从ch中接收值,并通过ok判断是否通道关闭

关闭(close)**<font style="color:#DF2A3F;">内置函数</font>**close(ch)关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic

:::

:::success 无缓冲的通道:无缓冲的通道又称为阻塞的通道,无缓冲通道也被称为同步通道

go并发编程(2) - 图2

无缓冲的通道主线程不能阻塞,deadlock只检查主线程,主线程发送时,必须有人接收值的时候才能发送值,主线程接收时,必须要有人发送值才能接收。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。 ::: go // ① 主发送,无接收 func main() { ch := make(chan int) ch <- 10 fmt.Println("发送成功") } /* 会报deadlock错,因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() .../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54 */ // ② 协程发送,无人接收 func recv(c chan int) { fmt.Println("发送成功") c <- 10 time.Sleep(time.Second*2) } func main() { ch := make(chan int) go recv(ch) // 启用goroutine从通道接收值 // // ret := <-ch time.Sleep(time.Second) //fmt.Println("接收成功", ret) } // 编译通过 // ③ 主线程接收,无人发送 func recv(c chan int) { // c <- 10 fmt.Println("发送成功") time.Sleep(time.Second*2) } func main() { ch := make(chan int) go recv(ch) // 启用goroutine从通道接收值 // ret := <-ch time.Sleep(time.Second) fmt.Println("接收成功", ret) } /* 发送成功 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() E:/桌面整理/00零声学院/0voice_go/2.资料/01-GO语言基础精讲和test方法/1-src/1-src/src/3/printa-z.go:17 +0x85 exit status 2 */ // ④ 协程接收,无人发送 func recv(c chan int) { ret := <-c time.Sleep(time.Second) fmt.Println("接收成功", ret) } func main() { ch := make(chan int) go recv(ch) // 启用goroutine从通道接收值 time.Sleep(time.Second*2) } /* 无输出结果 */ :::success 有缓冲区的通道:使用make函数初始化通道的时候为其指定通道的容量,如make(chan int, 1)go并发编程(2) - 图3 只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量; 我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量。

主线程不能永远阻塞**,若逻辑出现问题就会报deadlock的错误;**

:::

  1. func main() {
  2. ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
  3. ch <- 10
  4. fmt.Println("发送成功")
  5. }
  6. // 不会报错
  7. func main() {
  8. ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
  9. ch <- 10
  10. ch <- 10
  11. fmt.Println("发送成功")
  12. }
  13. // 又会报死锁错,主要思想就是主线程不能阻塞

:::success 正确的通道读方式当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道被关闭时,往该通道发送值会引发panic从该通道里接收的值一直都是类型零值如果通道里的元素是整型的,读操作是不能通过返回值来确定通道是否关闭的。

有两种方式在接收值的时候判断通道是否被关闭,我们通常使用的是for range的方式。

:::

  1. package main
  2. import "fmt"
  3. func main() {
  4. c := make(chan int)
  5. go func() {
  6. for i := 0; i < 5; i++ {
  7. c <- i
  8. }
  9. close(c)
  10. }()
  11. for {
  12. if data, ok := <-c; ok {
  13. fmt.Println(data)
  14. } else {
  15. break
  16. }
  17. }
  18. fmt.Println("main结束")
  19. }
  20. /*
  21. PS E:\2.资料\01-GO语言基础精讲和test方法\1-src\1-src\src> go run .\3\printa-z.go
  22. 0
  23. 1
  24. 2
  25. 3
  26. 4
  27. main结束
  28. PS E:\2.资料\01-GO语言基础精讲和test方法\1-src\1-src\src>
  29. */
  1. package main
  2. import "fmt"
  3. // channel 练习
  4. func main() {
  5. ch1 := make(chan int)
  6. ch2 := make(chan int)
  7. // 开启goroutine将0~100的数发送到ch1中
  8. go func() {
  9. for i := 0; i < 100; i++ {
  10. ch1 <- i
  11. }
  12. close(ch1)
  13. }()
  14. // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
  15. go func() {
  16. for {
  17. i, ok := <-ch1 // 通道关闭后再取值ok=false
  18. if !ok {
  19. break
  20. }
  21. ch2 <- i * i
  22. }
  23. close(ch2)
  24. }()
  25. // 在主goroutine中从ch2中接收值打印
  26. for i := range ch2 { // 通道关闭后会退出for range循环
  27. fmt.Println(i)
  28. }
  29. }
  30. /*
  31. PS E:\01-GO语言基础精讲和test方法\1-src\1-src\src> go run .\3\printa-z.go
  32. 0
  33. 1
  34. 4
  35. 9
  36. 16
  37. 25
  38. 36
  39. 49
  40. 64
  41. 81
  42. 100
  43. 121
  44. 144
  45. 169
  46. 196
  47. 225
  48. 256
  49. 289
  50. 324
  51. 361
  52. 400
  53. 441
  54. 484
  55. 529
  56. 576
  57. 625
  58. 676
  59. 729
  60. 784
  61. 841
  62. 900
  63. 961
  64. 1024
  65. 1089
  66. 1156
  67. 1225
  68. 1296
  69. 1369
  70. 1444
  71. 1521
  72. 1600
  73. 1681
  74. 1764
  75. 1849
  76. 1936
  77. 2025
  78. 2116
  79. 2209
  80. 2304
  81. 2401
  82. 2500
  83. 2601
  84. 2704
  85. 2809
  86. 2916
  87. 3025
  88. 3136
  89. 3249
  90. 3364
  91. 3481
  92. 3600
  93. 3721
  94. 3844
  95. 3969
  96. 4096
  97. 4225
  98. 4356
  99. 4489
  100. 4624
  101. 4761
  102. 4900
  103. 5041
  104. 5184
  105. 5329
  106. 5476
  107. 5625
  108. 5776
  109. 5929
  110. 6084
  111. 6241
  112. 6400
  113. 6561
  114. 6724
  115. 6889
  116. 7056
  117. 7225
  118. 7396
  119. 7569
  120. 7744
  121. 7921
  122. 8100
  123. 8281
  124. 8464
  125. 8649
  126. 8836
  127. 9025
  128. 9216
  129. 9409
  130. 9604
  131. 9801
  132. PS E:\01-GO语言基础精讲和test方法\1-src\1-src\src>
  133. */

:::success 通道写安全

单写多读确保通道写安全的最好方式是由负责写通道的协程自己来关闭通道,读通道的协程不要去关闭通道。

多写确保通道写安全方式是使用到内置 sync 包提供的 WaitGroup 对象,它使用计数来等待指定事件完成。

:::

:::success 单向通道有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收

**chan<- int**是一个只能发送的通道,可以发送但是不能接收;

**<-chan int**是一个只能接收的通道,可以接收但是不能发送。

函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

:::

  1. func counter(out chan<- int) {
  2. for i := 0; i < 100; i++ {
  3. out <- i
  4. }
  5. close(out)
  6. }
  7. func squarer(out chan<- int, in <-chan int) {
  8. for i := range in {
  9. out <- i * i
  10. }
  11. close(out)
  12. }
  13. func printer(in <-chan int) {
  14. for i := range in {
  15. fmt.Println(i)
  16. }
  17. }
  18. func main() {
  19. ch1 := make(chan int)
  20. ch2 := make(chan int)
  21. go counter(ch1)
  22. go squarer(ch2, ch1)
  23. printer(ch2)
  24. }
  1. // 2-6 WaitGroup 在写端关闭channel对单写的程序有效,但是多写的时候呢?
  2. package main
  3. import (
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. func send(ch chan int, wg *sync.WaitGroup) {
  9. defer wg.Done() // 计数值减一
  10. i := 0
  11. for i < 4 {
  12. i++
  13. ch <- i
  14. }
  15. }
  16. func recv(ch chan int) {
  17. for v := range ch {
  18. fmt.Println(v)
  19. }
  20. }
  21. // 只要一个值能做界定符 比如nil, 比如0xfffe
  22. func main() {
  23. var ch = make(chan int, 4)
  24. var wg = new(sync.WaitGroup)
  25. wg.Add(2) // 增加计数值
  26. go send(ch, wg) // 写
  27. go send(ch, wg) // 写
  28. go recv(ch)
  29. // Wait() 阻塞等待所有的写通道协程结束
  30. // 待计数值变成零,Wait() 才会返回
  31. wg.Wait()
  32. // 关闭通道
  33. close(ch)
  34. time.Sleep(time.Second)
  35. }
  36. /*
  37. PS E:\3-src\3-src\src> go run .\2\2-6-channel-waitgroup.go
  38. 1
  39. 2
  40. 3
  41. 4
  42. 1
  43. 2
  44. 3
  45. 4
  46. PS E
  47. */

:::success go并发编程(2) - 图4

:::

③、WaitGroup

:::success **<font style="color:rgb(51, 51, 51);">sync.WaitGroup</font>**实现并发任务的同步;引包<font style="color:rgb(51, 51, 51);">import "sync"</font>;

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成

:::

方法名 功能
(wg WaitGroup) *Add(delta int) 计数器+delta
(wg WaitGroup) *Done() 计数器-1
(wg WaitGroup) *Wait() 阻塞直到计数器变为0
  1. var wg sync.WaitGroup
  2. func hello() {
  3. defer wg.Done()
  4. fmt.Println("Hello Goroutine!")
  5. }
  6. func main() {
  7. wg.Add(1)
  8. go hello() // 启动另外一个goroutine去执行hello函数
  9. fmt.Println("main goroutine done!")
  10. wg.Wait()
  11. }

:::success 需要注意sync.WaitGroup是一个结构体,传递参数的时候要传递指针。

:::

④、多路复用select

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。Go内置了**<font style="color:rgb(51, 51, 51);">select</font>**关键字,可以同时响应多个通道的操作。

<font style="color:rgb(51, 51, 51);">select</font>的使用类似于<font style="color:rgb(51, 51, 51);">switch</font>语句,它有一系列<font style="color:rgb(51, 51, 51);">case</font>分支和一个<font style="color:rgb(51, 51, 51);">默认</font>的分支。每个case会对应一个通道的通信(接收或发送)过程。<font style="color:rgb(51, 51, 51);">select</font>会一直等待,直到某个<font style="color:rgb(51, 51, 51);">case</font>的通信操作完成时,就会执行case分支对应的语句。具体格式如下:

  1. select {
  2. case <-chan1:
  3. // 如果chan1成功读到数据,则进行该case处理语句
  4. case chan2 <- 1:
  5. // 如果成功向chan2写入数据,则进行该case处理语句
  6. default:
  7. // 如果上面都没有成功,则进入default处理流程
  8. }

:::success select功能

  • select可以同时监听一个或多个channel,直到其中一个channel ready;
  • 如果多个channel同时ready,则随机选择一个执行
  • 可以用于判断管道是否存满

:::

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func test1(ch chan string) {
  7. time.Sleep(time.Second * 5)
  8. ch <- "test1"
  9. }
  10. func test2(ch chan string) {
  11. time.Sleep(time.Second * 2)
  12. ch <- "test2"
  13. }
  14. func main() {
  15. // 2个管道
  16. output1 := make(chan string)
  17. output2 := make(chan string)
  18. // 跑2个子协程,写数据
  19. go test1(output1)
  20. go test2(output2)
  21. // 用select监控
  22. select {
  23. case s1 := <-output1:
  24. fmt.Println("s1=", s1)
  25. case s2 := <-output2:
  26. fmt.Println("s2=", s2)
  27. }
  28. }
  29. /* s2= test2 */
  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func main() {
  6. // 创建2个管道
  7. int_chan := make(chan int, 1)
  8. string_chan := make(chan string, 1)
  9. go func() {
  10. //time.Sleep(2 * time.Second)
  11. int_chan <- 1
  12. }()
  13. go func() {
  14. string_chan <- "hello"
  15. }()
  16. select {
  17. case value := <-int_chan:
  18. fmt.Println("int:", value)
  19. case value := <-string_chan:
  20. fmt.Println("string:", value)
  21. }
  22. fmt.Println("main结束")
  23. }
  24. /*
  25. string: hello
  26. main结束
  27. */
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // 判断管道有没有存满
  7. func main() {
  8. // 创建管道
  9. output1 := make(chan string, 10)
  10. // 子协程写数据
  11. go write(output1)
  12. // 取数据
  13. for s := range output1 {
  14. fmt.Println("res:", s)
  15. time.Sleep(time.Second)
  16. }
  17. }
  18. func write(ch chan string) {
  19. for {
  20. select {
  21. // 写数据
  22. case ch <- "hello":
  23. fmt.Println("write hello")
  24. default:
  25. fmt.Println("channel full")
  26. }
  27. time.Sleep(time.Millisecond * 500)
  28. }
  29. }
  30. /*
  31. res: hello
  32. write hello
  33. write hello
  34. write hello
  35. res: hello
  36. write hello
  37. res: hello
  38. write hello
  39. write hello
  40. res: hello
  41. write hello
  42. write hello
  43. res: hello
  44. write hello
  45. write hello
  46. res: hello
  47. write hello
  48. write hello
  49. res: hello
  50. write hello
  51. write hello
  52. res: hello
  53. write hello
  54. write hello
  55. res: hello
  56. write hello
  57. write hello
  58. res: hello
  59. write hello
  60. write hello
  61. res: hello
  62. write hello
  63. channel full
  64. res: hello
  65. write hello
  66. channel full
  67. res: hello
  68. write hello
  69. channel full
  70. res: hello
  71. write hello
  72. channel full
  73. res: hello
  74. write hello
  75. channel full
  76. res: hello
  77. write hello
  78. channel full
  79. res: hello
  80. write hello
  81. channel full
  82. res: hello
  83. write hello
  84. channel full
  85. res: hello
  86. write hello
  87. channel full
  88. res: hello
  89. write hello
  90. channel full
  91. res: hello
  92. write hello
  93. channel full
  94. res: hello
  95. write hello
  96. channel full
  97. res: hello
  98. write hello
  99. channel full
  100. res: hello
  101. write hello
  102. channel full
  103. res: hello
  104. write hello
  105. channel full
  106. res: hello
  107. write hello
  108. channel full
  109. res: hello
  110. write hello
  111. channel full
  112. res: hello
  113. write hello
  114. channel full
  115. res: hello
  116. write hello
  117. channel full
  118. res: hello
  119. write hello
  120. channel full
  121. res: hello
  122. write hello
  123. channel full
  124. res: hello
  125. write hello
  126. channel full
  127. res: hello
  128. write hello
  129. channel full
  130. res: hello
  131. write hello
  132. channel full
  133. res: hello
  134. write hello
  135. channel full
  136. res: hello
  137. write hello
  138. channel full
  139. res: hello
  140. write hello
  141. channel full
  142. res: hello
  143. write hello
  144. channel full
  145. res: hello
  146. write hello
  147. channel full
  148. res: hello
  149. write hello
  150. channel full
  151. res: hello
  152. */

:::success 非阻塞读写:当通道空时,读操作不会阻塞,当通道满时,写操作也不会阻塞。非阻塞读写需要依靠 select 语句的 default 分支。当 select 语句所有通道都不可读写时,如果定义了 default 分支,那就会执行 default 分支逻辑,这样就起到了不阻塞的效果。

:::

  1. // 非阻塞读写
  2. package main
  3. import (
  4. "fmt"
  5. "time"
  6. )
  7. func send(ch1 chan int, ch2 chan int) {
  8. i := 0
  9. for {
  10. i++
  11. select {
  12. case ch1 <- i:
  13. fmt.Printf("send ch1 %d\n", i)
  14. case ch2 <- i:
  15. fmt.Printf("send ch2 %d\n", i)
  16. default:
  17. fmt.Printf("ch block\n")
  18. time.Sleep(2 * time.Second) // 这里只是为了演示
  19. }
  20. }
  21. }
  22. func recv(ch chan int, gap time.Duration, name string) {
  23. for v := range ch {
  24. fmt.Printf("receive %s %d\n", name, v)
  25. time.Sleep(gap)
  26. }
  27. }
  28. func main() {
  29. // 无缓冲通道
  30. var ch1 = make(chan int)
  31. var ch2 = make(chan int)
  32. // 两个消费者的休眠时间不一样,名称不一样
  33. go recv(ch1, time.Second, "ch1")
  34. go recv(ch2, 2*time.Second, "ch2")
  35. send(ch1, ch2)
  36. }
  37. /* 截取开始的一部分
  38. ch block
  39. send ch2 2
  40. send ch1 3
  41. ch block
  42. receive ch1 3
  43. receive ch2 2
  44. send ch1 5
  45. send ch2 6
  46. ch block
  47. receive ch2 6
  48. receive ch1 5
  49. send ch1 8
  50. send ch2 9
  51. ch block
  52. receive ch2 9
  53. receive ch1 8
  54. send ch1 11
  55. send ch2 12
  56. ch block
  57. receive ch2 12
  58. receive ch1 11
  59. send ch2 14
  60. send ch1 15
  61. ch block
  62. */

⑤、生产者消费者模型

  1. // 生产者、消费者模型
  2. package main
  3. import (
  4. "fmt"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. "time"
  9. )
  10. // 生产者
  11. func Producer(factor int, out chan<- int) {
  12. for i := 0; ; i++ {
  13. out <- i * factor
  14. time.Sleep(3 * time.Second)
  15. }
  16. }
  17. // 消费者
  18. func Consumer(in <-chan int) {
  19. for v := range in {
  20. fmt.Println(v)
  21. }
  22. }
  23. func main() {
  24. ch := make(chan int, 64)
  25. go Producer(3, ch) // 生成3的倍数序列
  26. go Producer(5, ch) // 生成5的倍数序列
  27. go Consumer(ch)
  28. //Ctrl +C 退出
  29. sig := make(chan os.Signal, 1)
  30. signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
  31. fmt.Printf("wait Ctrl +C\n")
  32. fmt.Printf("quit (%v)\n", <-sig)
  33. }

⑥、并发安全和锁

:::success 数据竞态:可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题;

竞态检查工具**是基于运行时代码检查,而不是通过代码静态分析来完成的。这意味着那些没有机会运行到的代码逻辑中如果存在安全隐患,它是检查不出来的。需要加上-race 执行**。go run <font style="color:#DF2A3F;">-race</font> 代码

:::

:::success 互斥锁(**sync.Mutex**:互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

:::

  1. // 不加锁版本 输出6379
  2. var x int64
  3. var wg sync.WaitGroup
  4. var lock sync.Mutex
  5. func add() {
  6. for i := 0; i < 5000; i++ {
  7. lock.Lock() // 加锁
  8. x = x + 1
  9. lock.Unlock() // 解锁
  10. }
  11. wg.Done()
  12. }
  13. func main() {
  14. wg.Add(2)
  15. go add()
  16. go add()
  17. wg.Wait()
  18. fmt.Println(x)
  19. }
  20. // 加锁版本 输出10000
  21. var x int64
  22. var wg sync.WaitGroup
  23. var lock sync.Mutex
  24. func add() {
  25. for i := 0; i < 5000; i++ {
  26. lock.Lock() // 加锁
  27. x = x + 1
  28. lock.Unlock() // 解锁
  29. }
  30. wg.Done()
  31. }
  32. func main() {
  33. wg.Add(2)
  34. go add()
  35. go add()
  36. wg.Wait()
  37. fmt.Println(x)
  38. }

:::success 读写互斥锁:互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的<font style="color:rgb(51, 51, 51);">RWMutex</font>类型。

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

读写锁优势:读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。

:::

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "sync"
  6. )
  7. var (
  8. x int64
  9. wg sync.WaitGroup
  10. lock sync.Mutex
  11. rwlock sync.RWMutex
  12. )
  13. func write() {
  14. lock.Lock() // 加互斥锁
  15. // rwlock.Lock() // 加写锁
  16. x = x + 1
  17. time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
  18. // rwlock.Unlock() // 解写锁
  19. lock.Unlock() // 解互斥锁
  20. wg.Done()
  21. }
  22. func read() {
  23. lock.Lock() // 加互斥锁
  24. // rwlock.RLock() // 加读锁
  25. time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
  26. // rwlock.RUnlock() // 解读锁
  27. lock.Unlock() // 解互斥锁
  28. wg.Done()
  29. }
  30. func main() {
  31. start := time.Now()
  32. for i := 0; i < 10; i++ {
  33. wg.Add(1)
  34. go write()
  35. }
  36. for i := 0; i < 1000; i++ {
  37. wg.Add(1)
  38. go read()
  39. }
  40. wg.Wait()
  41. end := time.Now()
  42. fmt.Println(end.Sub(start))
  43. }
  44. // 读写锁 146.8492ms
  45. // 互斥锁 15.0116383s

:::success 避免锁复制sync.Mutex是一个结构体对象,这个对象在使用的过程中要避免被复制 —— 浅拷贝。复制会导致锁被「分裂」了,也就起不到保护的作用。所以在平时的使用中要尽量使用它的指针类型

锁复制存在于结构体变量的赋值、函数参数传递、方法参数传递中,都需要注意。

:::

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type SafeDict struct {
  7. data map[string]int
  8. mutex *sync.Mutex
  9. }
  10. func NewSafeDict(data map[string]int) *SafeDict {
  11. return &SafeDict{
  12. data: data,
  13. mutex: &sync.Mutex{},
  14. }
  15. }
  16. // defer 语句总是要推迟到函数尾部运行,所以如果函数逻辑运行时间比较长,
  17. // 这会导致锁持有的时间较长,这时使用 defer 语句来释放锁未必是一个好注意。
  18. func (d *SafeDict) Len() int {
  19. d.mutex.Lock()
  20. defer d.mutex.Unlock()
  21. return len(d.data)
  22. }
  23. // func (d *SafeDict) Test() int {
  24. // d.mutex.Lock()
  25. // length := len(d.data)
  26. // d.mutex.Unlock() // 手动解锁 减少粒度 // 这种情况就不要用 defer d.mutex.Unlock()
  27. // fmt.Println("length: ", length)
  28. // // 这里还有耗时处理 耗时1000ms
  29. // }
  30. func (d *SafeDict) Put(key string, value int) (int, bool) {
  31. d.mutex.Lock()
  32. defer d.mutex.Unlock()
  33. old_value, ok := d.data[key]
  34. d.data[key] = value
  35. return old_value, ok
  36. }
  37. func (d *SafeDict) Get(key string) (int, bool) {
  38. d.mutex.Lock()
  39. defer d.mutex.Unlock()
  40. old_value, ok := d.data[key]
  41. return old_value, ok
  42. }
  43. func (d *SafeDict) Delete(key string) (int, bool) {
  44. d.mutex.Lock()
  45. defer d.mutex.Unlock()
  46. old_value, ok := d.data[key]
  47. if ok {
  48. delete(d.data, key)
  49. }
  50. return old_value, ok
  51. }
  52. func write(d *SafeDict) {
  53. d.Put("banana", 5)
  54. }
  55. func read(d *SafeDict) {
  56. fmt.Println(d.Get("banana"))
  57. }
  58. // go run -race 3-2-lock.go
  59. func main() {
  60. d := NewSafeDict(map[string]int{
  61. "apple": 2,
  62. "pear": 3,
  63. })
  64. go read(d)
  65. write(d)
  66. }

:::info 匿名锁字段:在结构体章节,我们知道外部结构体可以自动继承匿名内部结构体的所有方法。如果将上面的SafeDict 结构体进行改造,将锁字段匿名,就可以稍微简化一下代码

:::

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type SafeDict struct {
  7. data map[string]int
  8. *sync.Mutex
  9. }
  10. func NewSafeDict(data map[string]int) *SafeDict {
  11. return &SafeDict{
  12. data,
  13. &sync.Mutex{}, // 一样是要初始化的
  14. }
  15. }
  16. func (d *SafeDict) Len() int {
  17. d.Lock()
  18. defer d.Unlock()
  19. return len(d.data)
  20. }
  21. func (d *SafeDict) Put(key string, value int) (int, bool) {
  22. d.Lock()
  23. defer d.Unlock()
  24. old_value, ok := d.data[key]
  25. d.data[key] = value
  26. return old_value, ok
  27. }
  28. func (d *SafeDict) Get(key string) (int, bool) {
  29. d.Lock()
  30. defer d.Unlock()
  31. old_value, ok := d.data[key]
  32. return old_value, ok
  33. }
  34. func (d *SafeDict) Delete(key string) (int, bool) {
  35. d.Lock()
  36. defer d.Unlock()
  37. old_value, ok := d.data[key]
  38. if ok {
  39. delete(d.data, key)
  40. }
  41. return old_value, ok
  42. }
  43. func write(d *SafeDict) {
  44. d.Put("banana", 5)
  45. }
  46. func read(d *SafeDict) {
  47. fmt.Println(d.Get("banana"))
  48. }
  49. func main() {
  50. d := NewSafeDict(map[string]int{
  51. "apple": 2,
  52. "pear": 3,
  53. })
  54. go read(d)
  55. write(d)
  56. }

⑦、发布订阅练习

⑧、Context上下文

:::success Context每一个处理都应该有个超时限制,需要在调用中传递这个超时,Context是协程安全的。代码中可以将单个Context传递给任意数量的goroutine,并在取消该Context时可以将信号传递给所有的goroutine。

对服务器传入的请求应该创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancel、WithDeadline、WithTimeout或WithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消

:::

:::success Context接口

**Deadline**方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消;

**<font style="color:#DF2A3F;">Done</font>**方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源;

**Err**方法返回取消的错误原因,因为什么Context被取消;

  • 如果当前Context被取消就会返回<font style="color:rgb(51, 51, 51);">Canceled</font>错误;
  • 如果当前Context超时就会返回<font style="color:rgb(51, 51, 51);">DeadlineExceeded</font>错误;

**Value**方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。

:::

  1. type Context interface {
  2. Deadline() (deadline time.Time, ok bool)
  3. Done() <-chan struct{}
  4. Err() error
  5. Value(key interface{}) interface{}
  6. }

:::success Background()和TODO()

Go内置两个函数:**<font style="color:#DF2A3F;">Background()和TODO()</font>**,这两个函数分别返回一个实现了Context接口的background和todo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。 Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。 TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。 background和todo本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context

:::

:::success With系列函数:四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思;

**WithCancel**函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context

**WithDeadline**函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消

**WithTimeout**和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思,只是传参数不一样。

**WithValue**函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到,WithValue返回父节点的副本,其中与key关联的值为val。仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数

:::

  1. func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  2. func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
  3. func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  4. func WithValue(parent Context, key interface{}, val interface{}) Context
  1. func gen(ctx context.Context) <-chan int {
  2. dst := make(chan int)
  3. n := 1
  4. go func() {
  5. for {
  6. select {
  7. case <-ctx.Done():
  8. return // return结束该goroutine,防止泄露
  9. case dst <- n:
  10. n++
  11. }
  12. }
  13. }()
  14. return dst
  15. }
  16. func main() {
  17. ctx, cancel := context.WithCancel(context.Background())
  18. defer cancel() // 当我们取完需要的整数后调用cancel
  19. for n := range gen(ctx) {
  20. fmt.Println(n)
  21. if n == 5 {
  22. break
  23. }
  24. }
  25. }
  26. /*上面的示例代码中,gen函数在单独的goroutine中生成整数并将它们发送到返回的通道。
  27. gen的调用者在使用生成的整数之后需要取消上下文,以免gen启动的内部goroutine发生泄漏*/
  28. /* 输出
  29. 1
  30. 2
  31. 3
  32. 4
  33. 5
  34. */
  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. "context"
  6. )
  7. func main() {
  8. d := time.Now().Add(50 * time.Millisecond)
  9. ctx, cancel := context.WithDeadline(context.Background(), d)
  10. // 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
  11. // 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
  12. defer cancel()
  13. select {
  14. case <-time.After(1 * time.Second):
  15. fmt.Println("overslept")
  16. case <-ctx.Done():
  17. fmt.Println(ctx.Err())
  18. }
  19. }
  20. // 输出 context deadline exceeded
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // context.WithTimeout
  9. var wg sync.WaitGroup
  10. func worker(ctx context.Context) {
  11. LOOP:
  12. for {
  13. fmt.Println("db connecting ...")
  14. time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
  15. select {
  16. case <-ctx.Done(): // 50毫秒后自动调用
  17. break LOOP
  18. default:
  19. }
  20. }
  21. fmt.Println("worker done!")
  22. wg.Done()
  23. }
  24. func main() {
  25. // 设置一个50毫秒的超时
  26. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
  27. wg.Add(1)
  28. go worker(ctx)
  29. time.Sleep(time.Second * 5)
  30. cancel() // 通知子goroutine结束
  31. wg.Wait()
  32. fmt.Println("over")
  33. }
  34. /*
  35. db connecting ...
  36. db connecting ...
  37. db connecting ...
  38. worker done!
  39. over
  40. */
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. // context.WithValue
  9. type TraceCode string
  10. var wg sync.WaitGroup
  11. func worker(ctx context.Context) {
  12. key := TraceCode("TRACE_CODE")
  13. traceCode, ok := ctx.Value(key).(string) // 在子goroutine中获取trace code
  14. if !ok {
  15. fmt.Println("invalid trace code")
  16. }
  17. LOOP:
  18. for {
  19. fmt.Printf("worker, trace code:%s\n", traceCode)
  20. time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
  21. select {
  22. case <-ctx.Done(): // 50毫秒后自动调用
  23. break LOOP
  24. default:
  25. }
  26. }
  27. fmt.Println("worker done!")
  28. wg.Done()
  29. }
  30. func main() {
  31. // 设置一个50毫秒的超时
  32. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
  33. // 在系统的入口中设置trace code传递给后续启动的goroutine实现日志数据聚合
  34. ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "12512312234")
  35. wg.Add(1)
  36. go worker(ctx)
  37. time.Sleep(time.Second * 5)
  38. cancel() // 通知子goroutine结束
  39. wg.Wait()
  40. fmt.Println("over")
  41. }
  42. /*
  43. worker, trace code:12512312234
  44. worker, trace code:12512312234
  45. worker, trace code:12512312234
  46. worker, trace code:12512312234
  47. worker done!
  48. over
  49. */

:::success Context使用原则

◼ 不要把Context放在结构体中,要以参数的方式进行传递

◼ 以 Context 作为参数的函数方法,应该把 Context 作为第一个参数

◼ 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO;

Context 的 Value 相关方法应该传递请求域的必要数据,不应该用于传递可选参数;

Context 是线程安全的,可以放心的在多个 Goroutine 中传递。

:::

:::success contexts派生上下文:Context包提供了从现有Context值派生新Context值的函数。这些值形成一个树:当一个Context被取消时,从它派生的所有Context也被取消。

:::

⑨、调用服务端API时如何在客户端实现超时控制?

  1. // context_timeout/server/main.go
  2. package main
  3. import (
  4. "fmt"
  5. "math/rand"
  6. "net/http"
  7. "time"
  8. )
  9. // server端,随机出现慢响应
  10. func indexHandler(w http.ResponseWriter, r *http.Request) {
  11. number := rand.Intn(2)
  12. if number == 0 {
  13. time.Sleep(time.Second * 10) // 耗时10秒的慢响应
  14. fmt.Fprintf(w, "slow response")
  15. return
  16. }
  17. fmt.Fprint(w, "quick response")
  18. }
  19. func main() {
  20. http.HandleFunc("/", indexHandler)
  21. err := http.ListenAndServe(":8000", nil)
  22. if err != nil {
  23. panic(err)
  24. }
  25. }
  1. // context_timeout/client/main.go
  2. package main
  3. import (
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "sync"
  9. "time"
  10. )
  11. // 客户端
  12. type respData struct {
  13. resp *http.Response
  14. err error
  15. }
  16. func doCall(ctx context.Context) {
  17. transport := http.Transport{
  18. // 请求频繁可定义全局的client对象并启用长链接
  19. // 请求不频繁使用短链接
  20. DisableKeepAlives: true, }
  21. client := http.Client{
  22. Transport: &transport,
  23. }
  24. respChan := make(chan *respData, 1)
  25. req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
  26. if err != nil {
  27. fmt.Printf("new requestg failed, err:%v\n", err)
  28. return
  29. }
  30. req = req.WithContext(ctx) // 使用带超时的ctx创建一个新的client request
  31. var wg sync.WaitGroup
  32. wg.Add(1)
  33. defer wg.Wait()
  34. go func() {
  35. resp, err := client.Do(req)
  36. fmt.Printf("client.do resp:%v, err:%v\n", resp, err)
  37. rd := &respData{
  38. resp: resp,
  39. err: err,
  40. }
  41. respChan <- rd
  42. wg.Done()
  43. }()
  44. select {
  45. case <-ctx.Done():
  46. //transport.CancelRequest(req)
  47. fmt.Println("call api timeout")
  48. case result := <-respChan:
  49. fmt.Println("call server api success")
  50. if result.err != nil {
  51. fmt.Printf("call server api failed, err:%v\n", result.err)
  52. return
  53. }
  54. defer result.resp.Body.Close()
  55. data, _ := ioutil.ReadAll(result.resp.Body)
  56. fmt.Printf("resp:%v\n", string(data))
  57. }
  58. }
  59. func main() {
  60. // 定义一个100毫秒的超时
  61. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
  62. defer cancel() // 调用cancel释放子goroutine资源
  63. doCall(ctx)
  64. }
  65. /* 请求失败打印
  66. call api timeout
  67. client.do resp:<nil>, err:Get "http://127.0.0.1:8000/": context deadline exceeded
  68. */
  69. /* 请求成功打印
  70. client.do resp:&{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[14] Content-Type:[text/plain; charset=utf-8] Date:[Sat, 08 Apr 2023 13:55:48 GMT]] 0xc00019e040 14 [] true
  71. false map[] 0xc00012e100 <nil>}, err:<nil>
  72. call server api success
  73. resp:quick response
  74. */