1. Goroutine

1.1. Go实现并发的方式

1.1.1. 并发与并行

  • 并发:同一时间段内执行多个任务,不同的任务通过上下文切换实现调度
  • 并行:同一时刻执行多个任务,多个任务在不同的CPU上同时运行

    1.1.2. Goroutine

    在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换。
    Go语言中的goroutine概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
    在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了。
    OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
    单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

    1.1.3. GPM调度系统

    GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。

  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。P的个数是通过 runtime.GOMAXPROCS 设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。

    1.1.4. 原子操作

    原子操作是指并发编程中最小且不可以并行的操作,当并发线程对一个共享资源的的操作时原子操作的话,那么同一时间就只能有一个线程能操作该资源,实现方式就是加锁。

    1.2. Goroutine的使用

    1.2.1. goroutine

    golang 中使用 goroutine 非常方便,只需要将需要放到代码封装为一个函数(命名函数或者匿名函数),然后使用 go func 执行函数即可。 ```go package main

import ( “fmt” “runtime” “time” )

func f0() { , , line, _ := runtime.Caller(0) fmt.Printf(“time:%v; 当前行号:%v\n”, time.Now().Format(“15:04:05”), line+1) }

func main() { go func() { , , line, _ := runtime.Caller(0) fmt.Printf(“time:%v; 当前行号:%v\n”, time.Now().Format(“15:04:05”), line+1) }() // 执行匿名函数 go f0() // 执行命名函数

  1. fmt.Printf("time:%v; 当前行号:%v\n", time.Now().Format("15:04:05"), 22)
  2. time.Sleep(time.Second)
  3. fmt.Printf("time:%v; 当前行号:%v\n", time.Now().Format("15:04:05"), 24)

}

  1. ```
  2. E:\Projects\Go\src\learn\day10-goroutine\goroutine>go run mian.go
  3. time:11:31:06; 当前行号:22
  4. time:11:31:06; 当前行号:17
  5. time:11:31:06; 当前行号:11
  6. time:11:31:07; 当前行号:24

1.2.2. WatiGroup

上述案例中,如果不加入 line22 会出现mian函数在go线程之前退出,main退出后,整个程序就结束了。在实际的编程中,主进程需要等待子进程完成所有任务后才能退出, sync.WaitGroup 用于实现goroutine同步。

  1. 1. type WaitGroup struct { }
  2. WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。
  3. 每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。
  4. 2. func (wg *WaitGroup) Add(delta int)
  5. Add方法向内部计数加上deltadelta可以是负数;
  6. 如果内部计数器变为0Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic
  7. 注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。
  8. 一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。
  9. 3. func (wg *WaitGroup) Done()
  10. Done方法减少WaitGroup计数器的值,应在线程的最后执行,一般配合 defer 使用
  11. 4. func (wg *WaitGroup) Wait()
  12. Wait方法阻塞直到WaitGroup计数器减为0
  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "time"
  7. )
  8. var wg sync.WaitGroup
  9. func f0() {
  10. defer wg.Done()
  11. _, _, line, _ := runtime.Caller(0)
  12. fmt.Printf("time:%v; 当前行号:%v\n", time.Now().Format("15:04:05"), line+1)
  13. }
  14. func main() {
  15. wg.Add(2)
  16. go func() {
  17. defer wg.Done()
  18. _, _, line, _ := runtime.Caller(0)
  19. fmt.Printf("time:%v; 当前行号:%v\n", time.Now().Format("15:04:05"), line+1)
  20. }()
  21. go f0()
  22. fmt.Printf("time:%v; 当前行号:%v\n", time.Now().Format("15:04:05"), 28)
  23. wg.Wait()
  24. }
  1. time:14:08:11; 当前行号:28
  2. time:14:08:11; 当前行号:15
  3. time:14:08:11; 当前行号:24

