调度相关的数据结构有三个,M(线程),P(调度器),G(goroutine)
M 表示线程,P 作为调度器用来帮助每个线程管理自己的 goroutine,G 就是 golang 的协程。我们可以通过 runtime.GOMAXPROCS(n int) 函数设置 P 的个数,注意P 的个数并不代表 M 的个数,例如程序启动时 runtime 代码会出实话 procs 个 P,但开始的时候只会启动一个 M,就是 M0 和一个栈为 64K(其他 goroutine 默认初始栈大小 2K) 来执行 runtime 代码。
那其他线程是什么时候创建的呐?
当 goroutine 被唤醒时,要在 M 上运行 (恢复 goroutine 的上下文),P 是帮助 M 管理 goroutine 的,恢复上下文的操作也由 P 来完成。如果被唤醒时发现还有空闲的 P,并且没有其他 M 在窃取 goroutine(M 发现本地 goroutine 队列和全局 goroutine 队列都没有 goroutine 的时候,会去其他线程窃取 goroutine),说明其他 M 都在忙,就会创建一个 M 让这个空闲的 P 帮他来管理 goroutine。
总之一句话,开始的时候创建一个 M,当发现调度不过来且还有空闲 P 没有工作就在创建新的,直到创建 procs 个 M(procs 通过 runtime.GOMAXPROCS 设置)

golang goroutine源码阅读 - 勿忘初心0924 - 博客园 - 图1

G

golang 用结构体 g 表示 goroutine

g

  1. type g struct {
  2. stack stack
  3. stackguard0 uintptr
  4. stackguard1 uintptr
  5. _panic *_panic
  6. _defer *_defer
  7. m *m
  8. sched gobuf
  9. ...
  10. schedlink guintptr
  11. ...
  12. preempt bool
  13. ...
  14. }

gobuf

gobuf 保存 goroutine 的调度信息,当一个 goroutine 被调度的时,本质上就是把这个 goroutine 放到 cpu,恢复各个寄存器的值,然后运行

  1. type gobuf struct {
  2. sp uintptr
  3. pc uintptr
  4. g guintptr
  5. ctxt unsafe.Pointer
  6. ret sys.Uintreg
  7. lr uintptr
  8. bp uintptr
  9. }

M

golang 中 M 表示实际操作系统的线程

m

  1. type m struct {
  2. g0 *g
  3. ...
  4. gsignal *g
  5. tls [6]uintptr
  6. mstartfn func()
  7. curg *g
  8. ...
  9. p puintptr
  10. nextp puintptr
  11. oldp puintptr
  12. ...
  13. spinning bool
  14. blocked bool
  15. ...
  16. park note
  17. alllink *m
  18. ...
  19. thread uintptr
  20. ...
  21. }

P

golang 中 P 表示一个调度器,为 M 提供上下文环境,使得 M 可以执行多个 goroutine

p

  1. type p struct {
  2. m muintptr
  3. ...
  4. runqhead uint32
  5. runqtail uint32
  6. runq [256]guintptr
  7. ...
  8. sudogcache []*sudog
  9. sudogbuf [128]*sudog
  10. ...
  11. pad cpu.CacheLinePad
  12. }

schedt

schedt 结构体用来保存 P 的状态信息和 goroutine 的全局运行队列

  1. type schedt struct {
  2. ...
  3. lock mutex
  4. midle muintptr
  5. nmidle int32
  6. nmidlelocked int32
  7. mnext int64
  8. maxmcount int32
  9. nmsys int32
  10. nmfreed int64
  11. ngsys uint32
  12. pidle puintptr
  13. npidle uint32
  14. nmspinning uint32
  15. runq gQueue
  16. runqsize int32
  17. ...
  18. gFree struct {
  19. lock mutex
  20. stack gList
  21. noStack gList
  22. n int32
  23. }
  24. ...
  25. }

重要的全局变量

  1. allgs []*g
  2. allm *m
  3. allp []*p
  4. ncpu int32
  5. gomaxprocs int32
  6. sched schedt
  7. m0 m
  8. g0 g

下面是用 go 实现的 hello world,代码里并没有关于调度的初始化,所以程序的入口并非是 main.main,下面通过 gdb 一步步找到 go 是如何初始化调度的。

  1. package main
  2. func main() {
  3. println("hello, world!")
  4. }

编译

  1. go build -gcflags "-N -l" test.go

使用 OS X 的同学注意,go1.11 之后压缩的 debug 信息,OS X 的同学需要同时做以下设置参考Debug Go Program With Gdb On Macos

  1. export GOFLAGS="-ldflags=-compressdwarf=false"

