Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond
image.png

Mutex

Go 语言的 sync.Mutex 由两个字段 state 和 sema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

  1. type Mutex struct {
  2. state int32
  3. sema uint32
  4. }

上述两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。

状态

互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:
image.png
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked — 表示互斥锁的锁定状态;
  • mutexWoken — 表示从正常模式被从唤醒;
  • mutexStarving — 当前的互斥锁进入饥饿状态;
  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;

正常模式和饥饿模式

sync.Mutex 有两种模式 — 正常模式和饥饿模式。我们需要在这里先了解正常模式和饥饿模式都是什么以及它们有什么样的关系。
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
image.png
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

加锁和解锁

我们在这一节中将分别介绍互斥锁的加锁和解锁过程,它们分别使用 sync.Mutex.Locksync.Mutex.Unlock 方法。

互斥锁的加锁是靠 sync.Mutex.Lock 完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock 方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked 位置成 1:

  1. func (m *Mutex) Lock() {
  2. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  3. return
  4. }
  5. m.lockSlow()
  6. }

如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:

  1. 判断当前 Goroutine 能否进入自旋;
  2. 通过自旋等待互斥锁的释放;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁;

我们先来介绍互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放:

  1. func (m *Mutex) lockSlow() {
  2. // 标记本goroutine的等待时间
  3. var waitStartTime int64
  4. // 本goroutine是否已经处于饥饿状态
  5. starving := false
  6. // 本goroutine是否已唤醒
  7. awoke := false
  8. // 自旋次数
  9. iter := 0
  10. // 复制锁的当前状态
  11. old := m.state
  12. for {
  13. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
  14. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
  15. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
  16. awoke = true
  17. }
  18. runtime_doSpin()
  19. iter++
  20. old = m.state
  21. continue
  22. }

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:

  1. 互斥锁只有在普通模式才能进入自旋;
  2. runtime.sync_runtime_canSpin 需要返回 true:
    1. 运行在多 CPU 的机器上;
    2. 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
    3. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;

一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpinruntime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

  1. func sync_runtime_doSpin() {
  2. procyield(active_spin_cnt)
  3. }
  4. TEXT runtime·procyield(SB),NOSPLIT,$0-0
  5. MOVL cycles+0(FP), AX
  6. again:
  7. PAUSE
  8. SUBL $1, AX
  9. JNZ again
  10. RET

处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

  1. // 到了这一步, state的状态可能是:
  2. // 1. 锁还没有被释放,锁处于正常状态
  3. // 2. 锁还没有被释放, 锁处于饥饿状态
  4. // 3. 锁已经被释放, 锁处于正常状态
  5. // 4. 锁已经被释放, 锁处于饥饿状态
  6. // 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)
  7. // new 复制 state的当前状态, 用来设置新的状态
  8. // old 是锁当前的状态
  9. new := old
  10. // 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
  11. // 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个
  12. if old&mutexStarving == 0 {
  13. new |= mutexLocked
  14. }
  15. // 将等待队列的等待者的数量加1
  16. if old&(mutexLocked|mutexStarving) != 0 {
  17. new += 1 << mutexWaiterShift
  18. }
  19. // 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
  20. // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态
  21. if starving && old&mutexLocked != 0 {
  22. new |= mutexStarving
  23. }
  24. // 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
  25. // 总之state的新状态不再是woken状态.
  26. if awoke {
  27. new &^= mutexWoken
  28. }

计算了新的互斥锁状态之后,会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新状态:

  1. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  2. if old&(mutexLocked|mutexStarving) == 0 {
  3. break // 通过 CAS 函数获取了锁
  4. }
  5. ...
  6. runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  7. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  8. old = m.state
  9. if old&mutexStarving != 0 {
  10. delta := int32(mutexLocked - 1<<mutexWaiterShift)
  11. if !starving || old>>mutexWaiterShift == 1 {
  12. delta -= mutexStarving
  13. }
  14. atomic.AddInt32(&m.state, delta)
  15. break
  16. }
  17. awoke = true
  18. iter = 0
  19. } else {
  20. old = m.state
  21. }
  22. }
  23. }

如果没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 通过信号量保证资源不会被两个 Goroutine 获取。runtime.sync_runtime_SemacquireMutex 会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 的剩余代码也会继续执行。

  • 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
  • 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;


