sync 包提供了一系列有关并发、同步、锁相关的功能。

参考 Go 中锁的那些姿势,估计你不知道

基本定义

Go 中关于锁的接口定义如下

  1. // A Locker represents an object that can be locked and unlocked.
  2. type Locker interface {
  3. Lock()
  4. Unlock()
  5. }
  6. // A Mutex is a mutual exclusion lock.
  7. // The zero value for a Mutex is an unlocked mutex.
  8. //
  9. // A Mutex must not be copied after first use.
  10. type Mutex struct {
  11. state int32
  12. sema uint32
  13. }

Mutex 使用也非常的简单,声明一个 Mutex 变量就可以直接调用 Lock 和 Unlock 方法了,但使用的过程中有一些注意点,如下:

  • 同一个协程不能连续多次调用 Lock,否则发生死锁
  • 锁资源时尽量缩小资源的范围,以免引起其它协程超长时间等待
  • Mutex 传递给外部的时候需要传指针,不然就是实例的拷贝,会引起锁失败
  • 善用 defer 确保在函数内释放了锁
  • 使用 -race 在运行时检测数据竞争问题,go test -race .../go build -race ...
  • 善用静态工具检查锁的使用问题
  • 使用 go-deadlock 检测死锁,和指定锁超时的等待问题
  • 能用 channel 的场景别使用成了 lock

关键常量

核心常量如下,理解常量对于理解锁的实现有很大帮助

  1. const (
  2. mutexLocked = 1 << iota // 0001 最后一位表示当前锁的状态,0未锁,1已锁
  3. mutexWoken // 0010 倒数第二位表示当前锁是否会被唤醒,0唤醒,1未唤醒
  4. mutexStarving //0100 倒数第三位表示当前对象是否为饥饿模式,0正常,1饥饿
  5. mutexWaiterShift = iota //3 通常用作右移的底数,从倒数第四位往前的bit表示排队的gorouting数量
  6. starvationThresholdNs = 1e6 // 饥饿的阈值:1ms
  7. )

互斥锁 Mutex

互斥锁指的是在 Go 编程中,同一资源的锁定对各个协程是相互排斥的,当其中一个协程获取到该锁时,其它协程只能等待,直到这个获取锁的协程释放锁之后,其它的协程才能获取。

Mutex 实现中有两种模式:

  1. 正常模式

协程按照 FIFO 顺序排队,锁释放时会唤醒最早排队的协程,这个协程会和正在 CPU 上运行的协程竞争锁,但是大概率会失败——因为 CPU 正在执行的协程比刚被唤醒的协程有更大优势,如果这个被唤醒的协程竞争失败,并且超过了 1ms,那么就会转变为饥饿模式

  1. 饥饿模式

这种模式下,互斥锁直接由还未解锁的协程交给队列的第一个处于等待的协程,新到协程不再尝试获取互斥锁即使互斥锁是处于解锁状态,同时也不会尝试 spin,本质是为了防止协程等待锁的时间太长。

接下来我们跟随源码查看 Mutex 是如何加锁和解锁的

  1. // Lock locks m.
  2. // If the lock is already in use, the calling goroutine
  3. // blocks until the mutex is available.
  4. func (m *Mutex) Lock() {
  5. // Fast path: grab unlocked mutex.
  6. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  7. if race.Enabled {
  8. race.Acquire(unsafe.Pointer(m))
  9. }
  10. return
  11. }
  12. // Slow path (outlined so that the fast path can be inlined)
  13. m.lockSlow()
  14. }

快速获取锁的路径,通过一次 Int32 的 CAS 操作,结果为 true 则表明加锁成功,结果为 false 则进入满路径加锁,也叫做自旋 (spin),这个过程需要等待锁被其他协程释放
// TODO:目前只看懂了开头一小部分

  1. func (m *Mutex) lockSlow() {
  2. var waitStartTime int64 // 开始等待时间
  3. // 这几个变量含义依次是:是否饥饿,是否唤醒,自旋次数,锁的当前状态
  4. starving := false
  5. awoke := false
  6. iter := 0
  7. old := m.state
  8. for { // 进入死循环,直到获得锁成功(获得锁成功就是有别的协程释放锁了)
  9. // 判断:已经被加锁并且不是饥饿模式 && 可以自旋(与cpu核数有关)
  10. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
  11. // 没有被唤醒 && 有排队等待的协程 && 尝试设置通知被唤醒
  12. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
  13. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
  14. awoke = true
  15. }
  16. // 自旋一段时间,次数加一
  17. runtime_doSpin()
  18. iter++
  19. old = m.state
  20. continue
  21. }
  22. // 判断各种状态,特殊情况处理
  23. new := old
  24. // 1:原协程已经unlock了,对new的修改为已锁
  25. if old&mutexStarving == 0 {
  26. new |= mutexLocked
  27. }
  28. // 2:这里是执行完自旋或者没执行自旋(原协程没有unlock)
  29. if old&(mutexLocked|mutexStarving) != 0 {
  30. new += 1 << mutexWaiterShift //排队
  31. }
  32. // 3:如果是饥饿模式,并且已锁的状态
  33. if starving && old&mutexLocked != 0 {
  34. new |= mutexStarving //设置new为饥饿状态
  35. }
  36. //4:上面的awoke被设置为true
  37. if awoke {
  38. // 当前协程被唤醒了,肯定不为0
  39. if new&mutexWoken == 0 {
  40. throw("sync: inconsistent mutex state")
  41. }
  42. // 既然当前协程被唤醒了,重置唤醒标志为0
  43. new &^= mutexWoken
  44. }
  45. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  46. if old&(mutexLocked|mutexStarving) == 0 {
  47. break // locked the mutex with CAS
  48. }
  49. // If we were already waiting before, queue at the front of the queue.
  50. queueLifo := waitStartTime != 0
  51. if waitStartTime == 0 {
  52. waitStartTime = runtime_nanotime()
  53. }
  54. runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  55. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  56. old = m.state
  57. if old&mutexStarving != 0 {
  58. // If this goroutine was woken and mutex is in starvation mode,
  59. // ownership was handed off to us but mutex is in somewhat
  60. // inconsistent state: mutexLocked is not set and we are still
  61. // accounted as waiter. Fix that.
  62. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
  63. throw("sync: inconsistent mutex state")
  64. }
  65. delta := int32(mutexLocked - 1<<mutexWaiterShift)
  66. if !starving || old>>mutexWaiterShift == 1 {
  67. // Exit starvation mode.
  68. // Critical to do it here and consider wait time.
  69. // Starvation mode is so inefficient, that two goroutines
  70. // can go lock-step infinitely once they switch mutex
  71. // to starvation mode.
  72. delta -= mutexStarving
  73. }
  74. atomic.AddInt32(&m.state, delta)
  75. break
  76. }
  77. awoke = true
  78. iter = 0
  79. } else {
  80. old = m.state
  81. }
  82. }
  83. if race.Enabled {
  84. race.Acquire(unsafe.Pointer(m))
  85. }
  86. }

读写锁 RWMutex

读写锁依赖于互斥锁的实现,这个指的是当多个协程对某一个资源都是只读操作,那么多个协程可以获取该资源的读锁,并且互相不影响,但当有协程要修改该资源时就必须获取写锁,如果获取写锁时,已经有其它协程获取了读写或者写锁,那么此次获取失败,也就是说读写互斥,读读共享,写写互斥。