go运行时调度器做什么?

Go 调度器可以决定只在它知道内存一致的点进行调度。这意味着当我们停止垃圾收集时,我们只需要等待 CPU 内核上正在运行的线程。

讲解顺序:

小->大->详细
小:只是讲下 go-schedule的发展历史 及 所谓的GMP的结构
大:粗略的讲schedule的整个函数调用流程 函数到函数的调用
详细:具体函数的作用,以及gmp这几个结构的流转

前生今世gmp

gmp的历史

  • 单线程调度器 · 0.x
    • 只包含 40 多行代码;
    • 程序中只能存在一个活跃线程,由 G-M 模型组成;
  • 多线程调度器 · 1.0
    • 允许运行多线程的程序;
    • 全局锁导致竞争严重;
  • 任务窃取调度器 · 1.1
    • 引入了处理器 P,构成了目前的 G-M-P 模型;
    • 在处理器 P 的基础上实现了基于工作窃取的调度器;
    • 在某些情况下,Goroutine 不会让出线程,进而造成饥饿问题;
    • 时间过长的垃圾回收(Stop-the-world,STW)会导致程序长时间无法工作;
  • 抢占式调度器 · 1.2 ~ 至今
    • 基于协作的抢占式调度器 - 1.2 ~ 1.13
      • 通过编译器在函数调用时插入抢占检查指令,在函数调用时检查当前 Goroutine 是否发起了抢占请求,实现基于协作的抢占式调度;
      • Goroutine 可能会因为垃圾回收和循环长时间占用资源导致程序暂停;
    • 基于信号的抢占式调度器 - 1.14 ~ 至今
      • 实现基于信号的真抢占式调度
      • 垃圾回收在扫描栈时会触发抢占调度;
      • 抢占的时间点不够多,还不能覆盖全部的边缘情况;
  • 非均匀存储访问调度器 · 提案
    • 对运行时的各种资源进行分区;
    • 实现非常复杂,到今天还没有提上日程;

image.png

go1.13版本之前的一个bug

  1. func main() {
  2. var x int
  3. threads := runtime.GOMAXPROCS(0)
  4. for i := 0; i < threads; i++ {
  5. go func() {
  6. for { x++ }
  7. }()
  8. }
  9. time.Sleep(time.Second)
  10. fmt.Println("x =", x)
  11. }

GMP 模型

此代码分析来自于go go1.14.15

G

  1. type g struct {
  2. // Stack parameters.
  3. // stack describes the actual stack memory: [stack.lo, stack.hi).
  4. // stackguard0 is the stack pointer compared in the Go stack growth prologue.
  5. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
  6. // stackguard1 is the stack pointer compared in the C stack growth prologue.
  7. // It is stack.lo+StackGuard on g0 and gsignal stacks.
  8. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
  9. // ==> 堆栈参数。stack 描述了实际的堆栈内存:[stack.lo, stack.hi).stackguard0 是 Go 堆栈增长序言中比较的堆栈指针。
  10. //正常情况下是stack.lo+StackGuard,但也可以是StackPreempt来触发抢占。
  11. // stackguard1 是在 C 堆栈增长序言中比较的堆栈指针。在 g0 和 gsignal 堆栈上是 stack.lo+StackGuard。它在其他 goroutine 堆栈上为 ~0,以触发对 morestackc 的调用(并崩溃)。
  12. stack stack // offset known to runtime/cgo ==> goroutine 使用的栈
  13. stackguard0 uintptr // offset known to liblink
  14. stackguard1 uintptr // offset known to liblink
  15. _panic *_panic // innermost panic - offset known to liblink
  16. _defer *_defer // innermost defer
  17. m *m // current m; offset known to arm liblink
  18. // goroutine 的运行现场
  19. sched gobuf
  20. syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc ==> if status==Gsyscall, syscallsp = sched.sp 在 gc 期间使用
  21. syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc ==> if status==Gsyscall, syscallpc = sched.pc 在 gc 期间使用
  22. stktopsp uintptr // expected sp at top of stack, to check in traceback ==> 堆栈顶部的预期 sp,以检查回溯
  23. param unsafe.Pointer // passed parameter on wakeup ==> 唤醒时传递参数
  24. atomicstatus uint32
  25. stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
  26. goid int64
  27. schedlink guintptr
  28. waitsince int64 // approx time when the g become blocked ==> g 被阻塞的大约时间
  29. waitreason waitReason // if status==Gwaiting
  30. // 抢占调度标志。这个为 true 时,stackguard0 等于 stackpreempt
  31. preempt bool // preemption signal, duplicates stackguard0 = stackpreempt ==> 抢占信号,复制 stackguard0 = stackpreempt
  32. preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule ==> 抢占时过渡到_Gpreempted;否则,只需取消计划
  33. preemptShrink bool // shrink stack at synchronous safe point ==> 在同步安全点收缩堆栈
  34. // asyncSafePoint is set if g is stopped at an asynchronous
  35. // safe point. This means there are frames on the stack
  36. // without precise pointer information. ==> 如果 g 在异步安全点停止,则设置 asyncSafePoint。 这意味着堆栈中存在没有精确指针信息的帧。
  37. asyncSafePoint bool
  38. paniconfault bool // panic (instead of crash) on unexpected fault address
  39. gcscandone bool // g has scanned stack; protected by _Gscan bit in status
  40. throwsplit bool // must not split stack
  41. // activeStackChans indicates that there are unlocked channels
  42. // pointing into this goroutine's stack. If true, stack
  43. // copying needs to acquire channel locks to protect these
  44. // areas of the stack. ===> activeStackChans 表示有指向此 goroutine 堆栈的未锁定通道。 如果为 true,堆栈复制需要获取通道锁以保护堆栈的这些区域。
  45. activeStackChans bool
  46. // parkingOnChan indicates that the goroutine is about to
  47. // park on a chansend or chanrecv. Used to signal an unsafe point
  48. // for stack shrinking. It's a boolean value, but is updated atomically.
  49. parkingOnChan uint8
  50. raceignore int8 // ignore race detection events
  51. sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
  52. sysexitticks int64 // cputicks when syscall has returned (for tracing)
  53. traceseq uint64 // trace event sequencer
  54. tracelastp puintptr // last P emitted an event for this goroutine
  55. // 如果调用了 LockOsThread,那么这个 g 会绑定到某个 m 上
  56. lockedm muintptr
  57. sig uint32
  58. writebuf []byte
  59. sigcode0 uintptr
  60. sigcode1 uintptr
  61. sigpc uintptr
  62. gopc uintptr // pc of go statement that created this goroutine ===> 创建该 goroutine 的语句的指令地址
  63. ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors)
  64. // goroutine 函数的指令地址
  65. startpc uintptr // pc of goroutine function
  66. racectx uintptr
  67. waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
  68. cgoCtxt []uintptr // cgo traceback context
  69. labels unsafe.Pointer // profiler labels
  70. // time.Sleep 缓存的定时器
  71. timer *timer // cached timer for time.Sleep
  72. selectDone uint32 // are we participating in a select and did someone win the race?
  73. // Per-G GC state
  74. // gcAssistBytes is this G's GC assist credit in terms of
  75. // bytes allocated. If this is positive, then the G has credit
  76. // to allocate gcAssistBytes bytes without assisting. If this
  77. // is negative, then the G must correct this by performing
  78. // scan work. We track this in bytes to make it fast to update
  79. // and check for debt in the malloc hot path. The assist ratio
  80. // determines how this corresponds to scan work debt.
  81. gcAssistBytes int64
  82. }

G结构体下嵌套了两个重要的结构体
stack stack 表示 goroutine 运行时的栈

  1. // Stack describes a Go execution stack.
  2. // The bounds of the stack are exactly [lo, hi),
  3. // with no implicit data structures on either side.\
  4. // ==> Stack 描述了一个 Go 执行栈。栈的边界正好是 [lo, hi),两边都没有隐含的数据结构。
  5. type stack struct {
  6. lo uintptr ==> // 栈顶,低地址
  7. hi uintptr ==> 栈低,高地址
  8. }