互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:

  • 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
  • 如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 开始慢速解锁:
    1. func (m *Mutex) Unlock() {
    2. new := atomic.AddInt32(&m.state, -mutexLocked)
    3. if new != 0 {
    4. m.unlockSlow(new)
    5. }
    6. }
    sync.Mutex.unlockSlow 会先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
    在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
  1. func (m *Mutex) unlockSlow(new int32) {
  2. if (new+mutexLocked)&mutexLocked == 0 {
  3. throw("sync: unlock of unlocked mutex")
  4. }
  5. if new&mutexStarving == 0 { // 正常模式
  6. old := new
  7. for {
  8. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
  9. return
  10. }
  11. new = (old - 1<<mutexWaiterShift) | mutexWoken
  12. if atomic.CompareAndSwapInt32(&m.state, old, new) {
  13. runtime_Semrelease(&m.sema, false, 1)
  14. return
  15. }
  16. old = m.state
  17. }
  18. } else { // 饥饿模式
  19. runtime_Semrelease(&m.sema, true, 1)
  20. }
  21. }
  • 在正常模式下,上述代码会使用如下所示的处理过程:
    • 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者;
    • 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease 唤醒等待者并移交锁的所有权;
  • 在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;

小结

我们已经从多个方面分析了互斥锁 sync.Mutex 的实现原理,这里我们从加锁和解锁两个方面总结注意事项。
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

RWMutex

读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

结构体

  1. type RWMutex struct {
  2. w Mutex
  3. writerSem uint32
  4. readerSem uint32
  5. readerCount int32
  6. readerWait int32
  7. }
  • w — 复用互斥锁提供的能力;
  • writerSem 和 readerSem — 分别用于写等待读和读等待写:
  • readerCount 存储了当前正在执行的读操作数量;
  • readerWait 表示当写操作被阻塞时等待的读操作个数;


我们会依次分析获取写锁和读锁的实现原理,其中:

写锁

当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:

  1. func (rw *RWMutex) Lock() {
  2. rw.w.Lock()
  3. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
  4. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
  5. runtime_SemacquireMutex(&rw.writerSem, false, 0)
  6. }
  7. }
  1. 调用结构体持有的 sync.Mutex 结构体的 sync.Mutex.Lock 阻塞后续的写操作;
    • 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
  2. 调用 sync/atomic.AddInt32 函数阻塞后续的读操作:
  3. 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;

写锁的释放会调用 sync.RWMutex.Unlock

  1. func (rw *RWMutex) Unlock() {
  2. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
  3. if r >= rwmutexMaxReaders {
  4. throw("sync: Unlock of unlocked RWMutex")
  5. }
  6. for i := 0; i < int(r); i++ {
  7. runtime_Semrelease(&rw.readerSem, false, 0)
  8. }
  9. rw.w.Unlock()
  10. }

与加锁的过程正好相反,写锁的释放分以下几个执行:

  1. 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
  2. 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
  3. 调用 sync.Mutex.Unlock 释放写锁;

获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。

读锁

读锁的加锁方法 sync.RWMutex.RLock 很简单,该方法会通过 sync/atomic.AddInt32 将 readerCount 加一:

  1. func (rw *RWMutex) RLock() {
  2. if atomic.AddInt32(&rw.readerCount, 1) < 0 {
  3. runtime_SemacquireMutex(&rw.readerSem, false, 0)
  4. }
  5. }
  1. 如果该方法返回负数 — 其他 Goroutine 获得了写锁,当前 Goroutine 就会调用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待锁的释放;
  2. 如果该方法的结果为非负数 — 没有 Goroutine 获得写锁,当前方法会成功返回;

当 Goroutine 想要释放读锁时,会调用如下所示的 sync.RWMutex.RUnlock 方法:

  1. func (rw *RWMutex) RUnlock() {
  2. if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
  3. rw.rUnlockSlow(r)
  4. }
  5. }

该方法会先减少正在读资源的 readerCount 整数,根据 sync/atomic.AddInt32 的返回值不同会分别进行处理:

  • 如果返回值大于等于零 — 读锁直接解锁成功;
  • 如果返回值小于零 — 有一个正在执行的写操作,在这时会调用sync.RWMutex.rUnlockSlow 方法;
    1. func (rw *RWMutex) rUnlockSlow(r int32) {
    2. if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    3. throw("sync: RUnlock of unlocked RWMutex")
    4. }
    5. if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    6. runtime_Semrelease(&rw.writerSem, false, 1)
    7. }
    8. }
    sync.RWMutex.rUnlockSlow 会减少获取锁的写操作等待的读操作数 readerWait 并在所有读操作都被释放之后触发写操作的信号量 writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。

小结

虽然读写互斥锁 sync.RWMutex 提供的功能比较复杂,但是因为它建立在 sync.Mutex 上,所以实现会简单很多。我们总结一下读锁和写锁的关系:

  • 调用 sync.RWMutex.Lock 尝试获取写锁时;
    • 每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 会获得写锁;
    • 将 readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作;
  • 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;

读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。

WaitGroup

sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求
image.png

结构体

