转载请声明出处哦~,本篇文章发布于 luozhiyun 的博客:https://www.luozhiyun.com

本文使用的 go 的源码时 14.4

Mutex 介绍

Mutex 结构体包含两个字段:

  • 字段 state:表示当前互斥锁的状态。
  • 字段 sema:是个信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒。
  1. type Mutex struct {
  2. state int32
  3. sema uint32
  4. }

在 Go 的 1.9 版本中,为了解决等待中的 goroutine 可能会一直获取不到锁,增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在 1 毫秒。

state 状态字段所表示的含义较为复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken、mutexStarving,state 总共是 32 位长度,所以剩下的位置,用来表示可以有 1<<(32-3) 个 Goroutine 等待互斥锁的释放:

多图详解Go的互斥锁Mutex - luozhiyun - 博客园 - 图1

代码表示如下:

  1. const (
  2. mutexLocked = 1 << iota
  3. mutexWoken
  4. mutexStarving
  5. )

加锁流程

fast path

  1. func (m *Mutex) Lock() {
  2. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  3. if race.Enabled {
  4. race.Acquire(unsafe.Pointer(m))
  5. }
  6. return
  7. }
  8. m.lockSlow()
  9. }

加锁的时候,一开始会通过 CAS 看一下能不能直接获取锁,如果可以的话,那么直接获取锁成功。

lockSlow

  1. var waitStartTime int64
  2. starving := false
  3. awoke := false
  4. iter := 0
  5. old := m.state
  6. for {
  7. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
  8. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
  9. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
  10. awoke = true
  11. }
  12. runtime_doSpin()
  13. iter++
  14. old = m.state
  15. continue
  16. }
  17. ...
  18. }

进入到 lockSlow 方法之后首先会判断以下能否可以自旋,判断依据就是通过计算:

  1. old&(mutexLocked|mutexStarving) == mutexLocked

可以知道当前锁的状态必须是上锁,并且不能处于饥饿状态,这个判断才为 true,然后再看看 iter 是否满足次数的限制,如果都为 true,那么则往下继续。

内层 if 包含了四个判断:

  • 首先判断了 awoke 是不是唤醒状态;
  • old&mutexWoken == 0为真表示没有其他正在唤醒的节点;
  • old>>mutexWaiterShift != 0表明当前有正在等待的 goroutine;
  • CAS 将 state 的 mutexWoken 状态位设置为old|mutexWoken,即为 1 是否成功。

如果都满足,那么将 awoke 状态设置为真,然后将自旋次数加一,并重新设置状态。

继续往下看:

  1. new := old
  2. if old&mutexStarving == 0 {
  3. new |= mutexLocked
  4. }
  5. if old&(mutexLocked|mutexStarving) != 0 {
  6. new += 1 << mutexWaiterShift
  7. }
  8. if starving && old&mutexLocked != 0 {
  9. new |= mutexStarving
  10. }
  11. if awoke {
  12. if new&mutexWoken == 0 {
  13. throw("sync: inconsistent mutex state")
  14. }
  15. new &^= mutexWoken
  16. }

走到这里有两种情况:1. 自旋超过了次数;2. 目前锁没有被持有。

所以第一个判断,如果当前加了锁,但是没有处于饥饿状态,也会重复设置new |= mutexLocked,即将 mutexLocked 状态设置为 1;

如果是 old 已经是饥饿状态或者已经被上锁了,那么需要设置 Waiter 加一,表示这个 goroutine 下面不会获取锁,会等待;

如果 starving 为真,表示当前 goroutine 是饥饿状态,并且 old 已经被上锁了,那么设置new |= mutexStarving,即将 mutexStarving 状态位设置为 1;

awoke 如果在自旋时设置成功,那么在这里要new &^= mutexWoken消除 mutexWoken 标志位。因为后续流程很有可能当前线程会被挂起, 就需要等待其他释放锁的 goroutine 来唤醒,如果 unlock 的时候发现 mutexWoken 的位置不是 0,则就不会去唤醒,则该线程就无法再醒来加锁。

继续往下:

  1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  2. if old&(mutexLocked|mutexStarving) == 0 {
  3. break
  4. }
  5. queueLifo := waitStartTime != 0
  6. if waitStartTime == 0 {
  7. waitStartTime = runtime_nanotime()
  8. }
  9. runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  10. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  11. old = m.state
  12. if old&mutexStarving != 0 {
  13. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
  14. throw("sync: inconsistent mutex state")
  15. }
  16. delta := int32(mutexLocked - 1<<mutexWaiterShift)
  17. if !starving || old>>mutexWaiterShift == 1 {
  18. delta -= mutexStarving
  19. }
  20. atomic.AddInt32(&m.state, delta)
  21. break
  22. }
  23. awoke = true
  24. iter = 0
  25. } else {
  26. old = m.state
  27. }