gobuf
Goroutine 运行时,光有栈还不行,至少还得包括 PC,SP 等寄存器,gobuf 就保存了这些值:

  1. type gobuf struct {
  2. // The offsets of sp, pc, and g are known to (hard-coded in) libmach.
  3. //
  4. // ctxt is unusual with respect to GC: it may be a
  5. // heap-allocated funcval, so GC needs to track it, but it
  6. // needs to be set and cleared from assembly, where it's
  7. // difficult to have write barriers. However, ctxt is really a
  8. // saved, live register, and we only ever exchange it between
  9. // the real register and the gobuf. Hence, we treat it as a
  10. // root during stack scanning, which means assembly that saves
  11. // and restores it doesn't need write barriers. It's still
  12. // typed as a pointer so that any other writes from Go get
  13. // write barriers.
  14. sp uintptr
  15. pc uintptr
  16. g guintptr
  17. ctxt unsafe.Pointer
  18. ret sys.Uintreg
  19. lr uintptr
  20. bp uintptr // for GOEXPERIMENT=framepointer
  21. }

M

当 M 没有工作可做的时候,在它休眠前,会“自旋”地来找工作:检查全局队列,查看 network poller,试图执行 gc 任务,或者“偷”工作。

  1. type m struct {
  2. g0 *g // goroutine with scheduling stack ==>带有调度堆栈的 goroutine
  3. morebuf gobuf // gobuf arg to morestack ==>gobuf arg 到 morestack
  4. divmod uint32 // div/mod denominator for arm - known to liblink ==>arm 的 div/mod 分母 - liblink 已知
  5. // Fields not known to debuggers. ==>调试器不知道的字段。
  6. procid uint64 // for debuggers, but offset not hard-coded ==>用于调试器,但偏移量不是硬编码的
  7. gsignal *g // signal-handling g ==>信号处理g
  8. goSigStack gsignalStack // Go-allocated signal handling stack ==>Go 分配的信号处理堆栈
  9. sigmask sigset // storage for saved signal mask ==>存储已保存的信号掩码
  10. tls [6]uintptr // thread-local storage (for x86 extern register) ==>线程本地存储(用于 x86 外部寄存器)
  11. mstartfn func()
  12. curg *g // current running goroutine ==>当前运行的 goroutine
  13. caughtsig guintptr // goroutine running during fatal signal ==>goroutine 在致命信号期间运行
  14. p puintptr // attached p for executing go code (nil if not executing go code) ==>附加 p 用于执行 go 代码(如果不执行 go 代码,则为零)
  15. nextp puintptr
  16. oldp puintptr // the p that was attached before executing a syscall ==> 在执行系统调用之前附加的 p
  17. id int64
  18. mallocing int32
  19. throwing int32
  20. preemptoff string // if != "", keep curg running on this m ==> 如果 != "", 继续在这个 m 上运行 curg
  21. locks int32
  22. dying int32
  23. profilehz int32
  24. spinning bool // m is out of work and is actively looking for work ==> m 失业了,正在积极找工作
  25. blocked bool // m is blocked on a note ==> m 在笔记上被屏蔽
  26. newSigstack bool // minit on C thread called sigaltstack ==> C 线程上的 minit 称为 sigaltstack
  27. printlock int8
  28. incgo bool // m is executing a cgo call ==> m 正在执行一个 cgo 调用
  29. freeWait uint32 // if == 0, safe to free g0 and delete m (atomic) ==>if == 0,可以安全地释放 g0 并删除 m(原子)
  30. fastrand [2]uint32
  31. needextram bool
  32. traceback uint8
  33. ncgocall uint64 // number of cgo calls in total ==> cgo 调用总数
  34. ncgo int32 // number of cgo calls currently in progress ==>当前正在进行的 cgo 调用数
  35. cgoCallersUse uint32 // if non-zero, cgoCallers in use temporarily ==>如果非零,则暂时使用 cgoCallers
  36. cgoCallers *cgoCallers // cgo traceback if crashing in cgo call ==>如果在 cgo 调用中崩溃,则 cgo 回溯
  37. park note
  38. alllink *m // on allm
  39. schedlink muintptr
  40. mcache *mcache
  41. lockedg guintptr
  42. createstack [32]uintptr // stack that created this thread. ==> 创建此线程的堆栈。
  43. lockedExt uint32 // tracking for external LockOSThread ==> 跟踪外部 LockOSThread
  44. lockedInt uint32 // tracking for internal lockOSThread ==> 跟踪内部 lockOSThread
  45. nextwaitm muintptr // next m waiting for lock ==> 下一个等待锁定
  46. waitunlockf func(*g, unsafe.Pointer) bool
  47. waitlock unsafe.Pointer
  48. waittraceev byte
  49. waittraceskip int
  50. startingtrace bool
  51. syscalltick uint32
  52. freelink *m // on sched.freem ==> 在 sched.freem 上
  53. // these are here because they are too large to be on the stack
  54. // of low-level NOSPLIT functions. ==> mFixup 用于同步 OS 相关的 m 状态(凭据等)使用互斥量访问。
  55. libcall libcall
  56. libcallpc uintptr // for cpu profiler ==> 用于 CPU 分析器
  57. libcallsp uintptr
  58. libcallg guintptr
  59. syscall libcall // stores syscall parameters on windows ==> 在 Windows 上存储系统调用参数
  60. vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call) ==> 在 VDSO 调用中用于回溯的 SP(如果不在调用中,则为 0)
  61. vdsoPC uintptr // PC for traceback while in VDSO call ==> 在 VDSO 调用中用于回溯的 PC
  62. // preemptGen counts the number of completed preemption
  63. // signals. This is used to detect when a preemption is
  64. // requested, but fails. Accessed atomically.
  65. // ===> preemptGen 统计完成抢占的次数 信号。这用于检测何时发生抢占请求,但失败。以原子方式访问。
  66. preemptGen uint32
  67. // Whether this is a pending preemption signal on this M.
  68. // Accessed atomically. ===>这是否是此 M 上的未决抢占信号。原子访问。
  69. signalPending uint32
  70. dlogPerM
  71. mOS
  72. }

P