调试

  • 利用断点可以找出目标文件的信息,在入口处打一个断点,找到程序入口在 rt0_darwin_amd64.s 的第 8 行
  1. sudo gdb test
  2. (gdb) info files
  3. Symbols from "/Users/journey/workspace/src/tool/gdb/test".
  4. Local exec file:
  5. `/Users/journey/workspace/src/tool/gdb/test', file type mach-o-x86-64.
  6. Entry point: 0x104cd00
  7. 0x0000000001001000 - 0x00000000010515b1 is .text
  8. 0x00000000010515c0 - 0x000000000108162a is __TEXT.__rodata
  9. 0x0000000001081640 - 0x0000000001081706 is __TEXT.__symbol_stub1
  10. 0x0000000001081720 - 0x0000000001081e80 is __TEXT.__typelink
  11. 0x0000000001081e80 - 0x0000000001081e88 is __TEXT.__itablink
  12. 0x0000000001081e88 - 0x0000000001081e88 is __TEXT.__gosymtab
  13. 0x0000000001081ea0 - 0x00000000010bfacd is __TEXT.__gopclntab
  14. 0x00000000010c0000 - 0x00000000010c0020 is __DATA.__go_buildinfo
  15. 0x00000000010c0020 - 0x00000000010c0128 is __DATA.__nl_symbol_ptr
  16. 0x00000000010c0140 - 0x00000000010c0d08 is __DATA.__noptrdata
  17. 0x00000000010c0d20 - 0x00000000010c27f0 is .data
  18. 0x00000000010c2800 - 0x00000000010ddc90 is .bss
  19. 0x00000000010ddca0 - 0x00000000010e01e8 is __DATA.__noptrbss
  20. (gdb) b *0x104cd00
  21. Breakpoint 1 at 0x104cd00: file /usr/local/go/src/runtime/rt0_darwin_amd64.s, line 8.
  • 进入上面找到的文件 rt0_darwin_amd64.s(不同的架构文件是不同的)
  1. runtime ls rt0_*
  2. rt0_aix_ppc64.s rt0_darwin_amd64.s rt0_freebsd_arm.s rt0_linux_arm64.s rt0_nacl_386.s rt0_netbsd_arm64.s rt0_plan9_amd64.s
  3. rt0_android_386.s rt0_darwin_arm.s rt0_illumos_amd64.s rt0_linux_mips64x.s rt0_nacl_amd64p32.s rt0_openbsd_386.s rt0_plan9_arm.s
  4. rt0_android_amd64.s rt0_darwin_arm64.s rt0_js_wasm.s rt0_linux_mipsx.s rt0_nacl_arm.s rt0_openbsd_amd64.s rt0_solaris_amd64.s
  5. rt0_android_arm.s rt0_dragonfly_amd64.s rt0_linux_386.s rt0_linux_ppc64.s rt0_netbsd_386.s rt0_openbsd_arm.s rt0_windows_386.s
  6. rt0_android_arm64.s rt0_freebsd_386.s rt0_linux_amd64.s rt0_linux_ppc64le.s rt0_netbsd_amd64.s rt0_openbsd_arm64.s rt0_windows_amd64.s
  7. rt0_darwin_386.s rt0_freebsd_amd64.s rt0_linux_arm.s rt0_linux_s390x.s rt0_netbsd_arm.s rt0_plan9_386.s rt0_windows_arm.s
  • 打开文件 go/src/runtime/rt0_darwin_amd64.s:8
    这里没有做什么就调了函数_rt0_amd64
  1. TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8
  2. JMP _rt0_amd64(SB)
  • 然后在打断点看看_rt0_amd64 在哪
    在 ams_amd64.s 第 15 行
  1. (gdb) b _rt0_amd64
  2. Breakpoint 2 at 0x1049350: file /usr/local/go/src/runtime/asm_amd64.s, line 15.

这里首先把参数放到 DI,SI 寄存器中,然后调用 runtime.rt0_go,这就是进程初始化主要函数了
参数 0 放在 DI 通用寄存器
参数 1 放在 SI 通用寄存器
参数 2 放在 DX 通用寄存器
参数 3 放在 CX 通用寄存器

  1. TEXT _rt0_amd64(SB),NOSPLIT,$-8
  2. MOVQ 0(SP), DI
  3. LEAQ 8(SP), SI
  4. JMP runtime·rt0_go(SB)
  • 然后跳转到 runtime.rt0_go
  1. (gdb) b runtime.rt0_go
  2. Breakpoint 3 at 0x1049360: file /usr/local/go/src/runtime/asm_amd64.s, line 89.

初始化

这个函数有点长,下面我们分段来看 rt0_go 这个函数

初始化参数以及创建 g0

  1. 首先将之前放入通用寄存器的参数放入 AX,BX 寄存器,然后调整栈顶指针 (真 SP 寄存器) 的位置,SP 指针先减 39,关于 16 字节向下对齐(因为 CPU 有一组 SSE 指令,这些指令中出现的内存地址必须是 16 的倍数),然后把参数放到 SP+16 字节和 SP+24 字节处
    golang 的汇编有抽象出来的寄存器,通过是否有前缀变量区分真假寄存器,例如 a+8(SP) 就是 golang 的寄存器,8(SP) 就是真的寄存器
  2. 创建 g0,并初始化 g.stackgruard0,g.stackguard1 以及 g.stack.lo,g.stack.hi 的值 (实际上是分配一段内存,然后分割成小段,约定哪小段表示哪个变量)
  1. TEXT runtime·rt0_go(SB),NOSPLIT,$0
  2. MOVQ DI, AX
  3. MOVQ SI, BX
  4. SUBQ $(4*8+7), SP
  5. ANDQ $~15, SP
  6. MOVQ AX, 16(SP)
  7. MOVQ BX, 24(SP)
  8. MOVQ $runtime·g0(SB), DI
  9. LEAQ (-64*1024+104)(SP), BX
  10. MOVQ BX, g_stackguard0(DI)
  11. MOVQ BX, g_stackguard1(DI)
  12. MOVQ BX, (g_stack+stack_lo)(DI)
  13. MOVQ SP, (g_stack+stack_hi)(DI)

创建完 g0 的内存分布

golang goroutine源码阅读 - 勿忘初心0924 - 博客园 - 图2

然后略过一段 CPU 型号检测和 CGO 初始化的代码

  1. ...

创建 m0

  1. 创建将 m0.tls 放入 DI 寄存器,然后调用 runtime.settls 将 m0 设置为线程私有变量 (mac 下什么也没干),将 m0 与主线程绑定,然后对 m0.tls 进行存取操作验证是否能用,不能用就直接退出
  2. 绑定 m0 和 g0 的关系,m0.g0 = g0,g0.m = m0
  1. LEAQ runtime·m0+m_tls(SB), DI
  2. CALL runtime·settls(SB)
  3. get_tls(BX)
  4. MOVQ $0x123, g(BX)
  5. MOVQ runtime·m0+m_tls(SB), AX
  6. CMPQ AX, $0x123
  7. JEQ 2(PC)
  8. CALL runtime·abort(SB)
  9. ok:
  10. get_tls(BX)
  11. LEAQ runtime·g0(SB), CX
  12. MOVQ CX, g(BX)
  13. LEAQ runtime·m0(SB), AX
  14. MOVQ CX, m_g0(AX)
  15. MOVQ AX, g_m(CX)
  16. CLD
  17. CALL runtime·check(SB)

创建完 m0 之后的内存分布

golang goroutine源码阅读 - 勿忘初心0924 - 博客园 - 图3

m0 和 g0 的关系

  1. m0 表示主线程,g0 表示主线程的第一个 goroutine
  2. g0 主要是记录主线程的栈信息,执行调度函数 (schedule 后边会讲) 时会用,而用户 goroutine 有自己的栈,执行的时候会从 g0 栈切换到用户 goroutine 栈

初始化调度

g0 和 m0 都创建并初始化好了,下面就该进行调度初始化了

  1. 将参数放入 AX(初始化 g0 时将参数放入 SP+16 和 SP+24 的位置
  2. runtime.args 初始化参数的
  3. runtime.osinit 是初始化 CPU 核数的
  4. 重点看 runtime.schedinit
  1. MOVL 16(SP), AX
  2. MOVL AX, 0(SP)
  3. MOVQ 24(SP), AX
  4. MOVQ AX, 8(SP)
  5. CALL runtime·args(SB)
  6. CALL runtime·osinit(SB)
  7. CALL runtime·schedinit(SB)

runtime.schedinit

下面函数省略了调度无关的代码,大概流程:

  1. 设置最大线程数
  2. 根据 GOMAXPROCS 设置 procs(P 的数量)
  3. 调用 procresizeprocs 调整 P 的数量
  1. func schedinit() {
  2. _g_ := getg()
  3. if raceenabled {
  4. _g_.racectx, raceprocctx0 = raceinit()
  5. }
  6. sched.maxmcount = 10000
  7. ...
  8. mcommoninit(_g_.m)
  9. ...
  10. sched.lastpoll = uint64(nanotime())
  11. procs := ncpu
  12. if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
  13. procs = n
  14. }
  15. if procresize(procs) != nil {
  16. throw("unknown runnable goroutine during bootstrap")
  17. }
  18. ...
  19. }

runtime.procresize

  • 调度初始化最后一步
  1. 更新最后一次修改 P 数量动作的时间戳并累加花费时间
  2. 根据 nprocs 调整 P 的数量 (加锁)

    1. nprocs > 现有 P 数量,就扩展 allp(p 的全局数组) 的长度为 nprocs
    2. nprocs < 现有 P 数量,就缩容 allp 的长度为 nprocs
  3. 如果上一步是扩容了,就从堆中创建新 P,并把 P 放入扩容出来的位置
  4. 通过 g0 找到 m0,然后将 allp[0]和 m0 绑定
  5. 如果 allp 缩容了,就将多余的 p 销毁
  6. 将空闲的 p 加入空闲链表
    到目前为止,创建了 m0,g0,和 nprocs 个 P,但是还是没有让调度真正的跑起来
  1. func procresize(nprocs int32) *p {
  2. old := gomaxprocs
  3. if old < 0 || nprocs <= 0 {
  4. throw("procresize: invalid arg")
  5. }
  6. if trace.enabled {
  7. traceGomaxprocs(nprocs)
  8. }
  9. now := nanotime()
  10. if sched.procresizetime != 0 {
  11. sched.totaltime += int64(old) * (now - sched.procresizetime)
  12. }
  13. sched.procresizetime = now
  14. if nprocs > int32(len(allp)) {
  15. lock(&allpLock)
  16. if nprocs <= int32(cap(allp)) {
  17. allp = allp[:nprocs]
  18. } else {
  19. nallp := make([]*p, nprocs)
  20. copy(nallp, allp[:cap(allp)])
  21. allp = nallp
  22. }
  23. unlock(&allpLock)
  24. }
  25. for i := old; i < nprocs; i++ {
  26. pp := allp[i]
  27. if pp == nil {
  28. pp = new(p)
  29. }
  30. pp.init(i)
  31. atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
  32. }
  33. _g_ := getg()
  34. if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
  35. _g_.m.p.ptr().status = _Prunning
  36. _g_.m.p.ptr().mcache.prepareForSweep()
  37. } else {
  38. if _g_.m.p != 0 {
  39. if trace.enabled {
  40. traceGoSched()
  41. traceProcStop(_g_.m.p.ptr())
  42. }
  43. _g_.m.p.ptr().m = 0
  44. }
  45. _g_.m.p = 0
  46. _g_.m.mcache = nil
  47. p := allp[0]
  48. p.m = 0
  49. p.status = _Pidle
  50. acquirep(p)
  51. if trace.enabled {
  52. traceGoStart()
  53. }
  54. }
  55. for i := nprocs; i < old; i++ {
  56. p := allp[i]
  57. p.destroy()
  58. }
  59. if int32(len(allp)) != nprocs {
  60. lock(&allpLock)
  61. allp = allp[:nprocs]
  62. unlock(&allpLock)
  63. }
  64. var runnablePs *p
  65. for i := nprocs - 1; i >= 0; i-- {
  66. p := allp[i]
  67. if _g_.m.p.ptr() == p {
  68. continue
  69. }
  70. p.status = _Pidle
  71. if runqempty(p) {
  72. pidleput(p)
  73. } else {
  74. p.m.set(mget())
  75. p.link.set(runnablePs)
  76. runnablePs = p
  77. }
  78. }
  79. stealOrder.reset(uint32(nprocs))
  80. var int32p *int32 = &gomaxprocs
  81. atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
  82. return runnablePs
  83. }

创建 “第一个”goroutine

我们返回 runtime·rt0_go 接着看

  1. 将 runtime.main 地址放入 AX
  2. 参数 AX, 0 入栈 (函数参数入栈由右向左)
  3. 然后调用 runtime.newproc 创建 goroutine
  1. MOVQ $runtime·mainPC(SB), AX
  2. PUSHQ AX
  3. PUSHQ $0
  4. CALL runtime·newproc(SB)
  5. POPQ AX
  6. POPQ AX

newproc

  1. 首先获取参数地址
  2. 获取当前所在 goroutine(初始化时 runtime 代码都在 g0 执行)
  3. 获取要执行指令地址
  4. 在 gp 的栈上执行 runtime.newproc1(在 g0 栈上执行)
  1. func newproc(siz int32, fn *funcval) {
  2. argp := add(unsafe.Pointer(&fn), sys.PtrSize)
  3. gp := getg()
  4. pc := getcallerpc()
  5. systemstack(func() {
  6. newproc1(fn, (*uint8)(argp), siz, gp, pc)
  7. })
  8. }

newproc1 函数主要的工作

这个函数有点长分段来看

  1. 首先获得当前所在 goroutine(g0)
  2. 禁止抢占
  3. 计算参数位置
  4. 计算下参数是否过大
  5. 获取当前 goroutine 所在 m 的 p,前边讲过 g0 对应的 m 是 m0,m0 对应的 p 是 allp[0]
  6. 创建一个 goroutine(先从 p 的缓存里找,找不到就 new 一个),并且确认 goroutine 栈边界是初始化好的 (方式 p 缓存里的 goroutine 参数没初始化)
  7. 计算栈顶的地址,如果有参数就将参数放到新创建的这个 goroutine 上
  1. func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
  2. _g_ := getg()
  3. if fn == nil {
  4. _g_.m.throwing = -1
  5. throw("go of nil func value")
  6. }
  7. acquirem()
  8. siz := narg
  9. siz = (siz + 7) &^ 7
  10. if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
  11. throw("newproc: function arguments too large for new goroutine")
  12. }
  13. _p_ := _g_.m.p.ptr()
  14. newg := gfget(_p_)
  15. if newg == nil {
  16. newg = malg(_StackMin)
  17. casgstatus(newg, _Gidle, _Gdead)
  18. allgadd(newg)
  19. }
  20. if newg.stack.hi == 0 {
  21. throw("newproc1: newg missing stack")
  22. }
  23. if readgstatus(newg) != _Gdead {
  24. throw("newproc1: new g is not Gdead")
  25. }
  26. ...

设置各个寄存器的值 (在 cpu 上恢复上下文时使用)

  1. 清理 sched
  2. 设置栈顶置针位置
  3. 设置 pc 寄存器值 (goexit 函数第二条指令,常理应该是 goroutine 本身函数的第一条指令,这个妙用后边说)
  4. 设置 goroutine 地址
  5. 调用 gostartcallfn,参数是 sched 和 goroutine 的参数
  1. memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
  2. newg.sched.sp = sp
  3. newg.stktopsp = sp
  4. newg.sched.pc = funcPC(goexit) + sys.PCQuantum
  5. newg.sched.g = guintptr(unsafe.Pointer(newg))
  6. gostartcallfn(&newg.sched, fn)

判断一下 goroutine 的函数是否为空,然后调用 gostartcall

  1. func gostartcallfn(gobuf *gobuf, fv *funcval) {
  2. var fn unsafe.Pointer
  3. if fv != nil {
  4. fn = unsafe.Pointer(fv.fn)
  5. } else {
  6. fn = unsafe.Pointer(funcPC(nilfunc))
  7. }
  8. gostartcall(gobuf, fn, unsafe.Pointer(fv))
  9. }
  1. 获取 sp,现在新 goroutine 的栈上之后本身的函数,sp 指向函数的第一个参数
  2. 将 sp 指向 pc 里面的指令地址,也就是 goexit 的第二条指令,然后重新设置新 goroutinesp 地址
  3. 这时候 pc 才指向 goroutine 自己的函数

gostartcall 的主要作用就是将 goexit 入栈,然后设置 goroutine 的 pc 指向自身函数,伪装成是 goexit 调用的自身函数,当自身函数执行完时返回 goexit 清理线程,大概就是下面这样

  1. func goexit() {
  2. goroutine自身函数()
  3. 清理现场()
  4. }
  1. func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
  2. sp := buf.sp
  3. if sys.RegSize > sys.PtrSize {
  4. sp -= sys.PtrSize
  5. *(*uintptr)(unsafe.Pointer(sp)) = 0
  6. }
  7. sp -= sys.PtrSize
  8. *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
  9. buf.sp = sp
  10. buf.pc = uintptr(fn)
  11. buf.ctxt = ctxt
  12. }

然后再回到 newproc 函数,剩下的就是设置 goroutine 的状态,然后把 goroutine 放入 p 的待执行队列中

  1. newg.gopc = callerpc
  2. newg.ancestors = saveAncestors(callergp)
  3. newg.startpc = fn.fn
  4. if _g_.m.curg != nil {
  5. newg.labels = _g_.m.curg.labels
  6. }
  7. if isSystemGoroutine(newg, false) {
  8. atomic.Xadd(&sched.ngsys, +1)
  9. }
  10. newg.gcscanvalid = false
  11. casgstatus(newg, _Gdead, _Grunnable)
  12. if _p_.goidcache == _p_.goidcacheend {
  13. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
  14. _p_.goidcache -= _GoidCacheBatch - 1
  15. _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
  16. }
  17. newg.goid = int64(_p_.goidcache)
  18. _p_.goidcache++
  19. if raceenabled {
  20. newg.racectx = racegostart(callerpc)
  21. }
  22. if trace.enabled {
  23. traceGoCreate(newg, newg.startpc)
  24. }
  25. runqput(_p_, newg, true)
  26. if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
  27. wakep()
  28. }
  29. releasem(_g_.m)
  30. }

总结一下创建第一个 goroutine 执行 runtime.main 的过程 (只是创建啊,整个调度这时候还是没有跑起来)

golang goroutine源码阅读 - 勿忘初心0924 - 博客园 - 图4

调度循环

我们再返回 runtime·rt0_go 继续看,总结一下到目前为止已经准备好的事情

  1. 将 m0 与主线程绑定了 (将 m0 结构体设为主线程的私有变量)
  2. 创建了 g0,并且与 m0 绑定
  3. 创建了 procs 个 p 并且初始化,将 allp[0]与 m0 绑定,形成初步的 GMP 模型 (g0,m0,p0)
  4. 创建了一个执行 runtime.main(不是代码里的 main.main,runtime.main 会做加载 init 函数等操作然后调用 main.main) 的 goroutine 并且放入了 p0 的待运行队列

接下来就是调度循环了,调用 runtime.mstart,这个函数就是调度循环,除非程序退出否则永远阻塞住

  1. CALL runtime·mstart(SB)
  2. CALL runtime·abort(SB)
  3. RET
  4. MOVQ $runtime·debugCallV1(SB), AX
  5. RET

runtime.mstart

  1. 获取了当前所在 goroutine(初始化时代码都是在 g0 上执行的)
  2. 初始化栈保护
  3. 调用 mstart1
    go/src/runtime/proc.go, line 1146
  1. func mstart() {
  2. _g_ := getg()
  3. osStack := _g_.stack.lo == 0
  4. if osStack {
  5. size := _g_.stack.hi
  6. if size == 0 {
  7. size = 8192 * sys.StackGuardMultiplier
  8. }
  9. _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
  10. _g_.stack.lo = _g_.stack.hi - size + 1024
  11. }
  12. _g_.stackguard0 = _g_.stack.lo + _StackGuard
  13. _g_.stackguard1 = _g_.stackguard0
  14. mstart1()
  15. if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
  16. osStack = true
  17. }
  18. mexit(osStack)
  19. }

runtime.mstart1

  1. 保存 g0 的指令指针和栈指针,保存这两个值是理解调度循环的关键,mstart1 执行完之后,g0 继续执行指令,不会再返回来了,保存了指令和栈指针之后,g0 要继续执行指令的时候,就会又从上面开始执行
  2. 做一些初始化工作
  3. 调用 schedule 开始调度
  1. func mstart1() {
  2. _g_ := getg()
  3. if _g_ != _g_.m.g0 {
  4. throw("bad runtime·mstart")
  5. }
  6. save(getcallerpc(), getcallersp())
  7. asminit()
  8. minit()
  9. if _g_.m == &m0 {
  10. mstartm0()
  11. }
  12. if fn := _g_.m.mstartfn; fn != nil {
  13. fn()
  14. }
  15. if _g_.m != &m0 {
  16. acquirep(_g_.m.nextp.ptr())
  17. _g_.m.nextp = 0
  18. }
  19. schedule()
  20. }

runtime.schedule

调度开始了,m 要找 gorutine 放到 cpu 上执行了

  1. 每调度 61 次 (具体为啥是 61 有待思考),就从全局的 goroutine 列表中选 goroutine
  2. 如果上一步没找到,就从 m 对应的 p 的缓存里找
  3. 如果上一步还没有找到,就调 findrunnable 从其他线程窃取 goroutine,如果发现有就窃取一半放到自己的 p 缓存中,如果都没有就说明真的没有待运行的 goroutine 了,就陷入睡眠一直阻塞在 findrunnable 函数,等待被唤醒
  4. 直到有 goroutine 需要执行了,就调用 execute 执行 goroutine
  1. func schedule() {
  2. _g_ := getg()
  3. if _g_.m.locks != 0 {
  4. throw("schedule: holding locks")
  5. }
  6. if _g_.m.lockedg != 0 {
  7. stoplockedm()
  8. execute(_g_.m.lockedg.ptr(), false)
  9. }
  10. if _g_.m.incgo {
  11. throw("schedule: in cgo")
  12. }
  13. top:
  14. if sched.gcwaiting != 0 {
  15. gcstopm()
  16. goto top
  17. }
  18. if _g_.m.p.ptr().runSafePointFn != 0 {
  19. runSafePointFn()
  20. }
  21. var gp *g
  22. var inheritTime bool
  23. tryWakeP := false
  24. if trace.enabled || trace.shutdown {
  25. gp = traceReader()
  26. if gp != nil {
  27. casgstatus(gp, _Gwaiting, _Grunnable)
  28. traceGoUnpark(gp, 0)
  29. tryWakeP = true
  30. }
  31. }
  32. if gp == nil && gcBlackenEnabled != 0 {
  33. gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
  34. tryWakeP = tryWakeP || gp != nil
  35. }
  36. if gp == nil {
  37. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
  38. lock(&sched.lock)
  39. gp = globrunqget(_g_.m.p.ptr(), 1)
  40. unlock(&sched.lock)
  41. }
  42. }
  43. if gp == nil {
  44. gp, inheritTime = runqget(_g_.m.p.ptr())
  45. if gp != nil && _g_.m.spinning {
  46. throw("schedule: spinning with local work")
  47. }
  48. }
  49. if gp == nil {
  50. gp, inheritTime = findrunnable()
  51. }
  52. if _g_.m.spinning {
  53. resetspinning()
  54. }
  55. if sched.disable.user && !schedEnabled(gp) {
  56. lock(&sched.lock)
  57. if schedEnabled(gp) {
  58. unlock(&sched.lock)
  59. } else {
  60. sched.disable.runnable.pushBack(gp)
  61. sched.disable.n++
  62. unlock(&sched.lock)
  63. goto top
  64. }
  65. }
  66. if tryWakeP {
  67. if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
  68. wakep()
  69. }
  70. }
  71. if gp.lockedm != 0 {
  72. startlockedm(gp)
  73. goto top
  74. }
  75. execute(gp, inheritTime)
  76. }

触发调度

触发调度地方大致有:

  1. 主动挂起
  2. 系统调用
  3. 协作式调度
  4. 正常退出
  • proc.go:1208 runtime.mstart1(调度开始)

主动挂起

  • proc.go:2610 runtime.park_m
    在上一章内容里讲过golang channel 源码阅读,当 goroutine 接收一个 channel 为空且为阻塞的时候,goroutine 会调用 goparkunlock 使 goroutine 陷入睡眠,等待 send 端调用 goready 函数唤醒函数,主动挂起就是这种情况,当 goroutine 由于某些条件在等待时,就会主动挂起,不放回待运行队列,等待被唤醒

各种阻塞条件 -> runtime.gopark() -> runtime.park_m() -> runtime.schedule

  1. 获取当前所在 m,并且固定 m
  2. 获取当前程序所在 goroutine
  3. 设置锁状态以及阻塞原因
  4. 调用 runtime.park_m 挂起 goroutine
  1. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
  2. if reason != waitReasonSleep {
  3. checkTimeouts()
  4. }
  5. mp := acquirem()
  6. gp := mp.curg
  7. status := readgstatus(gp)
  8. if status != _Grunning && status != _Gscanrunning {
  9. throw("gopark: bad g status")
  10. }
  11. mp.waitlock = lock
  12. mp.waitunlockf = unlockf
  13. gp.waitreason = reason
  14. mp.waittraceev = traceEv
  15. mp.waittraceskip = traceskip
  16. releasem(mp)
  17. mcall(park_m)
  18. }
  1. 获取当前 goroutine
  2. 将 goroutine 状态设置为 Gwaiting
  3. 重新调度
  1. func park_m(gp *g) {
  2. _g_ := getg()
  3. if trace.enabled {
  4. traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
  5. }
  6. casgstatus(gp, _Grunning, _Gwaiting)
  7. dropg()
  8. if fn := _g_.m.waitunlockf; fn != nil {
  9. ok := fn(gp, _g_.m.waitlock)
  10. _g_.m.waitunlockf = nil
  11. _g_.m.waitlock = nil
  12. if !ok {
  13. if trace.enabled {
  14. traceGoUnpark(gp, 2)
  15. }
  16. casgstatus(gp, _Gwaiting, _Grunnable)
  17. execute(gp, true)
  18. }
  19. }
  20. schedule()
  21. }

协作式调度

  • proc.go:2625 runtime.goschedImpl(协作式调度)
  1. 主动让出 cpu,这个情况不会挂起 goroutine,而是放回队列,等待下次调度,这个函数 (GoSched) 被暴露出去,可以调用,例如,线上有这种情况,写 log 是异步的,但由于机器磁盘老旧性能不佳,所以当 log goroutine 运行时还是会过多的占用 cpu,这时候可以调用 GoSched 适当降低当前 goroutine 优先级

runtime.Gosched -> runtime.gosched_m -> runtime.goschedImpl runtime.schedule

  1. func gosched_m(gp *g) {
  2. if trace.enabled {
  3. traceGoSched()
  4. }
  5. goschedImpl(gp)
  6. }
  1. 调度保护,当调度器发现 goroutine 处于禁止的状态时就会主动调度让出 cpu
  1. func goschedguarded_m(gp *g) {
  2. if gp.m.locks != 0 || gp.m.mallocing != 0 || gp.m.preemptoff != "" || gp.m.p.ptr().status != _Prunning {
  3. gogo(&gp.sched)
  4. }
  5. if trace.enabled {
  6. traceGoSched()
  7. }
  8. goschedImpl(gp)
  9. }
  1. 发生抢占,例如当一个 goroutine 运行时间过长但不像等待 channel 那样阻塞,一直有事情做时,其他 goroutine 可能会抢占 cpu
  1. func gopreempt_m(gp *g) {
  2. if trace.enabled {
  3. traceGoPreempt()
  4. }
  5. goschedImpl(gp)
  6. }
  1. func goschedImpl(gp *g) {
  2. status := readgstatus(gp)
  3. if status&^_Gscan != _Grunning {
  4. dumpgstatus(gp)
  5. throw("bad g status")
  6. }
  7. casgstatus(gp, _Grunning, _Grunnable)
  8. dropg()
  9. lock(&sched.lock)
  10. globrunqput(gp)
  11. unlock(&sched.lock)
  12. schedule()
  13. }

非 main goroutine 结束

  • proc.go:2704,2727 runtime.goexit0(goroutine 正常执行完)
    非 main goroutine 结束后会继续调度,这个是正常继续下一次调度不做过多介绍

系统调用

  • proc.go:3141 runtime.exitsyscall0(系统调用)

runtime·exitsyscall -> runtime·exitsyscall0 -> runtime.schedule

我们来看下系统调用的过程

  1. func syscall_syscall(fn, a1, a2, a3 uintptr) (r1, r2, err uintptr) {
  2. entersyscall()
  3. libcCall(unsafe.Pointer(funcPC(syscall)), unsafe.Pointer(&fn))
  4. exitsyscall()
  5. return
  6. }
  7. func syscall()

首先会调用 runtime.entersyscall 获取当前的指令位置和栈指针,然后调用 reentersyscall 做 goroutine 进入系统调用之前的准备

  1. func entersyscall() {
  2. reentersyscall(getcallerpc(), getcallersp())
  3. }
  1. 禁止线程抢占防止出现栈不一致的情况
  2. 保证当前函数不会触发栈调整 (golang 进程的栈初始 2k,然后动态调整)
  3. 设置 goroutine 状态为 Gsyscall
  4. 将 goroutine 的 P 暂时和 M 分离,并且设置 P 状态为 Psyscall
  5. 释放锁
  1. func reentersyscall(pc, sp uintptr) {
  2. _g_ := getg()
  3. _g_.m.locks++
  4. _g_.stackguard0 = stackPreempt
  5. _g_.throwsplit = true
  6. save(pc, sp)
  7. _g_.syscallsp = sp
  8. _g_.syscallpc = pc
  9. casgstatus(_g_, _Grunning, _Gsyscall)
  10. if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
  11. systemstack(func() {
  12. print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
  13. throw("entersyscall")
  14. })
  15. }
  16. if trace.enabled {
  17. systemstack(traceGoSysCall)
  18. save(pc, sp)
  19. }
  20. if atomic.Load(&sched.sysmonwait) != 0 {
  21. systemstack(entersyscall_sysmon)
  22. save(pc, sp)
  23. }
  24. if _g_.m.p.ptr().runSafePointFn != 0 {
  25. systemstack(runSafePointFn)
  26. save(pc, sp)
  27. }
  28. _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
  29. _g_.sysblocktraced = true
  30. _g_.m.mcache = nil
  31. pp := _g_.m.p.ptr()
  32. pp.m = 0
  33. _g_.m.oldp.set(pp)
  34. _g_.m.p = 0
  35. atomic.Store(&pp.status, _Psyscall)
  36. if sched.gcwaiting != 0 {
  37. systemstack(entersyscall_gcwait)
  38. save(pc, sp)
  39. }
  40. _g_.m.locks--
  41. }

然后就进入系统调用

  1. ...
  1. 获得 goroutine
  2. 线程加锁
  3. 调 exitsyscallfast 替当前 goroutine 找一个 P

    1. 如果原 P 处于 Psyscall 就让这个 P 接管,否则的话进行 2)
    2. 否则的话就找空闲的 P,有的话就调用 exitsyscall0 继续调度,否则的话进行 3)
    3. 将 goroutine 设置为 Grunning,加入全局队列,调用 Gosched() 继续调度
  1. func exitsyscall() {
  2. _g_ := getg()
  3. _g_.m.locks++
  4. if getcallersp() > _g_.syscallsp {
  5. throw("exitsyscall: syscall frame is no longer valid")
  6. }
  7. _g_.waitsince = 0
  8. oldp := _g_.m.oldp.ptr()
  9. _g_.m.oldp = 0
  10. if exitsyscallfast(oldp) {
  11. if _g_.m.mcache == nil {
  12. throw("lost mcache")
  13. }
  14. if trace.enabled {
  15. if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick {
  16. systemstack(traceGoStart)
  17. }
  18. }
  19. _g_.m.p.ptr().syscalltick++
  20. casgstatus(_g_, _Gsyscall, _Grunning)
  21. _g_.syscallsp = 0
  22. _g_.m.locks--
  23. if _g_.preempt {
  24. _g_.stackguard0 = stackPreempt
  25. } else {
  26. _g_.stackguard0 = _g_.stack.lo + _StackGuard
  27. }
  28. _g_.throwsplit = false
  29. if sched.disable.user && !schedEnabled(_g_) {
  30. Gosched()
  31. }
  32. return
  33. }
  34. _g_.sysexitticks = 0
  35. if trace.enabled {
  36. for oldp != nil && oldp.syscalltick == _g_.m.syscalltick {
  37. osyield()
  38. }
  39. _g_.sysexitticks = cputicks()
  40. }
  41. _g_.m.locks--
  42. mcall(exitsyscall0)
  43. if _g_.m.mcache == nil {
  44. throw("lost mcache")
  45. }
  46. _g_.syscallsp = 0
  47. _g_.m.p.ptr().syscalltick++
  48. _g_.throwsplit = false
  49. }

参考资料 go 语言调度器源代码情景分析

我每天都在努力, 只是想证明我是认真的活着.
https://www.cnblogs.com/wuwangchuxin0924/p/13264054.html