1.2.3. GOMAXPROCS

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。
在部分场景中,比如日志采集插件或者监控插件等非业务进程中,需要降低对业务进程的性能影响,需要开发者手动调整GOMAXPROCS的值。

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. )
  7. var wg sync.WaitGroup
  8. func func01() {
  9. defer wg.Done()
  10. for i := 0; i < 10; i++ {
  11. fmt.Printf("函数:%v; value:%v\n", "func01", i)
  12. }
  13. }
  14. func func02() {
  15. defer wg.Done()
  16. for i := 0; i < 10; i++ {
  17. fmt.Printf("函数:%v; value:%v\n", "func02", i)
  18. }
  19. }
  20. func main() {
  21. wg.Add(2)
  22. runtime.GOMAXPROCS(2)
  23. go func01()
  24. go func02()
  25. wg.Wait()
  26. }

2. channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

2.1. Channel基本使用

07-并发编程 - 图1

2.1.1. channel定义

channel 是引用类型,声明后需要使用make初始化分配内存才能使用。

  1. func main() {
  2. var ch1 chan int
  3. fmt.Printf("type:%T;value:%v\n", ch1, ch1)
  4. ch1 = make(chan int, 10)
  5. fmt.Printf("type:%T;value:%v\n", ch1, ch1)
  6. }
  1. [root@heyingsheng channel]# go run main.go
  2. type:chan int;value:<nil>
  3. type:chan int;value:0xc0000b6000
  1. type student struct {
  2. id string
  3. name string
  4. age uint8
  5. }
  6. func main() {
  7. stuChan := make(chan student, 10)
  8. fmt.Printf("type:%T;value:%v;cap:%d\n", stuChan, stuChan, cap(stuChan))
  9. }
  1. [root@heyingsheng channel]# go run main.go
  2. type:chan main.student;value:0xc0000b0120;cap:10

2.1.2. channel 简单使用

  1. package main
  2. import "fmt"
  3. type student struct {
  4. name string
  5. age uint8
  6. }
  7. func main() {
  8. ch1 := make(chan student, 2)
  9. stu1 := student{
  10. name: "张三",
  11. age: 18,
  12. }
  13. ch1 <- stu1 // 向 ch1 写入数据
  14. ch1 <- student{
  15. name: "李四",
  16. age: 20,
  17. }
  18. <-ch1 // 取出ch1中的值,并丢弃
  19. stu1 = <-ch1 // 取出ch1中的值,并赋值给 stu1
  20. close(ch1) // 关闭通道 ch1
  21. fmt.Println(stu1) // {李四 20}
  22. }

2.1.3. 遍历channel

遍历channel中的值,有两种方式:

  • 使用 for 配合 if 判断,使用 v, ok := ch 取值时,如果channel关闭则ok为 false,否则为 true
  • 使用 range 遍历ch,当ch关闭时,则自动退出 range 遍历

注意:如果注释掉下面代码的 line 17,则会出现死锁 fatal error: all goroutines are asleep - deadlock!

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var wg sync.WaitGroup
  8. func f0(ch1 chan int) {
  9. defer wg.Done()
  10. for i:=0;i<100;i++ {
  11. ch1 <- i
  12. time.Sleep(time.Millisecond * 50)
  13. }
  14. close(ch1)
  15. }
  16. func f1(ch1 chan int) {
  17. defer wg.Done()
  18. for {
  19. if v, ok := <- ch1; ok {
  20. fmt.Printf("time:%v;func:f1;value:%v\n", time.Now().Format("15:04:05"), v)
  21. continue
  22. }
  23. break
  24. }
  25. }
  26. func f2(ch1 chan int) {
  27. defer wg.Done()
  28. for v:=range ch1 {
  29. fmt.Printf("time:%v;func:f2;value:%v\n", time.Now().Format("15:04:05"), v)
  30. }
  31. }
  32. func main() {
  33. wg.Add(3)
  34. num := make(chan int, 15)
  35. go f0(num)
  36. go f1(num)
  37. go f2(num)
  38. wg.Wait()
  39. }

2.1.4. 阻塞通道