为 M 的执行提供“上下文”,保存 M 执行 G 时的一些资源,例如本地可运行 G 队列,memeory cache 等。
一个 M 只有绑定 P 才能执行 goroutine,当 M 被阻塞时,整个 P 会被传递给其他 M ,或者说整个 P 被接管。

  1. type p struct {
  2. id int32
  3. status uint32 // one of pidle/prunning/...
  4. link puintptr
  5. schedtick uint32 // incremented on every scheduler call ==>在每次调度程序调用时递增
  6. syscalltick uint32 // incremented on every system call ==>在每次系统调用时递增
  7. sysmontick sysmontick // last tick observed by sysmon ==>sysmon 观察到的最后一个滴答声
  8. m muintptr // back-link to associated m (nil if idle) ==>反向链接到关联的 m(如果空闲则为零)
  9. mcache *mcache
  10. pcache pageCache
  11. raceprocctx uintptr
  12. deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) ==>不同大小的可用延迟结构池(参见 panic.go)
  13. deferpoolbuf [5][32]*_defer
  14. // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. ==>goroutine id 的缓存,分摊对 runtime·sched.goidgen 的访问。
  15. goidcache uint64
  16. goidcacheend uint64
  17. // Queue of runnable goroutines. Accessed without lock. ==>可运行的 goroutine 队列。无锁访问。
  18. runqhead uint32
  19. runqtail uint32
  20. runq [256]guintptr
  21. // runnext, if non-nil, is a runnable G that was ready'd by
  22. // the current G and should be run next instead of what's in
  23. // runq if there's time remaining in the running G's time
  24. // slice. It will inherit the time left in the current time
  25. // slice. If a set of goroutines is locked in a
  26. // communicate-and-wait pattern, this schedules that set as a
  27. // unit and eliminates the (potentially large) scheduling
  28. // latency that otherwise arises from adding the ready'd
  29. // goroutines to the end of the run queue.
  30. // runnext,如果非零,是一个可运行的 G,它已由当前 G 准备好,如果在运行 G 的时间片中有剩余时间,则应该运行下一个而不是 runq 中的内容。
  31. //它将继承当前时间片中剩余的时间。 如果一组 goroutine 被锁定在一个通信和等待模式,这种调度设置为一个单元,并消除了(可能很大)调度延迟,
  32. //否则由于将准备好的 goroutine 添加到运行队列的末尾而产生的调度延迟。
  33. runnext guintptr
  34. // Available G's (status == Gdead) ==> 可用 G(状态 == Gdead)
  35. gFree struct {
  36. gList
  37. n int32
  38. }
  39. sudogcache []*sudog
  40. sudogbuf [128]*sudog
  41. // Cache of mspan objects from the heap. ==>从堆缓存 mspan 对象。
  42. mspancache struct {
  43. // We need an explicit length here because this field is used
  44. // in allocation codepaths where write barriers are not allowed,
  45. // and eliminating the write barrier/keeping it eliminated from
  46. // slice updates is tricky, moreso than just managing the length
  47. // ourselves. ==> 我们在这里需要一个明确的长度,因为这个字段用于不允许写屏障的分配代码路径中,并且消除写屏障/保持它从切片更新中消除是棘手的,不仅仅是我们自己管理长度。
  48. len int
  49. buf [128]*mspan
  50. }
  51. tracebuf traceBufPtr
  52. // traceSweep indicates the sweep events should be traced.
  53. // This is used to defer the sweep start event until a span
  54. // has actually been swept. ==> traceSweep 指示应该跟踪扫描事件。这用于推迟扫描开始事件,直到实际扫描了跨度。
  55. traceSweep bool
  56. // traceSwept and traceReclaimed track the number of bytes
  57. // swept and reclaimed by sweeping in the current sweep loop. ==> traceSwept 和 traceReclaimed 跟踪在当前扫描循环中通过扫描扫描和回收的字节数。
  58. traceSwept, traceReclaimed uintptr
  59. palloc persistentAlloc // per-P to avoid mutex ==> per-P 避免互斥
  60. _ uint32 // Alignment for atomic fields below ==> 下面的原子场对齐
  61. // The when field of the first entry on the timer heap.
  62. // This is updated using atomic functions.
  63. // This is 0 if the timer heap is empty. ==> 定时器堆上第一个条目的 when 字段。这是使用原子函数更新的。 如果计时器堆为空,则为 0。
  64. timer0When uint64
  65. // Per-P GC state
  66. gcAssistTime int64 // Nanoseconds in assistAlloc
  67. gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
  68. gcBgMarkWorker guintptr // (atomic)
  69. gcMarkWorkerMode gcMarkWorkerMode
  70. // gcMarkWorkerStartTime is the nanotime() at which this mark
  71. // worker started.
  72. gcMarkWorkerStartTime int64
  73. // gcw is this P's GC work buffer cache. The work buffer is
  74. // filled by write barriers, drained by mutator assists, and
  75. // disposed on certain GC state transitions.
  76. //==> gcw 是这个 P 的 GC 工作缓冲区缓存。工作缓冲区是由写屏障填充,由增变器辅助耗尽,以及处理某些 GC 状态转换。
  77. gcw gcWork
  78. // wbBuf is this P's GC write barrier buffer.
  79. //
  80. // TODO: Consider caching this in the running G.
  81. wbBuf wbBuf
  82. runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
  83. // Lock for timers. We normally access the timers while running
  84. // on this P, but the scheduler can also do it from a different P.
  85. //锁定计时器。我们通常在运行时访问计时器在这个 P 上,但调度器也可以从不同的 P 上做。
  86. timersLock mutex
  87. // Actions to take at some time. This is used to implement the
  88. // standard library's time package.
  89. // Must hold timersLock to access.
  90. // ==> 在某个时间采取的行动。 这是用来实现标准库的时间包的。必须持有timersLock才能访问。
  91. timers []*timer
  92. // Number of timers in P's heap.
  93. // Modified using atomic instructions.
  94. numTimers uint32
  95. // Number of timerModifiedEarlier timers on P's heap.
  96. // This should only be modified while holding timersLock,
  97. // or while the timer status is in a transient state
  98. // such as timerModifying.
  99. //==> P 堆上的 timerModifiedEarlier 定时器的数量。这应该只在持有 timersLock 时修改,或者当定时器状态处于诸如 timerModifying 的瞬态状态时。
  100. adjustTimers uint32
  101. // Number of timerDeleted timers in P's heap.
  102. // Modified using atomic instructions.
  103. //==> P 堆中 timerDeleted 定时器的数量。 使用原子指令修改。
  104. deletedTimers uint32
  105. // Race context used while executing timer functions. ==>执行计时器功能时使用的竞态上下文。
  106. timerRaceCtx uintptr
  107. // preempt is set to indicate that this P should be enter the
  108. // scheduler ASAP (regardless of what G is running on it).
  109. //==>preempt 设置为指示该 P 应尽快进入调度程序(无论 G 在其上运行什么)。
  110. preempt bool
  111. pad cpu.CacheLinePad
  112. }

保存调度器的状态信息、全局的可运行 G 队列等.
schedt 对象只有一份实体,它维护了调度器的所有信息

  1. type schedt struct {
  2. // accessed atomically. keep at top to ensure alignment on 32-bit systems.
  3. goidgen uint64
  4. lastpoll uint64 // time of last network poll, 0 if currently polling
  5. pollUntil uint64 // time to which current poll is sleeping
  6. lock mutex
  7. // When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
  8. // sure to call checkdead().
  9. midle muintptr // idle m's waiting for work ==> 由空闲的工作线程组成的链表
  10. nmidle int32 // number of idle m's waiting for work ==>空闲的工作线程数量
  11. nmidlelocked int32 // number of locked m's waiting for work ==> 空闲的且被 lock 的 m 计数
  12. mnext int64 // number of m's that have been created and next M ID
  13. maxmcount int32 // maximum number of m's allowed (or die) ==> 表示最多所能创建的工作线程数量
  14. nmsys int32 // number of system m's not counted for deadlock
  15. nmfreed int64 // cumulative number of freed m's
  16. ngsys uint32 // number of system goroutines; updated atomically ==> goroutine 的数量,自动更新
  17. pidle puintptr // idle p's ==> 由空闲的 p 结构体对象组成的链表
  18. npidle uint32 // ==> 空闲的 p 结构体对象的数量
  19. nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
  20. // Global runnable queue. ==>全局可运行的 G队列
  21. runq gQueue
  22. runqsize int32 // ==> // 元素数量
  23. // disable controls selective disabling of the scheduler.
  24. //
  25. // Use schedEnableUser to control this.
  26. //
  27. // disable is protected by sched.lock.
  28. disable struct {
  29. // user disables scheduling of user goroutines.
  30. user bool
  31. runnable gQueue // pending runnable Gs
  32. n int32 // length of runnable
  33. }
  34. // Global cache of dead G's.
  35. gFree struct {
  36. lock mutex
  37. stack gList // Gs with stacks
  38. noStack gList // Gs without stacks
  39. n int32
  40. }
  41. // Central cache of sudog structs. ==>/ sudog 结构的集中缓存
  42. sudoglock mutex
  43. sudogcache *sudog
  44. // Central pool of available defer structs of different sizes.
  45. deferlock mutex // ==>不同大小的可用的 defer struct 的集中缓存池
  46. deferpool [5]*_defer
  47. // freem is the list of m's waiting to be freed when their
  48. // m.exited is set. Linked through m.freelink.
  49. freem *m
  50. gcwaiting uint32 // gc is waiting to run
  51. stopwait int32
  52. stopnote note
  53. sysmonwait uint32
  54. sysmonnote note
  55. // safepointFn should be called on each P at the next GC
  56. // safepoint if p.runSafePointFn is set.
  57. safePointFn func(*p)
  58. safePointWait int32
  59. safePointNote note
  60. profilehz int32 // cpu profiling rate
  61. procresizetime int64 // nanotime() of last change to gomaxprocs ==> 上次修改 gomaxprocs 的纳秒时间
  62. totaltime int64 // ∫gomaxprocs dt up to procresizetime
  63. }

