上下文
概述
上下文 context.Context
用来设置截止时间,同步信号,传递请求相关的结构体。上下文与 goroutine 的关系比较密切,是 go 语言独特的设计。
type Context interface {
Done() <-chan struct{}
Err() error
Deadline() (deadline time.Time, ok bool)
Value(key interface{}) interface{}
}
● Deadline
返回 context.Context
被取消的时间,即完成工作的截止日期。
● Done
返回一个 channel,这个 channel 会在当前工作完成或上下文被取消后关闭,多次调用 Done
方法返回同一个 channel。
● Err
返回 context.Context
结束的原因,只会在 Done
方法对应的 channel 关闭时返回非空值:
▶ 如果 context.Context
被取消,会返回 Canceled
错误。
▶ 如果 context.Context
超时,会返回 DeadlineExceeded
。
● Value
从 context.Context
中获取键对应的值,对于同一个上下文来说,多次调用 Value
并传入相同的 key 会返回相同的结果,该方法可以用来传递特定的数据。
设计原理
context.Context 最大的作用是在 goroutine 构成的树桩结构中同步信号以减少计算资源的浪费。go 语言服务器的每一个请求都是通过单独的 goroutine 来处理的。
有时为了处理一次请求要创建多个 goroutine,而 context.Context 的作用是在不同的 goroutine 之间同步请求特定数据,取消信号,请求处理的截止时期。
每一个 context.Context 都会从最顶层的 goroutine 传递到最底层,context.Context 可以在上层 goroutine 出现错误时将信号及时同步给下层。
默认上下文
context 包中最常用的方法是 context.Background 和 context.TODO,这两个方法都会返回初始化好的私有变量 background 和 todo,它们会在一个 go 语言程序中被复用。
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
代码中可见,context.emptyCtx 通过空方法实现了 context.Context 中所有的方法,但是没有任何功能。
context.Background 和 context.TODO 只是互为别名,只是使用和语义上有所不同:
● context.background 是上下文的默认值,其他所有上下文都是从他衍生而来。
● context.TODO 只是在不确定使用哪种上下文时使用。
多数情况下,如果当前函数没有上下文作为入参,会使用 context.Background 作为起始的上下文传递。
取消信号
context.WithCancel 函数能够从 context.Context 中衍生出新的上下文,并返回用于取消该上下文的函数。一旦执行返回的取消函数,当前上下文和其子上下文都会被取消,所有 goroutine 都会同步收到这一取消信号。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
可认为该函数做了两件事:
● 将传入的上下文封装成私有结构体。
● 构建父子上下文之间的关系,当父上下文被取消,子上下文也会被取消。
父子上下文关联代码:
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
//关闭上下文中的 channel,向所有子上下文同步取消信号。
case <-child.Done():
}
}()
}
}
该函数的作用是在 parent 和 child 之间同步取消和结束的信号,保证在 parent 被取消时,child 也能收到信号,不会出现状态不一的情况。
分三种情况:
● parent.Done() == nil
即 parent 不会触发取消事件,当前函数直接返回。
● child
的集成连包含了可以取消的上下文,会判断 parent 是否已经触发了取消信号:
▶ 如果已经取消,child 会立即取消。
▶ 如果没有被取消,child 会加入到 parent 的 children
列表中,等待 parent 的取消信号。
● 当前上下文是开发者自定义类型,实现了 context.Context
接口并在 Done
方法中返回了非空 channel 时:
▶ 运行一个新的 goroutine 同时监听 parent.Done()
和 child.Done()
两个 channel 。
▶ 在 parent.Done()
关闭时调用 child.Done()
取消子上下文。
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
函数在创建 context.timeCtx
的过程中判断父上下文的截止日期与当前日期,并通过 time.AfterFunc
创建定时器,当时间超过截止日期后会会调用 context.timerCtx.cancel
同步取消信号。
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
`context.timerCtx` 不仅通过嵌入 `context.cancelCtx` 结构体继承了相关变量和方法,还通过持有的定时器 timer 和截止时间 deadline 实现了定时取消的功能。
传值方法
func WithValue(parent Context, key, val interface{}) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
context 包中的 `context.WithValue` 能从父上下文中创建子上下文,传值得子上下文使用 `context.valueCtx` 类型。
type valueCtx struct {
Context
key, val interface{}
}
context.valueCtx
会将 Value 外的 Err,Deadline 等方法代理到父上下文中,它只会响应 context.valueCtx.Value
方法
func (c *valueCtx) String() string {
return contextName(c.Context) + ".WithValue(type " +
reflectlite.TypeOf(c.key).String() +
", val " + stringify(c.val) + ")"
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
//WithValue 最后是 return &valueCtx{parent, key, val},所以说是父上下文
如果 context.valueCtx
存储的键值对与 context.valueCtx.Value
方法传入的参数不匹配,就会从父上下文中查找该键对应的值,直到某个父上下文返回 nil 或者查找到对应的值。
同步原语与锁
概述
锁是并发编程中的一种同步原语,能够保障多个 goroutine 在访问同一块内存时不会出现竞争条件等问题。(竞争条件:多个线程或者进程在读写一个共享数据时结果依赖于它们执行的相对时间的情形。)
基本原语
常见同步原语 | 容器 | 互斥锁 |
---|---|---|
Cond | Map | Mutex |
Once | Pool | |
WaitGroup |
Mutex
type Mutex struct {
state int32 //当前互斥锁的状态
sema uint32 //空值锁状态的信号量
}
其中 sema 被分为四部分:
● mutexLocked
表示互斥锁的锁定状态。
● mutexWoken
表示被从正常模式唤醒。
● mutexStraving
表示当前互斥锁进入饥饿状态。
● waitersCount
表示当前互斥锁上等待的 goroutine 的数量。
正常模式和饥饿模式
正常模式下,锁的等待者会按照先入先出的顺序获取锁,但是刚被唤醒的 goroutine 与新创建的 goroutine 竞争时,大概率获取不到锁。为减少这种情况,一旦 goroutine 超过 1ms 没有获取到锁,就会将当前互斥锁切换为饥饿模式。
饥饿模式目的是保证互斥锁的公平性,防止部分 goroutine 被饿死。饥饿模式下,互斥锁会直接交给等待队列最前边的 goroutine,新的 goroutine 在该状态下不能获取锁,也不会进入自旋状态,而是在队列末尾等待。
如果一个 goroutine 获得了互斥锁并且它在队列的最后边或者其等待时间少于 1ms,则将当前的互斥锁从饥饿状态转为正常状态。
相比之下,正常模式的互斥锁能够提供更好的性能,饥饿模式能够避免 goroutine 由于陷入等待无法获取锁而造成的高尾延时。
加锁和解锁
当锁的状态即 mutexLocked
的状态是零时,将其设置为一;如果不是零,会调用 sync.Mutex.lockSlow
尝试通过自旋等方式等待锁的释放,该方法主体是一个非常大的 for 循环,可将其分为以下几个部分:
● 判断当前 goroutine 能否进入自旋。
● 通过自旋等待互斥锁的释放。
● 计算互斥锁的最新状态。
● 更新互斥锁的状态并获取锁。
首先判断能否进入自旋等待互斥锁的释放:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
自旋是一种多线程同步机制,当前进程进入自旋状态以后会一直占用 CPU 资源,检查某个条件是否为真,在多核 CPU 上,自旋可以避免 goroutine 的切换,使用恰当会对性能带来很大的增益,使用不当会拖慢整个程序。所以判断条件非常严格:
● 互斥锁只有在普通1模式下才能进入自旋。
● runtime_canSpin
需要返回 true:
▶ 在有多个 CPU 的机器上运行。
▶ 当前 goroutine 为获取该锁进入自旋的次数小于四。
▶ 当前机器上至少存在一个正在运行的处理器 P 并且处理的队列为空。
一旦当前 goroutine 可以进入自旋,就会调用 funcsync_runtime_doSpin
和 procyield
并执行 30 次 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间。
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
处理完自旋的先关逻辑之后,互斥锁会根据上下文计算当前互斥锁的最新状态,几个不同的条件会分别更新 state 字段中存储的不同信息。
计算了新的互斥状态之后,会使用 CAS 函数 sync/atomic.CompareAndSwapInt32
更新状态。如果没有通过 CAS 获得锁,也会通过其他方式使用信号量来保障资源不会被两个 goroutine 获取。
与加锁相比,互斥锁的解锁过程非常简单:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
该函数首先使用 atomic.AddInt32
函数快速解锁,通常发生一下两种情况:
● 如果该函数的新状态等于零,当前 goroutine 就成果解锁了互斥锁。
● 不等于零,调用 m.unlockSlow
开始缓慢解锁。
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { //正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { //饥饿模式
runtime_Semrelease(&m.sema, true, 1)
}
}
该函数首先会校验锁状态的合法性,入过当前互斥锁已经被解锁,直接抛出异常:”sync: unlock of unlocked mutex”,终止当前程序。
正常情况下,会根据当前锁的状态分别处理正常模式和饥饿模式下的互斥锁:
● 正常模式下使用如下流程处理:
▶ 如果互斥锁不存在等待者,或者互斥锁的 mutexLocked
,mutexWoken
,mutexStarving
,状态不都为零,那么当前方法直接返回,不必唤醒其他等待者。
▶ 如果互斥锁存在等待者,会通过 runtime_Semrelease
唤醒等待者并移交锁的所有权。
● 在饥饿模式下,上述代码会调用 runtime_Semrelease
,将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,这时互斥锁不会退出饥饿状态,等待者会负责设置 mutexLocked
标志位。
RWMutex
读写互斥锁是细粒度的互斥锁,它不限制资源的并发读,但是读写,写写,操作无法并行执行。这里读写锁可以做一个简单的比喻:
● 老师在讲台写,写的东西被身体挡住,学生们看不到,对应写时不能读。
● 一个老师在写,其他老师不能跟着一起写,对应着写时不能再写。
● 老师写完了身体闪开,同学们一起读,互不影响。
● 老师在下一次写之前,确定之前的内容是否所有的学生都已经读完,每一个学生读完之后给老师发一个信号,老师就查看一下还有没有没有读完的,如果全部完成则开始写。
读 | 写 | |
---|---|---|
读 | Y | N |
写 | N | N |
常见服务资源读写比例会非常高,因为大多数请求之间互不影响,所以可以分离读写操作,以提高服务性能。
type RWMutex struct {
w Mutex //提供锁的能力
writerSem uint32 //用于写等待读
readSem uint32 //用于读等待写
readerCount int32 //当前正在执行的读操作数量
readerWait int32 //表示操作被堵塞时等待的读操作个数
}
● 写操作:sync.RWMutex.Lock
和 sync.RWMutex.Unlock
方法。
● 读操作:sync.RWMutex.RLock
和 sync.RWMutex.RUnlock
方法。
写锁
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
代码逻辑如下:
● 调用 rw.w.Lock
阻塞后续的写操作,因为互斥锁已经被获取,所以其他的 goroutine 在获取写锁时会进入自旋或者休眠。
● 调用 atomic.AddInt32
阻塞后边的读操作。
● 如果当前还有其它的 goroutine 持有互斥锁的读锁,该 goroutine 会调用 runtime_SemacquireMutex
进入休眠状态,等所有读锁执行完成之后,释放 writerSem
信号量获取当前协程。
写锁的释放会调用 sync.RWMutex.Unlock
方法:
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
与加锁过程相反,释放分为以下几个步骤:
● 调用 atomic.AddInt32
将 readerCount
变成正数,释放读锁。
● 通过 for 循环释放所有因为获取读锁而陷入等待的 goroutine。
● 通过调用 rw.w.Unlock
释放写锁。
获取写锁时会先阻塞写锁的获取,然后阻塞读锁的获取,这种策略保障读操作不会因为连续的写操作饿死。
读锁
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
代码核心逻辑如下:
● 调用 atomic.AddInt32
如果该方法返回负数——其他 goroutine 获取了写锁,当前 goroutine 就会调用 runtime_SemacquireMutex
函数陷入休眠,等待锁的释放。
● 如果该方法返回非负数——没有 goroutine 获取锁,当前方法成功返回。
当 goroutine 想要释放读锁时,会调用 RUnLock
方法:
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
该方法会减少正在读资源的 readerCount
,根据 atomic.AddInt32
的返回值分别处理:
● 返回值大于或等于零,解锁成功。
● 返回值小于零,有一个写操作正在执行,这时会调用 rw.rUnlockSlow
代码如下:
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
● 该函数会减少获取锁的写操作等待的读操作数 readerWait
,并在所有读操作被释放后触发写操作的的信号量 writerSem
。该信号量被触发时,调度器会唤醒尝试获取写锁的 goroutine。
读写锁之间的关系
● 调用 sync.RWMutex.Lock
尝试获取写锁时:
▶ 每次 sync.RWMutex.RUnLock
都会将 readerCount
减一,当它归零时 goroutine 会获得写锁。
▶ 将 readerCount
减少 rwmutexMaxReaders
个数后阻塞后续读操作。
● 调用 sync.RWMutex.Unlock
释放写锁时,会先通知所有读操作,然后才会释放所持有的互斥锁。
WaitGroup
概述
sync.WaitGroup
可以等待一组 goroutine 返回,我们可以通过它将原本顺序执行的代码在多个 goroutine 中并发执行,加快代码执行速度。
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
● noCopy
——保证 sync.WaitGroup
不会被通过再赋值的方式复制:wg := sync.WaitGroup{} ; cwg := wg;
代码段报错。
● state1
——存储状态和信号量。该数组会存储当前结构体的状态,包括:waiter
, counter
, sema
,这三个字段在 64 位和 32 位计算机上的表现不同。
● sync.WaitGroup
提供的私有方法 sync.WaitGroup.state
能够从 state1
字段中取出其状态和信号量。
接口
该结构体对外暴露了三个方法:Add
, Wait
, Done
,其中 Done 方法只是向 Add
中传入了 -1,所以重点分析其他两个。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
该函数能够更新结构体中的计数器 counter。Add
函数的参数可以是负数,但结构体的计数器只能是非负数,一旦出现负数,程序崩溃。当调用计数器归零,才会通过 runtime_Semrelease
唤醒处于等待状态的 goroutine。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Wait
函数会在计数器大于零且并不存在等待的 goroutine 时,调用 runtime_Semacquire
陷入睡眠状态。当 sync..WaitGroup 的计数器归零时,陷入睡眠状态的 goroutine 会被唤醒,上述方法也会立即返回。
小结
● sync.WaitGroup
必须在 sync.WaitGroup.Wait
方法返回之前才能重新使用。
● sync.WaitGroup.Done
只是对 sync.WaitGroup.Add
简单封装,可以向 sync.WaitGroup.Add
传入负数(要保障计数器为正),快速将计数器归零以唤醒等待的 goroutine。
● 可以同时有多个 goroutine 等待 sync.WaitGroup
计数器归零,这些 goroutine 会被同时唤醒。
Once
概述
Once 保障 go 语言在运行期间某一段代码只会执行一次,举例如下:
func main() {
o := sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println(i)
})
o.Do(p)
}
}
func p() {
fmt.Println("only once")
}
//0
结构体
每一个 sync.Once
结构体中都只包含一个用于标识代码块是否执行过的 done
,以及一个互斥锁 sync.Mutex
。
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do 是该结构体唯一对外暴露的方法,该方法会接受一个入参为空的函数:
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
● 如果传入的函数已经执行过,直接返回。
● 如果函数没有执行过,调用 doSlow
执行传入的函数,修改 done
字段的值。
doslow
函数核心逻辑如下:
● 为当前 goroutine 获取互斥锁。
● 执行传入的无参函数。
● 运行延时函数调用,将成员变量 done 更新为一
小结
● Do
方法中传入的函数只会运行一次,哪怕过程中发生了 panic。
● 两次调用 Do 方法传入不同函数,只会执行第一次调用传入的函数。
Cond
概述
条件变量 sync.Cond
可以让一组 goroutine 都在满足特定的条件时被唤醒。每一个 sync.Cond
结构体在初始化时都要传入一个互斥锁:
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
//2022/03/08 09:57:11 writer starts writing
//2022/03/08 09:57:12 writer wakes all
//2022/03/08 09:57:12 reader3 starts reading
//2022/03/08 09:57:12 reader1 starts reading
//2022/03/08 09:57:12 reader2 starts reading
● done
即互斥锁需要保护的条件变量。
● read
调用 Wait
等待通知,直到 done 为 true。
● write
接收数据,接收完成后,将 done 置为 true,调用 Broadcast
通知所有等待的协程。
● write
中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait
,处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。
结构体
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
● noCopy
保障结构体不会在编译期间复制。
● L
用于保护内部的 notify 字段,Locker 接口类型的变量。
● notify
一个 goroutine 列表,是实现同步机制的核心结构。
● copyChecker
用于禁止运行期间发生的复制。
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
该结构体中,head
和 tail
分别指向链表的头和尾,wait
和 notify
分别表示当前正在等待的和已经通知到的 goroutine 索引。
接口
Wait 方法会使当前 goroutine 陷入睡眠状态,代码如下:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
● 调用 runtime_notifyListAdd
将等待计数器加一并解锁。
● 调用 runtime_notifyListWait
等待其他 goroutine 被唤醒并解锁。
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
notifyListWait
会获取当前 goroutine 并将其添加到 goroutine 通知列表的末端。调用 goparkunlock
令当前 goroutine 陷入休眠状态该函数是 go 语言切换 goroutine 的常用方式,它会直接让出当前处理器的使用权并等待调度器唤醒。
唤醒陷入休眠的 goroutine 有两种方式:
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Signal
会唤醒队列最前面的 goroutine,BroadCast
会唤醒队列中所有的 goroutine。
func notifyListNotifyAll(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
func notifyListNotifyOne(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
notifyListNotifyOne
只会从 lockRankNotifyList
链表中找到满足 sudog.ticket == l.notify
条件的 goroutine,通过 readyWithTime
将其唤醒。
notifyListNotifyAll
会依次通过 readyWithTime
唤醒所有的 goroutine。
小结
Cond 不是常用的同步机制,但是条件长时间无法满足时,与使用 for 循环进行忙碌相比,它能够让出处理器的使用权,提高 CPU 的利用率,使用时注意以下几点:
● Wait 在调用之前一定要获取互斥锁,否则会触发程序崩溃,可见概述部分代码。
● Signal 唤醒的都是队列最前面,等待时间最长的 goroutine。
● Broadcast 会按照一定的顺序广播通知所有的 goroutine。
扩展原语
概要
golang/sync/errgroup.Group
, golang/sync/seamphore.Weighted
, golang/sync/singleflight.Group
, golang/sync/syncmap.Map
。
ErrorGroup
概述
该结构体在一组 goroutine 中提供了同步,错误传播以及上下文取消的功能。
package main
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
var g errgroup.Group
var urls = []string{
"https://pkg.go.dev/golang.org/x/sync/errgroup",
"https://pkg.go.dev/golang.org/x/sync",
}
for i := 0; i < len(urls); i++ {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("successfully fetched add urls")
}
}
g.Go 方法能够创建一个 goroutine 并在其中执行传入的函数,而 g.Wait 等待所有 goroutine 返回,该方法的不同返回结果会有不同的含义:
● 如果返回错误,表示这一组 goroutine 中最少返回一个错误。
● 如果返回空值,表示所有的 goroutine 都成功执行。
结构体
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
● cancel
创建 context.Context
时返回的取消函数,用于在多个 goroutine 之间同步取消信号。
● wg
用于等待一组 goroutine 完成子任务的同步原语。
● errOnce
用于保证只接收一个子任务返回的错误。
可以通过函数 WithContext 创建新的 Group 结构体:
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
运行新的并行子任务需要使用 Group.Go 方法,代码如下:
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
Go
函数核心逻辑如下:
● 调用 wg.Add
函数增加待处理的任务。
● 创建新的 goroutine 并运行子任务。
● 返回错误时及时调用 cancel
并对 err
赋值,只有返回最早的错误才能被上游感知到,后续错误会被抛弃。
而另一个 Wait
方法只是调用了 waitgroup.wait 在子任务全部完成时取消 context.Context
并返回可能的错误。
小结
Group 的实现没有涉及底层和运行时包中的 API 只是封装了基本同步语句用于提供更加复杂的功能,在此要注意:
● Group 在出现错误或者等待结束之后,会调用 context.Context 的 cancel 方法同步取消信号。
● 只有出现的第一个错误才会被返回,其余被丢弃。
Semaphore
概述
信号量(semaphore)是并发编程中常见的同步机制,在需要控制访问资源的进程数量时就会使用到信号量,它会保障持有的计数器数量在 0 到初始化权重之间波动。
● 每次获取资源时会将信号量中的计数器减去对应的值,在释放时重新加回来。
● 当遇到计数器大于信号量大小时,会进入休眠状态等待其他线程释放信号。
结构体
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
● size 表示最大权重。
● cur 计数器,其范围是 [0, size] 。
● waiters 列表,存储着等待获取资源的 goroutine。
信号量中的计数器会随着用户对资源的访问和释放而改变,引入权重的概念能够提供更细粒度的资源访问控制,尽可能满足常见用例。
获取
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
该方法能够获取指定权重的资源,其中包含三种情况:
● 当信号量中的资源大于获取的资源并且没有等待的 goroutine 时,直接获取信号量。
● 当需要获取的信号量大于 Weighted
的上限时,由于不能满足条件,直接返回错误。
● 遇到其他情况,会将当前 goroutine 加入到等待列表,并通过 select
等调度器获取当前 goroutine,goroutine 被唤醒后会获取信号量。
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
这一个获取信号量的方法只会阻塞的判断当前是否有充足的资源,如果有,立刻返回 true,否则返回 false。因为该方法不会等待资源的释放,所以会更加适用于一些对延时敏感,用户需要立刻感知结果的场景。
释放
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
该方法从头到尾遍历 waiters 列表中的全部等待者,如果释放资源后的信号量有充足的剩余资源,就会通过 channel 唤醒指定 goroutine。
可见当一个信号量占用的资源非常多的时候,他可能长时间都获取不到锁,这也是 Weighted.Acquire
引入上下文的原因,即为信号量的获取设置超时时间。
小结
● Acquire
和 TryAcquire
都可以用于获取资源,但是前者会阻塞的获取信号量,后者会非阻塞的获取信号量并加以判断。
● Release 方法会按照先入先出的书序唤醒可以被唤醒的 goroutine。
● 如果一个 goroutine 获得了较多资源,由于 Release 的释放策略可能会等待较长时间。
SingleFlight
概述
Singleflight 是 go 语言拓展包中提供的另一种同步原语,能够在一个服务中抑制对下游的多次重复请求,比较常见的使用场景是使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量请求会打到数据库上进而影响服务的尾延时。
而 Singleflight 可以限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。
在使用昂贵的资源时(访问缓存,数据库)就很适合使用 Singleflight.Group
优化服务。
结构体
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
forgotten bool
dups int
chans []chan<- Result
}
Group 结构体由一个互斥锁 sync.Mutex
和一个映射表组成,每一个 call 结构体都保存了当前调用对应的信息。
call 结构体中的 val 和 err 都只会在执行传入的函数时赋值一次,并且在 sync.WaitGroup.Wait
返回时被读取;dups 和 chans 两个字段分别存储了抑制的请求的数量以及用于同步结果的 channel。
接口
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
该方法会先获取互斥锁,随后判断是否已经存在键对应的 singleflight.call
。
● 当不存在对应的 call 时:
▶ 初始化一个新的 call 指针。
▶ 增加 sync.WaitGroup
持有的计数器。
▶ 将 call 指针添加到映射表。
▶ 释放所有的互斥锁。
▶ 阻塞地调用 doCall
方法等待结果返回。
● 当存在对应的 call 时:
▶ 增加 dups
计数器,它表示当前重复的调用次数。
▶ 释放持有的互斥锁。
▶ 通过 sync.WaitGroup
等待请求返回。
因为 val 和 err 两个字段都只会在 doCall
方法中赋值,所以当 doCall
和 WaitGroup.Wait
返回时,函数调用的结果和错误都会返回给 Do 方法。
小结
● Do 和 DoChan 一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 channel 接收函数的返回值。
● Group.Forget 可以通知 Group 在持有的映射表中删除某个键,接下来对该键的调用就不用等待前面的函数返回了。
● 一旦调用的函数返回了错误,所有等待的 goroutine 都会收到错误。
小结
这些并发原语可以帮助我们更好的利用 go 语言特性构建高量吞吐,低延时的服务,解决并发带来的问题,在设计同步原语时,不仅要考虑 API 的调用,解决并发线程中可能遇到的竞争问题,还要优化尾延时保障公平性。