上述案例中,每个通道指定了容量 cap,如果在make初始化时,不指定容量,则会通道cap为0,该通道称为阻塞通道或者无缓冲通道,实际开发中使用的很少。

  1. // 以下代码编译会通过,但是运行会报错: fatal error: all goroutines are asleep - deadlock!
  2. func main() {
  3. ch0 := make(chan int)
  4. ch0 <- 100
  5. fmt.Println(<-ch0)
  6. }
  1. // 正确的代码如下
  2. func main() {
  3. ch0 := make(chan int)
  4. go func() {
  5. ch0 <- 100
  6. }()
  7. fmt.Println(<- ch0)
  8. }

2.1.5. 单向通道

通常的channel都是可读可写,但是在部分场景中,需要对通道进行限制,如设置只读通道,或者只写通道,通常是作为函数的参数传递。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type result struct {
  7. num int
  8. res int
  9. }
  10. var wg0 sync.WaitGroup
  11. // 生产 0-100 的数字,存入只写通道 cha
  12. func createNum(cha chan <- int) {
  13. defer wg0.Done()
  14. for i:=0; i<100; i++ {
  15. cha <- i
  16. }
  17. close(cha)
  18. }
  19. // 取出只读通道 in 中的数字,取平方后存入只写通道 out
  20. func squarer(in <- chan int, out chan <- result) {
  21. defer wg0.Done()
  22. for i:=range in {
  23. out <- result{
  24. num: i,
  25. res: i*i,
  26. }
  27. }
  28. close(out)
  29. }
  30. // 从只读通道 in 中读取计算结果
  31. func printRes(in <- chan result) {
  32. defer wg0.Done()
  33. for i := range in {
  34. fmt.Printf("num:%d --> res:%d\n", i.num, i.res)
  35. }
  36. }
  37. func main() {
  38. ch0 := make(chan int,10)
  39. ch1 := make(chan result, 10)
  40. wg0.Add(3)
  41. go createNum(ch0)
  42. go squarer(ch0, ch1)
  43. go printRes(ch1)
  44. wg0.Wait()
  45. }

2.2. goroutine 池

用于指定某类goroutine的数量,并不是总的goroutine的数量,类似于其它编程语言中的线程池。其目的在于避免突如其来的的高并发导致goroutine暴涨或者泄露。控制思路如下:

  1. 1. 设置一个任务channel,所有的任务都发往整个队列
  2. 2. 设置一个 for 循环,该循环的次数就是woker的数量,也就是控制的 goroutine 数量
  3. 3. 这些 worker 都监听同一个任务队列,当有新的任务来了,会被其中一个 worker 抢到
  1. package main
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. )
  7. var wg1 sync.WaitGroup
  8. type student struct {
  9. name string
  10. age int
  11. }
  12. // 随机产生n位字符串
  13. func getName(n int) string {
  14. ret := make([]byte, n) // [a-z] --> 97-->122 --> 5
  15. for i:=0; i < n; i ++ {
  16. ret[i] = byte(rand.Intn(26) + 97)
  17. }
  18. return string(ret)
  19. }
  20. // 随机产生学生信息
  21. func producer(n int, out chan <- *student) {
  22. defer wg1.Done()
  23. for i:=0; i<n; i++ {
  24. out <- &student{
  25. name: getName(5),
  26. age : rand.Intn(16) + 10, // [10,25]
  27. }
  28. }
  29. close(out)
  30. }
  31. // 工作函数,即消费者
  32. func worker(n int, job chan *student) {
  33. defer wg1.Done()
  34. for stu := range job {
  35. ret := fmt.Sprintf("name:%v;age:%d", stu.name, stu.age)
  36. fmt.Printf("worker-%d --> %v\n", n, ret)
  37. }
  38. }
  39. func main() {
  40. students := make(chan *student, 10)
  41. for i:=0; i<5 ; i ++ {
  42. wg1.Add(1)
  43. go worker(i, students)
  44. }
  45. go producer(15, students)
  46. wg1.Wait()
  47. }

