深入理解 golang 的互斥锁

How to implement Golang Mutex golang 是如何实现互斥锁的


# 信号量 操作系统中有 P 和 V 操作。P 操作是将信号量值减去 1,V 操作是将信号量值增加 1.因此信号量的操作模式为:

  1. 初始化,给它一个非负整数值。
  2. 在程序试图进入临界区之前需要先运行 P 操作,然后会有两种情况。

    • 当信号量 S 减少到负值时,该过程将被阻止并且无法继续。这个时候,进程会被阻塞。
    • 当信号量 S 不为负时,进程可以进入临界区。
  3. 在程序离开临界区时,需要执行 V 操作。当信号量 S 不是为负时,之前被阻止的其他进程将允许进临界区。

# 信号量和锁 尽管信号量和锁看起来相似,例如,当信号量为 1 时,实现了互斥锁,但实际上,它们具有不同的含义。


信号量是用于确保进程 (线程或 goroutine) 被调度。比如,三个进程共同计算 c = a+b。首先,a+b 的计算和赋值操作不能同时执行。其次,必须确保首先执行 a+b。c 在赋值之后执行,因此这个位置需要以信号量的形式执行。

此外,可以通过信号量实现锁,然后 goroutine 可以根据规则阻塞和唤醒锁;也可以通过自旋的方式实现锁,goroutine 将持有 CPU,直到解锁。

这两种方式之间的区别是是否需要调度 goroutine,但是从本质上讲,锁是为了确保不会错误地访问临界资源。

# 自旋锁 CAS 理论是一种自旋锁。


  • 一直循环等待,以确定资源是否释放了锁。这种锁称为自旋锁,它不会阻塞线程(NON-BLOCKING)。
  • 一直阻塞,等待重新调度,这种是互斥锁。



线程持有锁的时间越长,持有锁的线程被 OS 调度中断的风险就越大。



# #悲观锁定和乐观锁定 悲观锁定是一种悲观思维。它总是认为最坏的情况可能会发生。它认为这些数据很可能被其他人修改过。无论是读还是写,悲观锁都是在执行操作之前锁定的。


乐观锁定的实现方案主要包括 CAS 和版本号机制。乐观锁定适用于多读场景这可以提高吞吐量。

CAS 是一个著名的基于比较和交换无锁算法。


CAS 涉及三种关系: 指向内存区域的指针 V、旧值 a 和要写入的新值 B。

由 CAS 实现的乐观锁将带来 ABA 问题。同时,整个乐观锁会在数据不一致的情况下触发等待和重试机制,这对性能有很大的影响。


有了以上的基础知识,我们就可以开始分析 golang 是如何实现互斥锁的了。

Golang 的 Mutex 实现一直在改进,到目前为止,主要经历了 4 个版本:

  • V1: 简单实现的版本
  • V2: 新的 goroutine 参加锁的竞争
  • V3: 新的 goroutines 更多参与竞争的机会
  • V4: 解决老 goroutine 饥饿的问题

每一次改进都是为了提高系统的整体性能。这个升级是渐进的、持续的,因此有必要从 V1 版本开始慢慢地看 Mutex 的演变过程。

V1: 简单实现的版本 在 V1 版本中,互斥的完整源代码如下。commit


  1. func cas(val *int32, old, new int32) bool
  2. func semacquire(*int32)
  3. func semrelease(*int32)
  4. // The structure of the mutex, containing two fields
  5. type Mutex struct {
  6. key int32 // Indication of whether the lock is held
  7. sema int32 // Semaphore dedicated to block/wake up goroutine
  8. }
  9. // Guaranteed to successfully increment the value of delta on val
  10. func xadd(val *int32, delta int32) (new int32) {
  11. for {
  12. v := *val
  13. if cas(val, v, v+delta) {
  14. return v + delta
  15. }
  16. }
  17. panic("unreached")
  18. }
  19. // request lock
  20. func (m *Mutex) Lock() {
  21. if xadd(&m.key, 1) == 1 { // Add 1 to the ID, if it is equal to 1, the lock is successfully acquired
  22. return
  23. }
  24. semacquire(&m.sema) // Otherwise block waiting
  25. }
  26. func (m *Mutex) Unlock() {
  27. if xadd(&m.key, -1) == 0 { // Subtract 1 from the flag, if equal to 0, there are no other waiters
  28. return
  29. }
  30. semrelease(&m.sema) // Wake up other blocked goroutines
  31. }

