1.goroutine

goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。
Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。
Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字
一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

2.启动单个goroutine

串行运行

  1. //串行
  2. func hello() {
  3. fmt.Println("Hello World!!!")
  4. }
  5. func main() {
  6. hello()
  7. fmt.Println("main")
  8. }

启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字
当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束

  1. func hello() {
  2. fmt.Println("Hello World!!!")
  3. }
  4. func main() {
  5. go hello()
  6. fmt.Println("main")
  7. }

我们在创建新的goroutine的时候需要花费一些时间,而此时main函数所在的goroutine是继续执行的。

  1. func hello() {
  2. fmt.Println("Hello World!!!")
  3. }
  4. func main() {
  5. go hello()
  6. fmt.Println("main")
  7. time.Sleep(time.Second)
  8. }
  9. //main
  10. //Hello World!!!

3.启动多个goroutine

sync.WaitGroup来实现多个goroutine的同步

  1. import (
  2. "fmt"
  3. "sync"
  4. )
  5. var wg sync.WaitGroup
  6. func hello(i int) {
  7. defer wg.Done() //goroutine结束就登记-1
  8. fmt.Println("Hello",i)
  9. }
  10. func main() {
  11. for i := 0; i < 10; i++ {
  12. wg.Add(1) //启动一个goroutine就登记+1
  13. go hello(i)
  14. }
  15. //time.Sleep(time.Second)
  16. wg.Wait() //等待所有登记的goroutine都结束
  17. }

4.goroutine与线程

4.1可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine也是可以的。

4.2goroutine调度

GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。

  • G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
  • P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
  • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

4.3GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

  1. import (
  2. "fmt"
  3. "runtime"
  4. "time"
  5. )
  6. func a() {
  7. for i := 0; i < 10; i++ {
  8. fmt.Println("A:",i)
  9. }
  10. }
  11. func b() {
  12. for i := 0; i < 10; i++ {
  13. fmt.Println("B",i)
  14. }
  15. }
  16. func main() {
  17. //两个任务只有一个逻辑核心
  18. //此时是做完一个任务再做另一个任务
  19. runtime.GOMAXPROCS(1) //一个逻辑核心
  20. go a()
  21. go b()
  22. time.Sleep(time.Second)
  23. }

Go语言中的操作系统线程和goroutine的关系:

  1. 一个操作系统线程对应用户态多个goroutine。
  2. go程序可以同时使用多个操作系统线程。
  3. goroutine和OS线程是多对多的关系,即m:n。

5.channel

Go语言的并发模型是CSP(Communicating Sequential Processes),
提倡通过通信共享内存而不是通过共享内存而实现通信
goroutine是Go程序并发的执行体,channel就是它们之间的连接。
channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
goroutine是Go程序并发的执行体,channel就是它们之间的连接。
channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
通道(channel)是一种特殊的类型。通道像一个传送带或者队列,
总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。

5.1channel类型

channel是一种类型,一种引用类型。

  1. var ch1 chan int // 声明一个传递整型的通道
  2. var ch2 chan bool // 声明一个传递布尔型的通道
  3. var ch3 chan []int // 声明一个传递int切片的通道

5.2创建channel

通道是引用类型,通道类型的空值是nil

  1. var ch chan int
  2. fmt.Println(ch) //<nil>
  3. //声明的通道后需要使用make函数初始化之后才能使用
  1. make(chan 元素类型, [缓冲大小])
  1. ch := make(chan int)
  2. fmt.Println(ch) //0xc000048060

5.3channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。

  1. ch := make(chan int)

将一个值发送到通道中。

  1. ch <- 10 // 把10发送到ch中

从一个通道中接收值。

  1. x := <- ch // 从ch中接收值并赋值给变量x
  2. <-ch // 从ch中接收值,忽略结果

我们通过调用内置的close函数来关闭通道。

  1. close(ch)

5.4无缓冲的通道

无缓冲的通道又称为阻塞的通道

  1. ch := make(chan int)
  2. ch <- 10
  3. fmt.Println("发送成功")

image.png
ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。

  1. func reviver(c chan int) {
  2. r := <-c
  3. fmt.Println(r)
  4. }
  5. func main() {
  6. ch := make(chan int)
  7. go reviver(ch) //启用goroutine从通道接收值
  8. ch <- 10
  9. fmt.Println("发送成功")
  10. }

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

5.5有缓冲的通道

使用make函数初始化通道的时候为其指定通道的容量
只要通道的容量大于零,那么该通道就是有缓冲的通道

  1. ch := make(chan int,1) // 创建一个容量为1的有缓冲区通道
  2. ch <- 10
  3. fmt.Println("发送成功")

5.6通道循环取值

  1. func main() {
  2. ch1 := make(chan int)
  3. ch2 := make(chan int)
  4. go func() {
  5. //开启goroutine将0~100的数发送到ch1
  6. for i := 0; i < 100; i++ {
  7. ch1 <- i
  8. }
  9. close(ch1)
  10. }()
  11. go func() {
  12. for {
  13. i,ok := <-ch1
  14. if !ok {
  15. break
  16. }
  17. ch2 <- i * i
  18. }
  19. close(ch2)
  20. }()
  21. for i := range ch2 {
  22. fmt.Println(i)
  23. }
  24. //使用for range遍历通道
  25. //当通道被关闭的时候就会退出for range。
  26. }