2.3. Select

select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。除了空 select{} ,其它一般放在for循环中处理通道数据。

  • 可处理一个或多个channel的发送/接收操作
  • 如果多个case同时满足,select会随机选择一个
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数 ```go package main

import “fmt”

func main() { ch := make(chan int, 1) for i:=0;i<10;i++ { select { case x := <-ch:
fmt.Println(x) case ch <- i:
} } }

  1. ```
  2. [root@heyingsheng channel]# go run select.go
  3. 0
  4. 2
  5. 4
  6. 6
  7. 8

3. 并发安全

3.1. 互斥锁

在实际并发编程中,会涉及到多个goroutine对一个公共变量进行修改,比如同时来了A和B两个任务需要对同一个实例的字段进行更新,此时就出现了资源争抢,需要对这个实例进行加锁,避免数据异常。
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

  1. type student struct {
  2. name string
  3. metric int
  4. }
  5. func main() {
  6. stu01 := student{
  7. name: "张三",
  8. metric: 0,
  9. }
  10. var wg01 sync.WaitGroup
  11. wg01.Add(1000)
  12. for i := 0; i < 1000; i++ {
  13. go func(stu *student) {
  14. defer wg01.Done()
  15. stu.metric++
  16. }(&stu01)
  17. }
  18. wg01.Wait()
  19. fmt.Println(stu01) // metric 数字不固定,因为不同的 goroutine 直接有竞争
  20. }
  1. type student struct {
  2. lock sync.Mutex // 添加一把互斥锁
  3. name string
  4. metric int
  5. }
  6. func main() {
  7. stu01 := student{
  8. name: "张三",
  9. metric: 0,
  10. }
  11. var wg01 sync.WaitGroup
  12. wg01.Add(1000)
  13. for i := 0; i < 1000; i++ {
  14. go func(stu *student) {
  15. defer wg01.Done()
  16. stu.lock.Lock()
  17. stu.metric++
  18. stu.lock.Unlock()
  19. }(&stu01)
  20. }
  21. wg01.Wait()
  22. fmt.Printf("name:%v;metric:%v\n", stu01.name, stu01.metric)
  23. }

3.2. 读写锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
在读多写少的情况,且数据并不是始终存在内存中的情况下,使用读写锁比互斥锁有着更高的性能

  1. var wg02 sync.WaitGroup
  2. type Person struct {
  3. rwlock sync.RWMutex
  4. lock sync.Mutex
  5. name string
  6. metric int
  7. }
  8. func (p *Person) read(lockType string) {
  9. defer wg02.Done()
  10. if lockType == "rwlock" {
  11. p.rwlock.RLock()
  12. time.Sleep(time.Millisecond) // 模拟 1ms I/O 阻塞
  13. _ = p.name
  14. _ = p.metric
  15. p.rwlock.RUnlock()
  16. return
  17. }
  18. p.lock.Lock()
  19. time.Sleep(time.Millisecond) // 模拟 1ms I/O 阻塞
  20. _ = p.name
  21. _ = p.metric
  22. p.lock.Unlock()
  23. }
  24. func (p *Person) write(lockType string) {
  25. wg02.Done()
  26. if lockType == "rwlock" {
  27. p.rwlock.Lock()
  28. time.Sleep(time.Millisecond*2) // 模拟 1ms I/O 阻塞
  29. p.metric++
  30. p.rwlock.Unlock()
  31. return
  32. }
  33. p.lock.Lock()
  34. time.Sleep(time.Millisecond*2) // 模拟 1ms I/O 阻塞
  35. p.metric++
  36. p.lock.Unlock()
  37. }
  38. func main() {
  39. p0 := &Person{name: "张三"}
  40. // 开始测试读写锁性能
  41. start := time.Now()
  42. wg02.Add(11000)
  43. for i := 0; i < 1000; i++ {
  44. go p0.write("rwlock") // 写1000次
  45. }
  46. for i := 0; i < 10000; i++ {
  47. go p0.read("rwlock") // 读10000次
  48. }
  49. wg02.Wait()
  50. fmt.Printf("读写锁耗时:%v\n", time.Now().Sub(start))
  51. // 开始测试互斥锁性能
  52. start = time.Now()
  53. wg02.Add(11000)
  54. for i := 0; i < 1000; i++ {
  55. go p0.write("lock") // 写1000次
  56. }
  57. for i := 0; i < 10000; i++ {
  58. go p0.read("lock") // 读10000次
  59. }
  60. wg02.Wait()
  61. fmt.Printf("互斥锁耗时:%v\n", time.Now().Sub(start))
  62. }
  1. [root@heyingsheng locks]# go run rwlock.go
  2. 读写锁耗时:10.9732ms
  3. 互斥锁耗时:14.5726255s