key 表示有几个 gorutines 正在使用或准备使用该锁。如果 key==0,表示当前互斥锁已解锁,否则,key>0 表示当前互斥锁已锁定。

sema 是实际导致 goroutine 阻塞和唤醒的信号量。

xadd 是一个基于 cas 的加减法函数。

Lock 和 Unlock 是锁定当前互斥锁的核心函数,但逻辑非常简单。

Unlock 使用 xadd 方法对 key 进行 -1 操作。如果结果不是 0,则意味着 goroutine 当前正在等待,需要调用 semrelease 来唤醒 goroutine。

想象一下这样一个场景: 被阻塞的 goroutine(命名为 g1)不能占用 CPU,因此需要在 g1 被唤醒后执行上下文切换。如果此时出现一个新的 goroutine (g2),它就拥有 CPU 资源。

如果把锁给 g2,那么它可以立即执行并返回结果(而不是等待 g1 的上下文切换),这样整体效率就可以提高到更高的水平。

V2:新的 goroutine 参加锁竞争.

完整的源代码地址 https://github.com/golang/go/blob/weekly.2011-07-07/src/pkg/sync/mutex.go

在 V2 版本中,核心特性是 goroutine 在被唤醒后不会立即执行任务,而是重复抢占锁的过程,以便新的 goroutine 有机会获得锁,这就是给后来者机会。


  1. // A Mutex is a mutual exclusion lock.
  2. // Mutexes can be created as part of other structures;
  3. // the zero value for a Mutex is an unlocked mutex.
  4. type Mutex struct {
  5. state int32
  6. sema uint32
  7. }
  8. const (
  9. mutexLocked = 1 << iota // mutex is locked
  10. mutexWoken
  11. mutexWaiterShift = iota
  12. )

虽然互斥锁结构的定义基本不变,但从 V1 的 key 到 V2 的 state,它们的内部结构却有很大的不同。

第 0 位表示锁定状态(L),即 0 表示解锁,1 表示锁定; 第 1 位表示唤醒状态(W),第 2 位到第 31 位表示阻塞等待的计数。


mutexLocked 的值是 0x1, mutexawake 的值是 0x2, mutexWaiterShift 的值是 0x2, mutexWaiterShift 表示任何表示阻塞等待数量的数组都需要左移两位。

V2 版本的主要在于 Lock 方法的改进,代码如下:

  1. // Lock locks m.
  2. // If the lock is already in use, the calling goroutine
  3. // blocks until the mutex is available.
  4. func (m *Mutex) Lock() {
  5. // Fast path: grab unlocked mutex.
  6. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  7. return
  8. }
  9. awoke := false
  10. for {
  11. old := m.state
  12. new := old | mutexLocked
  13. if old&mutexLocked != 0 {
  14. new = old + 1<<mutexWaiterShift
  15. }
  16. if awoke {
  17. // The goroutine has been woken from sleep,
  18. // so we need to reset the flag in either case.
  19. new &^= mutexWoken
  20. }
  21. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  22. if old&mutexLocked == 0 {
  23. break
  24. }
  25. runtime.Semacquire(&m.sema)
  26. awoke = true
  27. }
  28. }
  29. }

第 2 行和第 4 行中的代码逻辑应用于解锁状态,goroutine 通过 CAS 将 L 位从 0 设置为 1 获得锁。


然后,goroutine 进入一个循环,其中 CAS 方法可确保正确叠加新状态。

  • 尝试获得锁。
  • 尝试将等待锁的数量增加 1。

由于更改不是原子的,它可能会导致旧 state 的值变得无效。




如果旧状态未解锁,则直接获得锁,否则,通过信号量机制阻塞当前 goroutine。

与 V1 不同的是,当前进程在被唤醒后,仍然会陷入 for 循环再次获取锁,这主要是给新的 goroutine 机会

  • 如果旧的 goroutine 在上下文切换期间有一个新的 goroutine,锁将被赋予新的 goroutine。
  • 如果在完成上下文切换后,旧的 goroutine 仍然没有新的 goroutine,那么锁将被交给旧的 goroutine。

