newproc 就是创建一个新的 Goroutine,在 Scheduler 中就是 g,每个 g 会运行一个函数 fn。在 Golang 中创建一个 goroutine 很简单,只需要一个 go 关键字就可以。

  1. go func(message string) {
  2. fmt.Println(message)
  3. }()

newproc 中,首先获取函数 fn 的指针,获取调用方的 goroutine,获取调用方的 PC 寄存器,然后再系统堆栈上运行 newproc1,L24 是将新创建好的 goroutine 放到 P 的运行队列中,并且尝试唤醒一个 P。这里的 newproc1 , runqputwakep 会在后面一一介绍。

  1. func newproc(siz int32, fn *funcval) {
  2. argp := add(unsafe.Pointer(&fn), sys.PtrSize)
  3. gp := getg()
  4. pc := getcallerpc()
  5. systemstack(func() {
  6. newg := newproc1(fn, argp, siz, gp, pc)
  7. _p_ := getg().m.p.ptr()
  8. runqput(_p_, newg, true)
  9. if mainStarted {
  10. wakep()
  11. }
  12. })
  13. }

L1 ~ L6 检查一下 fn 是不是有效,如果 fn 为 nil,就直接抛出错误。接下来处理传递进来的参数大小,根据这个大小分配初始的栈。与 C 不同,Golang 函数调用的参数传递和返回值都是通过栈进行的,所以函数的参数数量没有限制,返回值的数量也可以是多个。

接下来并不直接生成一个新的 goroutine,而是先尝试从 gfree 队列中获取,获取不到的时候才会调用 malg() 生成一个新的 g。

  1. _g_ := getg()
  2. if fn == nil {
  3. _g_.m.throwing = -1 // do not dump full stacks
  4. throw("go of nil func value")
  5. }
  6. acquirem() // disable preemption because it can be holding p in a local var
  7. siz := narg
  8. siz = (siz + 7) &^ 7
  9. // ...
  10. _p_ := _g_.m.p.ptr()
  11. newg := gfget(_p_)
  12. if newg == nil {
  13. newg = malg(_StackMin)
  14. casgstatus(newg, _Gidle, _Gdead)
  15. allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
  16. }

Get Free Goroutine

这里有两个 gFree 队列,一个是当前的 P 中的,另一个是全局变量 sched 中的,通常不会从全局的 sched 中获取空闲的 Goroutine,除非当前 P 里没有空闲的 Goroutine 了。获取到空心的 Goroutine 之后还会查看是否这个 Goroutine 持有栈,如果没有的化会分配新的栈空间。

  1. // Get from gfree list.
  2. // If local list is empty, grab a batch from global list.
  3. func gfget(_p_ *p) *g {
  4. retry:
  5. if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
  6. lock(&sched.gFree.lock)
  7. // Move a batch of free Gs to the P.
  8. for _p_.gFree.n < 32 {
  9. // Prefer Gs with stacks.
  10. gp := sched.gFree.stack.pop()
  11. if gp == nil {
  12. gp = sched.gFree.noStack.pop()
  13. if gp == nil {
  14. break
  15. }
  16. }
  17. sched.gFree.n--
  18. _p_.gFree.push(gp)
  19. _p_.gFree.n++
  20. }
  21. unlock(&sched.gFree.lock)
  22. goto retry
  23. }
  24. gp := _p_.gFree.pop()
  25. if gp == nil {
  26. return nil
  27. }
  28. _p_.gFree.n--
  29. if gp.stack.lo == 0 {
  30. // Stack was deallocated in gfput. Allocate a new one.
  31. systemstack(func() {
  32. gp.stack = stackalloc(_FixedStack)
  33. })
  34. gp.stackguard0 = gp.stack.lo + _StackGuard
  35. } else {
  36. if raceenabled {
  37. racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
  38. }
  39. if msanenabled {
  40. msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
  41. }
  42. }
  43. return gp
  44. }

scheduler-gfree_get.drawio.png
图 1:gfree get

Init Goroutine

接下来的部分就是为新生成的 g 进行初始化。首先是参数的拷贝,需要注意的是每个 goroutine 刚生成时,分配的栈的大小都是 2K,如果我们的参数大小超过了这个限制,运行时并不会分配更大的栈,因为 Go 语言的开发人员认为这种情况大多数都是因为错误导致的,毕竟函数传递中传递 2K 数据并不多见,如果有较大的数据结构,应该直接传递其指针的。

