sync.WaitGroup

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

  1. var wg sync.WaitGroup
  2. func hello() {
  3. defer wg.Done()
  4. fmt.Println("Hello Goroutine!")
  5. }
  6. func main() {
  7. wg.Add(1)
  8. go hello() // 启动另外一个goroutine去执行hello函数
  9. fmt.Println("main goroutine done!")
  10. wg.Wait()
  11. }

sync.Once

  1. func (o *Once) Do(f func()) {

sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。但也有所不同。

  • init 函数是在文件包首次被加载的时候执行,且只执行一次
  • sync.Onc 是在代码运行中需要的时候执行,且只执行一次

当一个函数不希望程序在一开始的时候就被执行的时候,可以使用 sync.Once
例如:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. var once sync.Once
  8. onceBody := func() {
  9. fmt.Println("Only once")
  10. }
  11. done := make(chan bool)
  12. for i := 0; i < 10; i++ {
  13. go func() {
  14. once.Do(onceBody)
  15. done <- true
  16. }()
  17. }
  18. for i := 0; i < 10; i++ {
  19. <-done
  20. }
  21. }

sync.Once 使用变量 done 来记录函数的执行状态,使用 sync.Mutexsync.atomic 来保证线程安全的读取 done

  1. // Once is an object that will perform exactly one action.
  2. type Once struct {
  3. // done indicates whether the action has been performed.
  4. // It is first in the struct because it is used in the hot path.
  5. // The hot path is inlined at every call site.
  6. // Placing done first allows more compact instructions on some architectures (amd64/x86),
  7. // and fewer instructions (to calculate offset) on other architectures.
  8. done uint32
  9. m Mutex
  10. }
  11. // Do calls the function f if and only if Do is being called for the
  12. // first time for this instance of Once. In other words, given
  13. // var once Once
  14. // if once.Do(f) is called multiple times, only the first call will invoke f,
  15. // even if f has a different value in each invocation. A new instance of
  16. // Once is required for each function to execute.
  17. //
  18. // Do is intended for initialization that must be run exactly once. Since f
  19. // is niladic, it may be necessary to use a function literal to capture the
  20. // arguments to a function to be invoked by Do:
  21. // config.once.Do(func() { config.init(filename) })
  22. //
  23. // Because no call to Do returns until the one call to f returns, if f causes
  24. // Do to be called, it will deadlock.
  25. //
  26. // If f panics, Do considers it to have returned; future calls of Do return
  27. // without calling f.
  28. //
  29. func (o *Once) Do(f func()) {
  30. // Note: Here is an incorrect implementation of Do:
  31. //
  32. // if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
  33. // f()
  34. // }
  35. //
  36. // Do guarantees that when it returns, f has finished.
  37. // This implementation would not implement that guarantee:
  38. // given two simultaneous calls, the winner of the cas would
  39. // call f, and the second would return immediately, without
  40. // waiting for the first's call to f to complete.
  41. // This is why the slow path falls back to a mutex, and why
  42. // the atomic.StoreUint32 must be delayed until after f returns.
  43. if atomic.LoadUint32(&o.done) == 0 {
  44. // Outlined slow-path to allow inlining of the fast-path.
  45. o.doSlow(f)
  46. }
  47. }
  48. func (o *Once) doSlow(f func()) {
  49. o.m.Lock()
  50. defer o.m.Unlock()
  51. if o.done == 0 {
  52. defer atomic.StoreUint32(&o.done, 1)
  53. f()
  54. }
  55. }

这里 done 是一个状态位,用于判断变量是否初始化完成,其有效值是:

  • 0: 函数 f 尚未执行或执行中,Once对象创建时 done 默认值就是0
  • 1: 函数 f 已经执行结束,保证 f 不会被再次执行

而 m Mutex 用于控制临界区的进入,保证同一时间点最多有一个 f在执行。
done 在 m.Lock() 前后的两次校验都是必要的。

Mutex 只能保证临界区内的操作是可观测的 即只有处于o.m.Lock() 和 defer o.m.Unlock()之间的代码

