Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成。

Go语言还提供channel在多个goroutine间进行通信。goroutinechannel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

主线程与协程

Go的主线程也可以理解为是进程,它可以起多个协程,而协程可以理解为是轻量级的线程。

主线程是一个物理线程,直接作用在CPU上,是重量级的,非常耗费资源。

协程是通过主线程开启的,是轻量级的线程,是逻辑态,是轻量级的,对资源消耗相对较少。

协程的主要特点有:

  • 有自己独立的栈空间
  • 共享程序的堆空间
  • 协程由用户控制
  • 协程是轻量级的线程

并发与并行

并发:同一时间段内同时执行多个任务;

并行:同一时刻同时执行多个任务;

并发的关键是具有处理多个任务的能力,并不要求要在同一时刻进行。并行的关键是同时具有处理多个任务处理的能力。

goroutine

在Go语言中,goroutine类似于线程,但是goroutine是由运行时runtime调度与管理。Go语言程序会智能的将goroutine中的任务合理的分配给每个CPU。

使用goroutine

在Go语言中使用goroutine只需要在调用函数的时候在前面加一个go关键字。如下:

  1. func f1(){
  2. fmt.Println("Hello World")
  3. }
  4. func main(){
  5. go f1()
  6. }

一个goroutine必须对应一个函数,可以创建多个goroutine去执行相同的函数。如下:

  1. func f1(){
  2. fmt.Println("Hello World")
  3. }
  4. func main(){
  5. go f1()
  6. go f1()
  7. }

上面介绍了启用goroutine的方法,下面通过一个例子来看启用goroutine与不启用goroutine执行程序所得的结果有什么不同。

先看如下代码:

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func hello(i int) {
  6. fmt.Println("hello", i)
  7. }
  8. func main() {
  9. for i := 0; i < 5; i++ {
  10. hello(i)
  11. }
  12. fmt.Println("hello")
  13. }

不执行代码,我们就可以知道这段代码的输出结果是顺序输出。如下:

  1. hello 0
  2. hello 1
  3. hello 2
  4. hello 3
  5. hello 4
  6. hello

现在给调用hello()函数的时候加上goroutine,如下:

  1. func main() {
  2. for i := 0; i < 5; i++ {
  3. go hello(i)
  4. }
  5. fmt.Println("hello")
  6. }

其输出结果如下:

  1. hello

为啥只输出了hello呢,我们函数调用里的输出哪去了呢?

在Go语言中,main()函数是作为程序的入口函数,在它启动的时候,Go程序会默认为其添加一个goroutinemain()函数执行完后整个程序就结束了,所以上面的代码输出hello后整个程序就结束了。而没有输出hello() 方法中的程序是由于goroutine启动是需要一点时间的,而main()函数结束的又太快了,导致hello()函数都还没有来得及打印整个程序就结束了。

我们可以对其加一个time.Sleep()方法让main()稍等片刻,如下:

  1. func main() {
  2. for i := 0; i < 5; i++ {
  3. go hello(i)
  4. }
  5. fmt.Println("hello")
  6. time.Sleep(time.Second)
  7. }

其执行结果如下:

  1. hello 3
  2. hello 1
  3. hello 4
  4. hello 2
  5. hello
  6. hello 0

多次执行上面的代码,会发现每次打印的顺序都不一致。这是因为每个goroutine是并发执行的,而goroutine的调度是随机的。

上面的例子是我们用循环来启动了多个goroutine,我们为了得到goroutine所调用函数的结果使用了time.Sleep()函数,这种方法不好之处在于对于复杂的函数我们并不知道它到底需要多久才能执行完成,如果我们把时间设置过长则影响性能,如果我们设置过断则影响应用,非常不好去判断这个点。在Go语言中对于并行操作有一个更优雅的退出方式,那就是使用sync包中的WaitGroup。如下:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // 并发
  7. func hello(i int) {
  8. defer wg.Done()
  9. fmt.Println("hello", i)
  10. }
  11. var wg sync.WaitGroup
  12. func main() {
  13. for i := 0; i < 5; i++ {
  14. wg.Add(1)
  15. go hello(i)
  16. }
  17. fmt.Println("hello")
  18. wg.Wait()
  19. }