3.3. sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等,为了避免多次操作造成报错或者不必要的资源浪费,Go提供了 sync.Once 结构体,该结构体只有一个方法 func (o *Once) Do(f func()) {} ,该方法接收一个没有参数和返回值的函数。

3.3.1. 加载配置文件

  1. var icons map[string]image.Image
  2. func loadIcons() {
  3. icons = map[string]image.Image{
  4. "left": loadIcon("left.png"),
  5. "up": loadIcon("up.png"),
  6. "right": loadIcon("right.png"),
  7. "down": loadIcon("down.png"),
  8. }
  9. }
  10. // Icon 被多个goroutine调用时不是并发安全的
  11. func Icon(name string) image.Image {
  12. if icons == nil {
  13. loadIcons()
  14. }
  15. return icons[name]
  16. }

多个goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个goroutine都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

  1. func loadIcons() {
  2. icons = make(map[string]image.Image)
  3. icons["left"] = loadIcon("left.png")
  4. icons["up"] = loadIcon("up.png")
  5. icons["right"] = loadIcon("right.png")
  6. icons["down"] = loadIcon("down.png")
  7. }

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的goroutine操作,但是这样做又会引发性能问题。使用sync.Once改造的示例代码如下:

  1. var icons map[string]image.Image
  2. var loadIconsOnce sync.Once
  3. func loadIcons() {
  4. icons = map[string]image.Image{
  5. "left": loadIcon("left.png"),
  6. "up": loadIcon("up.png"),
  7. "right": loadIcon("right.png"),
  8. "down": loadIcon("down.png"),
  9. }
  10. }
  11. // Icon 是并发安全的
  12. func Icon(name string) image.Image {
  13. loadIconsOnce.Do(loadIcons)
  14. return icons[name]
  15. }

3.3.2. 并发安全的单例模式

下面是借助sync.Once实现的并发安全的单例模式:

  1. package singleton
  2. import (
  3. "sync"
  4. )
  5. type singleton struct {}
  6. var instance *singleton
  7. var once sync.Once
  8. func GetInstance() *singleton {
  9. once.Do(func() {
  10. instance = &singleton{}
  11. })
  12. return instance
  13. }

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

3.4. sync.Map