在 Scala 里,有一个关键词 lazy,实现了 sync.Once 同样的功能。具体实现上,早期版本使用了 volatile 修饰状态变量 done,使用 synchronized 替代 m Mutex;后来,也改成了基于CAS的方式。
使用体验上,显然 lazy 更香!

sync.Map

Go语言中内置的map不是并发安全的

实现

  • 空间换时间。通过冗余的两个数据结构(只读的 read 字段、可写的 dirty),来减少加锁对性能的影响。对只读字段(read)的操作不需要加锁。
  • 优先从 read 字段读取、更新、删除,因为对 read 字段的读取不需要锁。动态调整。miss 次数多了之后,将 dirty 数据提升为 read,避免总是从 dirty 中加锁读取。
  • double-checking。加锁之后先还要再检查 read 字段,确定真的不存在才操作 dirty 字段。
  • 延迟删除。删除一个键值只是打标记,只有在提升 dirty 字段为 read 字段的时候才清理删除的数据。 ```go import ( “fmt” “strconv” “sync” )

//以下的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。

var m = make(map[string]int)

func get(key string) int { return m[key] }

func set(key string, value int) { m[key] = value }

func main() { wg := sync.WaitGroup{} for i := 0; i < 200; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) set(key, n) fmt.Printf(“k=:%v,v:=%v\n”, key, get(key)) wg.Done() }(i) } wg.Wait() }

  1. 并发安全版mapsync.Map
  2. ```go
  3. var m = sync.Map{}
  4. func main() {
  5. wg := sync.WaitGroup{}
  6. for i := 0; i < 20; i++ {
  7. wg.Add(1)
  8. go func(n int) {
  9. key := strconv.Itoa(n)
  10. m.Store(key, n)
  11. value, _ := m.Load(key)
  12. fmt.Printf("k=:%v,v:=%v\n", key, value)
  13. wg.Done()
  14. }(i)
  15. }
  16. wg.Wait()
  17. }

sync.Mutex 同步锁

Go 语言的 sync.Mutex 由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

  1. type Mutex struct {
  2. state int32
  3. sema uint32
  4. }

互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLockedmutexWokenmutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:

image.png

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被从唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;

正常模式和饥饿模式

在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。

在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

互斥锁的加锁是靠 sync.Mutex.Lock 完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock 方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked 位置成 1:

  1. func (m *Mutex) Lock() {
  2. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  3. return
  4. }
  5. m.lockSlow()
  6. }

如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:

  1. 判断当前 Goroutine 能否进入自旋;
  2. 通过自旋等待互斥锁的释放;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁;

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:

  1. 互斥锁只有在普通模式才能进入自旋;
  2. runtime.sync_runtime_canSpin需要返回true
    1. 运行在多 CPU 的机器上;
    2. 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
    3. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;

一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpinruntime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLockedmutexStarvingmutexWokenmutexWaiterShift

计算了新的互斥锁状态之后,会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新状态:

  1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  2. if old&(mutexLocked|mutexStarving) == 0 {
  3. break // 通过 CAS 函数获取了锁
  4. }
  5. ...
  6. runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  7. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  8. old = m.state
  9. if old&mutexStarving != 0 {
  10. delta := int32(mutexLocked - 1<<mutexWaiterShift)
  11. if !starving || old>>mutexWaiterShift == 1 {
  12. delta -= mutexStarving
  13. }
  14. atomic.AddInt32(&m.state, delta)
  15. break
  16. }
  17. awoke = true
  18. iter = 0
  19. } else {
  20. old = m.state
  21. }
  22. }
  23. }

互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:

  • 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
  • 如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 开始慢速解锁:
  1. func (m *Mutex) Unlock() {
  2. new := atomic.AddInt32(&m.state, -mutexLocked)
  3. if new != 0 {
  4. m.unlockSlow(new)
  5. }
  6. }

