0.5342019.06.06 20:25:03 字数 3,826 阅读 3,286
前言
随着服务器硬件迭代升级,配置也越来越高。为充分利用服务器资源,并发编程也变的越来越重要。在开始之前,需要了解一下并发 (concurrency) 和并行 (parallesim) 的区别。
并发: 逻辑上具有处理多个同时性任务的能力。
并行: 物理上同一时刻执行多个并发任务。
通常所说的并发编程,也就是说它允许多个任务同时执行,但实际上并不一定在同一时刻被执行。在单核处理器上,通过多线程共享 CPU 时间片串行执行 (并发非并行)。而并行则依赖于多核处理器等物理资源,让多个任务可以实现并行执行 (并发且并行)。
多线程或多进程是并行的基本条件,但单线程也可以用协程 (coroutine) 做到并发。简单将 Goroutine 归纳为协程并不合适,因为它运行时会创建多个线程来执行并发任务,且任务单元可被调度到其它线程执行。这更像是多线程和协程的结合体,能最大限度提升执行效率,发挥多核处理器能力。
Go 编写一个并发编程程序很简单,只需要在函数之前使用一个 Go 关键字就可以实现并发编程。
func main() { go func(){fmt.Println("Hello,World!")}()}
Go 调度器组成
Go 语言虽然使用一个 Go 关键字即可实现并发编程,但 Goroutine 被调度到后端之后,具体的实现比较复杂。先看看调度器有哪几部分组成。
1、G
G 是 Goroutine 的缩写,相当于操作系统中的进程控制块,在这里就是 Goroutine 的控制结构,是对 Goroutine 的抽象。其中包括执行的函数指令及参数;G 保存的任务对象;线程上下文切换,现场保护和现场恢复需要的寄存器 (SP、IP) 等信息。
Go 不同版本 Goroutine 默认栈大小不同。
_StackMin = 2048func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {_g_ := getg()_g_.m.locks++siz := nargsiz = (siz + 7) &^ 7_p_ := _g_.m.p.ptr()newg := gfget(_p_)if newg == nil {newg = malg(_StackMin)casgstatus(newg, _Gidle, _Gdead)allgadd(newg)}
2、M
M 是一个线程或称为 Machine,所有 M 是有线程栈的。如果不对该线程栈提供内存的话,系统会给该线程栈提供内存 (不同操作系统提供的线程栈大小不同)。当指定了线程栈,则 M.stack→G.stack,M 的 PC 寄存器指向 G 提供的函数,然后去执行。
type m struct {g0 *gcurg *gvdsoSP uintptrvdsoPC uintptr
3、P
P(Processor) 是一个抽象的概念,并不是真正的物理 CPU。所以当 P 有任务时需要创建或者唤醒一个系统线程来执行它队列里的任务。所以 P/M 需要进行绑定,构成一个执行单元。
P 决定了同时可以并发任务的数量,可通过 GOMAXPROCS 限制同时执行用户级任务的操作系统线程。可以通过 runtime.GOMAXPROCS 进行指定。在 Go1.5 之后 GOMAXPROCS 被默认设置可用的核数,而之前则默认为 1。
func GOMAXPROCS(n int) int {ret := int(gomaxprocs)stopTheWorld("GOMAXPROCS")newprocs = int32(n)startTheWorld()return ret}func getproccount() int32 {const maxCPUs = 64 * 1024var buf [maxCPUs / 8]byter := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])n := int32(0)for _, v := range buf[:r] {for v != 0 {n += int32(v & 1)v >>= 1}}if n == 0 {n = 1}return n}sys_linux_amd64.s:TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0MOVQ pid+0(FP), DIMOVQ len+8(FP), SIMOVQ buf+16(FP), DXMOVL $SYS_sched_getaffinity, AXSYSCALLMOVL AX, ret+24(FP)RET
Go 调度器调度过程
首先创建一个 G 对象,G 对象保存到 P 本地队列或者是全局队列。P 此时去唤醒一个 M。P 继续执行它的执行序。M 寻找是否有空闲的 P,如果有则将该 G 对象移动到它本身。接下来 M 执行一个调度循环 (调用 G 对象 -> 执行 ->清理线程→继续找新的 Goroutine 执行)。
M 执行过程中,随时会发生上下文切换。当发生上线文切换时,需要对执行现场进行保护,以便下次被调度执行时进行现场恢复。Go 调度器 M 的栈保存在 G 对象上,只需要将 M 所需要的寄存器 (SP、PC 等) 保存到 G 对象上就可以实现现场保护。当这些寄存器数据被保护起来,就随时可以做上下文切换了,在中断之前把现场保存起来。如果此时 G 任务还没有执行完,M 可以将任务重新丢到 P 的任务队列,等待下一次被调度执行。当再次被调度执行时,M 通过访问 G 的 vdsoSP、vdsoPC 寄存器进行现场恢复(从上次中断位置继续执行)。
1、P 队列
通过上图可以发现,P 有两种队列:本地队列和全局队列。
- 本地队列: 当前 P 的队列,本地队列是 Lock-Free,没有数据竞争问题,无需加锁处理,可以提升处理速度。
- 全局队列:全局队列为了保证多个 P 之间任务的平衡。所有 M 共享 P 全局队列,为保证数据竞争问题,需要加锁处理。相比本地队列处理速度要低于全局队列。
2、上线文切换
简单理解为当时的环境即可,环境可以包括当时程序状态以及变量状态。例如线程切换的时候在内核会发生上下文切换,这里的上下文就包括了当时寄存器的值,把寄存器的值保存起来,等下次该线程又得到 cpu 时间的时候再恢复寄存器的值,这样线程才能正确运行。
对于代码中某个值说,上下文是指这个值所在的局部 (全局) 作用域对象。相对于进程而言,上下文就是进程执行时的环境,具体来说就是各个变量和数据,包括所有的寄存器变量、进程打开的文件、内存 (堆栈) 信息等。
3、线程清理
Goroutine 被调度执行必须保证 P/M 进行绑定,所以线程清理只需要将 P 释放就可以实现线程的清理。什么时候 P 会释放,保证其它 G 可以被执行。P 被释放主要有两种情况。
- 主动释放:最典型的例子是,当执行 G 任务时有系统调用,当发生系统调用时 M 会处于 Block 状态。调度器会设置一个超时时间,当超时时会将 P 释放。
- 被动释放:如果发生系统调用,有一个专门监控程序,进行扫描当前处于阻塞的 P/M 组合。当超过系统程序设置的超时时间,会自动将 P 资源抢走。去执行队列的其它 G 任务。
终于要来说说 Golang 中最吸引人的 goroutine 了,这也是 Golang 能够横空出世的主要原因。不同于 Python 基于进程的并发模型,以及 C++、Java 等基于线程的并发模型。Golang 采用轻量级的 goroutine 来实现并发,可以大大减少 CPU 的切换。现在已经有太多的文章来介绍 goroutine 的用法,在这里,我们从源码的角度来看看其内部实现。
重申一下重点:goroutine 中的三个实体
goroutine 中最主要的是三个实体为 GMP,其中:
G: 代表一个 goroutine 对象,每次 go 调用的时候,都会创建一个 G 对象,它包括栈、指令指针以及对于调用 goroutines 很重要的其它信息,比如阻塞它的任何 channel,其主要数据结构:
type g struct {stack stackm *msched gobufparam unsafe.Pointeratomicstatus uint32stackLock uint32goid int64waitsince int64lockedm *m}
其中最主要的当然是 sched 了,保存了 goroutine 的上下文。goroutine 切换的时候不同于线程有 OS 来负责这部分数据,而是由一个 gobuf 对象来保存,这样能够更加轻量级,再来看看 gobuf 的结构:
type gobuf struct {sp uintptrpc uintptrg guintptrctxt unsafe.Pointerret sys.Uintreglr uintptrbp uintptr}
其实就是保存了当前的栈指针,计数器,当然还有 g 自身,这里记录自身 g 的指针是为了能快速的访问到 goroutine 中的信息。
M:代表一个线程,每次创建一个 M 的时候,都会有一个底层线程创建;所有的 G 任务,最终还是在 M 上执行,其主要数据结构:
type m struct {g0 *ggsignal *gtls [6]uintptrmstartfn func()curg *gcaughtsig guintptrp puintptrnextp puintptrid int32mallocing int32spinning boolblocked boolinwb boolprintlock int8incgo boolfastrand uint32ncgocall uint64ncgo int32park notealllink *mschedlink muintptrmcache *mcachelockedg *gcreatestack [32]uintptr}
结构体 M 中有两个 G 是需要关注一下的,一个是 curg,代表结构体 M 当前绑定的结构体 G。另一个是 g0,是带有调度栈的 goroutine,这是一个比较特殊的 goroutine。普通的 goroutine 的栈是在堆上分配的可增长的栈,而 g0 的栈是 M 对应的线程的栈。所有调度相关的代码,会先切换到该 goroutine 的栈中再执行。也就是说线程的栈也是用的 g 实现,而不是使用的 OS 的。
P:代表一个处理器,每一个运行的 M 都必须绑定一个 P,就像线程必须在么一个 CPU 核上执行一样,由 P 来调度 G 在 M 上的运行,P 的个数就是 GOMAXPROCS(最大 256),启动时固定的,一般不修改;M 的个数和 P 的个数不一定一样多(会有休眠的 M 或者不需要太多的 M)(最大 10000);每一个 P 保存着本地 G 任务队列,也有一个全局 G 任务队列。P 的数据结构:
type p struct {lock mutexid int32status uint32link puintptrschedtick uint32syscalltick uint32sysmontick sysmontickm muintptrmcache *mcacheracectx uintptrgoidcache uint64goidcacheend uint64runqhead uint32runqtail uint32runq [256]guintptrrunnext guintptrsudogcache []*sudogsudogbuf [128]*sudogpalloc persistentAllocpad [sys.CacheLineSize]byte
其中 P 的状态有 Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其内部队列 runqhead 里面有可运行的 goroutine,P 优先从内部获取执行的 g,这样能够提高效率。
除此之外,还有一个数据结构需要在这里提及,就是 schedt,可以看做是一个全局的调度者:
type schedt struct {goidgen uint64lastpoll uint64lock mutexmidle muintptrnmidle int32nmidlelocked int32mcount int32maxmcount int32ngsys uint32pidle puintptrnpidle uint32nmspinning uint32runqhead guintptrrunqtail guintptrrunqsize int32gflock mutexgfreeStack *ggfreeNoStack *gngfree int32sudoglock mutexsudogcache *sudog}
大多数需要的信息都已放在了结构体 M、G 和 P 中,schedt 结构体只是一个壳。可以看到,其中有 M 的 idle 队列,P 的 idle 队列,以及一个全局的就绪的 G 队列。schedt 结构体中的 Lock 是非常必须的,如果 M 或 P 等做一些非局部的操作,它们一般需要先锁住调度器。
goroutine 的运行过程
所有的 goroutine 都是由函数newproc来创建的,但是由于该函数不能调用分段栈,最后真正调用的是newproc1。在newproc1中主要进行如下动作:
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {newg = malg(_StackMin)casgstatus(newg, _Gidle, _Gdead)allgadd(newg)newg.sched.sp = spnewg.stktopsp = spnewg.sched.pc = funcPC(goexit) + sys.PCQuantumnewg.sched.g = guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched, fn)newg.gopc = callerpcnewg.startpc = fn.fn......}
分配一个 g 的结构体
初始化这个结构体的一些域
将 g 挂在就绪队列
绑定 g 到一个 m 上
这个绑定只要 m 没有突破上限 GOMAXPROCS, 就拿一个 m 绑定一个 g。如果 m 的 waiting 队列中有就从队列中拿, 否则就要新建一个 m, 调用newm。
func newm(fn func(), _p_ *p) {mp := allocm(_p_, fn)mp.nextp.set(_p_)mp.sigmask = initSigmaskexecLock.rlock()newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))execLock.runlock()}
该函数其实就是创建一个 m,跟newproc有些相似,之前也说了 m 在底层就是一个线程的创建,也即是newosproc函数,在往下挖可以看到会根据不同的 OS 来执行不同的bsdthread_create函数,而底层就是调用的runtime.clone:
clone(cloneFlags,stk,unsafe.Pointer(mp),unsafe.Pointer(mp.g0),unsafe.Pointer(funcPC(mstart)))
m 创建好之后,线程的入口是mstart,最后调用的即是mstart1:
func mstart1() {_g_ := getg()gosave(&_g_.m.g0.sched)_g_.m.g0.sched.pc = ^uintptr(0)asminit()minit()if _g_.m == &m0 {initsig(false)}if fn := _g_.m.mstartfn; fn != nil {fn()}schedule()}
里面最重要的就是 schedule 了,在 schedule 中的动作大体就是找到一个等待运行的 g,然后然后搬到 m 上,设置其状态为Grunning, 直接切换到 g 的上下文环境, 恢复 g 的执行。
func schedule() {_g_ := getg()if _g_.m.lockedg != nil {stoplockedm()execute(_g_.m.lockedg, false) // Never returns.}}
schedule的执行可以大体总结为:
schedule 函数获取 g => [必要时休眠] => [唤醒后继续获取] => execute 函数执行 g => 执行后返回到 goexit => 重新执行 schedule 函数
简单来说 g 所经历的几个主要的过程就是:Gwaiting->Grunnable->Grunning。经历了创建, 到挂在就绪队列, 到从就绪队列拿出并运行整个过程。
casgstatus(gp, _Gwaiting, _Grunnable)casgstatus(gp, _Grunnable, _Grunning)
引入了 struct M 这层抽象。m 就是这里的 worker, 但不是线程。处理系统调用中的 m 不会占用 mcpu 数量, 只有干事的 m 才会对应到线程. 当 mcpu 数量少于 GOMAXPROCS 时可以一直开新的线程干活. 而 goroutine 的执行则是在 m 和 g 都满足之后通过 schedule 切换上下文进入的.
抢占式调度
当有很多 goroutine 需要执行的时候,是怎么调度的了,上面说的 P 还没有出场呢,在runtime.main中会创建一个额外 m 运行sysmon函数,抢占就是在 sysmon 中实现的。
sysmon 会进入一个无限循环, 第一轮回休眠 20us, 之后每次休眠时间倍增, 最终每一轮都会休眠 10ms. sysmon 中有 netpool(获取 fd 事件), retake(抢占), forcegc(按时间强制执行 gc), scavenge heap(释放自由列表中多余的项减少内存占用) 等处理.
func sysmon() {lasttrace := int64(0)idle := 0delay := uint32(0)for {if idle == 0 {delay = 20} else if idle > 50 {delay *= 2}if delay > 10*1000 {delay = 10 * 1000}usleep(delay)......}}
里面的函数retake负责抢占:
func retake(now int64) uint32 {n := 0for i := int32(0); i < gomaxprocs; i++ {_p_ := allp[i]if _p_ == nil {continue}pd := &_p_.sysmonticks := _p_.statusif s == _Psyscall {t := int64(_p_.syscalltick)if int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)pd.syscallwhen = nowcontinue}if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {continue}incidlelocked(-1)if atomic.Cas(&_p_.status, s, _Pidle) {if trace.enabled {traceGoSysBlock(_p_)traceProcStop(_p_)}n++_p_.syscalltick++handoffp(_p_)}incidlelocked(1)} else if s == _Prunning {t := int64(_p_.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = nowcontinue}if pd.schedwhen+forcePreemptNS > now {continue}preemptone(_p_)}}return uint32(n)}
枚举所有的 P 如果 P 在系统调用中 (_Psyscall), 且经过了一次 sysmon 循环 (20us~10ms), 则抢占这个 P, 调用 handoffp 解除 M 和 P 之间的关联, 如果 P 在运行中 (_Prunning), 且经过了一次 sysmon 循环并且 G 运行时间超过 forcePreemptNS(10ms), 则抢占这个 P
并设置 g.preempt = true,g.stackguard0 = stackPreempt。
为什么设置了 stackguard 就可以实现抢占?
因为这个值用于检查当前栈空间是否足够, go 函数的开头会比对这个值判断是否需要扩张栈。
newstack 函数判断 g.stackguard0 等于 stackPreempt, 就知道这是抢占触发的, 这时会再检查一遍是否要抢占。
抢占机制保证了不会有一个 G 长时间的运行导致其他 G 无法运行的情况发生。
总结
相比大多数并行设计模型,Go 比较优势的设计就是 P 上下文这个概念的出现,如果只有 G 和 M 的对应关系,那么当 G 阻塞在 IO 上的时候,M 是没有实际在工作的,这样造成了资源的浪费,没有了 P,那么所有 G 的列表都放在全局,这样导致临界区太大,对多核调度造成极大影响。
而 goroutine 在使用上面的特点,感觉既可以用来做密集的多核计算,又可以做高并发的 IO 应用,做 IO 应用的时候,写起来感觉和对程序员最友好的同步阻塞一样,而实际上由于 runtime 的调度,底层是以同步非阻塞的方式在运行(即 IO 多路复用)。
所以说保护现场的抢占式调度和 G 被阻塞后传递给其他 m 调用的核心思想,使得 goroutine 的产生。
更多精彩内容,就在简书 APP
“没有绝 “对” 的事情,争论之后,我们觉得合理那就行了。”
还没有人赞赏,支持一下
云爬虫技术研究笔记公众号:《云爬虫技术研究笔记》
CSDN 博客专家
华为云享专家
总资产 31 共写了 9.8W 字获得 237 个赞共 330 个粉丝
推荐阅读更多精彩内容
[TOC] [转载]也谈 goroutine 调度器 本文转载:https://tonybai.com/2017/06...
从源码角度看 Golang 的调度 本章主要从源码角度针对 Go 调度相关进行分析。仅关注 linux 系统下的逻辑。代码版本…

介绍 上一篇文章我对操作系统级别的调度进行了讲解,这对理解 Go 语言的调度器是很重要的。这篇文章,我将解释下 G…

达菲格阅读 6,072 评论 1 赞 28
http://skoo.me/go/2013/11/29/golang-schedule?hmsr=studygo...
有看见过什么眼睛就合不起来此前我用光泽端详静物屏息,它们不朽 必要点燃一些痒留在以前的巷子都撞散我我卡在喉咙在唇齿…

惊蛰夕阅读 263 评论 3 赞 8
- 什么是[NoSQL]数据库 数据库:进行高效的、有规则的进行数据持久化存储的软件 NoSQL 数据库:Not o…

冬 gua阅读 731 评论 0 赞 1
- 寻一处存在的不存在时空恍惚在记忆的错觉 错觉的记忆,风景蒙上岁月的轮廓峥嵘,蹉跎,平凡而又陈旧 那不是朝圣徒步穿越…

青语书生阅读 895 评论 33 赞 26

https://www.jianshu.com/p/abe79d86ff27