线程的几种状态

就绪 、 等待、 运行 ===> goroutine 不管再怎么运行,状态归类也会这么几类

状态 解释
Waiting 等待状态。线程在等待某件事的发生。例如等待网络数据、硬盘;调用操作系统 API;等待内存同步访问条件 ready,如 atomic, mutexes
Runnable 就绪状态。只要给 CPU 资源我就能运行
Executing 运行状态。线程在执行指令,这是我们想要的

goroutine几种状态

  • 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting、_Gsyscall 和 _Gpreempted 几个状态;
  • 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即 _Grunnable;
  • 运行中:Goroutine 正在某个线程上运行,即 _Grunning;

goroutine和thread的区别

区分项 thread go
内存空间 1M 2KB
耗费时间 线程切换会消耗 1000-1500 纳秒 200ns
切换所需寄存器 16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc. Program Counter, Stack Pointer and BP

go的关键词整理

g0 :Go 必须在每个正在运行的线程上调度和管理 goroutine。这个角色被委托给一个特殊的 goroutine,称为 g0,它是为每个 OS 线程创建的第一个 goroutine

https://medium.com/a-journey-with-go/go-g0-special-goroutine-8c778c6704d8

go schedule 整体流程

image.png

go 调度过程中的函数讲解

Go 程序的入口地址在 linux 上运行的,入口文件为 src/runtime/rt0_linux_amd64.s -_rt0_amd64_linux ==>/usr/local/go/src/runtime/asm_amd64.s -_rt0_amd64->TEXT runtime·rt0_go(SB),NOSPLIT,$0

TEXT runtime·rt0_go(SB),NOSPLIT,$0 ===>调度器运行始末

  1. TEXT runtime·rt0_go(SB),NOSPLIT,$0
  2. // copy arguments forward on an even stack
  3. MOVQ DI, AX // argc
  4. MOVQ SI, BX // argv
  5. SUBQ $(4*8+7), SP // 2args 2auto
  6. ANDQ $~15, SP
  7. MOVQ AX, 16(SP) //==> argc 放在 SP+16 字节处
  8. MOVQ BX, 24(SP) //==> argv 放在 SP+24 字节处
  9. // create istack out of the given (operating system) stack.
  10. // _cgo_init may update stackguard.
  11. MOVQ $runtime·g0(SB), DI //===> 把 g0 的地址存入 DI
  12. LEAQ (-64*1024+104)(SP), BX
  13. MOVQ BX, g_stackguard0(DI)
  14. MOVQ BX, g_stackguard1(DI)
  15. MOVQ BX, (g_stack+stack_lo)(DI)
  16. MOVQ SP, (g_stack+stack_hi)(DI)
  17. // find out information about the processor we're on
  18. MOVL $0, AX
  19. CPUID
  20. MOVL AX, SI
  21. CMPL AX, $0
  22. JE nocpuinfo
  23. // Figure out how to serialize RDTSC.
  24. // On Intel processors LFENCE is enough. AMD requires MFENCE.
  25. // Don't know about the rest, so let's do MFENCE.
  26. CMPL BX, $0x756E6547 // "Genu"
  27. JNE notintel
  28. CMPL DX, $0x49656E69 // "ineI"
  29. JNE notintel
  30. CMPL CX, $0x6C65746E // "ntel"
  31. JNE notintel
  32. MOVB $1, runtime·isIntel(SB)
  33. MOVB $1, runtime·lfenceBeforeRdtsc(SB)
  34. notintel:
  35. // Load EAX=1 cpuid flags
  36. MOVL $1, AX
  37. CPUID
  38. MOVL AX, runtime·processorVersionInfo(SB)
  39. nocpuinfo:
  40. // if there is an _cgo_init, call it.
  41. MOVQ _cgo_init(SB), AX
  42. TESTQ AX, AX
  43. JZ needtls
  44. // arg 1: g0, already in DI
  45. MOVQ $setg_gcc<>(SB), SI // arg 2: setg_gcc
  46. #ifdef GOOS_android
  47. MOVQ $runtime·tls_g(SB), DX // arg 3: &tls_g
  48. // arg 4: TLS base, stored in slot 0 (Android's TLS_SLOT_SELF).
  49. // Compensate for tls_g (+16).
  50. MOVQ -16(TLS), CX
  51. #else
  52. MOVQ $0, DX // arg 3, 4: not used when using platform's TLS
  53. MOVQ $0, CX
  54. #endif
  55. #ifdef GOOS_windows
  56. // Adjust for the Win64 calling convention.
  57. MOVQ CX, R9 // arg 4
  58. MOVQ DX, R8 // arg 3
  59. MOVQ SI, DX // arg 2
  60. MOVQ DI, CX // arg 1
  61. #endif
  62. CALL AX
  63. // update stackguard after _cgo_init
  64. MOVQ $runtime·g0(SB), CX
  65. MOVQ (g_stack+stack_lo)(CX), AX
  66. ADDQ $const__StackGuard, AX
  67. MOVQ AX, g_stackguard0(CX)
  68. MOVQ AX, g_stackguard1(CX)
  69. #ifndef GOOS_windows
  70. JMP ok
  71. #endif
  72. needtls:
  73. #ifdef GOOS_plan9
  74. // skip TLS setup on Plan 9
  75. JMP ok
  76. #endif
  77. #ifdef GOOS_solaris
  78. // skip TLS setup on Solaris
  79. JMP ok
  80. #endif
  81. #ifdef GOOS_illumos
  82. // skip TLS setup on illumos
  83. JMP ok
  84. #endif
  85. #ifdef GOOS_darwin
  86. // skip TLS setup on Darwin
  87. JMP ok
  88. #endif
  89. //=====> runtime 会启动多个工作线程,每个线程都会绑定一个 m0 初始化m0
  90. //=====> TLS 就是线程本地的私有的全局变量
  91. LEAQ runtime·m0+m_tls(SB), DI
  92. CALL runtime·settls(SB)
  93. // store through it, to make sure it works
  94. get_tls(BX) //===> 获取 fs 段基址并放入 BX 寄存器,其实就是 m0.tls[1] 的地址,get_tls 的代码由编译器生成
  95. MOVQ $0x123, g(BX)
  96. MOVQ runtime·m0+m_tls(SB), AX
  97. CMPQ AX, $0x123
  98. JEQ 2(PC)
  99. CALL runtime·abort(SB)
  100. ok:
  101. // set the per-goroutine and per-mach "registers"
  102. get_tls(BX) //====> 获取 fs 段基址到 BX 寄存器
  103. LEAQ runtime·g0(SB), CX //====> 将 g0 的地址存储到 CX,CX = &g0
  104. MOVQ CX, g(BX) //====> 把 g0 的地址保存在线程本地存储里面,也就是 m0.tls[0]=&g0
  105. LEAQ runtime·m0(SB), AX //====> 将 m0 的地址存储到 AX,AX = &m0
  106. // save m->g0 = g0
  107. MOVQ CX, m_g0(AX)
  108. // save m0 to g0->m
  109. MOVQ AX, g_m(CX)
  110. CLD // convention is D is always left cleared
  111. CALL runtime·check(SB)
  112. MOVL 16(SP), AX // copy argc
  113. MOVL AX, 0(SP)
  114. MOVQ 24(SP), AX // copy argv
  115. MOVQ AX, 8(SP)
  116. CALL runtime·args(SB)
  117. CALL runtime·osinit(SB) //====> 初始化系统核心数
  118. CALL runtime·schedinit(SB) //====> 调度器初始化
  119. // create a new goroutine to start program
  120. MOVQ $runtime·mainPC(SB), AX // entry
  121. PUSHQ AX //====> newproc 的第二个参数入栈,也就是新的 goroutine 需要执行的函数
  122. PUSHQ $0 // arg size //====> newproc 的第一个参数入栈,该参数表示 runtime.main 函数需要的参数大小, 因为 runtime.main 没有参数,所以这里是 0
  123. //====> // 创建 main goroutine
  124. CALL runtime·newproc(SB)
  125. POPQ AX
  126. POPQ AX
  127. // start this M ====>主线程进入调度循环,运行刚刚创建的 goroutine
  128. CALL runtime·mstart(SB)
  129. CALL runtime·abort(SB) // mstart should never return
  130. RET
  131. // Prevent dead-code elimination of debugCallV1, which is
  132. // intended to be called by debuggers. //====> 永远不会返回,万一返回了,crash 掉
  133. MOVQ $runtime·debugCallV1(SB), AX
  134. RET