5.7单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

  1. func counter(out chan <- int) {
  2. for i := 0; i < 100; i++ {
  3. out <- i
  4. }
  5. close(out)
  6. }
  7. func squarer(out chan <- int,in <- chan int){
  8. for i := range in {
  9. out <- i * i
  10. }
  11. close(out)
  12. }
  13. func printer(in <-chan int) {
  14. for i := range in {
  15. fmt.Println(i)
  16. }
  17. }
  18. func main() {
  19. ch1 := make(chan int)
  20. ch2 := make(chan int)
  21. go counter(ch1)
  22. go squarer(ch2,ch1)
  23. printer(ch2)
  24. }
  • chan<- int是一个只写单向通道,可以对其执行发送操作但是不能执行接收操作;
  • <-chan int是一个只读单向通道,可以对其执行接收操作但是不能执行发送操作。

6.goroutine池

可以指定启动的goroutine数量–worker pool模式,控制goroutine的数量

  1. func worker(id int,jobs <- chan int,results chan <- int) {
  2. for job := range jobs {
  3. fmt.Printf("worker:%d start:%d\n",id,job)
  4. results <- job * 3
  5. time.Sleep(time.Second)
  6. fmt.Printf("worker:%d end:%d\n",id,job)
  7. }
  8. }
  9. func main() {
  10. jobs := make(chan int,100)
  11. results := make(chan int,100)
  12. for i := 0; i < 3; i++ {
  13. go worker(i,jobs,results)
  14. }
  15. for j := 0; j < 5; j++ {
  16. jobs <- j
  17. }
  18. close(jobs)
  19. for i := 0; i < 5; i++ {
  20. result :=<-results
  21. fmt.Println("result:",result)
  22. }
  23. close(results)
  24. }

7.select多路复用

Go内置了select关键字,可以同时响应多个通道的操作。

  1. ch := make(chan int,1)
  2. for i := 0; i < 10; i++ {
  3. select {
  4. case ch <- i:
  5. case x:=<-ch:
  6. fmt.Println(x)
  7. }
  8. }

使用select语句能提高代码的可读性。

  • 可处理一个或多个channel的发送/接收操作。
  • 如果多个case同时满足,select会随机选择一个。
  • 对于没有case的select{}会一直等待,可用于阻塞main函数

    8.并发安全和锁

    Go代码中可能会存在多个goroutine同时操作一个资源(临界区)
    Go语言中使用sync包的Mutex类型来实现互斥锁。 ```go var x int64 var lock sync.Mutex var wg sync.WaitGroup

func add() { for i := 0; i < 50; i++ { lock.Lock() //加锁 x = x + 1 lock.Unlock() //解锁 } wg.Done() } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) }

  1. 使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。
  2. <a name="lkIE9"></a>
  3. # 9.读写互斥锁
  4. 互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。<br />读写锁在Go语言中使用sync包中的RWMutex类型
  5. ```go
  6. import (
  7. "fmt"
  8. "sync"
  9. "time"
  10. )
  11. var(
  12. x int64
  13. wg sync.WaitGroup
  14. lock sync.Mutex
  15. rwlock sync.RWMutex
  16. )
  17. func write() {
  18. //lock.Lock() //加互斥锁
  19. rwlock.Lock() // 加写锁
  20. x = x + 1
  21. time.Sleep(10 * time.Millisecond) //假设读操作耗时10毫秒
  22. rwlock.Unlock() //解写锁
  23. //lock.Unlock() //解互斥锁
  24. wg.Done()
  25. }
  26. func read(){
  27. //lock.Lock() //加互斥锁
  28. rwlock.RLock() //加读锁
  29. time.Sleep(time.Millisecond)
  30. rwlock.RUnlock() //解读锁
  31. //lock.Unlock() //解互斥锁
  32. wg.Done()
  33. }
  34. func main() {
  35. start := time.Now()
  36. for i := 0; i < 100; i++ {
  37. wg.Add(1)
  38. go write()
  39. }
  40. for i := 0; i < 100; i++ {
  41. wg.Add(1)
  42. go read()
  43. }
  44. wg.Wait()
  45. end := time.Now()
  46. fmt.Println(end.Sub(start))
  47. }

10.sync.WaitGroup

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。

  1. import (
  2. "fmt"
  3. "sync"
  4. )
  5. var wg sync.WaitGroup
  6. func hello() {
  7. defer wg.Done()
  8. fmt.Println("Go!!!")
  9. }
  10. func main() {
  11. wg.Add(1)
  12. go hello() //启动goroutine
  13. fmt.Println("main!!!")
  14. wg.Wait()
  15. }