sync.WaitGroup是一个结构体,它只有三个方法,分别是:Add()Done()Wait()

  • Add():增加计数器
  • Done():计数器减1操作
  • Wait():只要计数器不为0则一致等待

goroutine和线程

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。

  • M:主线程,是一个物理线程,是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;
  • P:协程需要的上下文环境,P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务
  • G:协程,就是goroutine,里面除了存放本goroutine信息外 还有与所在P的绑定等信息;

他们的关系如下图:

P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行,如上图的G1和G2。当一个G长久阻塞在一个M上时,比如上图的G0处于阻塞状态,这时候runtime会新建一个M,上图中的M1,阻塞G0所在的P会把其他的G 挂载在新建的M1上去运行,上图中的G1和G2就会挂到新的M1上去执行,G0继续处于阻塞状态,当G0不阻塞了,M会被放到空间的主线程继续执行,同时G0会被唤醒,当G0执行完成或者认为其已经死掉时回收旧的M。

P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

点我了解更多

GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。

我们可以通过将任务分配到不同的CPU逻辑核心上实现并行的效果,这里举个例子:

  1. func a() {
  2. for i := 1; i < 10; i++ {
  3. fmt.Println("A:", i)
  4. }
  5. }
  6. func b() {
  7. for i := 1; i < 10; i++ {
  8. fmt.Println("B:", i)
  9. }
  10. }
  11. func main() {
  12. runtime.GOMAXPROCS(1)
  13. go a()
  14. go b()
  15. time.Sleep(time.Second)
  16. }

两个任务只有一个逻辑核心,此时是做完一个任务再做另一个任务。 将逻辑核心数设为2,此时两个任务并行执行,代码如下。

  1. func a() {
  2. for i := 1; i < 10; i++ {
  3. fmt.Println("A:", i)
  4. }
  5. }
  6. func b() {
  7. for i := 1; i < 10; i++ {
  8. fmt.Println("B:", i)
  9. }
  10. }
  11. func main() {
  12. runtime.GOMAXPROCS(2)
  13. go a()
  14. go b()
  15. time.Sleep(time.Second)
  16. }

Go语言中的操作系统线程和goroutine的关系:

  1. 一个操作系统线程对应用户态多个goroutine。
  2. go程序可以同时使用多个操作系统线程。
  3. goroutine和OS线程是多对多的关系,即m:n。

channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。

例子(我们使用goroutine来计算1-20的阶乘,并将结果保存到map中):

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // 定义一个Map用来保存计算结果
  7. var retMap = make(map[int]int)
  8. var wg sync.WaitGroup
  9. // 计算阶乘的函数
  10. func test(n int) {
  11. defer wg.Done()
  12. ret := 1
  13. for i := 1; i <= n; i++ {
  14. ret *= i
  15. }
  16. // 保存结果
  17. retMap[n] = ret
  18. }
  19. func main() {
  20. for i := 1; i <= 20; i++ {
  21. wg.Add(1)
  22. go test(i)
  23. }
  24. wg.Wait()
  25. // 打印map
  26. for i, v := range retMap {
  27. fmt.Printf("retMap[%d]=%d\n", i, v)
  28. }
  29. }

我们直接运行上面的代码,会报并发写的错误,如下:

  1. fatal error: concurrent map writes
  2. fatal error: concurrent map writes

我们在build的时候加上-race参数来查看其资源竞选情况:

  1. go build -race .\main.go

然后运行可执行文件:

  1. ......
  2. retMap[75]=0
  3. retMap[89]=0
  4. retMap[92]=0
  5. retMap[98]=0
  6. retMap[184]=0
  7. Found 1 data race(s)

从输出可以看到有一个数据在进行资源竞选。