go调度的时机










reference
用 GODEBUG 看调度跟踪 https://blog.csdn.net/RA681t58CJxsgCkJ31/article/details/99785562

go -scheduler 剖析

上手小知识

线程切换

线程有三种状态: 等待、就绪、运行

函数调用过程分析

CALL 指令类似 PUSH IP 和 JMP somefunc 两个指令的组合,首先将当前的 IP 指令寄存器的值压入栈中,然后通过 JMP 指令将要调用函数的地址写入到 IP 寄存器实现跳转。
而 RET 指令则是和 CALL 相反的操作,基本和 POP IP 指令等价,也就是将执行 CALL 指令时保存在 SP 中的返回地址重新载入到 IP 寄存器,实现函数的返回。

goroutine

什么是goroutine

goroutine 是thread的一层抽象,更加轻量级,可以单独执行。线程是调度的基本单位。

goroutine 和 thread的区别

参考资料【How Goroutines Work】https://blog.nindalf.com/posts/how-goroutines-work/)从三个角度进行区别: 内存消耗、 创建和销毁、 切换

内存消耗

创建一个 goroutine 的栈内存消耗为 2 KB,实际运行过程中,如果栈空间不够用,会自动进行扩容。创建一个 thread 则需要消耗 1 MB 栈内存,而且还需要一个被称为 “a guard page” 的区域用于和其他 thread 的栈空间进行隔离。

创建和销毁

Thread 创建和销毀都会有巨大的消耗,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池。而 goroutine 因为是由 Go runtime 负责管理的,创建和销毁的消耗非常小,是用户级。

切换

当 threads 切换时,需要保存各种寄存器,以便将来恢复:

16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc.

而 goroutines 切换只需保存三个寄存器:Program Counter, Stack Pointer and BP。
一般而言,线程切换会消耗 1000-1500 纳秒,一个纳秒平均可以执行 12-18 条指令。所以由于线程切换,执行指令的条数会减少 12000-18000。
Goroutine 的切换约为 200 ns,相当于 2400-3600 条指令。
因此,goroutines 切换成本比 threads 要小得多。

对比项 线程(thread) 协程(goroutine)
内存消耗 1M 2KB
创建和销毁 内核级-使用线程池解决 用户级
切换 16 general purpose registers, PC (Program Counter), SP (Stack Pointer), segment registers, 16 XMM registers, FP coprocessor state, 16 AVX registers, all MSRs etc.

16个通用寄存器、PC(程序计数器)、SP(堆栈指针)、段寄存器、16个XMM寄存器、FP协处理器状态、16个AVX寄存器、所有MSR等

切换消耗 1000-1500纳秒, 一纳秒平均执行
12-18条指令。线程切换执行指令为 121000-181000 | Program Counter, Stack Pointer and BP

goroutine 切换为200ns。 goroutine执行指令为 20012-20018 |

在同一时刻,一个线程上只能跑一个 goroutine。

go-scheduler

go程序执行由两层组成:GO Program,Runtime.及用户程序和运行时。 通过函数调用来实现内存管理、channel通信、goroutines创建等功能。

image.png

为什么要scheduler

runtime维护所有的goroutines,通过scheduler进行调度。 goroutines和thread是独立的。但是goroutine要依赖threads才能执行。

scheduler 底层原理

通过G、M、P 结构体来实现goroutine的调度。
g 代表一个 goroutine,它包含:表示 goroutine 栈的一些字段,指示当前 goroutine 的状态,指示当前运行到的指令地址,也就是 PC 值。
m 表示内核线程,包含正在运行的 goroutine 等字段。
p 代表一个虚拟的 Processor,它维护一个处于 Runnable 状态的 g 队列, m 需要获得 p 才能运行 g。
还有一个核心的结构体 sched。用来进行调度。

Runtime 起始时会启动一些 G:垃圾回收的 G,执行调度的 G,运行用户代码的 G;并且会创建一个 M 用来开始 G 的运行。随着时间的推移,更多的 G 会被创建出来,更多的 M 也会被创建出来。

Go scheduler 的核心思想是:

  1. reuse threads;
  2. 限制同时运行(不包含阻塞)的线程数为 N,N 等于 CPU 的核心数目;
  3. 线程私有的 runqueues,并且可以从其他线程 stealing goroutine 来运行,线程阻塞后,可以将 runqueues 传递给其他线程。

为什么要引入P

您现在可能想知道,为什么要有上下文?我们不能把运行队列放在线程上并摆脱上下文吗?并不真地。我们拥有上下文的原因是,如果正在运行的线程由于某种原因需要阻塞,我们可以将它们(P)交给其他线程。
我们需要阻塞的一个例子是当我们调用系统调用时。由于线程不能同时执行代码并在系统调用上被阻塞,我们需要交出上下文以便它可以继续调度。

当一个线程阻塞的时候,将和它绑定的 P 上的 goroutines 转移到其他线程。

Go scheduler 会启动一个后台线程 sysmon,用来检测长时间(超过 10 ms)运行的 goroutine,将其调度到 global runqueues。这是一个全局的 runqueue,优先级比较低,以示惩罚。

go scheduler调度的顺序

  1. runtime.schedule() {
  2. // only 1/61 of the time, check the global runnable queue for a G.
  3. // if not found, check the local queue.
  4. // if not found,
  5. // try to steal from other Ps.
  6. // if not, check the global runnable queue.
  7. // if not found, poll network.
  8. }
  9. runtime.schedule() {
  10. // 只有 1/61 的时间,检查全局可运行队列中的 G。
  11. // 如果没有找到,检查本地队列。
  12. // 如果没有找到,
  13. // 尝试从其他 Ps 中窃取。
  14. // 如果没有,检查全局可运行队列。
  15. // 如果没有找到,轮询网络。
  16. }

汇编流程细节

  1. TEXT runtime·rt0_go(SB),NOSPLIT,$0
  2. ...
  3. ...
  4. 主线程绑定m0
  5. LEAQ runtime·m0+m_tls(SB), DI
  6. CALL runtime·settls(SB)
  7. // store through it, to make sure it works
  8. get_tls(BX)
  9. MOVQ $0x123, g(BX)
  10. MOVQ runtime·m0+m_tls(SB), AX
  11. CMPQ AX, $0x123
  12. JEQ 2(PC)
  13. CALL runtime·abort(SB)

