sync.WaitGroup
在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 计数器+delta |
(wg *WaitGroup) Done() | 计数器-1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
sync.Once
func (o *Once) Do(f func()) {
sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。但也有所不同。
- init 函数是在文件包首次被加载的时候执行,且只执行一次
- sync.Onc 是在代码运行中需要的时候执行,且只执行一次
当一个函数不希望程序在一开始的时候就被执行的时候,可以使用 sync.Once 。
例如:
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
sync.Once 使用变量 done 来记录函数的执行状态,使用 sync.Mutex 和 sync.atomic 来保证线程安全的读取 done 。
// Once is an object that will perform exactly one action.
type Once struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
// The hot path is inlined at every call site.
// Placing done first allows more compact instructions on some architectures (amd64/x86),
// and fewer instructions (to calculate offset) on other architectures.
done uint32
m Mutex
}
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
// Note: Here is an incorrect implementation of Do:
//
// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
// f()
// }
//
// Do guarantees that when it returns, f has finished.
// This implementation would not implement that guarantee:
// given two simultaneous calls, the winner of the cas would
// call f, and the second would return immediately, without
// waiting for the first's call to f to complete.
// This is why the slow path falls back to a mutex, and why
// the atomic.StoreUint32 must be delayed until after f returns.
if atomic.LoadUint32(&o.done) == 0 {
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
这里 done 是一个状态位,用于判断变量是否初始化完成,其有效值是:
- 0: 函数 f 尚未执行或执行中,Once对象创建时 done 默认值就是0
- 1: 函数 f 已经执行结束,保证 f 不会被再次执行
而 m Mutex 用于控制临界区的进入,保证同一时间点最多有一个 f在执行。
done 在 m.Lock() 前后的两次校验都是必要的。
Mutex 只能保证临界区内的操作是可观测的 即只有处于o.m.Lock() 和 defer o.m.Unlock()之间的代码
在 Scala 里,有一个关键词 lazy,实现了 sync.Once 同样的功能。具体实现上,早期版本使用了 volatile 修饰状态变量 done,使用 synchronized 替代 m Mutex;后来,也改成了基于CAS的方式。
使用体验上,显然 lazy 更香!
sync.Map
Go语言中内置的map不是并发安全的
实现
- 空间换时间。通过冗余的两个数据结构(只读的 read 字段、可写的 dirty),来减少加锁对性能的影响。对只读字段(read)的操作不需要加锁。
- 优先从 read 字段读取、更新、删除,因为对 read 字段的读取不需要锁。动态调整。miss 次数多了之后,将 dirty 数据提升为 read,避免总是从 dirty 中加锁读取。
- double-checking。加锁之后先还要再检查 read 字段,确定真的不存在才操作 dirty 字段。
- 延迟删除。删除一个键值只是打标记,只有在提升 dirty 字段为 read 字段的时候才清理删除的数据。 ```go import ( “fmt” “strconv” “sync” )
//以下的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。
var m = make(map[string]int)
func get(key string) int { return m[key] }
func set(key string, value int) { m[key] = value }
func main() { wg := sync.WaitGroup{} for i := 0; i < 200; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) set(key, n) fmt.Printf(“k=:%v,v:=%v\n”, key, get(key)) wg.Done() }(i) } wg.Wait() }
并发安全版map–sync.Map
```go
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
sync.Mutex 同步锁
Go 语言的 sync.Mutex
由两个字段 state
和 sema
组成。其中 state
表示当前互斥锁的状态,而 sema
是用于控制锁状态的信号量。
type Mutex struct {
state int32
sema uint32
}
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked
、mutexWoken
和 mutexStarving
,剩下的位置用来表示当前有多少个 Goroutine 在等待互斥锁的释放:
在默认情况下,互斥锁的所有状态位都是 0,int32
中的不同位分别表示了不同的状态:
mutexLocked
— 表示互斥锁的锁定状态;mutexWoken
— 表示从正常模式被从唤醒;mutexStarving
— 当前的互斥锁进入饥饿状态;waitersCount
— 当前互斥锁上等待的 Goroutine 个数;
正常模式和饥饿模式
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
互斥锁的加锁是靠 sync.Mutex.Lock
完成的,最新的 Go 语言源代码中已经将 sync.Mutex.Lock
方法进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked
位置成 1:
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow
尝试通过自旋(Spinnig)等方式等待锁的释放,该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:
- 判断当前 Goroutine 能否进入自旋;
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁;
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式才能进入自旋;
runtime.sync_runtime_canSpin
需要返回true
:- 运行在多 CPU 的机器上;
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin
和 runtime.procyield
并执行 30 次的 PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间:
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新 state
字段中存储的不同信息 — mutexLocked
、mutexStarving
、mutexWoken
和 mutexWaiterShift
:
计算了新的互斥锁状态之后,会使用 CAS 函数 sync/atomic.CompareAndSwapInt32
更新状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 通过 CAS 函数获取了锁
}
...
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
互斥锁的解锁过程 sync.Mutex.Unlock
与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32
函数快速解锁,这时会发生下面的两种情况:
- 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
- 如果该函数返回的新状态不等于 0,这段代码会调用
sync.Mutex.unlockSlow
开始慢速解锁:
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow
会先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饥饿模式
runtime_Semrelease(&m.sema, true, 1)
}
}
小结
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,会通过置位
mutexLocked
加锁; - 如果互斥锁处于
mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE
指令消耗 CPU 时间等待锁的释放; - 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex
将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,调用
sync.Mutex.Unlock
会直接抛出异常; - 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine;
RWMutex 读写互斥锁
适用于读远多于写的场景。
写锁
当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock
方法:
type RWMutex struct {
w Mutex // 互斥锁
writerSem uint32 // 写锁用的信号量
readerSem uint32 // 读锁用的信号量
readerCount int32 // 当前正在执行读操作的goroutine数量
readerWait int32 // 获取写锁时,当前还持有读锁的goroutine数量
}
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// 首先调用Mutex的Lock方法获取到锁
rw.w.Lock()
// 把readerCount改成负数,这样后续的读操作就会被阻塞
// r 就是当前正在执行读操作的goroutine数量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 如果当前有正在执行读操作的goroutine
// 把r赋值给readerWait
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 获取写锁的goroutine进入休眠,等待被唤醒
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
- 调用结构体持有的
sync.Mutex
结构体的sync.Mutex.Lock
阻塞后续的写操作;- 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠;
- 调用
sync/atomic.AddInt32
函数阻塞后续的读操作: - 如果仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用
runtime.sync_runtime_SemacquireMutex
进入休眠状态等待所有读锁所有者执行结束后释放writerSem
信号量将当前协程唤醒;
写锁的释放会调用 sync.RWMutex.Unlock
:
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
与加锁的过程正好相反,写锁的释放分以下几个执行:
- 调用
sync/atomic.AddInt32
函数将readerCount
变回正数,释放读锁; - 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine:
- 调用
sync.Mutex.Unlock
释放写锁;
小结
虽然读写互斥锁 sync.RWMutex
提供的功能比较复杂,但是因为它建立在 sync.Mutex
上,所以实现会简单很多。我们总结一下读锁和写锁的关系:
- 调用
sync.RWMutex.Lock
尝试获取写锁时;- 每次
sync.RWMutex.RUnlock
都会将readerCount
其减一,当它归零时该 Goroutine 会获得写锁; - 将
readerCount
减少rwmutexMaxReaders
个数以阻塞后续的读操作;
- 每次
- 调用
sync.RWMutex.Unlock
释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
sync.Pool 的使用场景
一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。
Go 语言从 1.3 版本开始提供了对象重用的机制,即 sync.Pool。sync.Pool 是可伸缩的,同时也是并发安全的,其大小仅受限于内存的大小。sync.Pool 用于存储那些被分配了但是没有被使用,而未来可能会使用的值。这样就可以不用再次经过内存分配,可直接复用已有对象,减轻 GC 的压力,从而提升系统的性能。
sync.Pool 的大小是可伸缩的,高负载时会动态扩容,存放在池中的对象如果不活跃了会被自动清理。
sync.Pool 的使用方式非常简单:
只需要实现 New 函数即可。对象池中没有对象时,将会调用 New 函数创建。
var studentPool = sync.Pool{
New: func() interface{} {
return new(Student)
},
}
stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu)
studentPool.Put(stu)
Get()
用于从对象池中获取对象,因为返回值是interface{}
,因此需要类型转换。Put()
则是在对象使用完毕后,返回对象池。
参考