在Go语言中,Map并不是并发安全的,如下的案例中,就可能出现报错: fatal error: concurrent map writes ,当然可以在 setMap() 和 getMap 上加互斥锁或者读写锁来实现数据安全,避免报错,但是Go本身提供了一个带锁的map和对对应的方法,可以开箱即用。

  1. var wg3 sync.WaitGroup
  2. func setMap(m map[int]string, key int, value string) {
  3. defer wg3.Done()
  4. wg3.Add(1)
  5. m[key] = value
  6. }
  7. func main() {
  8. m0 := make(map[int]string, 10)
  9. for i := 0; i < 10000; i++ {
  10. go setMap(m0, i, strconv.Itoa(i))
  11. }
  12. wg3.Wait()
  13. }
  1. # 手动添加互斥锁
  2. var wg3 sync.WaitGroup
  3. var lock sync.Mutex
  4. func setMap(m map[int]string, key int, value string) {
  5. defer wg3.Done()
  6. wg3.Add(1)
  7. lock.Lock()
  8. m[key] = value
  9. lock.Unlock()
  10. }
  11. func main() {
  12. m0 := make(map[int]string, 10)
  13. for i := 0; i < 10000; i++ {
  14. go setMap(m0, i, strconv.Itoa(i))
  15. }
  16. wg3.Wait()
  17. }
  1. var wg3 sync.WaitGroup
  2. func main() {
  3. m0 := sync.Map{}
  4. wg3.Add(10000)
  5. for i := 0; i < 10000; i++ {
  6. go func(i int) {
  7. defer wg3.Done()
  8. m0.Store(i, strconv.Itoa(i))
  9. }(i)
  10. }
  11. wg3.Wait()
  12. }
  1. 1. func (m *Map) Store(key, value interface{}) {}
  2. 存数据
  3. 2. func (m *Map) Load(key interface{}) (value interface{}, ok bool) {}
  4. 取数据
  5. 3. func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {}
  6. 存在则返回值,不存在存储并返回
  7. 4. func (m *Map) Delete(key interface{}) {}
  8. 删除值

4. Context包

在Go语言中,常常用一个goroutine来处理一个请求,而这个请求往往需要继续开子goroutine来处理其中部分请求,达能请求超时需要关闭这个goroutine和它的子goroutine,这种需求可以使用全局变量或者管道来实现。

4.1. 通知子goroutine关闭

4.1.1. 全局变量方式

  1. var wg01 sync.WaitGroup
  2. var flag bool
  3. func func01() {
  4. defer wg01.Done()
  5. for ! flag {
  6. fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
  7. time.Sleep(time.Second)
  8. }
  9. }
  10. func main() {
  11. wg01.Add(1)
  12. go func01()
  13. time.Sleep(time.Second * 5)
  14. flag = true
  15. wg01.Wait()
  16. }

4.1.2. 管道方式

  1. var wg02 sync.WaitGroup
  2. var exitChan = make(chan struct{}) // 空结构体或者bool都行
  3. func func02() {
  4. defer wg02.Done()
  5. for {
  6. fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
  7. time.Sleep(time.Second)
  8. select {
  9. case <-exitChan:
  10. fmt.Println("exit")
  11. return
  12. default:
  13. }
  14. }
  15. }
  16. func main() {
  17. wg02.Add(1)
  18. go func02()
  19. time.Sleep(time.Second * 5)
  20. exitChan <- struct{}{}
  21. wg02.Wait()
  22. }

4.1.3. context方式

  1. var wg03 sync.WaitGroup
  2. func func03(ctx context.Context) {
  3. defer wg03.Done()
  4. for {
  5. fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
  6. time.Sleep(time.Second)
  7. select {
  8. case <-ctx.Done():
  9. return
  10. default:
  11. }
  12. }
  13. }
  14. func main() {
  15. ctx, cancel := context.WithCancel(context.Background())
  16. wg03.Add(1)
  17. go func03(ctx)
  18. time.Sleep(time.Second * 5)
  19. cancel()
  20. wg03.Wait()
  21. }

4.1.4. 多级goroutine情况

如果存在多级的goroutine调用,把ctx继续传递下去就行,ctx内部实现了多级goroutine的安全退出,不会因为其中一个goroutine取了管道中的值,而导致其它goroutine无法取值。

  1. var wg04 sync.WaitGroup
  2. func childWorker(ctx context.Context) {
  3. for {
  4. fmt.Println("child", time.Now().Format("2006-01-02 15:04:05"))
  5. time.Sleep(time.Second)
  6. select {
  7. case <-ctx.Done():
  8. return
  9. default:
  10. }
  11. }
  12. }
  13. func func04(ctx context.Context) {
  14. defer wg04.Done()
  15. go childWorker(ctx)
  16. for {
  17. fmt.Println("parent", time.Now().Format("2006-01-02 15:04:05"))
  18. time.Sleep(time.Second)
  19. select {
  20. case <-ctx.Done():
  21. return
  22. default:
  23. }
  24. }
  25. }
  26. func main() {
  27. ctx, cancel := context.WithCancel(context.Background())
  28. wg04.Add(1)
  29. go func04(ctx)
  30. time.Sleep(time.Second * 5)
  31. cancel()
  32. wg04.Wait()
  33. }