m0 是全局变量,而 m0 又要绑定到工作线程才能执行。runtime 会启动多个工作线程,每个线程都会绑定一个 m0。
TLS 就是线程本地的私有的全局变量。

  1. ok:
  2. // set the per-goroutine and per-mach "registers"
  3. get_tls(BX)
  4. LEAQ runtime·g0(SB), CX
  5. MOVQ CX, g(BX)
  6. LEAQ runtime·m0(SB), AX
  7. // save m->g0 = g0
  8. MOVQ CX, m_g0(AX)
  9. // save m0 to g0->m
  10. MOVQ AX, g_m(CX)

通过 m.tls[0] 可以找到 g0,通过 g0 可以找到 m0(通过 g 结构体的 m 字段)。并且,通过 m 的字段 g0,m0 也可以找到 g0。于是,主线程和 m0,g0 就关联起来了。

保存在主线程本地存储中的值是 g0 的地址,也就是说工作线程的私有全局变量其实是一个指向 g 的指针而不是指向 m 的指针。 目前这个指针指向g0,表示代码正运行在 g0 栈。

初始化m0

src/runtime/proc.go/schedinit()/mcommoninit()

初始化allp

allp是全局的p, 从schedinit()入口进入,然后初始化系统核数个的p,然后调用 procresize() 函数进行 P0和m0关联

procesize()函数中循环遍历所有的allp,将 将第一个p0和m0关联,其他的通过runqempty() 函数判断是否是空闲的P,然后通过放入全局空闲链表pidleput(p)

  1. 使用 make([]p, nprocs) 初始化全局变量 allp,即 allp = make([]p, nprocs)
  2. 循环创建并初始化 nprocs 个 p 结构体对象并依次保存在 allp 切片之中
  3. 把 m0 和 allp[0] 绑定在一起,即 m0.p = allp[0],allp[0].m = m0
  4. 把除了 allp[0] 之外的所有 p 放入到全局变量 sched 的 pidle 空闲队列之中

newproc() 函数 会创建一个新的g来跑fn。 newproc如果不是初始化构建,而是执行函数比如 go func(){}() 本质也是调用newproc函数,
获取当前工作线程的g0,然后通过g0找到绑定的p0, 然后从p上获取一个没有使用的g.
newproc函数传递两个函数,一个是新创建的 goroutine 需要执行的任务,也就是 fn,它代表一个函数 func;还有一个是 fn 的参数大小。

goroutine 的初始栈比较小,只有 2K。可伸缩性。

main goroutine

  1. // 创建一个新的 goroutine 来启动程序
  2. MOVQ $runtime·mainPC(SB), AX // entry
  3. // newproc 的第二个参数入栈,也就是新的 goroutine 需要执行的函数
  4. // AX = &funcval{runtime·main},
  5. PUSHQ AX
  6. // newproc 的第一个参数入栈,该参数表示 runtime.main 函数需要的参数大小,
  7. // 因为 runtime.main 没有参数,所以这里是 0
  8. PUSHQ $0 // arg size
  9. // 创建 main goroutine
  10. CALL runtime·newproc(SB)
  11. POPQ AX
  12. POPQ AX
  13. // start this M
  14. // 主线程进入调度循环,运行刚刚创建的 goroutine
  15. CALL runtime·mstart(SB)
  16. // 永远不会返回,万一返回了,crash 掉
  17. MOVL $0xf1, 0xf1 // crash
  18. RET

代码前面几行是在为调用 newproc 函数构“造栈”,执行完 runtime·newproc(SB) 后,就会以一个新的 goroutine 来执行 mainPC 也就是 runtime.main()函数。runtime.main() 函数最终会执行到我们写的 main 函数

g0栈和用户栈如何完成切换

src/runtime/proc.go/newproc1()函数中的

  1. // 把newg.sched结构体成员的所有成员设置为0
  2. memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
  3. // 设置newg 的sched成员,调度器需要依靠这些字段才能把goroutine调度到CPU上运行
  4. newg.sched.sp = sp
  5. newg.stktopsp = sp
  6. // newg.sched.pc 表示当 newg 被调度起来运行时从这个地址开始执行指令
  7. newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
  8. newg.sched.g = guintptr(unsafe.Pointer(newg))
  9. gostartcallfn(&newg.sched, fn)
  10. newg.gopc = callerpc
  11. newg.ancestors = saveAncestors(callergp)
  12. newg.startpc = fn.fn
  13. if _g_.m.curg != nil {
  14. newg.labels = _g_.m.curg.labels
  15. }
  16. ...
  17. runqput(_p_, newg, true)
  18. ...

执行完成之后
src/runtime/proc.go/newproc1()函数中的 runqput() 调用的作用详解如下
runqput() 函数详解
// 将 G 放入 p 的本地待运行队列
// runqput 尝试将 g 放到本地可执行队列里。
// 如果 next 为假,runqput 将 g 添加到可运行队列的尾部
// 如果 next 为真,runqput 将 g 添加到 p.runnext 字段
// 如果 run queue 满了,runnext 将 g 放到全局队列里—> runqputslow()函数放入到全局队列
// runnext 成员中的 goroutine 会被优先调度起来运行

runqputslow() 函数讲解
// 将 g 和 p 本地队列的一半 goroutine 放入全局队列。
// 因为要获取锁,所以会慢
runqputslow()函数中调用—> globrunqputbatch(batch[0], batch[n], int32(n+1))
P 的本地可运行队列的长度为 256,它是一个循环队列,因此最多只能放下 256 个 goroutine

go-scheduler开始调度

  1. // start this M
  2. // 主线程进入调度循环,运行刚刚创建的 goroutine
  3. CALL runtime·mstart(SB)

msart()->mstart1()->schedule()
schedule()方法 : 进入调度循环。永不返回
schedule函数详解:

  1. // 执行一轮调度器的工作:找到一个 runnable 的 goroutine,并且执行它
  2. // 永不返回
  3. func schedule() {
  4. // _g_ = 每个工作线程 m 对应的 g0,初始化时是 m0 的 g0
  5. _g_ := getg()
  6. // ……………………
  7. top:
  8. // ……………………
  9. var gp *g
  10. var inheritTime bool
  11. // ……………………
  12. if gp == nil {
  13. // Check the global runnable queue once in a while to ensure fairness.
  14. // Otherwise two goroutines can completely occupy the local runqueue
  15. // by constantly respawning each other.
  16. // 为了公平,每调用 schedule 函数 61 次就要从全局可运行 goroutine 队列中获取
  17. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
  18. lock(&sched.lock)
  19. // 从全局队列最大获取 1 个 gorutine
  20. gp = globrunqget(_g_.m.p.ptr(), 1)
  21. unlock(&sched.lock)
  22. }
  23. }
  24. // 从 P 本地获取 G 任务
  25. if gp == nil {
  26. gp, inheritTime = runqget(_g_.m.p.ptr())
  27. if gp != nil && _g_.m.spinning {
  28. throw("schedule: spinning with local work")
  29. }
  30. }
  31. if gp == nil {
  32. // 从本地运行队列和全局运行队列都没有找到需要运行的 goroutine,
  33. // 调用 findrunnable 函数从其它工作线程的运行队列中偷取,如果偷不到,则当前工作线程进入睡眠
  34. // 直到获取到 runnable goroutine 之后 findrunnable 函数才会返回。
  35. gp, inheritTime = findrunnable() // blocks until work is available
  36. }
  37. // This thread is going to run a goroutine and is not spinning anymore,
  38. // so if it was marked as spinning we need to reset it now and potentially
  39. // start a new spinning M.
  40. if _g_.m.spinning {
  41. resetspinning()
  42. }
  43. if gp.lockedm != nil {
  44. // Hands off own p to the locked m,
  45. // then blocks waiting for a new p.
  46. startlockedm(gp)
  47. goto top
  48. }
  49. // 执行 goroutine 任务函数
  50. // 当前运行的是 runtime 的代码,函数调用栈使用的是 g0 的栈空间
  51. // 调用 execute 切换到 gp 的代码和栈空间去运行
  52. execute(gp, inheritTime)
  53. }