sync.WaitGroup 结构体中只包含两个成员变量:

  1. type WaitGroup struct {
  2. noCopy noCopy
  3. state1 [3]uint32
  4. }
  • noCopy — 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
  • state1 — 存储着状态和信号量;


sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法,如果包含该结构体或者实现了对应的方法就会报出以下错误:

  1. func main() {
  2. wg := sync.WaitGroup{}
  3. yawg := wg
  4. fmt.Println(wg, yawg)
  5. }
  6. $ go vet proc.go
  7. ./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
  8. ./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
  9. ./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup

除了 sync.noCopy 之外,sync.WaitGroup` 结构体中还包含一个总共占用 12 字节的数组,这个数组会存储当前结构体的状态,在 64 位与 32 位的机器上表现也非常不同。
image.png
sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能够帮我们从 state1 字段中取出它的状态和信号量。

接口

sync.WaitGroup 对外暴露了三个方法 — sync.WaitGroup.Addsync.WaitGroup.Waitsync.WaitGroup.Done
因为其中的 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法传入了 -1,所以我们重点分析另外两个方法,即 sync.WaitGroup.Addsync.WaitGroup.Wait

  1. func (wg *WaitGroup) Add(delta int) {
  2. statep, semap := wg.state()
  3. state := atomic.AddUint64(statep, uint64(delta)<<32)
  4. v := int32(state >> 32)
  5. w := uint32(state)
  6. if v < 0 {
  7. panic("sync: negative WaitGroup counter")
  8. }
  9. if v > 0 || w == 0 {
  10. return
  11. }
  12. *statep = 0
  13. for ; w != 0; w-- {
  14. runtime_Semrelease(semap, false, 0)
  15. }
  16. }

sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的计数器 counter。虽然 sync.WaitGroup.Add 方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数就会发生程序崩溃。当调用计数器归零,即所有任务都执行完成时,才会通过 sync.runtime_Semrelease 唤醒处于等待状态的 Goroutine。

sync.WaitGroup 的另一个方法 sync.WaitGroup.Wait 会在计数器大于 0 并且不存在等待的 Goroutine 时,调用 runtime.sync_runtime_Semacquire 陷入睡眠。

  1. func (wg *WaitGroup) Wait() {
  2. statep, semap := wg.state()
  3. for {
  4. state := atomic.LoadUint64(statep)
  5. v := int32(state >> 32)
  6. if v == 0 {
  7. return
  8. }
  9. if atomic.CompareAndSwapUint64(statep, state, state+1) {
  10. runtime_Semacquire(semap)
  11. if +statep != 0 {
  12. panic("sync: WaitGroup is reused before previous Wait has returned")
  13. }
  14. return
  15. }
  16. }
  17. }

sync.WaitGroup 的计数器归零时,陷入睡眠状态的 Goroutine 会被唤醒,上述方法也会立刻返回。

小结

通过对 sync.WaitGroup 的分析和研究,我们能够得出以下结论:

Once

Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。在运行如下所示的代码时,我们会看到如下所示的运行结果:

  1. func main() {
  2. o := &sync.Once{}
  3. for i := 0; i < 10; i++ {
  4. o.Do(func() {
  5. fmt.Println("only once")
  6. })
  7. }
  8. }
  9. $ go run main.go
  10. only once

结构体

每一个 sync.Once 结构体中都只包含一个用于标识代码块是否执行过的 done 以及一个互斥锁 sync.Mutex

  1. type Once struct {
  2. done uint32
  3. m Mutex
  4. }

接口

sync.Once.Dosync.Once 结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:

  • 如果传入的函数已经执行过,会直接返回;
  • 如果传入的函数没有执行过,会调用 sync.Once.doSlow 执行传入的函数: ```go func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 0 {
    1. o.doSlow(f)
    } }