到这里,首先会 CAS 设置新的状态,如果设置成功则往下走,否则返回之后循环设置状态。设置成功之后:

  1. 首先会判断 old 状态,如果没有饥饿,也没有获取到锁,那么直接返回,因为这种情况在进入到这段代码之前会将 new 状态设置为 mutexLocked,表示已经获取到锁。这里还判断了一下 old 状态不能为饥饿状态,否则也不能获取到锁;
  2. 判断 waitStartTime 是否已经初始化过了,如果是新的 goroutine 来抢占锁,那么 queueLifo 会返回 false;如果不是新的 goroutine 来抢占锁,那么加入到等待队列头部,这样等待最久的 goroutine 优先能够获取到锁;
  3. 如果等待时间为 0,那么初始化等待时间;
  4. 阻塞等待,当前 goroutine 进行休眠;
  5. 唤醒之后检查锁是否应该处于饥饿状态,并设置 starving 变量值;
  6. 判断是否已经处于饥饿状态,如果不处于饥饿状态,那么这里直接进入到下一个 for 循环中获取锁;
  7. 加锁并且将 waiter 数减 1,这里我看了一会,没用懂什么意思,其实需要分两步来理解,相当于 state+mutexLocked,然后 state 再将 waiter 部分的数减一;
  8. 如果当前 goroutine 不是饥饿状态或者 waiter 只有一个,就从饥饿模式切换会正常模式;
  9. 设置状态;

下面用图例来解释:

这部分的图解是休眠前的操作,休眠前会根据 old 的状态来判断能不能直接获取到锁,如果 old 状态没有上锁,也没有饥饿,那么直接 break 返回,因为这种情况会在 CAS 中设置加上锁;

接着往下判断,waitStartTime 是否等于 0,如果不等于,说明不是第一次来了,而是被唤醒后来到这里,那么就不能直接放到队尾再休眠了,而是要放到队首,防止长时间抢不到锁;

多图详解Go的互斥锁Mutex - luozhiyun - 博客园 - 图2

下面这张图是处于唤醒后的示意图,如何被唤醒的可以直接到跳到解锁部分看完再回来。

被唤醒一开始是需要判断一下当前的 starving 状态以及等待的时间如果超过了 1ms,那么会将 starving 设置为 true;

接下来会有一个 if 判断, 这里有个细节,因为是被唤醒的,所以判断前需要重新获取一下锁,如果当前不是饥饿模式,那么会直接返回,然后重新进入到 for 循环中;

如果当前是处于饥饿模式,那么会计算一下 delta 为加锁,并且当前的 goroutine 是可以直接抢占锁的,所以需要将 waiter 减一,如果 starving 不为饥饿,或者等待时间没有超过 1ms,或者 waiter 只有一个了,那么还需要将 delta 减去 mutexStarving,表示退出饥饿模式;

最后通过 AddInt32 将 state 加上 delta,这里之所以可以直接加上,因为这时候 state 的 mutexLocked 值肯定为 0,并且 mutexStarving 位肯定为 1,并且在获取锁之前至少还有当前一个 goroutine 在等待队列中,所以 waiter 可以直接减 1。

多图详解Go的互斥锁Mutex - luozhiyun - 博客园 - 图3

解锁流程

fast path

  1. func (m *Mutex) Unlock() {
  2. if race.Enabled {
  3. _ = m.state
  4. race.Release(unsafe.Pointer(m))
  5. }
  6. new := atomic.AddInt32(&m.state, -mutexLocked)
  7. if new != 0 {
  8. m.unlockSlow(new)
  9. }
  10. }

这里主要就是 AddInt32 重新设置 state 的 mutexLocked 位为 0,然后判断新的 state 值是否不为 0,不为 0 则调用 unlockSlow 方法。

unlockSlow

多图详解Go的互斥锁Mutex - luozhiyun - 博客园 - 图4

unlockSlow 方法里面也分为正常模式和饥饿模式下的解锁:

  1. func (m *Mutex) unlockSlow(new int32) {
  2. if (new+mutexLocked)&mutexLocked == 0 {
  3. throw("sync: unlock of unlocked mutex")
  4. }
  5. if new&mutexStarving == 0 {
  6. old := new
  7. for {
  8. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
  9. return
  10. }
  11. new = (old - 1<<mutexWaiterShift) | mutexWoken
  12. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  13. runtime_Semrelease(&m.sema, false, 1)
  14. return
  15. }
  16. old = m.state
  17. }
  18. } else {
  19. runtime_Semrelease(&m.sema, true, 1)
  20. }
  21. }

在正常模式下,如果没有 waiter,或者 mutexLocked、mutexStarving、mutexWoken 有一个不为零说明已经有其他 goroutine 在处理了,直接返回;如果互斥锁存在等待者,那么通过 runtime_Semrelease 直接唤醒等待队列中的 waiter;

在饥饿模式,直接调用 runtime_Semrelease 方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁。

总结

Mutex 的设计非常的简洁的,从代码可以看出为了设计出这么简洁的代码 state 一个字段可以当 4 个字段使用。并且为了解决 goroutine 饥饿问题,在 1.9 中 Mutex 增加了饥饿模式让锁变得更公平,不公平的等待时间限制在 1 毫秒,但同时,代码也变得越来越难懂了,所以要理解它上面的思想需要慢慢的废些时间细细的体会一下了。
https://www.cnblogs.com/luozhiyun/p/14157542.html