Unlock 方法相对简单。代码如下:

  1. // Unlock unlocks m.
  2. // It is a run-time error if m is not locked on entry to Unlock.
  3. //
  4. // A locked Mutex is not associated with a particular goroutine.
  5. // It is allowed for one goroutine to lock a Mutex and then
  6. // arrange for another goroutine to unlock it.
  7. func (m *Mutex) Unlock() {
  8. // Fast path: drop lock bit.
  9. new := atomic.AddInt32(&m.state, -mutexLocked)
  10. if (new+mutexLocked)&mutexLocked == 0 {
  11. panic("sync: unlock of unlocked mutex")
  12. }
  13. old := new
  14. for {
  15. // If there are no waiters or a goroutine has already
  16. // been woken or grabbed the lock, no need to wake anyone.
  17. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
  18. return
  19. }
  20. // Grab the right to wake someone.
  21. new = (old - 1<<mutexWaiterShift) | mutexWoken
  22. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  23. runtime.Semrelease(&m.sema)
  24. return
  25. }
  26. old = m.state
  27. }
  28. }

  • 如果没有等待锁的 goroutine,或者当前系统处于解锁状态,并且有唤醒程序,则直接返回。
  • 如果不满足上述要求,则等待的数量将减少 1,队列头部的 goroutine 将通过信号量被唤醒。

V3:给新的 goroutines 更多机会 这个版本的优化,关键提交是https://github.com/golang/go/commit/edcad8639a902741dc49f77d000ed62b0cc6956f

在 V2 的基础上,如何进一步提高性能? 在大多数情况下,协程互斥锁定期间的数据操作的耗时是非常低的,这可能比唤醒+切换上下文的耗时更低。

想象一个场景: GoroutineA 拥有 CPU 资源,GoroutineB 位于阻塞队列的头部。然后,当 GoroutineA 试图获取锁时,它发现当前锁已被占用。根据 V2 策略,GoroutineA 立即被阻塞,假设锁此时处于阻塞状态。如果释放,则 GoroutineB 将按计划被唤醒,即整个运行时间包括 GoroutineB 的唤醒+上下文切换时间。

在 V3 版本的实现中,新的 Goroutine (GoroutineA)被允许通过旋转等待一段时间。如果在等待时间内释放锁,新的 Goroutine 立即获取锁资源,避免了旧 Goroutine 唤醒+上下文切换的耗时,提高了整体工作效率。

同样,V3 的改进主要集中在 Lock 方法上,代码如下。

  1. // Lock locks m.
  2. // If the lock is already in use, the calling goroutine
  3. // blocks until the mutex is available.
  4. func (m *Mutex) Lock() {
  5. // Fast path: grab unlocked mutex.
  6. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  7. if raceenabled {
  8. raceAcquire(unsafe.Pointer(m))
  9. }
  10. return
  11. }
  12. awoke := false
  13. iter := 0
  14. for {
  15. old := m.state
  16. new := old | mutexLocked
  17. if old&mutexLocked != 0 {
  18. if runtime_canSpin(iter) {
  19. // Active spinning makes sense.
  20. // Try to set mutexWoken flag to inform Unlock
  21. // to not wake other blocked goroutines.
  22. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
  23. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
  24. awoke = true
  25. }
  26. runtime_doSpin()
  27. iter++
  28. continue
  29. }
  30. new = old + 1<<mutexWaiterShift
  31. }
  32. if awoke {
  33. // The goroutine has been woken from sleep,
  34. // so we need to reset the flag in either case.
  35. if new&mutexWoken == 0 {
  36. panic("sync: inconsistent mutex state")
  37. }
  38. new &^= mutexWoken
  39. }
  40. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  41. if old&mutexLocked == 0 {
  42. break
  43. }
  44. runtime_Semacquire(&m.sema)
  45. awoke = true
  46. iter = 0
  47. }
  48. }
  49. if raceenabled {
  50. raceAcquire(unsafe.Pointer(m))
  51. }
  52. }

与 V2 的实现代码相比,它主要集中在第 14-25 行,多了两个自旋锁的核心函数 runtime_canSpinruntime_doSpin