正常的解决思路是对内存进行加锁,不让其同时写,但是这样就会造成性能问题。如下:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. // 定义一个Map用来保存计算结果
  7. var retMap = make(map[int]int)
  8. var wg sync.WaitGroup
  9. // 声明全局锁
  10. var lock sync.Mutex
  11. // 计算阶乘的函数
  12. func test(n int) {
  13. defer wg.Done()
  14. ret := 1
  15. for i := 1; i <= n; i++ {
  16. ret *= i
  17. }
  18. // 保存结果
  19. // 加锁
  20. lock.Lock()
  21. retMap[n] = ret
  22. lock.Unlock()
  23. }
  24. func main() {
  25. for i := 1; i <= 20; i++ {
  26. wg.Add(1)
  27. go test(i)
  28. }
  29. wg.Wait()
  30. // 打印map
  31. for i, v := range retMap {
  32. fmt.Printf("retMap[%d]=%d\n", i, v)
  33. }
  34. }

我们通过加锁可以解决这种竞选的问题,但是这是内存级别的加锁,对性能有很大的影响。在Go语言中还可以使用channel来解决这种竞选问题,如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

创建channel

channel是一种类型,一种引用类型。声明通道类型的格式如下:

  1. var 变量 chan 元素类型

举几个例子:

  1. var ch1 chan int // 声明一个传递整型的通道
  2. var ch2 chan bool // 声明一个传递布尔型的通道
  3. var ch3 chan []int // 声明一个传递int切片的通道

声明了什么类型的channel,就只能往里面存放相应类型的数据。

通道是引用类型,通道类型的空值是nil

  1. var ch chan int
  2. fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化之后才能使用。

创建channel的格式如下:

  1. make(chan 元素类型, [缓冲大小])

channel的缓冲大小是可选的,如果确定了缓冲区大小,是固定的,并不会动态扩容。

举几个例子:

  1. ch4 := make(chan int)
  2. ch5 := make(chan bool)
  3. ch6 := make(chan []int)

操作channel

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

现在我们先使用以下语句定义一个通道:

  1. ch := make(chan int)

发送

将一个值发送到通道中。

  1. ch <- 10 // 把10发送到ch中

接收

从一个通道中接收值。

  1. x := <- ch // 从ch中接收值并赋值给变量x
  2. <-ch // 从ch中接收值,忽略结果

关闭

我们通过调用内置的close函数来关闭通道。

  1. close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

channel操作的注意事项

  • channel只能存放指定的数据类型
  • channel的数据放满后,就不能再向里面存放数据,否则会抛出deadlock
  • 如果从channel中取出数据后,则可以继续向里存放数据
  • 在没有任何协程的情况下,如果channel里的数据取完了,再取就会报deadlock

无缓冲的channel

上面介绍过在用make初始化channel的时候可以指定缓冲区大小,也可以不指定。如果不指定其就为无缓冲区的channel。

如下:

  1. package main
  2. import "fmt"
  3. func main() {
  4. // 声明一个无缓冲的channel
  5. ch := make(chan int)
  6. // 向里面存放一个数据
  7. ch <- 10
  8. fmt.Println(ch)
  9. }

这时候我们运行代码会报如下错误:

  1. fatal error: all goroutines are asleep - deadlock!

这是因为无缓冲的channel是没办法直接给它发送值的,必须要有地方接受channel的值才行。

如下我们开启一个goroutine来接受值。

  1. package main
  2. import "fmt"
  3. func getRet(ch chan int) {
  4. ret := <-ch
  5. fmt.Println(ret)
  6. }
  7. func main() {
  8. // 声明一个无缓冲的channel
  9. ch := make(chan int)
  10. // 启动 一个goroutine去接受值
  11. go getRet(ch)
  12. // 向里面存放一个数据
  13. ch <- 10
  14. }

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道

有缓冲channel

解决上面问题的方法还有一种就是使用有缓冲区的通道。我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:

  1. func main() {
  2. ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
  3. ch <- 10
  4. fmt.Println("发送成功")
  5. }

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。如果缓冲区满了则不能继续向里面存放数据,只有等别人从缓冲区取走数据后才能继续向里面存放数。

我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。

单向channel

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

Go语言中提供了单向通道来处理这种情况。

如下即为只能向channel里存数据:

  1. package main
  2. import "fmt"
  3. func testChan(x chan<- int) {
  4. for i := 1; i < 3; i++ {
  5. x <- i
  6. }
  7. }
  8. func main() {
  9. // 声明一个channel
  10. ch1 := make(chan int, 3)
  11. testChan(ch1)
  12. }