为了公平,调度器每调度 61 次的时候,都会尝试从全局队列里取出待运行的 goroutine 来运行,调用 globrunqget;
如果还没找到,调用 runqget,从 P 本地可运行队列先选出一个可运行的 goroutine;
如果还没找到,就要去其他 P 里面去偷一些 goroutine 来执行,调用 findrunnable 函数。

schedule()中的execute()函数中调用gogo函数
execute()函数将 gp 的状态改为 _Grunning,将 m 和 gp 相互关联起来。最后,调用 gogo 完成从 g0 到 gp 的切换,CPU 的执行权将从 g0 转让到 gp。 gogo 函数用汇编语言写成
原因如下:

gogo 函数也是通过汇编语言编写的,这里之所以需要使用汇编,是因为 goroutine 的调度涉及不同执行流之间的切换。

前面我们在讨论操作系统切换线程时已经看到过,执行流的切换从本质上来说就是 CPU 寄存器以及函数调用栈的切换,然而不管是 go 还是 c 这种高级语言都无法精确控制 CPU 寄存器,因而高级语言在这里也就无能为力了,只能依靠汇编指令来达成目的。

汇编gogo详解

  1. // func gogo(buf *gobuf)
  2. // restore state from Gobuf; longjmp
  3. TEXT runtime·gogo(SB), NOSPLIT, $16-8
  4. MOVQ buf+0(FP), BX // gobuf
  5. MOVQ gobuf_g(BX), DX
  6. MOVQ 0(DX), CX // make sure g != nil
  7. get_tls(CX)
  8. ....

第一行,将 gp.sched.g 保存到 DX 寄存器;第二行,我们见得已经比较多了, get_tls 将 tls 保存到 CX 寄存器,再将 gp.sched.g 放到 tls[0] 处。这样,当下次再调用 get_tls 时,取出的就是 gp,而不再是 g0,这一行完成从 g0 栈切换到 gp。
image.png

goroutine 从生到死

src/runtime/proc.go/main() 函数详解

  1. // The main goroutine.
  2. func main() {
  3. // g = main goroutine,不再是 g0 了
  4. g := getg()
  5. ...
  6. if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
  7. systemstack(func() {
  8. // // 创建监控线程,该线程独立于调度器,不需要跟 p 关联即可运行
  9. newm(sysmon, nil)
  10. })
  11. }
  12. ...
  13. // 调用 runtime 包的初始化函数,由编译器实现
  14. doInit(&runtime_inittask) // must be before defer
  15. if nanotime() == 0 {
  16. throw("nanotime returning zero")
  17. }
  18. // Defer unlock so that runtime.Goexit during init does the unlock too.
  19. needUnlock := true
  20. defer func() {
  21. if needUnlock {
  22. unlockOSThread()
  23. }
  24. }()
  25. // Record when the world started.
  26. runtimeInitTime = nanotime()
  27. //// 开启垃圾回收器
  28. gcenable()
  29. main_init_done = make(chan bool)
  30. ...
  31. // main 包的初始化,递归的调用我们 import 进来的包的初始化函数
  32. doInit(&main_inittask)
  33. close(main_init_done)
  34. // ……………………
  35. ...
  36. // 调用 main.main 函数
  37. fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
  38. fn()
  39. if raceenabled {
  40. racefini()
  41. }
  42. // 进入系统调用,退出进程,可以看出 main goroutine 并未返回,而是直接进入系统调用退出进程了
  43. exit(0)
  44. // 保护性代码,如果 exit 意外返回,下面的代码会让该进程 crash 死掉
  45. for {
  46. var x *int32
  47. *x = 0
  48. }
  49. }

image.png

从流程图可知,main goroutine 执行完之后就直接调用 exit(0) 退出了,这会导致整个进程退出,太粗暴了。
不过,main goroutine 实际上就是代表用户的 main 函数,它都执行完了,肯定是用户的任务都执行完了,直接退出就可以了,就算有其他的 goroutine 没执行完,同样会直接退出。

主main.goroutine退出直接调用exit()退出了 。
如果是非main.goroutine退出则直接进行在 src/proc.go/newproc1()函数中调用汇编函数goexit(), 简称gp的退出。
汇编函数goexit()函数中调用src/proc.go/goexit1()函数,goexit1()函数中调用汇编函数mcall()函数。
mcall 函数里边调用goexit0()函数
goexit0函数详解

  1. // goexit continuation on g0.
  2. //// 在 g0 上执行
  3. func goexit0(gp *g) {
  4. //// g0
  5. _g_ := getg()
  6. casgstatus(gp, _Grunning, _Gdead)
  7. if isSystemGoroutine(gp, false) {
  8. atomic.Xadd(&sched.ngsys, -1)
  9. }
  10. //// 清空 gp 的一些字段
  11. gp.m = nil
  12. locked := gp.lockedm != 0
  13. gp.lockedm = 0
  14. _g_.m.lockedg = 0
  15. gp.paniconfault = false
  16. gp._defer = nil // should be true already but just in case.
  17. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
  18. gp.writebuf = nil
  19. gp.waitreason = 0
  20. gp.param = nil
  21. gp.labels = nil
  22. gp.timer = nil
  23. ......
  24. // Note that gp's stack scan is now "valid" because it has no
  25. // stack.
  26. gp.gcscanvalid = true
  27. //// 解除 g 与 m 的关系
  28. dropg()
  29. if _g_.m.lockedInt != 0 {
  30. print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
  31. throw("internal lockOSThread error")
  32. }
  33. //// 将 g 放入 free 队列缓存起来
  34. gfput(_g_.m.p.ptr(), gp)
  35. .......
  36. schedule()
  37. }

函数主要步骤解析

  1. 把 g 的状态从 _Grunning 更新为 _Gdead;
  2. 清空 g 的一些字段;
  3. 调用 dropg 函数解除 g 和 m 之间的关系,其实就是设置 g->m = nil, m->currg = nil;
  4. 把 g 放入 p 的 freeg 队列缓存起来供下次创建 g 时快速获取而不用从内存分配。freeg 就是 g 的一个对象池;
  5. 调用 schedule 函数再次进行调度。

gp完成之后放入到goroutine 缓存池,待下次任务重新启用。
而工作线程,又继续调用 schedule 函数进行新一轮的调度,整个过程形成了一个循环。

main goroutine 和普通 goroutine 的退出过程:

对于 main goroutine,在执行完用户定义的 main 函数的所有代码后,直接调用 exit(0) 退出整个进程,非常霸道。
对于普通 goroutine 则没这么“舒服”,需要经历一系列的过程。先是跳转到提前设置好的 goexit 函数的第二条指令,然后调用 runtime.goexit1,接着调用 mcall(goexit0),而 mcall 函数会切换到 g0 栈,运行 goexit0 函数,清理 goroutine 的一些字段,并将其添加到 goroutine 缓存池里,然后进入 schedule 调度循环。到这里,普通 goroutine 才算完成使命。

M如何找工作,如何窃取调度其他及全局的队列的goroutine