sync.Mutex.unlockSlow 会先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:

  1. func (m *Mutex) unlockSlow(new int32) {
  2. if (new+mutexLocked)&mutexLocked == 0 {
  3. throw("sync: unlock of unlocked mutex")
  4. }
  5. if new&mutexStarving == 0 { // 正常模式
  6. old := new
  7. for {
  8. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
  9. return
  10. }
  11. new = (old - 1<<mutexWaiterShift) | mutexWoken
  12. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  13. runtime_Semrelease(&m.sema, false, 1)
  14. return
  15. }
  16. old = m.state
  17. }
  18. } else { // 饥饿模式
  19. runtime_Semrelease(&m.sema, true, 1)
  20. }
  21. }

小结

互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

RWMutex 读写互斥锁

适用于读远多于写的场景

它不限制资源的并发读,但是读写、写写操作无法并行执行。

写锁

当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:

  1. type RWMutex struct {
  2. w Mutex // 互斥锁
  3. writerSem uint32 // 写锁用的信号量
  4. readerSem uint32 // 读锁用的信号量
  5. readerCount int32 // 当前正在执行读操作的goroutine数量
  6. readerWait int32 // 获取写锁时,当前还持有读锁的goroutine数量
  7. }
  8. const rwmutexMaxReaders = 1 << 30
  1. func (rw *RWMutex) Lock() {
  2. // 首先调用Mutex的Lock方法获取到锁
  3. rw.w.Lock()
  4. // 把readerCount改成负数,这样后续的读操作就会被阻塞
  5. // r 就是当前正在执行读操作的goroutine数量
  6. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
  7. // 如果当前有正在执行读操作的goroutine
  8. // 把r赋值给readerWait
  9. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
  10. // 获取写锁的goroutine进入休眠,等待被唤醒
  11. runtime_SemacquireMutex(&rw.writerSem, false, 0)
  12. }
  13. }
  1. 调用结构体持有的sync.Mutex结构体的sync.Mutex.Lock阻塞后续的写操作;
    • 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
  2. 调用 sync/atomic.AddInt32 函数阻塞后续的读操作:
  3. 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;

写锁的释放会调用 sync.RWMutex.Unlock

  1. func (rw *RWMutex) Unlock() {
  2. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
  3. if r >= rwmutexMaxReaders {
  4. throw("sync: Unlock of unlocked RWMutex")
  5. }
  6. for i := 0; i < int(r); i++ {
  7. runtime_Semrelease(&rw.readerSem, false, 0)
  8. }
  9. rw.w.Unlock()
  10. }

与加锁的过程正好相反,写锁的释放分以下几个执行:


  1. 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
  2. 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
  3. 调用 sync.Mutex.Unlock 释放写锁;

小结

虽然读写互斥锁 sync.RWMutex 提供的功能比较复杂,但是因为它建立在 sync.Mutex 上,所以实现会简单很多。我们总结一下读锁和写锁的关系:

  • 调用sync.RWMutex.Lock尝试获取写锁时;
    • 每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 会获得写锁;
    • readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作;
  • 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;

读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。

sync.Pool 的使用场景

一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。

Go 语言从 1.3 版本开始提供了对象重用的机制,即 sync.Pool。sync.Pool 是可伸缩的,同时也是并发安全的,其大小仅受限于内存的大小。sync.Pool 用于存储那些被分配了但是没有被使用,而未来可能会使用的值。这样就可以不用再次经过内存分配,可直接复用已有对象,减轻 GC 的压力,从而提升系统的性能。
sync.Pool 的大小是可伸缩的,高负载时会动态扩容,存放在池中的对象如果不活跃了会被自动清理。

sync.Pool 的使用方式非常简单:

只需要实现 New 函数即可。对象池中没有对象时,将会调用 New 函数创建。

  1. var studentPool = sync.Pool{
  2. New: func() interface{} {
  3. return new(Student)
  4. },
  5. }
  1. stu := studentPool.Get().(*Student)
  2. json.Unmarshal(buf, stu)
  3. studentPool.Put(stu)
  • Get() 用于从对象池中获取对象,因为返回值是 interface{},因此需要类型转换。
  • Put() 则是在对象使用完毕后,返回对象池。

参考

6.2 同步原语与锁
参考
深度解密 Go 语言之 sync.Pool