0.5342019.06.06 20:25:03 字数 3,826 阅读 3,286
前言
随着服务器硬件迭代升级,配置也越来越高。为充分利用服务器资源,并发编程也变的越来越重要。在开始之前,需要了解一下并发 (concurrency) 和并行 (parallesim) 的区别。
并发: 逻辑上具有处理多个同时性任务的能力。
并行: 物理上同一时刻执行多个并发任务。
通常所说的并发编程,也就是说它允许多个任务同时执行,但实际上并不一定在同一时刻被执行。在单核处理器上,通过多线程共享 CPU 时间片串行执行 (并发非并行)。而并行则依赖于多核处理器等物理资源,让多个任务可以实现并行执行 (并发且并行)。
多线程或多进程是并行的基本条件,但单线程也可以用协程 (coroutine) 做到并发。简单将 Goroutine 归纳为协程并不合适,因为它运行时会创建多个线程来执行并发任务,且任务单元可被调度到其它线程执行。这更像是多线程和协程的结合体,能最大限度提升执行效率,发挥多核处理器能力。
Go 编写一个并发编程程序很简单,只需要在函数之前使用一个 Go 关键字就可以实现并发编程。
func main() { go func(){
fmt.Println("Hello,World!")
}()
}
Go 调度器组成
Go 语言虽然使用一个 Go 关键字即可实现并发编程,但 Goroutine 被调度到后端之后,具体的实现比较复杂。先看看调度器有哪几部分组成。
1、G
G 是 Goroutine 的缩写,相当于操作系统中的进程控制块,在这里就是 Goroutine 的控制结构,是对 Goroutine 的抽象。其中包括执行的函数指令及参数;G 保存的任务对象;线程上下文切换,现场保护和现场恢复需要的寄存器 (SP、IP) 等信息。
Go 不同版本 Goroutine 默认栈大小不同。
_StackMin = 2048
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg()
_g_.m.locks++
siz := narg
siz = (siz + 7) &^ 7
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
2、M
M 是一个线程或称为 Machine,所有 M 是有线程栈的。如果不对该线程栈提供内存的话,系统会给该线程栈提供内存 (不同操作系统提供的线程栈大小不同)。当指定了线程栈,则 M.stack→G.stack,M 的 PC 寄存器指向 G 提供的函数,然后去执行。
type m struct {
g0 *g
curg *g
vdsoSP uintptr
vdsoPC uintptr
3、P
P(Processor) 是一个抽象的概念,并不是真正的物理 CPU。所以当 P 有任务时需要创建或者唤醒一个系统线程来执行它队列里的任务。所以 P/M 需要进行绑定,构成一个执行单元。
P 决定了同时可以并发任务的数量,可通过 GOMAXPROCS 限制同时执行用户级任务的操作系统线程。可以通过 runtime.GOMAXPROCS 进行指定。在 Go1.5 之后 GOMAXPROCS 被默认设置可用的核数,而之前则默认为 1。
func GOMAXPROCS(n int) int {
ret := int(gomaxprocs)
stopTheWorld("GOMAXPROCS")
newprocs = int32(n)
startTheWorld()
return ret
}
func getproccount() int32 {
const maxCPUs = 64 * 1024
var buf [maxCPUs / 8]byte
r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])
n := int32(0)
for _, v := range buf[:r] {
for v != 0 {
n += int32(v & 1)
v >>= 1
}
}
if n == 0 {
n = 1
}
return n
}
sys_linux_amd64.s:
TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0
MOVQ pid+0(FP), DI
MOVQ len+8(FP), SI
MOVQ buf+16(FP), DX
MOVL $SYS_sched_getaffinity, AX
SYSCALL
MOVL AX, ret+24(FP)
RET
Go 调度器调度过程
首先创建一个 G 对象,G 对象保存到 P 本地队列或者是全局队列。P 此时去唤醒一个 M。P 继续执行它的执行序。M 寻找是否有空闲的 P,如果有则将该 G 对象移动到它本身。接下来 M 执行一个调度循环 (调用 G 对象 -> 执行 ->清理线程→继续找新的 Goroutine 执行)。
M 执行过程中,随时会发生上下文切换。当发生上线文切换时,需要对执行现场进行保护,以便下次被调度执行时进行现场恢复。Go 调度器 M 的栈保存在 G 对象上,只需要将 M 所需要的寄存器 (SP、PC 等) 保存到 G 对象上就可以实现现场保护。当这些寄存器数据被保护起来,就随时可以做上下文切换了,在中断之前把现场保存起来。如果此时 G 任务还没有执行完,M 可以将任务重新丢到 P 的任务队列,等待下一次被调度执行。当再次被调度执行时,M 通过访问 G 的 vdsoSP、vdsoPC 寄存器进行现场恢复(从上次中断位置继续执行)。
1、P 队列
通过上图可以发现,P 有两种队列:本地队列和全局队列。
- 本地队列: 当前 P 的队列,本地队列是 Lock-Free,没有数据竞争问题,无需加锁处理,可以提升处理速度。
- 全局队列:全局队列为了保证多个 P 之间任务的平衡。所有 M 共享 P 全局队列,为保证数据竞争问题,需要加锁处理。相比本地队列处理速度要低于全局队列。
2、上线文切换
简单理解为当时的环境即可,环境可以包括当时程序状态以及变量状态。例如线程切换的时候在内核会发生上下文切换,这里的上下文就包括了当时寄存器的值,把寄存器的值保存起来,等下次该线程又得到 cpu 时间的时候再恢复寄存器的值,这样线程才能正确运行。
对于代码中某个值说,上下文是指这个值所在的局部 (全局) 作用域对象。相对于进程而言,上下文就是进程执行时的环境,具体来说就是各个变量和数据,包括所有的寄存器变量、进程打开的文件、内存 (堆栈) 信息等。
3、线程清理
Goroutine 被调度执行必须保证 P/M 进行绑定,所以线程清理只需要将 P 释放就可以实现线程的清理。什么时候 P 会释放,保证其它 G 可以被执行。P 被释放主要有两种情况。
- 主动释放:最典型的例子是,当执行 G 任务时有系统调用,当发生系统调用时 M 会处于 Block 状态。调度器会设置一个超时时间,当超时时会将 P 释放。
- 被动释放:如果发生系统调用,有一个专门监控程序,进行扫描当前处于阻塞的 P/M 组合。当超过系统程序设置的超时时间,会自动将 P 资源抢走。去执行队列的其它 G 任务。
终于要来说说 Golang 中最吸引人的 goroutine 了,这也是 Golang 能够横空出世的主要原因。不同于 Python 基于进程的并发模型,以及 C++、Java 等基于线程的并发模型。Golang 采用轻量级的 goroutine 来实现并发,可以大大减少 CPU 的切换。现在已经有太多的文章来介绍 goroutine 的用法,在这里,我们从源码的角度来看看其内部实现。
重申一下重点:goroutine 中的三个实体
goroutine 中最主要的是三个实体为 GMP,其中:
G
: 代表一个 goroutine 对象,每次 go 调用的时候,都会创建一个 G 对象,它包括栈、指令指针以及对于调用 goroutines 很重要的其它信息,比如阻塞它的任何 channel,其主要数据结构:
type g struct {
stack stack
m *m
sched gobuf
param unsafe.Pointer
atomicstatus uint32
stackLock uint32
goid int64
waitsince int64
lockedm *m
}
其中最主要的当然是 sched 了,保存了 goroutine 的上下文。goroutine 切换的时候不同于线程有 OS 来负责这部分数据,而是由一个 gobuf 对象来保存,这样能够更加轻量级,再来看看 gobuf 的结构:
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret sys.Uintreg
lr uintptr
bp uintptr
}
其实就是保存了当前的栈指针,计数器,当然还有 g 自身,这里记录自身 g 的指针是为了能快速的访问到 goroutine 中的信息。
M
:代表一个线程,每次创建一个 M 的时候,都会有一个底层线程创建;所有的 G 任务,最终还是在 M 上执行,其主要数据结构:
type m struct {
g0 *g
gsignal *g
tls [6]uintptr
mstartfn func()
curg *g
caughtsig guintptr
p puintptr
nextp puintptr
id int32
mallocing int32
spinning bool
blocked bool
inwb bool
printlock int8
incgo bool
fastrand uint32
ncgocall uint64
ncgo int32
park note
alllink *m
schedlink muintptr
mcache *mcache
lockedg *g
createstack [32]uintptr
}
结构体 M 中有两个 G 是需要关注一下的,一个是 curg,代表结构体 M 当前绑定的结构体 G。另一个是 g0,是带有调度栈的 goroutine,这是一个比较特殊的 goroutine。普通的 goroutine 的栈是在堆上分配的可增长的栈,而 g0 的栈是 M 对应的线程的栈。所有调度相关的代码,会先切换到该 goroutine 的栈中再执行。也就是说线程的栈也是用的 g 实现,而不是使用的 OS 的。
P
:代表一个处理器,每一个运行的 M 都必须绑定一个 P,就像线程必须在么一个 CPU 核上执行一样,由 P 来调度 G 在 M 上的运行,P 的个数就是 GOMAXPROCS(最大 256),启动时固定的,一般不修改;M 的个数和 P 的个数不一定一样多(会有休眠的 M 或者不需要太多的 M)(最大 10000);每一个 P 保存着本地 G 任务队列,也有一个全局 G 任务队列。P 的数据结构:
type p struct {
lock mutex
id int32
status uint32
link puintptr
schedtick uint32
syscalltick uint32
sysmontick sysmontick
m muintptr
mcache *mcache
racectx uintptr
goidcache uint64
goidcacheend uint64
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
sudogcache []*sudog
sudogbuf [128]*sudog
palloc persistentAlloc
pad [sys.CacheLineSize]byte
其中 P 的状态有 Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其内部队列 runqhead 里面有可运行的 goroutine,P 优先从内部获取执行的 g,这样能够提高效率。
除此之外,还有一个数据结构需要在这里提及,就是 schedt,可以看做是一个全局的调度者:
type schedt struct {
goidgen uint64
lastpoll uint64
lock mutex
midle muintptr
nmidle int32
nmidlelocked int32
mcount int32
maxmcount int32
ngsys uint32
pidle puintptr
npidle uint32
nmspinning uint32
runqhead guintptr
runqtail guintptr
runqsize int32
gflock mutex
gfreeStack *g
gfreeNoStack *g
ngfree int32
sudoglock mutex
sudogcache *sudog
}
大多数需要的信息都已放在了结构体 M、G 和 P 中,schedt 结构体只是一个壳。可以看到,其中有 M 的 idle 队列,P 的 idle 队列,以及一个全局的就绪的 G 队列。schedt 结构体中的 Lock 是非常必须的,如果 M 或 P 等做一些非局部的操作,它们一般需要先锁住调度器。
goroutine 的运行过程
所有的 goroutine 都是由函数newproc
来创建的,但是由于该函数不能调用分段栈,最后真正调用的是newproc1
。在newproc1
中主要进行如下动作:
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.startpc = fn.fn
......
}
分配一个 g 的结构体
初始化这个结构体的一些域
将 g 挂在就绪队列
绑定 g 到一个 m 上
这个绑定只要 m 没有突破上限 GOMAXPROCS, 就拿一个 m 绑定一个 g。如果 m 的 waiting 队列中有就从队列中拿, 否则就要新建一个 m, 调用newm
。
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_)
mp.sigmask = initSigmask
execLock.rlock()
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
execLock.runlock()
}
该函数其实就是创建一个 m,跟newproc
有些相似,之前也说了 m 在底层就是一个线程的创建,也即是newosproc
函数,在往下挖可以看到会根据不同的 OS 来执行不同的bsdthread_create
函数,而底层就是调用的runtime.clone
:
clone(cloneFlags,stk,unsafe.Pointer(mp),unsafe.Pointer(mp.g0),unsafe.Pointer(funcPC(mstart)))
m 创建好之后,线程的入口是mstart
,最后调用的即是mstart1
:
func mstart1() {
_g_ := getg()
gosave(&_g_.m.g0.sched)
_g_.m.g0.sched.pc = ^uintptr(0)
asminit()
minit()
if _g_.m == &m0 {
initsig(false)
}
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
schedule()
}
里面最重要的就是 schedule 了,在 schedule 中的动作大体就是找到一个等待运行的 g,然后然后搬到 m 上,设置其状态为Grunning
, 直接切换到 g 的上下文环境, 恢复 g 的执行。
func schedule() {
_g_ := getg()
if _g_.m.lockedg != nil {
stoplockedm()
execute(_g_.m.lockedg, false) // Never returns.
}
}
schedule
的执行可以大体总结为:
schedule 函数获取 g => [必要时休眠] => [唤醒后继续获取] => execute 函数执行 g => 执行后返回到 goexit => 重新执行 schedule 函数
简单来说 g 所经历的几个主要的过程就是:Gwaiting->Grunnable->Grunning。经历了创建, 到挂在就绪队列, 到从就绪队列拿出并运行整个过程。
casgstatus(gp, _Gwaiting, _Grunnable)
casgstatus(gp, _Grunnable, _Grunning)
引入了 struct M 这层抽象。m 就是这里的 worker, 但不是线程。处理系统调用中的 m 不会占用 mcpu 数量, 只有干事的 m 才会对应到线程. 当 mcpu 数量少于 GOMAXPROCS 时可以一直开新的线程干活. 而 goroutine 的执行则是在 m 和 g 都满足之后通过 schedule 切换上下文进入的.
抢占式调度
当有很多 goroutine 需要执行的时候,是怎么调度的了,上面说的 P 还没有出场呢,在runtime.main
中会创建一个额外 m 运行sysmon
函数,抢占就是在 sysmon 中实现的。
sysmon 会进入一个无限循环, 第一轮回休眠 20us, 之后每次休眠时间倍增, 最终每一轮都会休眠 10ms. sysmon 中有 netpool(获取 fd 事件), retake(抢占), forcegc(按时间强制执行 gc), scavenge heap(释放自由列表中多余的项减少内存占用) 等处理.
func sysmon() {
lasttrace := int64(0)
idle := 0
delay := uint32(0)
for {
if idle == 0 {
delay = 20
} else if idle > 50 {
delay *= 2
}
if delay > 10*1000 {
delay = 10 * 1000
}
usleep(delay)
......
}
}
里面的函数retake
负责抢占:
func retake(now int64) uint32 {
n := 0
for i := int32(0); i < gomaxprocs; i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
} else if s == _Prunning {
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
preemptone(_p_)
}
}
return uint32(n)
}
枚举所有的 P 如果 P 在系统调用中 (_Psyscall), 且经过了一次 sysmon 循环 (20us~10ms), 则抢占这个 P, 调用 handoffp 解除 M 和 P 之间的关联, 如果 P 在运行中 (_Prunning), 且经过了一次 sysmon 循环并且 G 运行时间超过 forcePreemptNS(10ms), 则抢占这个 P
并设置 g.preempt = true,g.stackguard0 = stackPreempt。
为什么设置了 stackguard 就可以实现抢占?
因为这个值用于检查当前栈空间是否足够, go 函数的开头会比对这个值判断是否需要扩张栈。
newstack 函数判断 g.stackguard0 等于 stackPreempt, 就知道这是抢占触发的, 这时会再检查一遍是否要抢占。
抢占机制保证了不会有一个 G 长时间的运行导致其他 G 无法运行的情况发生。
总结
相比大多数并行设计模型,Go 比较优势的设计就是 P 上下文这个概念的出现,如果只有 G 和 M 的对应关系,那么当 G 阻塞在 IO 上的时候,M 是没有实际在工作的,这样造成了资源的浪费,没有了 P,那么所有 G 的列表都放在全局,这样导致临界区太大,对多核调度造成极大影响。
而 goroutine 在使用上面的特点,感觉既可以用来做密集的多核计算,又可以做高并发的 IO 应用,做 IO 应用的时候,写起来感觉和对程序员最友好的同步阻塞一样,而实际上由于 runtime 的调度,底层是以同步非阻塞的方式在运行(即 IO 多路复用)。
所以说保护现场的抢占式调度和 G 被阻塞后传递给其他 m 调用的核心思想,使得 goroutine 的产生。
更多精彩内容,就在简书 APP
“没有绝 “对” 的事情,争论之后,我们觉得合理那就行了。”
还没有人赞赏,支持一下
云爬虫技术研究笔记公众号:《云爬虫技术研究笔记》
CSDN 博客专家
华为云享专家
总资产 31 共写了 9.8W 字获得 237 个赞共 330 个粉丝
推荐阅读更多精彩内容
[TOC] [转载]也谈 goroutine 调度器 本文转载:https://tonybai.com/2017/06...
从源码角度看 Golang 的调度 本章主要从源码角度针对 Go 调度相关进行分析。仅关注 linux 系统下的逻辑。代码版本…
介绍 上一篇文章我对操作系统级别的调度进行了讲解,这对理解 Go 语言的调度器是很重要的。这篇文章,我将解释下 G…
达菲格阅读 6,072 评论 1 赞 28
http://skoo.me/go/2013/11/29/golang-schedule?hmsr=studygo...
有看见过什么眼睛就合不起来此前我用光泽端详静物屏息,它们不朽 必要点燃一些痒留在以前的巷子都撞散我我卡在喉咙在唇齿…
惊蛰夕阅读 263 评论 3 赞 8
- 什么是[NoSQL]数据库 数据库:进行高效的、有规则的进行数据持久化存储的软件 NoSQL 数据库:Not o…
冬 gua阅读 731 评论 0 赞 1
- 寻一处存在的不存在时空恍惚在记忆的错觉 错觉的记忆,风景蒙上岁月的轮廓峥嵘,蹉跎,平凡而又陈旧 那不是朝圣徒步穿越…
青语书生阅读 895 评论 33 赞 26
https://www.jianshu.com/p/abe79d86ff27