go运行时调度器做什么?
Go 调度器可以决定只在它知道内存一致的点进行调度。这意味着当我们停止垃圾收集时,我们只需要等待 CPU 内核上正在运行的线程。
讲解顺序:
小->大->详细
小:只是讲下 go-schedule的发展历史 及 所谓的GMP的结构
大:粗略的讲schedule的整个函数调用流程 函数到函数的调用
详细:具体函数的作用,以及gmp这几个结构的流转
前生今世gmp
gmp的历史
- 单线程调度器 · 0.x
- 只包含 40 多行代码;
- 程序中只能存在一个活跃线程,由 G-M 模型组成;
- 多线程调度器 · 1.0
- 允许运行多线程的程序;
- 全局锁导致竞争严重;
- 任务窃取调度器 · 1.1
- 引入了处理器 P,构成了目前的 G-M-P 模型;
- 在处理器 P 的基础上实现了基于工作窃取的调度器;
- 在某些情况下,Goroutine 不会让出线程,进而造成饥饿问题;
- 时间过长的垃圾回收(Stop-the-world,STW)会导致程序长时间无法工作;
- 抢占式调度器 · 1.2 ~ 至今
- 基于协作的抢占式调度器 - 1.2 ~ 1.13
- 通过编译器在函数调用时插入抢占检查指令,在函数调用时检查当前 Goroutine 是否发起了抢占请求,实现基于协作的抢占式调度;
- Goroutine 可能会因为垃圾回收和循环长时间占用资源导致程序暂停;
- 基于信号的抢占式调度器 - 1.14 ~ 至今
- 实现基于信号的真抢占式调度;
- 垃圾回收在扫描栈时会触发抢占调度;
- 抢占的时间点不够多,还不能覆盖全部的边缘情况;
- 基于协作的抢占式调度器 - 1.2 ~ 1.13
- 非均匀存储访问调度器 · 提案
- 对运行时的各种资源进行分区;
- 实现非常复杂,到今天还没有提上日程;
go1.13版本之前的一个bug
func main() {
var x int
threads := runtime.GOMAXPROCS(0)
for i := 0; i < threads; i++ {
go func() {
for { x++ }
}()
}
time.Sleep(time.Second)
fmt.Println("x =", x)
}
GMP 模型
G
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
// ==> 堆栈参数。stack 描述了实际的堆栈内存:[stack.lo, stack.hi).stackguard0 是 Go 堆栈增长序言中比较的堆栈指针。
//正常情况下是stack.lo+StackGuard,但也可以是StackPreempt来触发抢占。
// stackguard1 是在 C 堆栈增长序言中比较的堆栈指针。在 g0 和 gsignal 堆栈上是 stack.lo+StackGuard。它在其他 goroutine 堆栈上为 ~0,以触发对 morestackc 的调用(并崩溃)。
stack stack // offset known to runtime/cgo ==> goroutine 使用的栈
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
// goroutine 的运行现场
sched gobuf
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc ==> if status==Gsyscall, syscallsp = sched.sp 在 gc 期间使用
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc ==> if status==Gsyscall, syscallpc = sched.pc 在 gc 期间使用
stktopsp uintptr // expected sp at top of stack, to check in traceback ==> 堆栈顶部的预期 sp,以检查回溯
param unsafe.Pointer // passed parameter on wakeup ==> 唤醒时传递参数
atomicstatus uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
goid int64
schedlink guintptr
waitsince int64 // approx time when the g become blocked ==> g 被阻塞的大约时间
waitreason waitReason // if status==Gwaiting
// 抢占调度标志。这个为 true 时,stackguard0 等于 stackpreempt
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt ==> 抢占信号,复制 stackguard0 = stackpreempt
preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule ==> 抢占时过渡到_Gpreempted;否则,只需取消计划
preemptShrink bool // shrink stack at synchronous safe point ==> 在同步安全点收缩堆栈
// asyncSafePoint is set if g is stopped at an asynchronous
// safe point. This means there are frames on the stack
// without precise pointer information. ==> 如果 g 在异步安全点停止,则设置 asyncSafePoint。 这意味着堆栈中存在没有精确指针信息的帧。
asyncSafePoint bool
paniconfault bool // panic (instead of crash) on unexpected fault address
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
throwsplit bool // must not split stack
// activeStackChans indicates that there are unlocked channels
// pointing into this goroutine's stack. If true, stack
// copying needs to acquire channel locks to protect these
// areas of the stack. ===> activeStackChans 表示有指向此 goroutine 堆栈的未锁定通道。 如果为 true,堆栈复制需要获取通道锁以保护堆栈的这些区域。
activeStackChans bool
// parkingOnChan indicates that the goroutine is about to
// park on a chansend or chanrecv. Used to signal an unsafe point
// for stack shrinking. It's a boolean value, but is updated atomically.
parkingOnChan uint8
raceignore int8 // ignore race detection events
sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
sysexitticks int64 // cputicks when syscall has returned (for tracing)
traceseq uint64 // trace event sequencer
tracelastp puintptr // last P emitted an event for this goroutine
// 如果调用了 LockOsThread,那么这个 g 会绑定到某个 m 上
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr // pc of go statement that created this goroutine ===> 创建该 goroutine 的语句的指令地址
ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
// goroutine 函数的指令地址
startpc uintptr // pc of goroutine function
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
// time.Sleep 缓存的定时器
timer *timer // cached timer for time.Sleep
selectDone uint32 // are we participating in a select and did someone win the race?
// Per-G GC state
// gcAssistBytes is this G's GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}
G结构体下嵌套了两个重要的结构体
stack stack 表示 goroutine 运行时的栈
// Stack describes a Go execution stack.
// The bounds of the stack are exactly [lo, hi),
// with no implicit data structures on either side.\
// ==> Stack 描述了一个 Go 执行栈。栈的边界正好是 [lo, hi),两边都没有隐含的数据结构。
type stack struct {
lo uintptr ==> // 栈顶,低地址
hi uintptr ==> 栈低,高地址
}
gobuf
Goroutine 运行时,光有栈还不行,至少还得包括 PC,SP 等寄存器,gobuf 就保存了这些值:
type gobuf struct {
// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
//
// ctxt is unusual with respect to GC: it may be a
// heap-allocated funcval, so GC needs to track it, but it
// needs to be set and cleared from assembly, where it's
// difficult to have write barriers. However, ctxt is really a
// saved, live register, and we only ever exchange it between
// the real register and the gobuf. Hence, we treat it as a
// root during stack scanning, which means assembly that saves
// and restores it doesn't need write barriers. It's still
// typed as a pointer so that any other writes from Go get
// write barriers.
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret sys.Uintreg
lr uintptr
bp uintptr // for GOEXPERIMENT=framepointer
}
M
当 M 没有工作可做的时候,在它休眠前,会“自旋”地来找工作:检查全局队列,查看 network poller,试图执行 gc 任务,或者“偷”工作。
type m struct {
g0 *g // goroutine with scheduling stack ==>带有调度堆栈的 goroutine
morebuf gobuf // gobuf arg to morestack ==>gobuf arg 到 morestack
divmod uint32 // div/mod denominator for arm - known to liblink ==>arm 的 div/mod 分母 - liblink 已知
// Fields not known to debuggers. ==>调试器不知道的字段。
procid uint64 // for debuggers, but offset not hard-coded ==>用于调试器,但偏移量不是硬编码的
gsignal *g // signal-handling g ==>信号处理g
goSigStack gsignalStack // Go-allocated signal handling stack ==>Go 分配的信号处理堆栈
sigmask sigset // storage for saved signal mask ==>存储已保存的信号掩码
tls [6]uintptr // thread-local storage (for x86 extern register) ==>线程本地存储(用于 x86 外部寄存器)
mstartfn func()
curg *g // current running goroutine ==>当前运行的 goroutine
caughtsig guintptr // goroutine running during fatal signal ==>goroutine 在致命信号期间运行
p puintptr // attached p for executing go code (nil if not executing go code) ==>附加 p 用于执行 go 代码(如果不执行 go 代码,则为零)
nextp puintptr
oldp puintptr // the p that was attached before executing a syscall ==> 在执行系统调用之前附加的 p
id int64
mallocing int32
throwing int32
preemptoff string // if != "", keep curg running on this m ==> 如果 != "", 继续在这个 m 上运行 curg
locks int32
dying int32
profilehz int32
spinning bool // m is out of work and is actively looking for work ==> m 失业了,正在积极找工作
blocked bool // m is blocked on a note ==> m 在笔记上被屏蔽
newSigstack bool // minit on C thread called sigaltstack ==> C 线程上的 minit 称为 sigaltstack
printlock int8
incgo bool // m is executing a cgo call ==> m 正在执行一个 cgo 调用
freeWait uint32 // if == 0, safe to free g0 and delete m (atomic) ==>if == 0,可以安全地释放 g0 并删除 m(原子)
fastrand [2]uint32
needextram bool
traceback uint8
ncgocall uint64 // number of cgo calls in total ==> cgo 调用总数
ncgo int32 // number of cgo calls currently in progress ==>当前正在进行的 cgo 调用数
cgoCallersUse uint32 // if non-zero, cgoCallers in use temporarily ==>如果非零,则暂时使用 cgoCallers
cgoCallers *cgoCallers // cgo traceback if crashing in cgo call ==>如果在 cgo 调用中崩溃,则 cgo 回溯
park note
alllink *m // on allm
schedlink muintptr
mcache *mcache
lockedg guintptr
createstack [32]uintptr // stack that created this thread. ==> 创建此线程的堆栈。
lockedExt uint32 // tracking for external LockOSThread ==> 跟踪外部 LockOSThread
lockedInt uint32 // tracking for internal lockOSThread ==> 跟踪内部 lockOSThread
nextwaitm muintptr // next m waiting for lock ==> 下一个等待锁定
waitunlockf func(*g, unsafe.Pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
freelink *m // on sched.freem ==> 在 sched.freem 上
// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions. ==> mFixup 用于同步 OS 相关的 m 状态(凭据等)使用互斥量访问。
libcall libcall
libcallpc uintptr // for cpu profiler ==> 用于 CPU 分析器
libcallsp uintptr
libcallg guintptr
syscall libcall // stores syscall parameters on windows ==> 在 Windows 上存储系统调用参数
vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call) ==> 在 VDSO 调用中用于回溯的 SP(如果不在调用中,则为 0)
vdsoPC uintptr // PC for traceback while in VDSO call ==> 在 VDSO 调用中用于回溯的 PC
// preemptGen counts the number of completed preemption
// signals. This is used to detect when a preemption is
// requested, but fails. Accessed atomically.
// ===> preemptGen 统计完成抢占的次数 信号。这用于检测何时发生抢占请求,但失败。以原子方式访问。
preemptGen uint32
// Whether this is a pending preemption signal on this M.
// Accessed atomically. ===>这是否是此 M 上的未决抢占信号。原子访问。
signalPending uint32
dlogPerM
mOS
}
P
为 M 的执行提供“上下文”,保存 M 执行 G 时的一些资源,例如本地可运行 G 队列,memeory cache 等。
一个 M 只有绑定 P 才能执行 goroutine,当 M 被阻塞时,整个 P 会被传递给其他 M ,或者说整个 P 被接管。
type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call ==>在每次调度程序调用时递增
syscalltick uint32 // incremented on every system call ==>在每次系统调用时递增
sysmontick sysmontick // last tick observed by sysmon ==>sysmon 观察到的最后一个滴答声
m muintptr // back-link to associated m (nil if idle) ==>反向链接到关联的 m(如果空闲则为零)
mcache *mcache
pcache pageCache
raceprocctx uintptr
deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) ==>不同大小的可用延迟结构池(参见 panic.go)
deferpoolbuf [5][32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. ==>goroutine id 的缓存,分摊对 runtime·sched.goidgen 的访问。
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock. ==>可运行的 goroutine 队列。无锁访问。
runqhead uint32
runqtail uint32
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
// runnext,如果非零,是一个可运行的 G,它已由当前 G 准备好,如果在运行 G 的时间片中有剩余时间,则应该运行下一个而不是 runq 中的内容。
//它将继承当前时间片中剩余的时间。 如果一组 goroutine 被锁定在一个通信和等待模式,这种调度设置为一个单元,并消除了(可能很大)调度延迟,
//否则由于将准备好的 goroutine 添加到运行队列的末尾而产生的调度延迟。
runnext guintptr
// Available G's (status == Gdead) ==> 可用 G(状态 == Gdead)
gFree struct {
gList
n int32
}
sudogcache []*sudog
sudogbuf [128]*sudog
// Cache of mspan objects from the heap. ==>从堆缓存 mspan 对象。
mspancache struct {
// We need an explicit length here because this field is used
// in allocation codepaths where write barriers are not allowed,
// and eliminating the write barrier/keeping it eliminated from
// slice updates is tricky, moreso than just managing the length
// ourselves. ==> 我们在这里需要一个明确的长度,因为这个字段用于不允许写屏障的分配代码路径中,并且消除写屏障/保持它从切片更新中消除是棘手的,不仅仅是我们自己管理长度。
len int
buf [128]*mspan
}
tracebuf traceBufPtr
// traceSweep indicates the sweep events should be traced.
// This is used to defer the sweep start event until a span
// has actually been swept. ==> traceSweep 指示应该跟踪扫描事件。这用于推迟扫描开始事件,直到实际扫描了跨度。
traceSweep bool
// traceSwept and traceReclaimed track the number of bytes
// swept and reclaimed by sweeping in the current sweep loop. ==> traceSwept 和 traceReclaimed 跟踪在当前扫描循环中通过扫描扫描和回收的字节数。
traceSwept, traceReclaimed uintptr
palloc persistentAlloc // per-P to avoid mutex ==> per-P 避免互斥
_ uint32 // Alignment for atomic fields below ==> 下面的原子场对齐
// The when field of the first entry on the timer heap.
// This is updated using atomic functions.
// This is 0 if the timer heap is empty. ==> 定时器堆上第一个条目的 when 字段。这是使用原子函数更新的。 如果计时器堆为空,则为 0。
timer0When uint64
// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
gcBgMarkWorker guintptr // (atomic)
gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which this mark
// worker started.
gcMarkWorkerStartTime int64
// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
//==> gcw 是这个 P 的 GC 工作缓冲区缓存。工作缓冲区是由写屏障填充,由增变器辅助耗尽,以及处理某些 GC 状态转换。
gcw gcWork
// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
//锁定计时器。我们通常在运行时访问计时器在这个 P 上,但调度器也可以从不同的 P 上做。
timersLock mutex
// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
// ==> 在某个时间采取的行动。 这是用来实现标准库的时间包的。必须持有timersLock才能访问。
timers []*timer
// Number of timers in P's heap.
// Modified using atomic instructions.
numTimers uint32
// Number of timerModifiedEarlier timers on P's heap.
// This should only be modified while holding timersLock,
// or while the timer status is in a transient state
// such as timerModifying.
//==> P 堆上的 timerModifiedEarlier 定时器的数量。这应该只在持有 timersLock 时修改,或者当定时器状态处于诸如 timerModifying 的瞬态状态时。
adjustTimers uint32
// Number of timerDeleted timers in P's heap.
// Modified using atomic instructions.
//==> P 堆中 timerDeleted 定时器的数量。 使用原子指令修改。
deletedTimers uint32
// Race context used while executing timer functions. ==>执行计时器功能时使用的竞态上下文。
timerRaceCtx uintptr
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
//==>preempt 设置为指示该 P 应尽快进入调度程序(无论 G 在其上运行什么)。
preempt bool
pad cpu.CacheLinePad
}
保存调度器的状态信息、全局的可运行 G 队列等.
schedt 对象只有一份实体,它维护了调度器的所有信息
type schedt struct {
// accessed atomically. keep at top to ensure alignment on 32-bit systems.
goidgen uint64
lastpoll uint64 // time of last network poll, 0 if currently polling
pollUntil uint64 // time to which current poll is sleeping
lock mutex
// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
// sure to call checkdead().
midle muintptr // idle m's waiting for work ==> 由空闲的工作线程组成的链表
nmidle int32 // number of idle m's waiting for work ==>空闲的工作线程数量
nmidlelocked int32 // number of locked m's waiting for work ==> 空闲的且被 lock 的 m 计数
mnext int64 // number of m's that have been created and next M ID
maxmcount int32 // maximum number of m's allowed (or die) ==> 表示最多所能创建的工作线程数量
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's
ngsys uint32 // number of system goroutines; updated atomically ==> goroutine 的数量,自动更新
pidle puintptr // idle p's ==> 由空闲的 p 结构体对象组成的链表
npidle uint32 // ==> 空闲的 p 结构体对象的数量
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue. ==>全局可运行的 G队列
runq gQueue
runqsize int32 // ==> // 元素数量
// disable controls selective disabling of the scheduler.
//
// Use schedEnableUser to control this.
//
// disable is protected by sched.lock.
disable struct {
// user disables scheduling of user goroutines.
user bool
runnable gQueue // pending runnable Gs
n int32 // length of runnable
}
// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
// Central cache of sudog structs. ==>/ sudog 结构的集中缓存
sudoglock mutex
sudogcache *sudog
// Central pool of available defer structs of different sizes.
deferlock mutex // ==>不同大小的可用的 defer struct 的集中缓存池
deferpool [5]*_defer
// freem is the list of m's waiting to be freed when their
// m.exited is set. Linked through m.freelink.
freem *m
gcwaiting uint32 // gc is waiting to run
stopwait int32
stopnote note
sysmonwait uint32
sysmonnote note
// safepointFn should be called on each P at the next GC
// safepoint if p.runSafePointFn is set.
safePointFn func(*p)
safePointWait int32
safePointNote note
profilehz int32 // cpu profiling rate
procresizetime int64 // nanotime() of last change to gomaxprocs ==> 上次修改 gomaxprocs 的纳秒时间
totaltime int64 // ∫gomaxprocs dt up to procresizetime
}
线程的几种状态
就绪 、 等待、 运行 ===> goroutine 不管再怎么运行,状态归类也会这么几类
状态 | 解释 |
---|---|
Waiting | 等待状态。线程在等待某件事的发生。例如等待网络数据、硬盘;调用操作系统 API;等待内存同步访问条件 ready,如 atomic, mutexes |
Runnable | 就绪状态。只要给 CPU 资源我就能运行 |
Executing | 运行状态。线程在执行指令,这是我们想要的 |
goroutine几种状态
- 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting、_Gsyscall 和 _Gpreempted 几个状态;
- 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即 _Grunnable;
- 运行中:Goroutine 正在某个线程上运行,即 _Grunning;
goroutine和thread的区别
区分项 | thread | go |
---|---|---|
内存空间 | 1M | 2KB |
耗费时间 | 线程切换会消耗 1000-1500 纳秒 | 200ns |
切换所需寄存器 | 16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc. | Program Counter, Stack Pointer and BP |
go的关键词整理
g0 :Go 必须在每个正在运行的线程上调度和管理 goroutine。这个角色被委托给一个特殊的 goroutine,称为 g0,它是为每个 OS 线程创建的第一个 goroutine
https://medium.com/a-journey-with-go/go-g0-special-goroutine-8c778c6704d8
go schedule 整体流程
go 调度过程中的函数讲解
Go 程序的入口地址在 linux 上运行的,入口文件为 src/runtime/rt0_linux_amd64.s -_rt0_amd64_linux ==>/usr/local/go/src/runtime/asm_amd64.s -_rt0_amd64->TEXT runtime·rt0_go(SB),NOSPLIT,$0
TEXT runtime·rt0_go(SB),NOSPLIT,$0 ===>调度器运行始末
TEXT runtime·rt0_go(SB),NOSPLIT,$0
// copy arguments forward on an even stack
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto
ANDQ $~15, SP
MOVQ AX, 16(SP) //==> argc 放在 SP+16 字节处
MOVQ BX, 24(SP) //==> argv 放在 SP+24 字节处
// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
MOVQ $runtime·g0(SB), DI //===> 把 g0 的地址存入 DI
LEAQ (-64*1024+104)(SP), BX
MOVQ BX, g_stackguard0(DI)
MOVQ BX, g_stackguard1(DI)
MOVQ BX, (g_stack+stack_lo)(DI)
MOVQ SP, (g_stack+stack_hi)(DI)
// find out information about the processor we're on
MOVL $0, AX
CPUID
MOVL AX, SI
CMPL AX, $0
JE nocpuinfo
// Figure out how to serialize RDTSC.
// On Intel processors LFENCE is enough. AMD requires MFENCE.
// Don't know about the rest, so let's do MFENCE.
CMPL BX, $0x756E6547 // "Genu"
JNE notintel
CMPL DX, $0x49656E69 // "ineI"
JNE notintel
CMPL CX, $0x6C65746E // "ntel"
JNE notintel
MOVB $1, runtime·isIntel(SB)
MOVB $1, runtime·lfenceBeforeRdtsc(SB)
notintel:
// Load EAX=1 cpuid flags
MOVL $1, AX
CPUID
MOVL AX, runtime·processorVersionInfo(SB)
nocpuinfo:
// if there is an _cgo_init, call it.
MOVQ _cgo_init(SB), AX
TESTQ AX, AX
JZ needtls
// arg 1: g0, already in DI
MOVQ $setg_gcc<>(SB), SI // arg 2: setg_gcc
#ifdef GOOS_android
MOVQ $runtime·tls_g(SB), DX // arg 3: &tls_g
// arg 4: TLS base, stored in slot 0 (Android's TLS_SLOT_SELF).
// Compensate for tls_g (+16).
MOVQ -16(TLS), CX
#else
MOVQ $0, DX // arg 3, 4: not used when using platform's TLS
MOVQ $0, CX
#endif
#ifdef GOOS_windows
// Adjust for the Win64 calling convention.
MOVQ CX, R9 // arg 4
MOVQ DX, R8 // arg 3
MOVQ SI, DX // arg 2
MOVQ DI, CX // arg 1
#endif
CALL AX
// update stackguard after _cgo_init
MOVQ $runtime·g0(SB), CX
MOVQ (g_stack+stack_lo)(CX), AX
ADDQ $const__StackGuard, AX
MOVQ AX, g_stackguard0(CX)
MOVQ AX, g_stackguard1(CX)
#ifndef GOOS_windows
JMP ok
#endif
needtls:
#ifdef GOOS_plan9
// skip TLS setup on Plan 9
JMP ok
#endif
#ifdef GOOS_solaris
// skip TLS setup on Solaris
JMP ok
#endif
#ifdef GOOS_illumos
// skip TLS setup on illumos
JMP ok
#endif
#ifdef GOOS_darwin
// skip TLS setup on Darwin
JMP ok
#endif
//=====> runtime 会启动多个工作线程,每个线程都会绑定一个 m0 初始化m0
//=====> TLS 就是线程本地的私有的全局变量
LEAQ runtime·m0+m_tls(SB), DI
CALL runtime·settls(SB)
// store through it, to make sure it works
get_tls(BX) //===> 获取 fs 段基址并放入 BX 寄存器,其实就是 m0.tls[1] 的地址,get_tls 的代码由编译器生成
MOVQ $0x123, g(BX)
MOVQ runtime·m0+m_tls(SB), AX
CMPQ AX, $0x123
JEQ 2(PC)
CALL runtime·abort(SB)
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX) //====> 获取 fs 段基址到 BX 寄存器
LEAQ runtime·g0(SB), CX //====> 将 g0 的地址存储到 CX,CX = &g0
MOVQ CX, g(BX) //====> 把 g0 的地址保存在线程本地存储里面,也就是 m0.tls[0]=&g0
LEAQ runtime·m0(SB), AX //====> 将 m0 的地址存储到 AX,AX = &m0
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
CLD // convention is D is always left cleared
CALL runtime·check(SB)
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB) //====> 初始化系统核心数
CALL runtime·schedinit(SB) //====> 调度器初始化
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX //====> newproc 的第二个参数入栈,也就是新的 goroutine 需要执行的函数
PUSHQ $0 // arg size //====> newproc 的第一个参数入栈,该参数表示 runtime.main 函数需要的参数大小, 因为 runtime.main 没有参数,所以这里是 0
//====> // 创建 main goroutine
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M ====>主线程进入调度循环,运行刚刚创建的 goroutine
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RET
// Prevent dead-code elimination of debugCallV1, which is
// intended to be called by debuggers. //====> 永远不会返回,万一返回了,crash 掉
MOVQ $runtime·debugCallV1(SB), AX
RET
go调度的时机
- 主动挂起 — runtime.gopark -> runtime.park_m
- 系统调用 — runtime.exitsyscall -> runtime.exitsyscall0
- 协作式调度 — runtime.Gosched -> runtime.gosched_m -> runtime.goschedImpl
- 系统监控 — runtime.sysmon -> runtime.retake -> runtime.preemptone
reference
用 GODEBUG 看调度跟踪 https://blog.csdn.net/RA681t58CJxsgCkJ31/article/details/99785562
go -scheduler 剖析
上手小知识
线程切换
函数调用过程分析
CALL 指令类似 PUSH IP 和 JMP somefunc 两个指令的组合,首先将当前的 IP 指令寄存器的值压入栈中,然后通过 JMP 指令将要调用函数的地址写入到 IP 寄存器实现跳转。
而 RET 指令则是和 CALL 相反的操作,基本和 POP IP 指令等价,也就是将执行 CALL 指令时保存在 SP 中的返回地址重新载入到 IP 寄存器,实现函数的返回。
goroutine
什么是goroutine
goroutine 是thread的一层抽象,更加轻量级,可以单独执行。线程是调度的基本单位。
goroutine 和 thread的区别
参考资料【How Goroutines Work】https://blog.nindalf.com/posts/how-goroutines-work/)从三个角度进行区别: 内存消耗、 创建和销毁、 切换
内存消耗
创建一个 goroutine 的栈内存消耗为 2 KB,实际运行过程中,如果栈空间不够用,会自动进行扩容。创建一个 thread 则需要消耗 1 MB 栈内存,而且还需要一个被称为 “a guard page” 的区域用于和其他 thread 的栈空间进行隔离。
创建和销毁
Thread 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池。而 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。
切换
当 threads 切换时,需要保存各种寄存器,以便将来恢复:
16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc.
而 goroutines 切换只需保存三个寄存器:Program Counter, Stack Pointer and BP。
一般而言,线程切换会消耗 1000-1500 纳秒,一个纳秒平均可以执行 12-18 条指令。所以由于线程切换,执行指令的条数会减少 12000-18000。
Goroutine 的切换约为 200 ns,相当于 2400-3600 条指令。
因此,goroutines 切换成本比 threads 要小得多。
对比项 | 线程(thread) | 协程(goroutine) |
---|---|---|
内存消耗 | 1M | 2KB |
创建和销毁 | 内核级-使用线程池解决 | 用户级 |
切换 | 16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc. 16个通用寄存器、PC(程序计数器)、SP(堆栈指针)、段寄存器、16个XMM寄存器、FP协处理器状态、16个AVX寄存器、所有MSR等 |
切换消耗 1000-1500纳秒, 一纳秒平均执行
12-18条指令。线程切换执行指令为 121000-181000 | Program Counter, Stack Pointer and BP
goroutine 切换为200ns。 goroutine执行指令为 20012-20018 |
在同一时刻,一个线程上只能跑一个 goroutine。
go-scheduler
go程序执行由两层组成:GO Program,Runtime.及用户程序和运行时。 通过函数调用来实现内存管理、channel通信、goroutines创建等功能。
为什么要scheduler
runtime维护所有的goroutines,通过scheduler进行调度。 goroutines和thread是独立的。但是goroutine要依赖threads才能执行。
scheduler 底层原理
通过G、M、P 结构体来实现goroutine的调度。
g 代表一个 goroutine,它包含:表示 goroutine 栈的一些字段,指示当前 goroutine 的状态,指示当前运行到的指令地址,也就是 PC 值。
m 表示内核线程,包含正在运行的 goroutine 等字段。
p 代表一个虚拟的 Processor,它维护一个处于 Runnable 状态的 g 队列, m 需要获得 p 才能运行 g。
还有一个核心的结构体 sched。用来进行调度。
Runtime 起始时会启动一些 G:垃圾回收的 G,执行调度的 G,运行用户代码的 G;并且会创建一个 M 用来开始 G 的运行。随着时间的推移,更多的 G 会被创建出来,更多的 M 也会被创建出来。
Go scheduler 的核心思想是:
- reuse threads;
- 限制同时运行(不包含阻塞)的线程数为 N,N 等于 CPU 的核心数目;
- 线程私有的 runqueues,并且可以从其他线程 stealing goroutine 来运行,线程阻塞后,可以将 runqueues 传递给其他线程。
为什么要引入P
您现在可能想知道,为什么要有上下文?我们不能把运行队列放在线程上并摆脱上下文吗?并不真地。我们拥有上下文的原因是,如果正在运行的线程由于某种原因需要阻塞,我们可以将它们(P)交给其他线程。
我们需要阻塞的一个例子是当我们调用系统调用时。由于线程不能同时执行代码并在系统调用上被阻塞,我们需要交出上下文以便它可以继续调度。
当一个线程阻塞的时候,将和它绑定的 P 上的 goroutines 转移到其他线程。
Go scheduler 会启动一个后台线程 sysmon,用来检测长时间(超过 10 ms)运行的 goroutine,将其调度到 global runqueues。这是一个全局的 runqueue,优先级比较低,以示惩罚。
go scheduler调度的顺序
runtime.schedule() {
// only 1/61 of the time, check the global runnable queue for a G.
// if not found, check the local queue.
// if not found,
// try to steal from other Ps.
// if not, check the global runnable queue.
// if not found, poll network.
}
runtime.schedule() {
// 只有 1/61 的时间,检查全局可运行队列中的 G。
// 如果没有找到,检查本地队列。
// 如果没有找到,
// 尝试从其他 Ps 中窃取。
// 如果没有,检查全局可运行队列。
// 如果没有找到,轮询网络。
}
汇编流程细节
TEXT runtime·rt0_go(SB),NOSPLIT,$0
...
...
主线程绑定m0
LEAQ runtime·m0+m_tls(SB), DI
CALL runtime·settls(SB)
// store through it, to make sure it works
get_tls(BX)
MOVQ $0x123, g(BX)
MOVQ runtime·m0+m_tls(SB), AX
CMPQ AX, $0x123
JEQ 2(PC)
CALL runtime·abort(SB)
m0 是全局变量,而 m0 又要绑定到工作线程才能执行。runtime 会启动多个工作线程,每个线程都会绑定一个 m0。
TLS 就是线程本地的私有的全局变量。
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
通过 m.tls[0] 可以找到 g0,通过 g0 可以找到 m0(通过 g 结构体的 m 字段)。并且,通过 m 的字段 g0,m0 也可以找到 g0。于是,主线程和 m0,g0 就关联起来了。
保存在主线程本地存储中的值是 g0 的地址,也就是说工作线程的私有全局变量其实是一个指向 g 的指针而不是指向 m 的指针。 目前这个指针指向g0,表示代码正运行在 g0 栈。
初始化m0
src/runtime/proc.go/schedinit()/mcommoninit()
初始化allp
allp是全局的p, 从schedinit()入口进入,然后初始化系统核数个的p,然后调用 procresize() 函数进行 P0和m0关联
procesize()函数中循环遍历所有的allp,将 将第一个p0和m0关联,其他的通过runqempty() 函数判断是否是空闲的P,然后通过放入全局空闲链表pidleput(p)
- 使用 make([]p, nprocs) 初始化全局变量 allp,即 allp = make([]p, nprocs)
- 循环创建并初始化 nprocs 个 p 结构体对象并依次保存在 allp 切片之中
- 把 m0 和 allp[0] 绑定在一起,即 m0.p = allp[0],allp[0].m = m0
- 把除了 allp[0] 之外的所有 p 放入到全局变量 sched 的 pidle 空闲队列之中
newproc() 函数 会创建一个新的g来跑fn。 newproc如果不是初始化构建,而是执行函数比如 go func(){}() 本质也是调用newproc函数,
获取当前工作线程的g0,然后通过g0找到绑定的p0, 然后从p上获取一个没有使用的g.
newproc函数传递两个函数,一个是新创建的 goroutine 需要执行的任务,也就是 fn,它代表一个函数 func;还有一个是 fn 的参数大小。
goroutine 的初始栈比较小,只有 2K。可伸缩性。
main goroutine
// 创建一个新的 goroutine 来启动程序
MOVQ $runtime·mainPC(SB), AX // entry
// newproc 的第二个参数入栈,也就是新的 goroutine 需要执行的函数
// AX = &funcval{runtime·main},
PUSHQ AX
// newproc 的第一个参数入栈,该参数表示 runtime.main 函数需要的参数大小,
// 因为 runtime.main 没有参数,所以这里是 0
PUSHQ $0 // arg size
// 创建 main goroutine
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
// 主线程进入调度循环,运行刚刚创建的 goroutine
CALL runtime·mstart(SB)
// 永远不会返回,万一返回了,crash 掉
MOVL $0xf1, 0xf1 // crash
RET
代码前面几行是在为调用 newproc 函数构“造栈”,执行完 runtime·newproc(SB) 后,就会以一个新的 goroutine 来执行 mainPC 也就是 runtime.main()函数。runtime.main() 函数最终会执行到我们写的 main 函数
g0栈和用户栈如何完成切换
src/runtime/proc.go/newproc1()函数中的
// 把newg.sched结构体成员的所有成员设置为0
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
// 设置newg 的sched成员,调度器需要依靠这些字段才能把goroutine调度到CPU上运行
newg.sched.sp = sp
newg.stktopsp = sp
// newg.sched.pc 表示当 newg 被调度起来运行时从这个地址开始执行指令
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
...
runqput(_p_, newg, true)
...
执行完成之后
src/runtime/proc.go/newproc1()函数中的 runqput() 调用的作用详解如下
runqput() 函数详解
// 将 G 放入 p 的本地待运行队列
// runqput 尝试将 g 放到本地可执行队列里。
// 如果 next 为假,runqput 将 g 添加到可运行队列的尾部
// 如果 next 为真,runqput 将 g 添加到 p.runnext 字段
// 如果 run queue 满了,runnext 将 g 放到全局队列里—> runqputslow()函数放入到全局队列
// runnext 成员中的 goroutine 会被优先调度起来运行
runqputslow() 函数讲解
// 将 g 和 p 本地队列的一半 goroutine 放入全局队列。
// 因为要获取锁,所以会慢
runqputslow()函数中调用—> globrunqputbatch(batch[0], batch[n], int32(n+1))
P 的本地可运行队列的长度为 256,它是一个循环队列,因此最多只能放下 256 个 goroutine
go-scheduler开始调度
// start this M
// 主线程进入调度循环,运行刚刚创建的 goroutine
CALL runtime·mstart(SB)
msart()->mstart1()->schedule()
schedule()方法 : 进入调度循环。永不返回
schedule函数详解:
// 执行一轮调度器的工作:找到一个 runnable 的 goroutine,并且执行它
// 永不返回
func schedule() {
// _g_ = 每个工作线程 m 对应的 g0,初始化时是 m0 的 g0
_g_ := getg()
// ……………………
top:
// ……………………
var gp *g
var inheritTime bool
// ……………………
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 为了公平,每调用 schedule 函数 61 次就要从全局可运行 goroutine 队列中获取
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
// 从全局队列最大获取 1 个 gorutine
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从 P 本地获取 G 任务
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
// 从本地运行队列和全局运行队列都没有找到需要运行的 goroutine,
// 调用 findrunnable 函数从其它工作线程的运行队列中偷取,如果偷不到,则当前工作线程进入睡眠
// 直到获取到 runnable goroutine 之后 findrunnable 函数才会返回。
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}
if gp.lockedm != nil {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
// 执行 goroutine 任务函数
// 当前运行的是 runtime 的代码,函数调用栈使用的是 g0 的栈空间
// 调用 execute 切换到 gp 的代码和栈空间去运行
execute(gp, inheritTime)
}
为了公平,调度器每调度 61 次的时候,都会尝试从全局队列里取出待运行的 goroutine 来运行,调用 globrunqget;
如果还没找到,调用 runqget,从 P 本地可运行队列先选出一个可运行的 goroutine;
如果还没找到,就要去其他 P 里面去偷一些 goroutine 来执行,调用 findrunnable 函数。
schedule()中的execute()函数中调用gogo函数
execute()函数将 gp 的状态改为 _Grunning,将 m 和 gp 相互关联起来。最后,调用 gogo 完成从 g0 到 gp 的切换,CPU 的执行权将从 g0 转让到 gp。 gogo 函数用汇编语言写成
原因如下:
gogo 函数也是通过汇编语言编写的,这里之所以需要使用汇编,是因为 goroutine 的调度涉及不同执行流之间的切换。
前面我们在讨论操作系统切换线程时已经看到过,执行流的切换从本质上来说就是 CPU 寄存器以及函数调用栈的切换,然而不管是 go 还是 c 这种高级语言都无法精确控制 CPU 寄存器,因而高级语言在这里也就无能为力了,只能依靠汇编指令来达成目的。
汇编gogo详解
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $16-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
get_tls(CX)
....
第一行,将 gp.sched.g 保存到 DX 寄存器;第二行,我们见得已经比较多了, get_tls 将 tls 保存到 CX 寄存器,再将 gp.sched.g 放到 tls[0] 处。这样,当下次再调用 get_tls 时,取出的就是 gp,而不再是 g0,这一行完成从 g0 栈切换到 gp。
goroutine 从生到死
src/runtime/proc.go/main() 函数详解
// The main goroutine.
func main() {
// g = main goroutine,不再是 g0 了
g := getg()
...
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
// // 创建监控线程,该线程独立于调度器,不需要跟 p 关联即可运行
newm(sysmon, nil)
})
}
...
// 调用 runtime 包的初始化函数,由编译器实现
doInit(&runtime_inittask) // must be before defer
if nanotime() == 0 {
throw("nanotime returning zero")
}
// Defer unlock so that runtime.Goexit during init does the unlock too.
needUnlock := true
defer func() {
if needUnlock {
unlockOSThread()
}
}()
// Record when the world started.
runtimeInitTime = nanotime()
//// 开启垃圾回收器
gcenable()
main_init_done = make(chan bool)
...
// main 包的初始化,递归的调用我们 import 进来的包的初始化函数
doInit(&main_inittask)
close(main_init_done)
// ……………………
...
// 调用 main.main 函数
fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
fn()
if raceenabled {
racefini()
}
// 进入系统调用,退出进程,可以看出 main goroutine 并未返回,而是直接进入系统调用退出进程了
exit(0)
// 保护性代码,如果 exit 意外返回,下面的代码会让该进程 crash 死掉
for {
var x *int32
*x = 0
}
}
从流程图可知,main goroutine 执行完之后就直接调用 exit(0) 退出了,这会导致整个进程退出,太粗暴了。
不过,main goroutine 实际上就是代表用户的 main 函数,它都执行完了,肯定是用户的任务都执行完了,直接退出就可以了,就算有其他的 goroutine 没执行完,同样会直接退出。
主main.goroutine退出直接调用exit()退出了 。
如果是非main.goroutine退出则直接进行在 src/proc.go/newproc1()函数中调用汇编函数goexit(), 简称gp的退出。
汇编函数goexit()函数中调用src/proc.go/goexit1()函数,goexit1()函数中调用汇编函数mcall()函数。
mcall 函数里边调用goexit0()函数
goexit0函数详解
// goexit continuation on g0.
//// 在 g0 上执行
func goexit0(gp *g) {
//// g0
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
//// 清空 gp 的一些字段
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
......
// Note that gp's stack scan is now "valid" because it has no
// stack.
gp.gcscanvalid = true
//// 解除 g 与 m 的关系
dropg()
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
//// 将 g 放入 free 队列缓存起来
gfput(_g_.m.p.ptr(), gp)
.......
schedule()
}
函数主要步骤解析
- 把 g 的状态从 _Grunning 更新为 _Gdead;
- 清空 g 的一些字段;
- 调用 dropg 函数解除 g 和 m 之间的关系,其实就是设置 g->m = nil, m->currg = nil;
- 把 g 放入 p 的 freeg 队列缓存起来供下次创建 g 时快速获取而不用从内存分配。freeg 就是 g 的一个对象池;
- 调用 schedule 函数再次进行调度。
gp完成之后放入到goroutine 缓存池,待下次任务重新启用。
而工作线程,又继续调用 schedule 函数进行新一轮的调度,整个过程形成了一个循环。
main goroutine 和普通 goroutine 的退出过程:
对于 main goroutine,在执行完用户定义的 main 函数的所有代码后,直接调用 exit(0) 退出整个进程,非常霸道。
对于普通 goroutine 则没这么“舒服”,需要经历一系列的过程。先是跳转到提前设置好的 goexit 函数的第二条指令,然后调用 runtime.goexit1,接着调用 mcall(goexit0),而 mcall 函数会切换到 g0 栈,运行 goexit0 函数,清理 goroutine 的一些字段,并将其添加到 goroutine 缓存池里,然后进入 schedule 调度循环。到这里,普通 goroutine 才算完成使命。
M如何找工作,如何窃取调度其他及全局的队列的goroutine
一从全局队列窃取
二从本地队列获取
三从其他地方找goroutine -其他P中窃取、从本地或全局队列中获取g、轮询网络
src/runtime/proc.go/findrunnable(()函数详解
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
//......
//从本地队列获取
// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 从全局队列获取
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Steal work from other P's.
// 如果其他的 P 都处于空闲状态,那肯定没有其他工作要做
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
////如果有很多工作线程在找工作,那我就停下休息。避免消耗太多 CPU
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
//// 设置自旋状态为true
_g_.m.spinning = true
////自旋状态数加1
atomic.Xadd(&sched.nmspinning, 1)
}
////从其他P的本地运行队列盗取goroutine
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
//窃取函数
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
//......
// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
//// 当前工作线程解除与P之间的绑定,准备去休眠
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
////把p放入空闲队列
pidleput(_p_)
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
//// m即将睡眠,不在处于自旋
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
////休眠之气前再检查一下所有的p,看一下是否有工作要做
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
////......
//// 休眠
stopm()
goto top
}
runsteal窃取函数详解
// 从P2偷走一半的工作放到_p_的本地
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
// 队尾
t := _p_.runqtail
// 从p2偷取工作,放到_p_.runq的队尾
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
//// 找到最后一个g,准备返回
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
//说明只偷了一个g
return gp
}
//// 队列头
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
////判断是否偷太多了
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
////更新队尾,将偷来的工作加入队列
atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
调用 runqgrab 从 p2 偷走它一半的工作放到 p 本地。runqgrab 函数将从 p2 偷来的工作放到以 t 为地址的数组里,数组就是 p.runq。
窃取函数findrunnable()函数从其他地方找goroutine来执行。
findrunnable()函数中调用—>globrunqget()函数从全局队列获取—>检查其他的P是否都处于空闲状态,那肯定也没有工作—>如果现在有很多工作线程找工作,那么就停下来休息,避免消耗太多的CPU —>然后从其他P的本地运行队列窃取goroutine(外层循环次数,内层窃取次序)通过runqsteal()函数窃取 —>runsteal()函数中再调用runqgrab函数窃取一半的工作放到p本地—>然后返回findrunnable()函数执行stop代码块->执行releasep()函数接触p与m的关联—>执行stopm()函数 休眠,停止执行工作,直到有新工作需要做为止。
acquirep()函数的作用:
调用 acquirep(p) 绑定获取到的 p 和 m,主要的动作就是设置 p 的 m 字段,更改 p 的工作状态为 _Prunning,并且设置 m 的 p 字段。做完这些之后,再次进入 top 代码段,再走一遍之前找工作的过程。
// Stops execution of the current m until new work is available.
// Returns with acquired P.
////休眠,停止执行工作,直到有新的工作需要做为止
func stopm() {
//// 当前goroutine,g0
_g_ := getg()
// .......
// 将 m 放到全局空闲链表里去
mput(_g_.m)
unlock(&sched.lock)
//进入到睡眠状态
notesleep(&_g_.m.park)
//这里呗其他工作线程唤醒
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
最后m 找不到工作就会休眠,当其他线程发现有工作要做时,就会找到空闲的m,再通过m.park字段唤醒本线程。唤醒之后再回到findrunnable()函数,继续寻找goroutine,然后返回schedule函数。然后就会去运行找到的goroutine。
系统监控sysmon线程做了什么?
在 runtime.main() 函数中,执行 runtime_init() 前,会启动一个 sysmon 的监控线程,执行后台监控任务:
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
////创建监控线程,该线程独立于调度器,不需要跟 p 关联即可运行
newm(sysmon, nil)
})
}
sysmon()函数做了什么
sysmon 执行一个无限循环,一开始每次循环休眠 20us,之后(1 ms 后)每次休眠时间倍增,最终每一轮都会休眠 10ms。
sysmon 中会进行 netpool(获取 fd 事件)、retake(抢占)、forcegc(按时间强制执行 gc),scavenge heap(释放自由列表中多余的项减少内存占用)等处理。
sysmon线程调度总结如下:
- 抢占处于系统调用的 P,让其他 m 接管它,以运行其他的 goroutine。
- 将运行时间过长的 goroutine 调度出去,给其他 goroutine 运行的机会。
调度相关的函数只需要关心retake函数,retake函数中针对处于_Psycall和_Pruning状态的p进行抢占。