第一个是 runtime_canSpin 函数。传入参数是迭代器(代表当前旋转的数量)。runtime_canSpin 函数的实现功能是确定是否可以输入当前的旋转等待状态。

如前所述,旋转锁在等待时不会释放 CPU 资源,也不会消耗上下文切换,但如果旋转时间过长,则会导致无意义的 CPU 消耗,这将进一步影响性能。


最后一个是 runtime_doSpin 函数,可以简单地理解为 CPU 空闲一段时间,这就是自旋过程。

整个过程非常清晰。所有持有 CPU 的 goroutine 在 runtime_canSpin 函数通过检查后执行自旋操作。如果在旋转操作完成后,它们仍然没有持有锁,则 Goroutine 将被阻塞。其他逻辑与 V2 相同。

V4:解决老 Goroutine 的饥饿问题.

源地址如下: https://github.com/golang/go/blob/go1.15.5/src/sync/mutex.go

从 V2 到 V3 的改进是为新的 goroutines 提供更多的机会,这导致了老的 goroutine 饥饿问题,即新的获取锁的机会不会让给老的 goroutines,因此这个问题主要集中在 V4 中改进。

首先是互斥锁结构的 State 字段有了新的变化,增加了饥饿指示器(S)。

0 表示没有发生饥饿,1 表示发生了饥饿。(图片中的 S 标志位)



  1. const (
  2. mutexLocked = 1 << iota // mutex is locked
  3. mutexWoken
  4. mutexStarving // separate out a starvation token from the state field
  5. mutexWaiterShift = iota
  6. starvationThresholdNs = 1e6
  7. )

Lock 方法的逻辑如下所示。

  1. // Lock locks m.
  2. // If the lock is already in use, the calling goroutine
  3. // blocks until the mutex is available.
  4. func (m *Mutex) Lock() {
  5. // Fast path: grab unlocked mutex.
  6. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  7. if race.Enabled {
  8. race.Acquire(unsafe.Pointer(m))
  9. }
  10. return
  11. }
  12. // Slow path (outlined so that the fast path can be inlined)
  13. m.lockSlow()
  14. }
  15. func (m *Mutex) lockSlow() {
  16. var waitStartTime int64
  17. starving := false
  18. awoke := false
  19. iter := 0
  20. old := m.state
  21. for {
  22. // Don't spin in starvation mode, ownership is handed off to waiters
  23. // so we won't be able to acquire the mutex anyway.
  24. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
  25. // Active spinning makes sense.
  26. // Try to set mutexWoken flag to inform Unlock
  27. // to not wake other blocked goroutines.
  28. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
  29. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
  30. awoke = true
  31. }
  32. runtime_doSpin()
  33. iter++
  34. old = m.state
  35. continue
  36. }
  37. new := old
  38. // Don't try to acquire starving mutex, new arriving goroutines must queue.
  39. if old&mutexStarving == 0 {
  40. new |= mutexLocked
  41. }
  42. if old&(mutexLocked|mutexStarving) != 0 {
  43. new += 1 << mutexWaiterShift
  44. }
  45. // The current goroutine switches mutex to starvation mode.
  46. // But if the mutex is currently unlocked, don't do the switch.
  47. // Unlock expects that starving mutex has waiters, which will not
  48. // be true in this case.
  49. if starving && old&mutexLocked != 0 {
  50. new |= mutexStarving
  51. }
  52. if awoke {
  53. // The goroutine has been woken from sleep,
  54. // so we need to reset the flag in either case.
  55. if new&mutexWoken == 0 {
  56. throw("sync: inconsistent mutex state")
  57. }
  58. new &^= mutexWoken
  59. }
  60. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  61. if old&(mutexLocked|mutexStarving) == 0 {
  62. break // locked the mutex with CAS
  63. }
  64. // If we were already waiting before, queue at the front of the queue.
  65. queueLifo := waitStartTime != 0
  66. if waitStartTime == 0 {
  67. waitStartTime = runtime_nanotime()
  68. }
  69. runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  70. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  71. old = m.state
  72. if old&mutexStarving != 0 {
  73. // If this goroutine was woken and mutex is in starvation mode,
  74. // ownership was handed off to us but mutex is in somewhat
  75. // inconsistent state: mutexLocked is not set and we are still
  76. // accounted as waiter. Fix that.
  77. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
  78. throw("sync: inconsistent mutex state")
  79. }
  80. delta := int32(mutexLocked - 1<<mutexWaiterShift)
  81. if !starving || old>>mutexWaiterShift == 1 {
  82. // Exit starvation mode.
  83. // Critical to do it here and consider wait time.
  84. // Starvation mode is so inefficient, that two goroutines
  85. // can go lock-step infinitely once they switch mutex
  86. // to starvation mode.
  87. delta -= mutexStarving
  88. }
  89. atomic.AddInt32(&m.state, delta)
  90. break
  91. }
  92. awoke = true
  93. iter = 0
  94. } else {
  95. old = m.state
  96. }
  97. }
  98. if race.Enabled {
  99. race.Acquire(unsafe.Pointer(m))
  100. }
  101. }