func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }

  1. 1. 为当前 Goroutine 获取互斥锁;
  2. 1. 执行传入的无入参函数;
  3. 1. 运行延迟函数调用,将成员变量 done 更新成 1
  4. [sync.Once](https://draveness.me/golang/tree/sync.Once) 会通过成员变量 done 确保函数不会执行第二次。
  5. <a name="YDVPJ"></a>
  6. ### Cond
  7. Go 语言标准库中还包含条件变量 [sync.Cond](https://draveness.me/golang/tree/sync.Cond),它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 [sync.Cond](https://draveness.me/golang/tree/sync.Cond) 结构体在初始化时都需要传入一个互斥锁,我们可以通过下面的例子了解它的使用方法:
  8. ```go
  9. var status int64
  10. func main() {
  11. c := sync.NewCond(&sync.Mutex{})
  12. for i := 0; i < 10; i++ {
  13. go listen(c)
  14. }
  15. time.Sleep(1 * time.Second)
  16. go broadcast(c)
  17. ch := make(chan os.Signal, 1)
  18. signal.Notify(ch, os.Interrupt)
  19. <-ch
  20. }
  21. func broadcast(c *sync.Cond) {
  22. c.L.Lock()
  23. atomic.StoreInt64(&status, 1)
  24. c.Broadcast()
  25. c.L.Unlock()
  26. }
  27. func listen(c *sync.Cond) {
  28. c.L.Lock()
  29. for atomic.LoadInt64(&status) != 1 {
  30. c.Wait()
  31. }
  32. fmt.Println("listen")
  33. c.L.Unlock()
  34. }
  35. $ go run main.go
  36. listen
  37. ...
  38. listen

上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:

调用 sync.Cond.Broadcast 方法后,上述代码会打印出 10 次 “listen” 并结束调用。
image.png

结构体

  1. type Cond struct {
  2. noCopy noCopy
  3. L Locker
  4. notify notifyList
  5. checker copyChecker
  6. }
  • noCopy — 用于保证结构体不会在编译期间拷贝;
  • copyChecker — 用于禁止运行期间发生的拷贝;
  • L — 用于保护内部的 notify 字段,Locker 接口类型的变量;
  • notify — 一个 Goroutine 的链表,它是实现同步机制的核心结构;

    1. type notifyList struct {
    2. wait uint32
    3. notify uint32
    4. lock mutex
    5. head *sudog
    6. tail *sudog
    7. }

    sync.notifyList 结构体中,head 和 tail 分别指向的链表的头和尾,wait 和 notify 分别表示当前正在等待的和已经通知到的 Goroutine 的索引。

接口

sync.Cond 对外暴露的 sync.Cond.Wait 方法会将当前 Goroutine 陷入休眠状态,它的执行过程分成以下两个步骤:

  1. 调用 runtime.notifyListAdd 将等待计数器加一并解锁;
  2. 调用 runtime.notifyListWait 等待其他 Goroutine 的唤醒并加锁: ```go func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的链接名 c.L.Unlock() runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的链接名 c.L.Lock() }

func notifyListAdd(l *notifyList) uint32 { return atomic.Xadd(&l.wait, 1) - 1 }

  1. [runtime.notifyListWait](https://draveness.me/golang/tree/runtime.notifyListWait) 会获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端:
  2. ```go
  3. func notifyListWait(l *notifyList, t uint32) {
  4. s := acquireSudog()
  5. s.g = getg()
  6. s.ticket = t
  7. if l.tail == nil {
  8. l.head = s
  9. } else {
  10. l.tail.next = s
  11. }
  12. l.tail = s
  13. goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
  14. releaseSudog(s)
  15. }

除了将当前 Goroutine 追加到链表的末端之外,我们还会调用 runtime.goparkunlock 将当前 Goroutine 陷入休眠,该函数也是在 Go 语言切换 Goroutine 时经常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。
image.png
sync.Cond.Signalsync.Cond.Broadcast 就是用来唤醒陷入休眠的 Goroutine 的方法,它们的实现有一些细微的差别:

  • sync.Cond.Signal 方法会唤醒队列最前面的 Goroutine;
  • sync.Cond.Broadcast 方法会唤醒队列中全部的 Goroutine; ```go func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) }

func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }

  1. [runtime.notifyListNotifyOne](https://draveness.me/golang/tree/runtime.notifyListNotifyOne) 只会从 [sync.notifyList](https://draveness.me/golang/tree/sync.notifyList) 链表中找到满足 sudog.ticket == l.notify 条件的 Goroutine 并通过 [runtime.readyWithTime](https://draveness.me/golang/tree/runtime.readyWithTime) 唤醒:
  2. ```go
  3. func notifyListNotifyOne(l *notifyList) {
  4. t := l.notify
  5. atomic.Store(&l.notify, t+1)
  6. for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  7. if s.ticket == t {
  8. n := s.next
  9. if p != nil {
  10. p.next = n
  11. } else {
  12. l.head = n
  13. }
  14. if n == nil {
  15. l.tail = p
  16. }
  17. s.next = nil
  18. readyWithTime(s, 4)
  19. return
  20. }
  21. }
  22. }

runtime.notifyListNotifyAll 会依次通过 runtime.readyWithTime 唤醒链表中 Goroutine:

  1. func notifyListNotifyAll(l *notifyList) {
  2. s := l.head
  3. l.head = nil
  4. l.tail = nil
  5. atomic.Store(&l.notify, atomic.Load(&l.wait))
  6. for s != nil {
  7. next := s.next
  8. s.next = nil
  9. readyWithTime(s, 4)
  10. s = next
  11. }
  12. }

Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。
在一般情况下,我们都会先调用 sync.Cond.Wait 陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 唤醒一个或者全部的 Goroutine。

小结

sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:

  • sync.Cond.Wait 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
  • sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
  • sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine;