并发 & 并行

  • 并发 (concurrent): 逻辑上处理同时的任务的能力
  • 并行 (parallel): 物理上同一时间处理不同任务

一般来说,并发对应在单个处理器,通过串行的时间片分配(time slice)来执行任务。 而并发,对应多个处理器,来执行不同的任务。

Golang 中,执行多个任务时,Goroutine 会创建不同的线程,也会将任务单元分配给其他线程来执行,这像是并发和并行的结合,能够最大化执行效率。

Go 调度器组成

G

G 是 Goroutine 的缩写,相当于操作系统的进程控制块 (process control block)。 它包含:函数执行的指令和参数,任务对象,线程上下文切换,字段保护,和字段的寄存器。

下面代码来自 runtime/runtime2.go, 可以看到,每个 Goroutine 都有一个不导出的 goid。

  1. type g struct {
  2. m *m // current m; offset known to arm liblink
  3. sched gobuf
  4. ...
  5. param unsafe.Pointer // passed parameter on wakeup
  6. goid int64
  7. ...
  8. vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)
  9. vdsoPC uintptr // PC for traceback while in VDSO call
  10. }

不同版本的 Go 语言,Goroutine 的栈空间的默认值不一样。下面代码来自 runtime/proc.go。

  1. const (
  2. _StackMin = 2048
  3. )
  4. // Create a new g in state _Grunnable, starting at fn, with narg bytes
  5. // of arguments starting at argp. callerpc is the address of the go
  6. // statement that created this. The caller is responsible for adding
  7. // the new g to the scheduler.
  8. //
  9. // This must run on the system stack because it's the continuation of
  10. // newproc, which cannot split the stack.
  11. //
  12. //go:systemstack
  13. func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
  14. _g_ := getg()
  15. if fn == nil {
  16. _g_.m.throwing = -1 // do not dump full stacks
  17. throw("go of nil func value")
  18. }
  19. acquirem() // disable preemption because it can be holding p in a local var
  20. siz := narg
  21. siz = (siz + 7) &^ 7
  22. // We could allocate a larger initial stack if necessary.
  23. // Not worth it: this is almost always an error.
  24. // 4*sizeof(uintreg): extra space added below
  25. // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
  26. if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
  27. throw("newproc: function arguments too large for new goroutine")
  28. }
  29. ...
  30. }

M

M 是一个线程,每个 M 都有一个线程的栈。如果没有给线程的栈分配内存,操作系统会给线程的栈分配默认的内存。

当线程的栈制定,M.stack->G.stack, M 的 PC 寄存器会执行 G 提供的函数。

  1. type m struct {
  2. /*
  3. g0的线程栈与M相关
  4. */
  5. g0 *g
  6. Curg *g //M 现在绑定的G
  7. // SP, PC registers for on-site protection and on-site recovery
  8. vdsoSP uintptr
  9. vdsoPC uintptr
  10. ...
  11. }

P

P(处理器,Processor) 是一个抽象的概念,不是物理上的 CPU。当一个 P 有任务,需要创建或者唤醒一个系统线程去处理它队列中的任务。

P 决定同时执行的任务的数量,GOMAXPROCS 限制系统线程执行用户层面的任务的数量。

  1. // GOMAXPROCS sets the maximum number of CPUs that can be executing
  2. // simultaneously and returns the previous setting. If n < 1, it does not
  3. // change the current setting.
  4. // The number of logical CPUs on the local machine can be queried with NumCPU.
  5. // This call will go away when the scheduler improves.
  6. func GOMAXPROCS(n int) int {
  7. if GOARCH == "wasm" && n > 1 {
  8. n = 1 // WebAssembly has no threads yet, so only one CPU is possible.
  9. }
  10. lock(&sched.lock)
  11. ret := int(gomaxprocs)
  12. unlock(&sched.lock)
  13. if n <= 0 || n == ret {
  14. return ret
  15. }
  16. stopTheWorldGC("GOMAXPROCS")
  17. // newprocs will be processed by startTheWorld
  18. newprocs = int32(n)
  19. startTheWorldGC()
  20. return ret
  21. }

Go 调度器的调度过程

首先创建一个 G 对象,然后 G 被保存在 P 的本地队列或者全局队列(global queue)。

这时 P 会唤醒一个 M。P 按照它的执行顺序继续执行任务。M 寻找一个空闲的 P,如果找得到,将 G 移动到它自己。

然后 M 执行一个调度循环:调用 G 对象 -> 执行 -> 清理线程 -> 继续寻找 Goroutine。

在 M 的执行过程中,上下文切换随时发生。当切换发生,任务的执行现场需要被保护,这样在下一次调度执行可以进行现场恢复。M 的栈保存在 G 对象,只有现场恢复需要的寄存器 (SP,PC 等),需要被保存到 G 对象。

如果 G 对象还没有被执行,M 可以将 G 重新放到 P 的调度队列,等待下一次的调度执行。当调度执行时,M 可以通过 G 的 vdsoSP, vdsoPC 寄存器进行现场恢复。

  • P 队列 P 有 2 种类型的队列:

    • 本地队列:本地的队列是无锁的,没有数据竞争问题,处理速度比较高。
    • 全局队列:是用来平衡不同的 P 的任务数量,所有的 M 共享 P 的全局队列。
  • 线程清理 G 的调度是为了实现 P/M 的绑定,所以线程清理就是释放 P 上的 G,让其他的 G 能够被调度

    • 主动释放 (active release):典型的例子是,执行 G 任务时,发生了系统调用 (system call),这时 M 会处于阻塞(Block)状态。调度器会设置一个超时时间,来释放 P。
    • 被动释放 (passive release):如果系统调用发生,监控程序需要扫描处于阻塞状态的 P/M。 这时,超时之后,P 资源会回收,程序被安排给队列中的其他 G 任务。