一从全局队列窃取
二从本地队列获取
三从其他地方找goroutine -其他P中窃取、从本地或全局队列中获取g、轮询网络
src/runtime/proc.go/findrunnable(()函数详解

  1. func findrunnable() (gp *g, inheritTime bool) {
  2. _g_ := getg()
  3. top:
  4. _p_ := _g_.m.p.ptr()
  5. //......
  6. //从本地队列获取
  7. // local runq
  8. if gp, inheritTime := runqget(_p_); gp != nil {
  9. return gp, inheritTime
  10. }
  11. // global runq
  12. // 从全局队列获取
  13. if sched.runqsize != 0 {
  14. lock(&sched.lock)
  15. gp := globrunqget(_p_, 0)
  16. unlock(&sched.lock)
  17. if gp != nil {
  18. return gp, false
  19. }
  20. }
  21. // Poll network.
  22. // This netpoll is only an optimization before we resort to stealing.
  23. // We can safely skip it if there are no waiters or a thread is blocked
  24. // in netpoll already. If there is any kind of logical race with that
  25. // blocked thread (e.g. it has already returned from netpoll, but does
  26. // not set lastpoll yet), this thread will do blocking netpoll below
  27. // anyway.
  28. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
  29. if list := netpoll(false); !list.empty() { // non-blocking
  30. gp := list.pop()
  31. injectglist(&list)
  32. casgstatus(gp, _Gwaiting, _Grunnable)
  33. if trace.enabled {
  34. traceGoUnpark(gp, 0)
  35. }
  36. return gp, false
  37. }
  38. }
  39. // Steal work from other P's.
  40. // 如果其他的 P 都处于空闲状态,那肯定没有其他工作要做
  41. procs := uint32(gomaxprocs)
  42. if atomic.Load(&sched.npidle) == procs-1 {
  43. // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
  44. // New work can appear from returning syscall/cgocall, network or timers.
  45. // Neither of that submits to local run queues, so no point in stealing.
  46. goto stop
  47. }
  48. ////如果有很多工作线程在找工作,那我就停下休息。避免消耗太多 CPU
  49. // If number of spinning M's >= number of busy P's, block.
  50. // This is necessary to prevent excessive CPU consumption
  51. // when GOMAXPROCS>>1 but the program parallelism is low.
  52. if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
  53. goto stop
  54. }
  55. if !_g_.m.spinning {
  56. //// 设置自旋状态为true
  57. _g_.m.spinning = true
  58. ////自旋状态数加1
  59. atomic.Xadd(&sched.nmspinning, 1)
  60. }
  61. ////从其他P的本地运行队列盗取goroutine
  62. for i := 0; i < 4; i++ {
  63. for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
  64. if sched.gcwaiting != 0 {
  65. goto top
  66. }
  67. stealRunNextG := i > 2 // first look for ready queues with more than 1 g
  68. //窃取函数
  69. if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
  70. return gp, false
  71. }
  72. }
  73. }
  74. stop:
  75. //......
  76. // return P and block
  77. lock(&sched.lock)
  78. if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
  79. unlock(&sched.lock)
  80. goto top
  81. }
  82. if sched.runqsize != 0 {
  83. gp := globrunqget(_p_, 0)
  84. unlock(&sched.lock)
  85. return gp, false
  86. }
  87. //// 当前工作线程解除与P之间的绑定,准备去休眠
  88. if releasep() != _p_ {
  89. throw("findrunnable: wrong p")
  90. }
  91. ////把p放入空闲队列
  92. pidleput(_p_)
  93. unlock(&sched.lock)
  94. wasSpinning := _g_.m.spinning
  95. if _g_.m.spinning {
  96. //// m即将睡眠,不在处于自旋
  97. _g_.m.spinning = false
  98. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
  99. throw("findrunnable: negative nmspinning")
  100. }
  101. }
  102. // check all runqueues once again
  103. ////休眠之气前再检查一下所有的p,看一下是否有工作要做
  104. for _, _p_ := range allpSnapshot {
  105. if !runqempty(_p_) {
  106. lock(&sched.lock)
  107. _p_ = pidleget()
  108. unlock(&sched.lock)
  109. if _p_ != nil {
  110. acquirep(_p_)
  111. if wasSpinning {
  112. _g_.m.spinning = true
  113. atomic.Xadd(&sched.nmspinning, 1)
  114. }
  115. goto top
  116. }
  117. break
  118. }
  119. }
  120. ////......
  121. //// 休眠
  122. stopm()
  123. goto top
  124. }

runsteal窃取函数详解

  1. // 从P2偷走一半的工作放到_p_的本地
  2. func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
  3. // 队尾
  4. t := _p_.runqtail
  5. // 从p2偷取工作,放到_p_.runq的队尾
  6. n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
  7. if n == 0 {
  8. return nil
  9. }
  10. n--
  11. //// 找到最后一个g,准备返回
  12. gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
  13. if n == 0 {
  14. //说明只偷了一个g
  15. return gp
  16. }
  17. //// 队列头
  18. h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
  19. ////判断是否偷太多了
  20. if t-h+n >= uint32(len(_p_.runq)) {
  21. throw("runqsteal: runq overflow")
  22. }
  23. ////更新队尾,将偷来的工作加入队列
  24. atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
  25. return gp
  26. }

调用 runqgrab 从 p2 偷走它一半的工作放到 p 本地。runqgrab 函数将从 p2 偷来的工作放到以 t 为地址的数组里,数组就是 p.runq。

窃取函数findrunnable()函数从其他地方找goroutine来执行。
findrunnable()函数中调用—>globrunqget()函数从全局队列获取—>检查其他的P是否都处于空闲状态,那肯定也没有工作—>如果现在有很多工作线程找工作,那么就停下来休息,避免消耗太多的CPU —>然后从其他P的本地运行队列窃取goroutine(外层循环次数,内层窃取次序)通过runqsteal()函数窃取 —>runsteal()函数中再调用runqgrab函数窃取一半的工作放到p本地—>然后返回findrunnable()函数执行stop代码块->执行releasep()函数接触p与m的关联—>执行stopm()函数 休眠,停止执行工作,直到有新工作需要做为止。

acquirep()函数的作用:

调用 acquirep(p) 绑定获取到的 p 和 m,主要的动作就是设置 p 的 m 字段,更改 p 的工作状态为 _Prunning,并且设置 m 的 p 字段。做完这些之后,再次进入 top 代码段,再走一遍之前找工作的过程。

  1. // Stops execution of the current m until new work is available.
  2. // Returns with acquired P.
  3. ////休眠,停止执行工作,直到有新的工作需要做为止
  4. func stopm() {
  5. //// 当前goroutine,g0
  6. _g_ := getg()
  7. // .......
  8. // 将 m 放到全局空闲链表里去
  9. mput(_g_.m)
  10. unlock(&sched.lock)
  11. //进入到睡眠状态
  12. notesleep(&_g_.m.park)
  13. //这里呗其他工作线程唤醒
  14. noteclear(&_g_.m.park)
  15. acquirep(_g_.m.nextp.ptr())
  16. _g_.m.nextp = 0
  17. }

最后m 找不到工作就会休眠,当其他线程发现有工作要做时,就会找到空闲的m,再通过m.park字段唤醒本线程。唤醒之后再回到findrunnable()函数,继续寻找goroutine,然后返回schedule函数。然后就会去运行找到的goroutine。

系统监控sysmon线程做了什么?

在 runtime.main() 函数中,执行 runtime_init() 前,会启动一个 sysmon 的监控线程,执行后台监控任务:

  1. if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
  2. systemstack(func() {
  3. ////创建监控线程,该线程独立于调度器,不需要跟 p 关联即可运行
  4. newm(sysmon, nil)
  5. })
  6. }

sysmon()函数做了什么

sysmon 执行一个无限循环,一开始每次循环休眠 20us,之后(1 ms 后)每次休眠时间倍增,最终每一轮都会休眠 10ms。

sysmon 中会进行 netpool(获取 fd 事件)、retake(抢占)、forcegc(按时间强制执行 gc),scavenge heap(释放自由列表中多余的项减少内存占用)等处理。

sysmon线程调度总结如下:
  1. 抢占处于系统调用的 P,让其他 m 接管它,以运行其他的 goroutine。
  2. 将运行时间过长的 goroutine 调度出去,给其他 goroutine 运行的机会。

调度相关的函数只需要关心retake函数,retake函数中针对处于_Psycall和_Pruning状态的p进行抢占。