我们主要关注 lockSlow 方法。在深入研究代码之前,我们首先需要理解在什么情况下当前锁被认为是饥饿的:

  • (场景 1)旧的 Goroutine 被唤醒,但锁被新的 Goroutine 占据,对于旧的 Goroutine,我被唤醒了,但什么也没做,并立即再次被阻塞。
  • (场景2)Goroutine 被阻塞的总时间超过阈值(默认为 1ms)。

所以核心是记录当前 Goroutine 开始等待的时间: 对于第一次进入锁的 Goroutine,开始等待时间为 0。对于场景 1,判断标准为开始等待时间是否为 0。如果它不是 0,就意味着它以前被阻塞过。通过(第 45 行)。

对于场景 2,判断标准为当前时间与开始等待时间的差值是否超过阈值。如果是这样,这意味着 Goroutine 已经等待太久,应该进入饥饿状态(第 50 行)。


首先,如果当前锁被耗尽,任何新的 goroutine 都不会旋转(第 15 行)。

其次,如果当前 Goroutine 处于饥饿状态,那么当它被阻塞时,它将被添加到等待队列的头部(下一个唤醒操作肯定会唤醒当前处于饥饿状态的 Goroutine,第 45 行和第 49 行)。

最后,允许饥饿的 goroutine 在被唤醒后立即持有锁,而不必与其他 goroutine 重新竞争锁(比较 V2,第 52 行和 62 行)。

V4 对应的 Unlock 也根据饥饿状态进行了调整。代码如下

  1. func (m *Mutex) Unlock() {
  2. if race.Enabled {
  3. _ = m.state
  4. race.Release(unsafe.Pointer(m))
  5. }
  6. // Fast path: drop lock bit.
  7. new := atomic.AddInt32(&m.state, -mutexLocked)
  8. if new != 0 {
  9. // Outlined slow path to allow inlining the fast path.
  10. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
  11. m.unlockSlow(new)
  12. }
  13. }
  14. func (m *Mutex) unlockSlow(new int32) {
  15. if (new+mutexLocked)&mutexLocked == 0 {
  16. throw("sync: unlock of unlocked mutex")
  17. }
  18. if new&mutexStarving == 0 {
  19. old := new
  20. for {
  21. // If there are no waiters or a goroutine has already
  22. // been woken or grabbed the lock, no need to wake anyone.
  23. // In starvation mode ownership is directly handed off from unlocking
  24. // goroutine to the next waiter. We are not part of this chain,
  25. // since we did not observe mutexStarving when we unlocked the mutex above.
  26. // So get off the way.
  27. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
  28. return
  29. }
  30. // Grab the right to wake someone.
  31. new = (old - 1<<mutexWaiterShift) | mutexWoken
  32. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  33. runtime_Semrelease(&m.sema, false, 1)
  34. return
  35. }
  36. old = m.state
  37. }
  38. } else {
  39. // Starving mode: handoff mutex ownership to the next waiter, and yield
  40. // our time slice so that the next waiter can start to run immediately.
  41. // Note: mutexLocked is not set, the waiter will set it after wakeup.
  42. // But mutex is still considered locked if mutexStarving is set,
  43. // so new coming goroutines won't acquire it.
  44. runtime_Semrelease(&m.sema, true, 1)
  45. }
  46. }