如果我们在testChan函数中加上如下代码读取channel

  1. for i := range x {
  2. fmt.Println(i)
  3. }

则会抛出一下错误:

  1. // invalid operation: range x (receive from send-only type chan<- int)

上面写的是只能向channel里写入数据,只能读取类似,代码如下:

  1. package main
  2. import "fmt"
  3. func readChan(in <-chan int) {
  4. for i := range in {
  5. fmt.Println(i)
  6. }
  7. }
  8. func writeChan(in chan<- int) {
  9. for i := 1; i < 4; i++ {
  10. in <- i
  11. }
  12. close(in)
  13. }
  14. func main() {
  15. in := make(chan int, 3)
  16. go writeChan(in)
  17. readChan(in)
  18. }

其中:

  • chan<- int是一个只写单向通道(只能对其写入int类型值),可以对其执行发送操作但是不能执行接收操作;
  • <-chan int是一个只读单向通道(只能从其读取int类型值),可以对其执行接收操作但是不能执行发送操作。

在函数传参及任何赋值操作中可以将双向通道转换为单向通道,但反过来是不可以的。

用for range 从channel取值

当向通道中发送完数据时,我们可以通过close函数来关闭通道。

当通道被关闭时,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值,再然后取到的值一直都是对应类型的零值。那如何判断一个通道是否被关闭了呢?

我们来看下面这个例子:

  1. package main
  2. import "fmt"
  3. func main() {
  4. // 声明channel
  5. ch1 := make(chan int, 10)
  6. ch2 := make(chan int, 10)
  7. // 启动goroutine向里写值
  8. go func() {
  9. for i := 0; i < 10; i++ {
  10. ch1 <- i
  11. }
  12. close(ch1)
  13. }()
  14. // 启动goroutine,取ch1存进ch2
  15. go func() {
  16. for {
  17. i, ok := <-ch1
  18. if !ok {
  19. fmt.Println("ch1中的数据已取完")
  20. break
  21. }
  22. ch2 <- i
  23. }
  24. close(ch2)
  25. }()
  26. // 输出ch2
  27. for i := range ch2 {
  28. fmt.Println(i)
  29. }
  30. }

从上面的例子中我们看到有两种方式在接收值的时候判断该通道是否被关闭,不过我们通常使用的是for range的方式。使用for range遍历通道,当通道被关闭的时候就会退出for range

注意事项:

  • 在遍历时,如果channel没有关闭,则会出现deadlock
  • 在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后会自动关闭

接口类型的channel

使用接口类型的channel可以向里面存任何类型的数据,如下:

  1. package main
  2. import "fmt"
  3. type cat struct {
  4. name string
  5. age int
  6. }
  7. func main() {
  8. // 定义接口类型的channel
  9. allChan := make(chan interface{}, 5)
  10. allChan <- 10 // 向里面存Int类型的值
  11. allChan <- "hello" // 向里面存string类型的值
  12. newCat := cat{name: "咖啡猫", age: 80}
  13. allChan <- newCat // 向里面存结构体类型的值
  14. newSlice := []int{1, 2, 3}
  15. allChan <- newSlice // 向里面存切片类型的值
  16. newMap := map[string]int{
  17. "apple": 10,
  18. }
  19. allChan <- newMap // 向里面存map类型的值
  20. // 第一此取值
  21. ret1 := <-allChan
  22. fmt.Println("ret1:", ret1)
  23. // 第二次取值
  24. ret2 := <-allChan
  25. fmt.Println("ret2:", ret2)
  26. // 第三次取值
  27. ret3 := <-allChan
  28. fmt.Println("ret3:", ret3)
  29. // 第三次取值
  30. ret4 := <-allChan
  31. fmt.Println("ret4:", ret4)
  32. // 第五次取值
  33. ret5 := <-allChan
  34. fmt.Println("ret5:", ret5)
  35. }

其输出如下:

  1. ret1: 10
  2. ret2: hello
  3. ret3: {咖啡猫 80}
  4. ret4: [1 2 3]
  5. ret5: map[apple:10]

但是如果我们直接取结构体的某个键,可以直接取吗?如下:

  1. package main
  2. import "fmt"
  3. type dog struct {
  4. name string
  5. age int
  6. }
  7. func main() {
  8. aChan := make(chan interface{}, 1)
  9. d := dog{
  10. name: "小黑",
  11. age: 10,
  12. }
  13. aChan <- d
  14. ret := <-aChan
  15. fmt.Println(ret.name)
  16. }

如果向上面这样直接取值则编译不过去,会panic,如下:

  1. ret.name undefined (type interface {} is interface with no methods)

其意思就是说interface没有name这个方法。这是为什么呢?

因为从编译层面来说,编译器依然认为ret是一个interface类型,而不是struct类型,而且上面定义的interface是没有name这个字段的,所以编译会报错。

解决办法即为做类型断言,如下:

  1. package main
  2. import "fmt"
  3. type dog struct {
  4. name string
  5. age int
  6. }
  7. func main() {
  8. aChan := make(chan interface{}, 1)
  9. d := dog{
  10. name: "小黑",
  11. age: 10,
  12. }
  13. aChan <- d
  14. ret := <-aChan
  15. ret1 := ret.(dog)
  16. fmt.Println(ret1.name)
  17. }

channel总结

channel常见的异常总结,如下图:第十六章 并发 - 图1

关闭已经关闭的channel也会引发panic

select

如果我们需要从多个channel里读取数据,可以使用select,它的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:

  1. select{
  2. case <-ch1:
  3. ...
  4. case data := <-ch2:
  5. ...
  6. case ch3<-data:
  7. ...
  8. default:
  9. 默认操作
  10. }

l例子:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var wg sync.WaitGroup
  7. func insertData(ch1 chan int, num int) {
  8. for i := 0; i < num; i++ {
  9. ch1 <- i
  10. }
  11. defer wg.Done()
  12. }
  13. func main() {
  14. ch1 := make(chan int, 10)
  15. ch2 := make(chan int, 20)
  16. wg.Add(2)
  17. go insertData(ch1, 10)
  18. go insertData(ch2, 20)
  19. wg.Wait()
  20. // 循环取
  21. for {
  22. select {
  23. case v := <-ch1:
  24. fmt.Printf("从ch1取出数据,其值为:%d\n", v)
  25. case v := <-ch2:
  26. fmt.Printf("从ch2取出数据,其值为:%d\n", v)
  27. default:
  28. fmt.Printf("没有数据了\n")
  29. }
  30. }
  31. }

如果取完了就会一直打印default语句。

如果要退出可以使用如下方式:

(1)、用label标签

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var wg sync.WaitGroup
  7. func insertData(ch1 chan int, num int) {
  8. for i := 0; i < num; i++ {
  9. ch1 <- i
  10. }
  11. defer wg.Done()
  12. }
  13. func main() {
  14. ch1 := make(chan int, 10)
  15. ch2 := make(chan int, 20)
  16. wg.Add(2)
  17. go insertData(ch1, 10)
  18. go insertData(ch2, 20)
  19. // 循环取
  20. wg.Wait()
  21. label:
  22. for {
  23. select {
  24. case v := <-ch1:
  25. fmt.Printf("从ch1取出数据,其值为:%d\n", v)
  26. case v := <-ch2:
  27. fmt.Printf("从ch2取出数据,其值为:%d\n", v)
  28. default:
  29. fmt.Printf("没有数据了\n")
  30. break label
  31. }
  32. }
  33. }

(2)、用return语句,用return会退出当前所在函数

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var wg sync.WaitGroup
  7. func insertData(ch1 chan int, num int) {
  8. for i := 0; i < num; i++ {
  9. ch1 <- i
  10. }
  11. defer wg.Done()
  12. }
  13. func main() {
  14. ch1 := make(chan int, 10)
  15. ch2 := make(chan int, 20)
  16. wg.Add(2)
  17. go insertData(ch1, 10)
  18. go insertData(ch2, 20)
  19. // 循环取
  20. wg.Wait()
  21. for {
  22. select {
  23. case v := <-ch1:
  24. fmt.Printf("从ch1取出数据,其值为:%d\n", v)
  25. case v := <-ch2:
  26. fmt.Printf("从ch2取出数据,其值为:%d\n", v)
  27. default:
  28. fmt.Printf("没有数据了\n")
  29. return
  30. }
  31. }
  32. }