调度示意图

tortoise/gmp.md at main · githuberic/tortoise - 图1

P 的数量由 GOMAXPROCS 环境变量,或者 runtime 中 GOMAXPROCS() 函数决定的。

M 的数量在 runtime/debug 包的 SetMaxThreads() 决定。如果当前的 M 阻塞,就会新建一个新的线程。

M 的数量和 P 的数量没有关系。如果当前的 M 阻塞,P 的 goroutine 会运行在其他的 M 上,或者新建一个 M。

所以可能出现有很多个 M,只有 1 个 P 的情况。

调度策略

调度策略是为了尽可能地复用线程,避免频繁地创建,销毁线程。有 2 中策略:

  • Work Stealing: 当没有运行的 G 时,从其他 P 的队列上获得 G
  • Hand Off: 当 M 阻塞时,将 P 转移到其他空闲的 M。

全局的 Goroutine 队列,当 Work Stealing 失败,M 可以从这个队列获取 G 任务。

抢占式调度 (Preemptive scheduling)

考虑到有大量的 G 任务时,为了让每个 G 任务都有时间运行,runtime.main 会创建一个额外的 M,来运行 sysmon 函数。抢占 (preemption) 在 sysmon 中实现。

sysmon 会进入一个无限循环,第一轮休眠 20us,然后休眠时间倍乘,最后每次休眠时间达到 10ms。

sysmon 有 netpoll, retake(抢占),forcegc, scavenge heap 等其他处理。

  1. // Always runs without a P, so write barriers are not allowed.
  2. //
  3. //go:nowritebarrierrec
  4. func sysmon() {
  5. lock(&sched.lock)
  6. sched.nmsys++
  7. checkdead()
  8. unlock(&sched.lock)
  9. lasttrace := int64(0)
  10. idle := 0 // how many cycles in succession we had not wokeup somebody
  11. delay := uint32(0)
  12. for {
  13. if idle == 0 { // start with 20us sleep...
  14. delay = 20
  15. } else if idle > 50 { // start doubling the sleep after 1ms...
  16. delay *= 2
  17. }
  18. if delay > 10*1000 { // up to 10ms
  19. delay = 10 * 1000
  20. }
  21. usleep(delay)
  22. now := nanotime()
  23. next, _ := timeSleepUntil()
  24. ...
  25. if atomic.Load(&scavenge.sysmonWake) != 0 {
  26. // Kick the scavenger awake if someone requested it.
  27. wakeScavenger()
  28. }
  29. // retake P's blocked in syscalls
  30. // and preempt long running G's
  31. if retake(now) != 0 {
  32. idle = 0
  33. } else {
  34. idle++
  35. }
  36. // check if we need to force a GC
  37. if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
  38. lock(&forcegc.lock)
  39. forcegc.idle = 0
  40. var list gList
  41. list.push(forcegc.g)
  42. injectglist(&list)
  43. unlock(&forcegc.lock)
  44. }
  45. ...
  46. unlock(&sched.sysmonlock)
  47. }
  48. }

go func(){} 之后

tortoise/gmp.md at main · githuberic/tortoise - 图2

    1. go func(){} 创建一个新的 goroutine
    1. G 保存在 P 的本地队列,如果本地队列满了,保存在全局队列
    1. G 在 M 上运行,每个 M 绑定一个 P。如果 P 的本地队列没有 G,M 会从其他 P 的本地队列,挥着 G 的全局队列,窃取 G
    1. 当 M 阻塞时,会将 M 从 P 解除。把 G 运行在其他空闲的 M 或者创建新的 M。
    1. 当 M 恢复时,会尝试获得一个空闲的 P。如果没有 P 空闲,M 会休眠,G 会放到全局队列。

生命周期

tortoise/gmp.md at main · githuberic/tortoise - 图3

M0: M0 是首先创建的线程。它就像系统初始化,启动第一个 G,然后变成普通的 M。 G0: 当 M 创建时,G0 就创建了。G0 用来调度其他的 G。每个 M 都有自己的 G0。当系统调用或者 goroutine 调度,G0 的内存栈就会被占用。

看下面的例子,

  1. package main
  2. import "fmt"
  3. func main() {
  4. fmt.Println("Hello world")
  5. }

上面代码的流程图:

    1. runtime 创建 M0,G0 然后绑定他们
    1. 调度器初始化:初始化 M0,栈,垃圾回收,创建初始的长度为 GOMAXPROCS 的 P 列表
    1. runtime.main 创建代码的 main.main,创建主 gorourine, 然后放到 P 的本地队列
    1. 启动 M0, M0 绑定 P
    1. 根据 goroutine 的栈和调度信息,M0 设置运行环境
    1. 在 M 中运行 G
    1. G 退出,runtime.main 调用 defer,panic, 最后调用 runtime.exit
      https://github.com/githuberic/tortoise/blob/main/go_interview/gpm/gmp.md