4.1.5. goroutine kill 问题

golang 并不能停止一个正在运行的 goroutine,上面提到的通过goroutine关闭,并不是最底层处理阻塞任务的那个gorountine,比如下面的这个案例:
Run() 方法可以提前结束退出,但是 exec() 方法需要等待 ansible执行完毕。无法在 Run()中强制结束 exec() 方法。为了避免exec()超时后,仍然修改执行结果,增加了 timeout 这个字段,当该字段为 true 时,主动放弃exec() 的执行结果。

  1. type Ansible struct {
  2. ticker <-chan time.Time // 定时器,用于超时处理
  3. result string // ansible运行结果
  4. timeout bool // 超时状态
  5. done chan bool // 完成任务的标志位
  6. }
  7. func NewAnsible(duration time.Duration) *Ansible {
  8. return &Ansible{
  9. ticker: time.After(duration),
  10. done: make(chan bool),
  11. }
  12. }
  13. // 执行ansible 任务,并更新实例的状态,比如将 result 写入数据等
  14. func (a *Ansible) Run() {
  15. go a.exec()
  16. for {
  17. select {
  18. case <-a.ticker:
  19. a.timeout = true
  20. a.result = "task failed, error: timeout"
  21. return
  22. case <-a.done:
  23. a.result = "task success, ..... "
  24. return
  25. }
  26. }
  27. }
  28. // 正在执行ansible命令的函数,该函数是阻塞的,会等待 ansible 执行完毕
  29. func (a *Ansible) exec() {
  30. time.Sleep(time.Second * 2) // 模拟ansible 执行,此处为了快速出结果,仅休眠2秒
  31. if a.timeout {
  32. return
  33. }
  34. a.done <- true
  35. }
  36. func main() {
  37. task01 := NewAnsible(time.Second * 1)
  38. task01.Run()
  39. fmt.Printf("task01 resutl:%s\n", task01.result)
  40. task02 := NewAnsible(time.Second * 3)
  41. task02.Run()
  42. fmt.Printf("task02 resutl:%s\n", task02.result)
  43. }
  1. [root@duduniao go_learn]# go run day23/groutine/test01.go
  2. task01 resutl:task failed, error: timeout
  3. task02 resutl:task success, .....

4.2. Context基础

Go1.7加入了一个新的标准库context,它定义了Context类型,专门用来简化 对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

  1. 1. type Context interface
  2. type Context interface {
  3. Deadline() (deadline time.Time, ok bool)
  4. Done() <-chan struct{}
  5. Err() error
  6. Value(key interface{}) interface{}
  7. }
  8. 定义接口
  9. 2. func Background() Context
  10. 返回一个不会被取消,没有值,也不会过期的Context类型,一般作为顶层节点的 Context,用于mian函数、init函数等。
  11. 3. func TODO() Context
  12. 保留项,目前官方还没有定义具体的用途
  13. 4. type CancelFunc func()
  14. 定义一个没有参数和返回值的函数类型: CancelFunc
  15. 调用后,释放 Context 上下文资源。
  16. 5. func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  17. 返回带有新Done通道的父节点的副本,从 ctx.Done() 中获取空结构体,用于goroutine之间的通信
  18. 6. func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
  19. 带过期时间的context
  20. 7. func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
  21. 带过期时间的context
  22. 8. func WithValue(parent Context, key, val interface{}) Context
  23. 可以在不同的API和进程之间传递数据,一般用于追踪一个请求的链路,比如用 traceID 作为唯一值,在不同API之间转递
  24. 仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。
  25. 所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。
  26. WithValue的用户应该为键定义自己的类型。

4.3. 案例

4.3.1. timeout

  1. func main() {
  2. ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5)
  3. defer cancel()
  4. for {
  5. fmt.Println(time.Now().Format("15:04:05"))
  6. time.Sleep(time.Second)
  7. select {
  8. case <- ctx.Done():
  9. return
  10. default:
  11. }
  12. }
  13. }

4.3.2. deadline

  1. var wg01 sync.WaitGroup
  2. func func01(ctx context.Context) {
  3. defer wg01.Done()
  4. for {
  5. fmt.Println(time.Now().Format("15:04:05"))
  6. time.Sleep(time.Second)
  7. select {
  8. case <- ctx.Done():
  9. return
  10. default:
  11. }
  12. }
  13. }
  14. func main() {
  15. ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second * 5))
  16. defer cancel()
  17. wg01.Add(2)
  18. go func01(ctx)
  19. go func01(ctx)
  20. wg01.Wait()
  21. }

4.3.3. value

  1. type TraceCode string
  2. var wg sync.WaitGroup
  3. func worker(ctx context.Context) {
  4. key := TraceCode("TRACE_CODE")
  5. traceCode, ok := ctx.Value(key).(string) // 在子goroutine中获取trace code
  6. if !ok {
  7. fmt.Println("invalid trace code")
  8. }
  9. LOOP:
  10. for {
  11. fmt.Printf("worker, trace code:%s\n", traceCode)
  12. time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
  13. select {
  14. case <-ctx.Done(): // 50毫秒后自动调用
  15. break LOOP
  16. default:
  17. }
  18. }
  19. fmt.Println("worker done!")
  20. wg.Done()
  21. }
  22. func main() {
  23. // 设置一个50毫秒的超时
  24. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
  25. // 在系统的入口中设置trace code传递给后续启动的goroutine实现日志数据聚合
  26. ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "12512312234")
  27. wg.Add(1)
  28. go worker(ctx)
  29. time.Sleep(time.Second * 5)
  30. cancel() // 通知子goroutine结束
  31. wg.Wait()
  32. fmt.Println("over")
  33. }

5. 并发模式

5.1. 生产者和消费者模型

并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题。

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. var (
  7. ch = make(chan int, 20)
  8. )
  9. func producer(num int, queue chan<- int) {
  10. for i := 0; ; i++ {
  11. queue <- num * i
  12. time.Sleep(100 * time.Millisecond)
  13. }
  14. }
  15. func consumer(name string, queue <-chan int) {
  16. for v := range queue {
  17. fmt.Printf("consumer:%s;res:%d\n", name, v)
  18. }
  19. }
  20. func main() {
  21. go producer(5, ch)
  22. go producer(7, ch)
  23. go consumer("c1", ch)
  24. go consumer("c2", ch)
  25. select {}
  26. }

5.2. 发布订阅模型

发布订阅者模型(publish-and-subscribe),一般是多个服务应用之间,通过中间件消息队列,如Kafka实现。消费者和生产者模型中,是通过队列进行异步的消息传递的。而发布订阅模型则引入了Topic概念,发布者将消息发布到一个主题,只有订阅了该主题的程序才能收到和消费这条消息。

5.3. 赢者为王

并发编程在大部分情况下,目的是为了避免阻塞,提高程序运行的性能。但是在部分场景中,如果一个问题由多个处理方法,但是不同情况下不同处理方式耗费的时间不同,此时可以让多个方式一起运行,仅取第一个完成的结果。

  1. func main() {
  2. ch := make(chan string, 32)
  3. go func() {
  4. ch <- searchByBing("golang")
  5. }()
  6. go func() {
  7. ch <- searchByGoogle("golang")
  8. }()
  9. go func() {
  10. ch <- searchByBaidu("golang")
  11. }()
  12. fmt.Println(<-ch)
  13. }