sync.WaitGroup
在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
| 方法名 | 功能 |
|---|---|
| (wg * WaitGroup) Add(delta int) | 计数器+delta |
| (wg *WaitGroup) Done() | 计数器-1 |
| (wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
var wg sync.WaitGroupfunc hello() {defer wg.Done()fmt.Println("Hello Goroutine!")}func main() {wg.Add(1)go hello() // 启动另外一个goroutine去执行hello函数fmt.Println("main goroutine done!")wg.Wait()}
sync.Once
func (o *Once) Do(f func()) {
sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。但也有所不同。
- init 函数是在文件包首次被加载的时候执行,且只执行一次
- sync.Onc 是在代码运行中需要的时候执行,且只执行一次
当一个函数不希望程序在一开始的时候就被执行的时候,可以使用 sync.Once 。
例如:
package mainimport ("fmt""sync")func main() {var once sync.OnceonceBody := func() {fmt.Println("Only once")}done := make(chan bool)for i := 0; i < 10; i++ {go func() {once.Do(onceBody)done <- true}()}for i := 0; i < 10; i++ {<-done}}
sync.Once 使用变量 done 来记录函数的执行状态,使用 sync.Mutex 和 sync.atomic 来保证线程安全的读取 done 。
// Once is an object that will perform exactly one action.type Once struct {// done indicates whether the action has been performed.// It is first in the struct because it is used in the hot path.// The hot path is inlined at every call site.// Placing done first allows more compact instructions on some architectures (amd64/x86),// and fewer instructions (to calculate offset) on other architectures.done uint32m Mutex}// Do calls the function f if and only if Do is being called for the// first time for this instance of Once. In other words, given// var once Once// if once.Do(f) is called multiple times, only the first call will invoke f,// even if f has a different value in each invocation. A new instance of// Once is required for each function to execute.//// Do is intended for initialization that must be run exactly once. Since f// is niladic, it may be necessary to use a function literal to capture the// arguments to a function to be invoked by Do:// config.once.Do(func() { config.init(filename) })//// Because no call to Do returns until the one call to f returns, if f causes// Do to be called, it will deadlock.//// If f panics, Do considers it to have returned; future calls of Do return// without calling f.//func (o *Once) Do(f func()) {// Note: Here is an incorrect implementation of Do://// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {// f()// }//// Do guarantees that when it returns, f has finished.// This implementation would not implement that guarantee:// given two simultaneous calls, the winner of the cas would// call f, and the second would return immediately, without// waiting for the first's call to f to complete.// This is why the slow path falls back to a mutex, and why// the atomic.StoreUint32 must be delayed until after f returns.if atomic.LoadUint32(&o.done) == 0 {// Outlined slow-path to allow inlining of the fast-path.o.doSlow(f)}}func (o *Once) doSlow(f func()) {o.m.Lock()defer o.m.Unlock()if o.done == 0 {defer atomic.StoreUint32(&o.done, 1)f()}}
这里 done 是一个状态位,用于判断变量是否初始化完成,其有效值是:
- 0: 函数 f 尚未执行或执行中,Once对象创建时 done 默认值就是0
- 1: 函数 f 已经执行结束,保证 f 不会被再次执行
而 m Mutex 用于控制临界区的进入,保证同一时间点最多有一个 f在执行。
done 在 m.Lock() 前后的两次校验都是必要的。
Mutex 只能保证临界区内的操作是可观测的 即只有处于o.m.Lock() 和 defer o.m.Unlock()之间的代码
在 Scala 里,有一个关键词 lazy,实现了 sync.Once 同样的功能。具体实现上,早期版本使用了 volatile 修饰状态变量 done,使用 synchronized 替代 m Mutex;后来,也改成了基于CAS的方式。
使用体验上,显然 lazy 更香!
sync.Map
Go语言中内置的map不是并发安全的
实现
- 空间换时间。通过冗余的两个数据结构(只读的 read 字段、可写的 dirty),来减少加锁对性能的影响。对只读字段(read)的操作不需要加锁。
- 优先从 read 字段读取、更新、删除,因为对 read 字段的读取不需要锁。动态调整。miss 次数多了之后,将 dirty 数据提升为 read,避免总是从 dirty 中加锁读取。
- double-checking。加锁之后先还要再检查 read 字段,确定真的不存在才操作 dirty 字段。
- 延迟删除。删除一个键值只是打标记,只有在提升 dirty 字段为 read 字段的时候才清理删除的数据。 ```go import ( “fmt” “strconv” “sync” )
//以下的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。
var m = make(map[string]int)
func get(key string) int { return m[key] }
func set(key string, value int) { m[key] = value }
func main() { wg := sync.WaitGroup{} for i := 0; i < 200; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) set(key, n) fmt.Printf(“k=:%v,v:=%v\n”, key, get(key)) wg.Done() }(i) } wg.Wait() }
并发安全版map–sync.Map```govar m = sync.Map{}func main() {wg := sync.WaitGroup{}for i := 0; i < 20; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)m.Store(key, n)value, _ := m.Load(key)fmt.Printf("k=:%v,v:=%v\n", key, value)wg.Done()}(i)}wg.Wait()}
sync.Mutex 同步锁
Go 语言的 sync.Mutex 由两个字段 state 和 sema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。
type Mutex struct {state int32sema uint32}
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
mutexLocked— 表示互斥锁的锁定状态;mutexWoken— 表示从正常模式被从唤醒;mutexStarving— 当前的互斥锁进入饥饿状态;waitersCount— 当前互斥锁上等待的 Goroutine 个数;
正常模式和饥饿模式
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
互斥锁的加锁是靠 sync.Mutex.Lock 完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock 方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked 位置成 1:
func (m *Mutex) Lock() {if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {return}m.lockSlow()}
如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:
- 判断当前 Goroutine 能否进入自旋;
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁;
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式才能进入自旋;
runtime.sync_runtime_canSpin需要返回true:- 运行在多 CPU 的机器上;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
计算了新的互斥锁状态之后,会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // 通过 CAS 函数获取了锁}...runtime_SemacquireMutex(&m.sema, queueLifo, 1)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}}}
互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:
- 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
- 如果该函数返回的新状态不等于 0,这段代码会调用
sync.Mutex.unlockSlow开始慢速解锁:
func (m *Mutex) Unlock() {new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {m.unlockSlow(new)}}
sync.Mutex.unlockSlow 会先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")}if new&mutexStarving == 0 { // 正常模式old := newfor {if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else { // 饥饿模式runtime_Semrelease(&m.sema, true, 1)}}
小结
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位
mutexLocked加锁; - 如果互斥锁处于
mutexLocked状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE指令消耗 CPU 时间等待锁的释放; - 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用
sync.Mutex.Unlock会直接抛出异常; - 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过
sync.runtime_Semrelease唤醒对应的 Goroutine;
RWMutex 读写互斥锁
适用于读远多于写的场景。
写锁
当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:
type RWMutex struct {w Mutex // 互斥锁writerSem uint32 // 写锁用的信号量readerSem uint32 // 读锁用的信号量readerCount int32 // 当前正在执行读操作的goroutine数量readerWait int32 // 获取写锁时,当前还持有读锁的goroutine数量}const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {// 首先调用Mutex的Lock方法获取到锁rw.w.Lock()// 把readerCount改成负数,这样后续的读操作就会被阻塞// r 就是当前正在执行读操作的goroutine数量r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders// 如果当前有正在执行读操作的goroutine// 把r赋值给readerWaitif r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {// 获取写锁的goroutine进入休眠,等待被唤醒runtime_SemacquireMutex(&rw.writerSem, false, 0)}}
- 调用结构体持有的
sync.Mutex结构体的sync.Mutex.Lock阻塞后续的写操作;- 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
- 调用
sync/atomic.AddInt32函数阻塞后续的读操作: - 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用
runtime.sync_runtime_SemacquireMutex进入休眠状态等待所有读锁所有者执行结束后释放writerSem信号量将当前协程唤醒;
写锁的释放会调用 sync.RWMutex.Unlock:
func (rw *RWMutex) Unlock() {r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)if r >= rwmutexMaxReaders {throw("sync: Unlock of unlocked RWMutex")}for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}rw.w.Unlock()}
与加锁的过程正好相反,写锁的释放分以下几个执行:
- 调用
sync/atomic.AddInt32函数将readerCount变回正数,释放读锁; - 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
- 调用
sync.Mutex.Unlock释放写锁;
小结
虽然读写互斥锁 sync.RWMutex 提供的功能比较复杂,但是因为它建立在 sync.Mutex 上,所以实现会简单很多。我们总结一下读锁和写锁的关系:
- 调用
sync.RWMutex.Lock尝试获取写锁时;- 每次
sync.RWMutex.RUnlock都会将readerCount其减一,当它归零时该 Goroutine 会获得写锁; - 将
readerCount减少rwmutexMaxReaders个数以阻塞后续的读操作;
- 每次
- 调用
sync.RWMutex.Unlock释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
sync.Pool 的使用场景
一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。
Go 语言从 1.3 版本开始提供了对象重用的机制,即 sync.Pool。sync.Pool 是可伸缩的,同时也是并发安全的,其大小仅受限于内存的大小。sync.Pool 用于存储那些被分配了但是没有被使用,而未来可能会使用的值。这样就可以不用再次经过内存分配,可直接复用已有对象,减轻 GC 的压力,从而提升系统的性能。
sync.Pool 的大小是可伸缩的,高负载时会动态扩容,存放在池中的对象如果不活跃了会被自动清理。
sync.Pool 的使用方式非常简单:
只需要实现 New 函数即可。对象池中没有对象时,将会调用 New 函数创建。
var studentPool = sync.Pool{New: func() interface{} {return new(Student)},}
stu := studentPool.Get().(*Student)json.Unmarshal(buf, stu)studentPool.Put(stu)
Get()用于从对象池中获取对象,因为返回值是interface{},因此需要类型转换。Put()则是在对象使用完毕后,返回对象池。
参考