11.sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次
例如只加载一次配置文件、只关闭一次通道等。sync包中提供了一个针对只执行一次场景的解决方案sync.Once

  1. //sync.Once实现的并发安全的单例模式
  2. type singleton struct {}
  3. var instance *singleton
  4. var once sync.Once
  5. func GetInstance() *singleton {
  6. once.Do(func() {
  7. instance = &singleton{}
  8. })
  9. return instance
  10. }

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

12.sync.Map

Go语言中内置的map不是并发安全的

  1. import (
  2. "fmt"
  3. "strconv"
  4. "sync"
  5. )
  6. var m = make(map[string]int)
  7. func get(key string) int {
  8. return m[key]
  9. }
  10. func set(key string, value int) {
  11. m[key] = value
  12. }
  13. func main() {
  14. wg := sync.WaitGroup{}
  15. for i := 0; i < 5; i++ {
  16. wg.Add(1)
  17. go func(n int) {
  18. key := strconv.Itoa(n)
  19. set(key,n)
  20. fmt.Printf("k=%v,v=%v\n",key,get(key))
  21. wg.Done()
  22. }(i)
  23. }
  24. wg.Wait()
  25. }

上面的代码开启少量几个goroutine的时候可能没什么问题
当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误
Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。
开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。

  1. var m = sync.Map{}
  2. func main() {
  3. wg := sync.WaitGroup{}
  4. for i := 0; i < 20; i++ {
  5. wg.Add(1)
  6. go func(n int) {
  7. key := strconv.Itoa(n)
  8. m.Store(key,n)
  9. value, _ := m.Load(key)
  10. fmt.Printf("k=%v v=%v\n",key,value)
  11. wg.Done()
  12. }(i)
  13. }
  14. wg.Wait()
  15. }

同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。

13.原子操作

锁机制的底层是基于原子操作的,其一般直接通过CPU指令实现。
Go语言中原子操作由内置的标准库sync/atomic提供

方法 解释
func LoadInt32(addr int32) (val int32)
func LoadInt64(addr
int64) (val int64)
func LoadUint32(addr uint32) (val uint32)
func LoadUint64(addr
uint64) (val uint64)
func LoadUintptr(addr uintptr) (val uintptr)
func LoadPointer(addr
unsafe.Pointer) (val unsafe.Pointer)
读取操作
func StoreInt32(addr int32, val int32)
func StoreInt64(addr
int64, val int64)
func StoreUint32(addr uint32, val uint32)
func StoreUint64(addr
uint64, val uint64)
func StoreUintptr(addr uintptr, val uintptr)
func StorePointer(addr
unsafe.Pointer, val unsafe.Pointer)
写入操作
func AddInt32(addr int32, delta int32) (new int32)
func AddInt64(addr
int64, delta int64) (new int64)
func AddUint32(addr uint32, delta uint32) (new uint32)
func AddUint64(addr
uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
修改操作
func SwapInt32(addr int32, new int32) (old int32)
func SwapInt64(addr
int64, new int64) (old int64)
func SwapUint32(addr uint32, new uint32) (old uint32)
func SwapUint64(addr
uint64, new uint64) (old uint64)
func SwapUintptr(addr uintptr, new uintptr) (old uintptr)
func SwapPointer(addr
unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
交换操作
func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr
int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr
uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr
unsafe.Pointer, old, new unsafe.Pointer)
(swapped bool)
比较并交换操作
  1. import (
  2. "fmt"
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. type Counter interface {
  8. Ioc()
  9. Load() int64
  10. }
  11. //普通版
  12. type CommonCounter struct {
  13. counter int64
  14. }
  15. func (c *CommonCounter) Ioc() {
  16. c.counter++
  17. }
  18. func (c *CommonCounter) Load() int64 {
  19. return c.counter
  20. }
  21. type MutexCounter struct {
  22. counter int64
  23. lock sync.Mutex
  24. }
  25. func (m *MutexCounter) Ioc() {
  26. m.lock.Lock()
  27. defer m.lock.Unlock()
  28. m.counter++
  29. }
  30. func (m *MutexCounter) Load() int64{
  31. m.lock.Lock()
  32. defer m.lock.Unlock()
  33. m.counter++
  34. return m.counter
  35. }
  36. //原子操作版
  37. type AtomicCounter struct {
  38. counter int64
  39. }
  40. func (a *AtomicCounter) Ioc() {
  41. atomic.AddInt64(&a.counter,1)
  42. }
  43. func (a *AtomicCounter) Load() int64 {
  44. return atomic.LoadInt64(&a.counter)
  45. }
  46. func test(c Counter) {
  47. var wg sync.WaitGroup
  48. start := time.Now()
  49. for i := 0; i < 10000; i++ {
  50. wg.Add(1)
  51. go func() {
  52. c.Ioc()
  53. wg.Done()
  54. }()
  55. }
  56. wg.Wait()
  57. end := time.Now()
  58. fmt.Println(c.Load() , end.Sub(start))
  59. }
  60. func main() {
  61. c1 := CommonCounter{}
  62. test(&c1)
  63. c2 := MutexCounter{}
  64. test(&c2)
  65. c3 := AtomicCounter{}
  66. test(&c3)
  67. }

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。
这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。