scheduler.drawio.png
图 2:init args

  1. totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
  2. totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
  3. sp := newg.stack.hi - totalSize
  4. spArg := sp
  5. if usesLR {
  6. // caller's LR
  7. *(*uintptr)(unsafe.Pointer(sp)) = 0
  8. prepGoExitFrame(sp)
  9. spArg += sys.MinFrameSize
  10. }
  11. if narg > 0 {
  12. memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
  13. // This is a stack-to-stack copy. If write barriers
  14. // are enabled and the source stack is grey (the
  15. // destination is always black), then perform a
  16. // barrier copy. We do this *after* the memmove
  17. // because the destination stack may have garbage on
  18. // it.
  19. if writeBarrier.needed && !_g_.m.curg.gcscandone {
  20. f := findfunc(fn.fn)
  21. stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
  22. if stkmap.nbit > 0 {
  23. // We're in the prologue, so it's always stack map index 0.
  24. bv := stackmapdata(stkmap, 0)
  25. bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
  26. }
  27. }
  28. }
  29. memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
  30. newg.sched.sp = sp
  31. newg.stktopsp = sp
  32. newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
  33. newg.sched.g = guintptr(unsafe.Pointer(newg))
  34. gostartcallfn(&newg.sched, fn)
  35. newg.gopc = callerpc
  36. newg.ancestors = saveAncestors(callergp)
  37. newg.startpc = fn.fn
  38. if _g_.m.curg != nil {
  39. newg.labels = _g_.m.curg.labels
  40. }
  41. if isSystemGoroutine(newg, false) {
  42. atomic.Xadd(&sched.ngsys, +1)
  43. }
  44. casgstatus(newg, _Gdead, _Grunnable)
  45. if _p_.goidcache == _p_.goidcacheend {
  46. // Sched.goidgen is the last allocated id,
  47. // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
  48. // At startup sched.goidgen=0, so main goroutine receives goid=1.
  49. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
  50. _p_.goidcache -= _GoidCacheBatch - 1
  51. _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
  52. }
  53. newg.goid = int64(_p_.goidcache)
  54. _p_.goidcache++
  55. if raceenabled {
  56. newg.racectx = racegostart(callerpc)
  57. }
  58. if trace.enabled {
  59. traceGoCreate(newg, newg.startpc)
  60. }
  61. releasem(_g_.m)
  62. return newg
  63. }

Running Queue

接着回到函数 newproc 中,生成好的 g 会被放入当前 p 的运行队列中,并将 next 标记为 true。如果当前的调度模式是 random,新创建的 goroutine 并不一定需要下一个执行,生成一个随机数,如果这个随机数不是 2 的倍数,也就是有二分之一的概率,我们并不在下一个执行它。

首先处理 next == true 的情况,将新来的 g 与 runnext 交换,直到交换成功为止,查看是不是以前也有 next,将这个 next 进行下一步处理,如果原来的 next 就是空的,直接返回就可以了。接下来是将 g 放入到 runq 的末尾,这里的 g 可能是新创建的 groutine,也可能是以前的 runnext 中的 goroutine。

:::danger L25 和 L26 都是获取 p 中的值,为什么 runqhead 需要使用 atomic 进行获取呢?L29 最后的存储 runqtail 为什么也需要 actomic? :::

接下来看看 runq 满没满,通过 t - h 就可以判断,因为这里的 runq 的存取都是使用的 i%len(runq),我们可以将它看作是一个环形队列,h%len(runq) 的位置可能在 t%len(runq) 前面,但是 t 是肯定大于 h 的,只要两者差不大于 len(runq),那这个队列就没满。我们将这个 goroutine 放入到本地的 runq 中。
scheduler-第 2 页.drawio.png
图 3:runq

  1. // runqput tries to put g on the local runnable queue.
  2. // If next is false, runqput adds g to the tail of the runnable queue.
  3. // If next is true, runqput puts g in the _p_.runnext slot.
  4. // If the run queue is full, runnext puts g on the global queue.
  5. // Executed only by the owner P.
  6. func runqput(_p_ *p, gp *g, next bool) {
  7. if randomizeScheduler && next && fastrand()%2 == 0 {
  8. next = false
  9. }
  10. if next {
  11. retryNext:
  12. oldnext := _p_.runnext
  13. if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
  14. goto retryNext
  15. }
  16. if oldnext == 0 {
  17. return
  18. }
  19. // Kick the old runnext out to the regular run queue.
  20. gp = oldnext.ptr()
  21. }
  22. retry:
  23. h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
  24. t := _p_.runqtail
  25. if t-h < uint32(len(_p_.runq)) {
  26. _p_.runq[t%uint32(len(_p_.runq))].set(gp)
  27. atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
  28. return
  29. }
  30. if runqputslow(_p_, gp, h, t) {
  31. return
  32. }
  33. // the queue is not full, now the put above must succeed
  34. goto retry
  35. }

如果当前 p 的 runq 已经满了,就应该尝试将 P 中的 goroutine 清理一些放到全局队列中了,而这个 g 也应该放到全局队列里。关于 runqputslow 的过程见下图。可以看到在 p 中的 runq 是一个数组,而在 sche 中的 runq 变成了一个链表,这是一个单向链表,链表中的每个节点都是 g。

scheduler-第 4 页.drawio.png
图 4:runqputslow

  1. // Put g and a batch of work from local runnable queue on global queue.
  2. // Executed only by the owner P.
  3. func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
  4. var batch [len(_p_.runq)/2 + 1]*g
  5. // First, grab a batch from local queue.
  6. n := t - h
  7. n = n / 2
  8. if n != uint32(len(_p_.runq)/2) {
  9. throw("runqputslow: queue is not full")
  10. }
  11. for i := uint32(0); i < n; i++ {
  12. batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
  13. }
  14. if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
  15. return false
  16. }
  17. batch[n] = gp
  18. if randomizeScheduler {
  19. for i := uint32(1); i <= n; i++ {
  20. j := fastrandn(i + 1)
  21. batch[i], batch[j] = batch[j], batch[i]
  22. }
  23. }
  24. // Link the goroutines.
  25. for i := uint32(0); i < n; i++ {
  26. batch[i].schedlink.set(batch[i+1])
  27. }
  28. var q gQueue
  29. q.head.set(batch[0])
  30. q.tail.set(batch[n])
  31. // Now put the batch on global queue.
  32. lock(&sched.lock)
  33. globrunqputbatch(&q, int32(n+1))
  34. unlock(&sched.lock)
  35. return true
  36. }