使用select语句能提高代码的可读性。

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个。
  • 对于没有caseselect{}会一直等待,可用于阻塞main函数。

并发安全与锁

注:锁是一个结构体类型,当作为参数传递的时候须用指针类型

在使用goroutine的时候,如果多个goroutine同时对一个资源进行操作,可能会造成竞态问题

比如:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var x = 0
  7. var wg sync.WaitGroup
  8. func add() {
  9. for i := 0; i < 50000; i++ {
  10. x++
  11. }
  12. wg.Done()
  13. }
  14. func main() {
  15. wg.Add(2)
  16. go add()
  17. go add()
  18. wg.Wait()
  19. fmt.Println(x)
  20. }

运行代码,结果如下:

  1. PS E:\DEV\Go\src\code.rookieops.com\day07\14lock> go run .\main.go
  2. 65768
  3. PS E:\DEV\Go\src\code.rookieops.com\day07\14lock> go run .\main.go
  4. 69379

可以看到其输出的结果并不是我们想要的,这时候就可以引入锁机制。

互斥锁

互斥锁也叫排他锁。当使用这种的锁的时候,它能保证同时只有一个goroutine可以访问共享资源。

Go语言中使用sync.Mutex来实现互斥锁。

如下对上段代码进行重构:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var x = 0
  7. var wg sync.WaitGroup
  8. var lock sync.Mutex
  9. func add() {
  10. for i := 0; i < 50000; i++ {
  11. lock.Lock()
  12. x++
  13. lock.Unlock()
  14. }
  15. wg.Done()
  16. }
  17. func main() {
  18. wg.Add(2)
  19. go add()
  20. go add()
  21. wg.Wait()
  22. fmt.Println(x)
  23. }

然后就能得到我们想要的结果。

读写互斥锁

在一种特殊场景下,比如新闻网站这种读多写少的场景下,用互斥锁的话就会很浪费性能,Go语言提供了一种读写互斥锁,是用sync.RWMutx来实现。

我们写两个对比示例:

(1)、用互斥锁来模拟这种场景

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var (
  8. x = 0
  9. lock sync.Mutex
  10. wg sync.WaitGroup
  11. rwlock sync.RWMutex
  12. )
  13. func write() {
  14. defer wg.Done()
  15. lock.Lock()
  16. // rwlock.Lock()
  17. x++
  18. lock.Unlock()
  19. // rwlock.Unlock()
  20. }
  21. func read() {
  22. defer wg.Done()
  23. lock.Lock()
  24. // rwlock.RLock()
  25. fmt.Println(x)
  26. lock.Unlock()
  27. // rwlock.RUnlock()
  28. }
  29. func main() {
  30. start := time.Now()
  31. for i := 0; i < 10; i++ {
  32. wg.Add(1)
  33. go write()
  34. }
  35. for i := 0; i < 200; i++ {
  36. wg.Add(1)
  37. go read()
  38. }
  39. wg.Wait()
  40. fmt.Println(time.Now().Sub(start))
  41. }

输出的执行时间为:

  1. 156.5827ms

(2)、用读写互斥锁来模拟上面场景

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. var (
  8. x = 0
  9. lock sync.Mutex
  10. wg sync.WaitGroup
  11. rwlock sync.RWMutex
  12. )
  13. func write() {
  14. defer wg.Done()
  15. // lock.Lock()
  16. rwlock.Lock()
  17. x++
  18. // lock.Unlock()
  19. rwlock.Unlock()
  20. }
  21. func read() {
  22. defer wg.Done()
  23. // lock.Lock()
  24. rwlock.RLock()
  25. fmt.Println(x)
  26. // lock.Unlock()
  27. rwlock.RUnlock()
  28. }
  29. func main() {
  30. start := time.Now()
  31. for i := 0; i < 10; i++ {
  32. wg.Add(1)
  33. go write()
  34. }
  35. for i := 0; i < 200; i++ {
  36. wg.Add(1)
  37. go read()
  38. }
  39. wg.Wait()
  40. fmt.Println(time.Now().Sub(start))
  41. }

输出时间为:

  1. 91.7481ms

可以看到这两种的区别。

sync.Once

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once

sync.Once只有一个Do方法,其签名如下:

  1. func (o *Once) Do(f func()) {}

备注:如果要执行的函数f需要传递参数就需要搭配闭包来使用。

示例1:使用sync.Once来关闭channel

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. var (
  7. once sync.Once
  8. wg sync.WaitGroup
  9. )
  10. func test(ch1 chan int) {
  11. for i := 0; i < 10; i++ {
  12. ch1 <- i
  13. }
  14. once.Do(func() { defer close(ch1) })
  15. defer wg.Done()
  16. }
  17. func main() {
  18. wg.Add(1)
  19. ch1 := make(chan int, 10)
  20. go test(ch1)
  21. wg.Wait()
  22. for v := range ch1 {
  23. fmt.Println(v)
  24. }
  25. }

示例2:用sync.Once来加载配置文件示例

  1. var icons map[string]image.Image
  2. var loadIconsOnce sync.Once
  3. func loadIcons() {
  4. icons = map[string]image.Image{
  5. "left": loadIcon("left.png"),
  6. "up": loadIcon("up.png"),
  7. "right": loadIcon("right.png"),
  8. "down": loadIcon("down.png"),
  9. }
  10. }
  11. // Icon 是并发安全的
  12. func Icon(name string) image.Image {
  13. loadIconsOnce.Do(loadIcons)
  14. return icons[name]
  15. }

示例3:用sync.Once来实现并发安全的单例模式

  1. package singleton
  2. import (
  3. "sync"
  4. )
  5. type singleton struct {}
  6. var instance *singleton
  7. var once sync.Once
  8. func GetInstance() *singleton {
  9. once.Do(func() {
  10. instance = &singleton{}
  11. })
  12. return instance
  13. }

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

如果要并发操作Map的话可以用sync.Map,它是一个开箱即用的Map,只用它的时候不需要再用make进行初始化了。sync.Map内置了一些常用方法,比如:SotreloadDeleteRange等。

示例:

  1. package main
  2. import (
  3. "fmt"
  4. "strconv"
  5. "sync"
  6. )
  7. var (
  8. m sync.Map
  9. wg sync.WaitGroup
  10. )
  11. func main() {
  12. for i := 0; i < 20; i++ {
  13. wg.Add(1)
  14. go func(n int) {
  15. key := strconv.Itoa(n)
  16. // 存值
  17. m.Store(key, n)
  18. // 取值
  19. value, _ := m.Load(key)
  20. fmt.Printf("key: %s, value: %d\n", key, value)
  21. defer wg.Done()
  22. }(i)
  23. }
  24. wg.Wait()
  25. }

原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

其操作有:

(1)、读取操作

  1. func LoadInt32(addr *int32) (val int32)
  2. func LoadInt64(addr *int64) (val int64)
  3. func LoadUint32(addr *uint32) (val uint32)
  4. func LoadUint64(addr *uint64) (val uint64)
  5. func LoadUintptr(addr *uintptr) (val uintptr)
  6. func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

(2)、写入操作

  1. func StoreInt32(addr *int32, val int32)
  2. func StoreInt64(addr *int64, val int64)
  3. func StoreUint32(addr *uint32, val uint32)
  4. func StoreUint64(addr *uint64, val uint64)
  5. func StoreUintptr(addr *uintptr, val uintptr)
  6. func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

(3)、修改操作

  1. func AddInt32(addr *int32, delta int32) (new int32)
  2. func AddInt64(addr *int64, delta int64) (new int64)
  3. func AddUint32(addr *uint32, delta uint32) (new uint32)
  4. func AddUint64(addr *uint64, delta uint64) (new uint64)
  5. func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

(4)、交换操作

  1. func SwapInt32(addr *int32, new int32) (old int32)
  2. func SwapInt64(addr *int64, new int64) (old int64)
  3. func SwapUint32(addr *uint32, new uint32) (old uint32)
  4. func SwapUint64(addr *uint64, new uint64) (old uint64)
  5. func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
  6. func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

(5)、比较并交换操作

  1. func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
  2. func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
  3. func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
  4. func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
  5. func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
  6. func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)