Golang 从 1.5 开始引入了三色 GC, 经过多次改进, 当前的 1.9 版本的 GC 停顿时间已经可以做到极短.
停顿时间的减少意味着 “最大响应时间” 的缩短, 这也让 go 更适合编写网络服务程序.
这篇文章将通过分析 golang 的源代码来讲解 go 中的三色 GC 的实现原理.
这个系列分析的 golang 源代码是 Google 官方的实现的 1.9.2 版本, 不适用于其他版本和 gccgo 等其他实现,
运行环境是 Ubuntu 16.04 LTS 64bit.
首先会讲解基础概念, 然后讲解分配器, 再讲解收集器的实现.
内存结构
go 在程序启动时会分配一块虚拟内存地址是连续的内存, 结构如下:
这一块内存分为了 3 个区域, 在 X64 上大小分别是 512M, 16G 和 512G, 它们的作用如下:
arena
arena 区域就是我们通常说的heap, go 从 heap 分配的内存都在这个区域中.
bitmap
bitmap 区域用于表示 arena 区域中哪些地址保存了对象, 并且对象中哪些地址包含了指针.
bitmap 区域中一个 byte(8 bit) 对应了 arena 区域中的四个指针大小的内存, 也就是 2 bit 对应一个指针大小的内存.
所以 bitmap 区域的大小是 512GB / 指针大小 (8 byte) / 4 = 16GB.
bitmap 区域中的一个 byte 对应 arena 区域的四个指针大小的内存的结构如下,
每一个指针大小的内存都会有两个 bit 分别表示是否应该继续扫描和是否包含指针:
bitmap 中的 byte 和 arena 的对应关系从末尾开始, 也就是随着内存分配会向两边扩展:
spans
spans 区域用于表示 arena 区中的某一页 (Page) 属于哪个 span, 什么是 span 将在下面介绍.
spans 区域中一个指针 (8 byte) 对应了 arena 区域中的一页(在 go 中一页 = 8KB).
所以 spans 的大小是 512GB / 页大小 (8KB) * 指针大小 (8 byte) = 512MB.
spans 区域的一个指针对应 arena 区域的一页的结构如下, 和 bitmap 不一样的是对应关系会从开头开始:
什么时候从 Heap 分配对象
很多讲解 go 的文章和书籍中都提到过, go 会自动确定哪些对象应该放在栈上, 哪些对象应该放在堆上.
简单的来说, 当一个对象的内容可能在生成该对象的函数结束后被访问, 那么这个对象就会分配在堆上.
在堆上分配对象的情况包括:
- 返回对象的指针
- 传递了对象的指针到其他函数
- 在闭包中使用了对象并且需要修改对象
- 使用 new
在 C 语言中函数返回在栈上的对象的指针是非常危险的事情, 但在 go 中却是安全的, 因为这个对象会自动在堆上分配.
go 决定是否使用堆分配对象的过程也叫 “逃逸分析”.
GC Bitmap
GC 在标记时需要知道哪些地方包含了指针, 例如上面提到的 bitmap 区域涵盖了 arena 区域中的指针信息.
除此之外, GC 还需要知道栈空间上哪些地方包含了指针,
因为栈空间不属于 arena 区域, 栈空间的指针信息将会在函数信息里面.
另外, GC 在分配对象时也需要根据对象的类型设置 bitmap 区域, 来源的指针信息将会在类型信息里面.
总结起来 go 中有以下的 GC Bitmap:
- bitmap 区域: 涵盖了 arena 区域, 使用 2 bit 表示一个指针大小的内存
- 函数信息: 涵盖了函数的栈空间, 使用 1 bit 表示一个指针大小的内存 (位于 stackmap.bytedata)
- 类型信息: 在分配对象时会复制到 bitmap 区域, 使用 1 bit 表示一个指针大小的内存 (位于_type.gcdata)
Span
span 是用于分配对象的区块, 下图是简单说明了 Span 的内部结构:
通常一个 span 包含了多个大小相同的元素, 一个元素会保存一个对象, 除非:
- span 用于保存大对象, 这种情况 span 只有一个元素
- span 用于保存极小对象且不包含指针的对象 (tiny object), 这种情况 span 会用一个元素保存多个对象
span 中有一个 freeindex 标记下一次分配对象时应该开始搜索的地址, 分配后 freeindex 会增加,
在 freeindex 之前的元素都是已分配的, 在 freeindex 之后的元素有可能已分配, 也有可能未分配.
span 每次 GC 以后都可能会回收掉一些元素, allocBits 用于标记哪些元素是已分配的, 哪些元素是未分配的.
使用 freeindex + allocBits 可以在分配时跳过已分配的元素, 把对象设置在未分配的元素中,
但因为每次都去访问 allocBits 效率会比较慢, span 中有一个整数型的 allocCache 用于缓存 freeindex 开始的 bitmap, 缓存的 bit 值与原值相反.
gcmarkBits 用于在 gc 时标记哪些对象存活, 每次 gc 以后 gcmarkBits 会变为 allocBits.
需要注意的是 span 结构本身的内存是从系统分配的, 上面提到的 spans 区域和 bitmap 区域都只是一个索引.
Span 的类型
span 根据大小可以分为 67 个类型, 如下:
// class bytes/obj bytes/span objects tail waste max waste
// 1 8 8192 1024 0 87.50%
// 2 16 8192 512 0 43.75%
// 3 32 8192 256 0 46.88%
// 4 48 8192 170 32 31.52%
// 5 64 8192 128 0 23.44%
// 6 80 8192 102 32 19.07%
// 7 96 8192 85 32 15.95%
// 8 112 8192 73 16 13.56%
// 9 128 8192 64 0 11.72%
// 10 144 8192 56 128 11.82%
// 11 160 8192 51 32 9.73%
// 12 176 8192 46 96 9.59%
// 13 192 8192 42 128 9.25%
// 14 208 8192 39 80 8.12%
// 15 224 8192 36 128 8.15%
// 16 240 8192 34 32 6.62%
// 17 256 8192 32 0 5.86%
// 18 288 8192 28 128 12.16%
// 19 320 8192 25 192 11.80%
// 20 352 8192 23 96 9.88%
// 21 384 8192 21 128 9.51%
// 22 416 8192 19 288 10.71%
// 23 448 8192 18 128 8.37%
// 24 480 8192 17 32 6.82%
// 25 512 8192 16 0 6.05%
// 26 576 8192 14 128 12.33%
// 27 640 8192 12 512 15.48%
// 28 704 8192 11 448 13.93%
// 29 768 8192 10 512 13.94%
// 30 896 8192 9 128 15.52%
// 31 1024 8192 8 0 12.40%
// 32 1152 8192 7 128 12.41%
// 33 1280 8192 6 512 15.55%
// 34 1408 16384 11 896 14.00%
// 35 1536 8192 5 512 14.00%
// 36 1792 16384 9 256 15.57%
// 37 2048 8192 4 0 12.45%
// 38 2304 16384 7 256 12.46%
// 39 2688 8192 3 128 15.59%
// 40 3072 24576 8 0 12.47%
// 41 3200 16384 5 384 6.22%
// 42 3456 24576 7 384 8.83%
// 43 4096 8192 2 0 15.60%
// 44 4864 24576 5 256 16.65%
// 45 5376 16384 3 256 10.92%
// 46 6144 24576 4 0 12.48%
// 47 6528 32768 5 128 6.23%
// 48 6784 40960 6 256 4.36%
// 49 6912 49152 7 768 3.37%
// 50 8192 8192 1 0 15.61%
// 51 9472 57344 6 512 14.28%
// 52 9728 49152 5 512 3.64%
// 53 10240 40960 4 0 4.99%
// 54 10880 32768 3 128 6.24%
// 55 12288 24576 2 0 11.45%
// 56 13568 40960 3 256 9.99%
// 57 14336 57344 4 0 5.35%
// 58 16384 16384 1 0 12.49%
// 59 18432 73728 4 0 11.11%
// 60 19072 57344 3 128 3.57%
// 61 20480 40960 2 0 6.87%
// 62 21760 65536 3 256 6.25%
// 63 24576 24576 1 0 11.45%
// 64 27264 81920 3 128 10.00%
// 65 28672 57344 2 0 4.91%
// 66 32768 32768 1 0 12.50%
以类型 (class) 为 1 的 span 为例,
span 中的元素大小是 8 byte, span 本身占 1 页也就是 8K, 一共可以保存 1024 个对象.
在分配对象时, 会根据对象的大小决定使用什么类型的 span,
例如 16 byte 的对象会使用 span 2, 17 byte 的对象会使用 span 3, 32 byte 的对象会使用 span 3.
从这个例子也可以看到, 分配 17 和 32 byte 的对象都会使用 span 3, 也就是说部分大小的对象在分配时会浪费一定的空间.
有人可能会注意到, 上面最大的 span 的元素大小是 32K, 那么分配超过 32K 的对象会在哪里分配呢?
超过 32K 的对象称为 “大对象”, 分配大对象时, 会直接从 heap 分配一个特殊的 span,
这个特殊的 span 的类型 (class) 是 0, 只包含了一个大对象, span 的大小由对象的大小决定.
特殊的 span 加上的 66 个标准的 span, 一共组成了 67 个 span 类型.
Span 的位置
在前一篇中我提到了 P 是一个虚拟的资源, 同一时间只能有一个线程访问同一个 P, 所以 P 中的数据不需要锁.
为了分配对象时有更好的性能, 各个 P 中都有 span 的缓存 (也叫 mcache), 缓存的结构如下:
各个 P 中按 span 类型的不同, 有 67*2=134 个 span 的缓存,
其中 scan 和 noscan 的区别在于,
如果对象包含了指针, 分配对象时会使用 scan 的 span,
如果对象不包含指针, 分配对象时会使用 noscan 的 span.
把 span 分为 scan 和 noscan 的意义在于,
GC 扫描对象的时候对于 noscan 的 span 可以不去查看 bitmap 区域来标记子对象, 这样可以大幅提升标记的效率.
在分配对象时将会从以下的位置获取适合的 span 用于分配:
- 首先从 P 的缓存 (mcache) 获取, 如果有缓存的 span 并且未满则使用, 这个步骤不需要锁
- 然后从全局缓存 (mcentral) 获取, 如果获取成功则设置到 P, 这个步骤需要锁
- 最后从 mheap 获取, 获取后设置到全局缓存, 这个步骤需要锁
在 P 中缓存 span 的做法跟 CoreCLR 中线程缓存分配上下文 (Allocation Context) 的做法相似,
都可以让分配对象时大部分时候不需要线程锁, 改进分配的性能.
分配对象的流程
go 从堆分配对象时会调用 newobject 函数, 这个函数的流程大致如下:
首先会检查 GC 是否在工作中, 如果 GC 在工作中并且当前的 G 分配了一定大小的内存则需要协助 GC 做一定的工作,
这个机制叫 GC Assist, 用于防止分配内存太快导致 GC 回收跟不上的情况发生.
之后会判断是小对象还是大对象, 如果是大对象则直接调用 largeAlloc 从堆中分配,
如果是小对象分 3 个阶段获取可用的 span, 然后从 span 中分配对象:
- 首先从 P 的缓存 (mcache) 获取
- 然后从全局缓存 (mcentral) 获取, 全局缓存中有可用的 span 的列表
- 最后从 mheap 获取, mheap 中也有 span 的自由列表, 如果都获取失败则从 arena 区域分配
这三个阶段的详细结构如下图:
数据类型的定义
分配对象涉及的数据类型包含:
p: 前一篇提到过, P 是协程中的用于运行 go 代码的虚拟资源
m: 前一篇提到过, M 目前代表系统线程
g: 前一篇提到过, G 就是 goroutine
mspan: 用于分配对象的区块
mcentral: 全局的 mspan 缓存, 一共有 67*2=134 个
mheap: 用于管理 heap 的对象, 全局只有一个
源代码分析
go 从堆分配对象时会调用newobject函数, 先从这个函数看起:
func newobject(typ *_type) unsafe.Pointer {
return mallocgc(typ.size, typ, true)
}
// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap.
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
if gcphase == _GCmarktermination {
throw("mallocgc called with gcphase == _GCmarktermination")
}
if size == 0 {
return unsafe.Pointer(&zerobase)
}
if debug.sbrk != 0 {
align := uintptr(16)
if typ != nil {
align = uintptr(typ.align)
}
return persistentalloc(size, align, &memstats.other_sys)
}
// 判断是否要辅助GC工作
// gcBlackenEnabled在GC的标记阶段会开启
// assistG is the G to charge for this allocation, or nil if
// GC is not currently active.
var assistG *g
if gcBlackenEnabled != 0 {
// Charge the current user G for this allocation.
assistG = getg()
if assistG.m.curg != nil {
assistG = assistG.m.curg
}
// Charge the allocation against the G. We'll account
// for internal fragmentation at the end of mallocgc.
assistG.gcAssistBytes -= int64(size)
// 会按分配的大小判断需要协助GC完成多少工作
// 具体的算法将在下面讲解收集器时说明
if assistG.gcAssistBytes < 0 {
// This G is in debt. Assist the GC to correct
// this before allocating. This must happen
// before disabling preemption.
gcAssistAlloc(assistG)
}
}
// 增加当前G对应的M的lock计数, 防止这个G被抢占
// Set mp.mallocing to keep from being preempted by GC.
mp := acquirem()
if mp.mallocing != 0 {
throw("malloc deadlock")
}
if mp.gsignal == getg() {
throw("malloc during signal")
}
mp.mallocing = 1
shouldhelpgc := false
dataSize := size
// 获取当前G对应的M对应的P的本地span缓存(mcache)
// 因为M在拥有P后会把P的mcache设到M中, 这里返回的是getg().m.mcache
c := gomcache()
var x unsafe.Pointer
noscan := typ == nil || typ.kind&kindNoPointers != 0
// 判断是否小对象, maxSmallSize当前的值是32K
if size <= maxSmallSize {
// 如果对象不包含指针, 并且对象的大小小于16 bytes, 可以做特殊处理
// 这里是针对非常小的对象的优化, 因为span的元素最小只能是8 byte, 如果对象更小那么很多空间都会被浪费掉
// 非常小的对象可以整合在"class 2 noscan"的元素(大小为16 byte)中
if noscan && size < maxTinySize {
// Tiny allocator.
//
// Tiny allocator combines several tiny allocation requests
// into a single memory block. The resulting memory block
// is freed when all subobjects are unreachable. The subobjects
// must be noscan (don't have pointers), this ensures that
// the amount of potentially wasted memory is bounded.
//
// Size of the memory block used for combining (maxTinySize) is tunable.
// Current setting is 16 bytes, which relates to 2x worst case memory
// wastage (when all but one subobjects are unreachable).
// 8 bytes would result in no wastage at all, but provides less
// opportunities for combining.
// 32 bytes provides more opportunities for combining,
// but can lead to 4x worst case wastage.
// The best case winning is 8x regardless of block size.
//
// Objects obtained from tiny allocator must not be freed explicitly.
// So when an object will be freed explicitly, we ensure that
// its size >= maxTinySize.
//
// SetFinalizer has a special case for objects potentially coming
// from tiny allocator, it such case it allows to set finalizers
// for an inner byte of a memory block.
//
// The main targets of tiny allocator are small strings and
// standalone escaping variables. On a json benchmark
// the allocator reduces number of allocations by ~12% and
// reduces heap size by ~20%.
off := c.tinyoffset
// Align tiny pointer for required (conservative) alignment.
if size&7 == 0 {
off = round(off, 8)
} else if size&3 == 0 {
off = round(off, 4)
} else if size&1 == 0 {
off = round(off, 2)
}
if off+size <= maxTinySize && c.tiny != 0 {
// The object fits into existing tiny block.
x = unsafe.Pointer(c.tiny + off)
c.tinyoffset = off + size
c.local_tinyallocs++
mp.mallocing = 0
releasem(mp)
return x
}
// Allocate a new maxTinySize block.
span := c.alloc[tinySpanClass]
v := nextFreeFast(span)
if v == 0 {
v, _, shouldhelpgc = c.nextFree(tinySpanClass)
}
x = unsafe.Pointer(v)
(*[2]uint64)(x)[0] = 0
(*[2]uint64)(x)[1] = 0
// See if we need to replace the existing tiny block with the new one
// based on amount of remaining free space.
if size < c.tinyoffset || c.tiny == 0 {
c.tiny = uintptr(x)
c.tinyoffset = size
}
size = maxTinySize
} else {
// 否则按普通的小对象分配
// 首先获取对象的大小应该使用哪个span类型
var sizeclass uint8
if size <= smallSizeMax-8 {
sizeclass = size_to_class8[(size+smallSizeDiv-1)/smallSizeDiv]
} else {
sizeclass = size_to_class128[(size-smallSizeMax+largeSizeDiv-1)/largeSizeDiv]
}
size = uintptr(class_to_size[sizeclass])
// 等于sizeclass * 2 + (noscan ? 1 : 0)
spc := makeSpanClass(sizeclass, noscan)
span := c.alloc[spc]
// 尝试快速的从这个span中分配
v := nextFreeFast(span)
if v == 0 {
// 分配失败, 可能需要从mcentral或者mheap中获取
// 如果从mcentral或者mheap获取了新的span, 则shouldhelpgc会等于true
// shouldhelpgc会等于true时会在下面判断是否要触发GC
v, span, shouldhelpgc = c.nextFree(spc)
}
x = unsafe.Pointer(v)
if needzero && span.needzero != 0 {
memclrNoHeapPointers(unsafe.Pointer(v), size)
}
}
} else {
// 大对象直接从mheap分配, 这里的s是一个特殊的span, 它的class是0
var s *mspan
shouldhelpgc = true
systemstack(func() {
s = largeAlloc(size, needzero, noscan)
})
s.freeindex = 1
s.allocCount = 1
x = unsafe.Pointer(s.base())
size = s.elemsize
}
// 设置arena对应的bitmap, 记录哪些位置包含了指针, GC会使用bitmap扫描所有可到达的对象
var scanSize uintptr
if !noscan {
// If allocating a defer+arg block, now that we've picked a malloc size
// large enough to hold everything, cut the "asked for" size down to
// just the defer header, so that the GC bitmap will record the arg block
// as containing nothing at all (as if it were unused space at the end of
// a malloc block caused by size rounding).
// The defer arg areas are scanned as part of scanstack.
if typ == deferType {
dataSize = unsafe.Sizeof(_defer{})
}
// 这个函数非常的长, 有兴趣的可以看
// https://github.com/golang/go/blob/go1.9.2/src/runtime/mbitmap.go#L855
// 虽然代码很长但是设置的内容跟上面说过的bitmap区域的结构一样
// 根据类型信息设置scan bit跟pointer bit, scan bit成立表示应该继续扫描, pointer bit成立表示该位置是指针
// 需要注意的地方有
// - 如果一个类型只有开头的地方包含指针, 例如[ptr, ptr, large non-pointer data]
// 那么后面的部分的scan bit将会为0, 这样可以大幅提升标记的效率
// - 第二个slot的scan bit用途比较特殊, 它并不用于标记是否继续scan, 而是标记checkmark
// 什么是checkmark
// - 因为go的并行GC比较复杂, 为了检查实现是否正确, go需要在有一个检查所有应该被标记的对象是否被标记的机制
// 这个机制就是checkmark, 在开启checkmark时go会在标记阶段的最后停止整个世界然后重新执行一次标记
// 上面的第二个slot的scan bit就是用于标记对象在checkmark标记中是否被标记的
// - 有的人可能会发现第二个slot要求对象最少有两个指针的大小, 那么只有一个指针的大小的对象呢?
// 只有一个指针的大小的对象可以分为两种情况
// 对象就是指针, 因为大小刚好是1个指针所以并不需要看bitmap区域, 这时第一个slot就是checkmark
// 对象不是指针, 因为有tiny alloc的机制, 不是指针且只有一个指针大小的对象会分配在两个指针的span中
// 这时候也不需要看bitmap区域, 所以和上面一样第一个slot就是checkmark
heapBitsSetType(uintptr(x), size, dataSize, typ)
if dataSize > typ.size {
// Array allocation. If there are any
// pointers, GC has to scan to the last
// element.
if typ.ptrdata != 0 {
scanSize = dataSize - typ.size + typ.ptrdata
}
} else {
scanSize = typ.ptrdata
}
c.local_scan += scanSize
}
// 内存屏障, 因为x86和x64的store不会乱序所以这里只是个针对编译器的屏障, 汇编中是ret
// Ensure that the stores above that initialize x to
// type-safe memory and set the heap bits occur before
// the caller can make x observable to the garbage
// collector. Otherwise, on weakly ordered machines,
// the garbage collector could follow a pointer to x,
// but see uninitialized memory or stale heap bits.
publicationBarrier()
// 如果当前在GC中, 需要立刻标记分配后的对象为"黑色", 防止它被回收
// Allocate black during GC.
// All slots hold nil so no scanning is needed.
// This may be racing with GC so do it atomically if there can be
// a race marking the bit.
if gcphase != _GCoff {
gcmarknewobject(uintptr(x), size, scanSize)
}
// Race Detector的处理(用于检测线程冲突问题)
if raceenabled {
racemalloc(x, size)
}
// Memory Sanitizer的处理(用于检测危险指针等内存问题)
if msanenabled {
msanmalloc(x, size)
}
// 重新允许当前的G被抢占
mp.mallocing = 0
releasem(mp)
// 除错记录
if debug.allocfreetrace != 0 {
tracealloc(x, size, typ)
}
// Profiler记录
if rate := MemProfileRate; rate > 0 {
if size < uintptr(rate) && int32(size) < c.next_sample {
c.next_sample -= int32(size)
} else {
mp := acquirem()
profilealloc(mp, x, size)
releasem(mp)
}
}
// gcAssistBytes减去"实际分配大小 - 要求分配大小", 调整到准确值
if assistG != nil {
// Account for internal fragmentation in the assist
// debt now that we know it.
assistG.gcAssistBytes -= int64(size - dataSize)
}
// 如果之前获取了新的span, 则判断是否需要后台启动GC
// 这里的判断逻辑(gcTrigger)会在下面详细说明
if shouldhelpgc {
if t := (gcTrigger{kind: gcTriggerHeap}); t.test() {
gcStart(gcBackgroundMode, t)
}
}
return x
}
接下来看看如何从 span 里面分配对象, 首先会调用nextFreeFast尝试快速分配:
func nextFreeFast(s *mspan) gclinkptr {
theBit := sys.Ctz64(s.allocCache)
if theBit < 64 {
result := s.freeindex + uintptr(theBit)
if result < s.nelems {
freeidx := result + 1
if freeidx%64 == 0 && freeidx != s.nelems {
return 0
}
s.allocCache >>= uint(theBit + 1)
s.freeindex = freeidx
v := gclinkptr(result*s.elemsize + s.base())
s.allocCount++
return v
}
}
return 0
}
如果在 freeindex 后无法快速找到未分配的元素, 就需要调用nextFree做出更复杂的处理:
func (c *mcache) nextFree(spc spanClass) (v gclinkptr, s *mspan, shouldhelpgc bool) {
s = c.alloc[spc]
shouldhelpgc = false
freeIndex := s.nextFreeIndex()
if freeIndex == s.nelems {
if uintptr(s.allocCount) != s.nelems {
println("runtime: s.allocCount=", s.allocCount, "s.nelems=", s.nelems)
throw("s.allocCount != s.nelems && freeIndex == s.nelems")
}
systemstack(func() {
c.refill(spc)
})
shouldhelpgc = true
s = c.alloc[spc]
freeIndex = s.nextFreeIndex()
}
if freeIndex >= s.nelems {
throw("freeIndex is not valid")
}
v = gclinkptr(freeIndex*s.elemsize + s.base())
s.allocCount++
if uintptr(s.allocCount) > s.nelems {
println("s.allocCount=", s.allocCount, "s.nelems=", s.nelems)
throw("s.allocCount > s.nelems")
}
return
}
如果 mcache 中指定类型的 span 已满, 就需要调用refill函数申请新的 span:
func (c *mcache) refill(spc spanClass) *mspan {
_g_ := getg()
_g_.m.locks++
s := c.alloc[spc]
if uintptr(s.allocCount) != s.nelems {
throw("refill of span with free space remaining")
}
if s != &emptymspan {
s.incache = false
}
s = mheap_.central[spc].mcentral.cacheSpan()
if s == nil {
throw("out of memory")
}
if uintptr(s.allocCount) == s.nelems {
throw("span has no free space")
}
c.alloc[spc] = s
_g_.m.locks--
return s
}
向 mcentral 申请一个新的 span 会通过cacheSpan函数:
mcentral 首先尝试从内部的链表复用原有的 span, 如果复用失败则向 mheap 申请.
func (c *mcentral) cacheSpan() *mspan {
spanBytes := uintptr(class_to_allocnpages[c.spanclass.sizeclass()]) * _PageSize
deductSweepCredit(spanBytes, 0)
lock(&c.lock)
traceDone := false
if trace.enabled {
traceGCSweepStart()
}
sg := mheap_.sweepgen
retry:
var s *mspan
for s = c.nonempty.first; s != nil; s = s.next {
if s.sweepgen == sg-2 && atomic.Cas(&s.sweepgen, sg-2, sg-1) {
c.nonempty.remove(s)
c.empty.insertBack(s)
unlock(&c.lock)
s.sweep(true)
goto havespan
}
if s.sweepgen == sg-1 {
continue
}
c.nonempty.remove(s)
c.empty.insertBack(s)
unlock(&c.lock)
goto havespan
}
for s = c.empty.first; s != nil; s = s.next {
if s.sweepgen == sg-2 && atomic.Cas(&s.sweepgen, sg-2, sg-1) {
c.empty.remove(s)
c.empty.insertBack(s)
unlock(&c.lock)
s.sweep(true)
freeIndex := s.nextFreeIndex()
if freeIndex != s.nelems {
s.freeindex = freeIndex
goto havespan
}
lock(&c.lock)
goto retry
}
if s.sweepgen == sg-1 {
continue
}
break
}
if trace.enabled {
traceGCSweepDone()
traceDone = true
}
unlock(&c.lock)
s = c.grow()
if s == nil {
return nil
}
lock(&c.lock)
c.empty.insertBack(s)
unlock(&c.lock)
havespan:
if trace.enabled && !traceDone {
traceGCSweepDone()
}
cap := int32((s.npages << _PageShift) / s.elemsize)
n := cap - int32(s.allocCount)
if n == 0 || s.freeindex == s.nelems || uintptr(s.allocCount) == s.nelems {
throw("span has no free objects")
}
atomic.Xadd64(&c.nmalloc, int64(n))
usedBytes := uintptr(s.allocCount) * s.elemsize
atomic.Xadd64(&memstats.heap_live, int64(spanBytes)-int64(usedBytes))
if trace.enabled {
traceHeapAlloc()
}
if gcBlackenEnabled != 0 {
gcController.revise()
}
s.incache = true
freeByteBase := s.freeindex &^ (64 - 1)
whichByte := freeByteBase / 8
s.refillAllocCache(whichByte)
s.allocCache >>= s.freeindex % 64
return s
}
mcentral 向 mheap 申请一个新的 span 会使用grow函数:
func (c *mcentral) grow() *mspan {
npages := uintptr(class_to_allocnpages[c.spanclass.sizeclass()])
size := uintptr(class_to_size[c.spanclass.sizeclass()])
n := (npages << _PageShift) / size
s := mheap_.alloc(npages, c.spanclass, false, true)
if s == nil {
return nil
}
p := s.base()
s.limit = p + size*n
heapBitsForSpan(s.base()).initSpan(s)
return s
}
mheap 分配 span 的函数是alloc:
func (h *mheap) alloc(npage uintptr, spanclass spanClass, large bool, needzero bool) *mspan {
var s *mspan
systemstack(func() {
s = h.alloc_m(npage, spanclass, large)
})
if s != nil {
if needzero && s.needzero != 0 {
memclrNoHeapPointers(unsafe.Pointer(s.base()), s.npages<<_PageShift)
}
s.needzero = 0
}
return s
}
alloc 函数会在 g0 的栈空间中调用alloc_m函数:
func (h *mheap) alloc_m(npage uintptr, spanclass spanClass, large bool) *mspan {
_g_ := getg()
if _g_ != _g_.m.g0 {
throw("_mheap_alloc not on g0 stack")
}
lock(&h.lock)
if h.sweepdone == 0 {
if trace.enabled {
traceGCSweepStart()
}
h.reclaim(npage)
if trace.enabled {
traceGCSweepDone()
}
}
memstats.heap_scan += uint64(_g_.m.mcache.local_scan)
_g_.m.mcache.local_scan = 0
memstats.tinyallocs += uint64(_g_.m.mcache.local_tinyallocs)
_g_.m.mcache.local_tinyallocs = 0
s := h.allocSpanLocked(npage, &memstats.heap_inuse)
if s != nil {
atomic.Store(&s.sweepgen, h.sweepgen)
h.sweepSpans[h.sweepgen/2%2].push(s)
s.state = _MSpanInUse
s.allocCount = 0
s.spanclass = spanclass
if sizeclass := spanclass.sizeclass(); sizeclass == 0 {
s.elemsize = s.npages << _PageShift
s.divShift = 0
s.divMul = 0
s.divShift2 = 0
s.baseMask = 0
} else {
s.elemsize = uintptr(class_to_size[sizeclass])
m := &class_to_divmagic[sizeclass]
s.divShift = m.shift
s.divMul = m.mul
s.divShift2 = m.shift2
s.baseMask = m.baseMask
}
h.pagesInUse += uint64(npage)
if large {
memstats.heap_objects++
mheap_.largealloc += uint64(s.elemsize)
mheap_.nlargealloc++
atomic.Xadd64(&memstats.heap_live, int64(npage<<_PageShift))
if s.npages < uintptr(len(h.busy)) {
h.busy[s.npages].insertBack(s)
} else {
h.busylarge.insertBack(s)
}
}
}
if gcBlackenEnabled != 0 {
gcController.revise()
}
if trace.enabled {
traceHeapAlloc()
}
unlock(&h.lock)
return s
}
继续查看allocSpanLocked函数:
func (h *mheap) allocSpanLocked(npage uintptr, stat *uint64) *mspan {
var list *mSpanList
var s *mspan
for i := int(npage); i < len(h.free); i++ {
list = &h.free[i]
if !list.isEmpty() {
s = list.first
list.remove(s)
goto HaveSpan
}
}
s = h.allocLarge(npage)
if s == nil {
if !h.grow(npage) {
return nil
}
s = h.allocLarge(npage)
if s == nil {
return nil
}
}
HaveSpan:
if s.state != _MSpanFree {
throw("MHeap_AllocLocked - MSpan not free")
}
if s.npages < npage {
throw("MHeap_AllocLocked - bad npages")
}
if s.npreleased > 0 {
sysUsed(unsafe.Pointer(s.base()), s.npages<<_PageShift)
memstats.heap_released -= uint64(s.npreleased << _PageShift)
s.npreleased = 0
}
if s.npages > npage {
t := (*mspan)(h.spanalloc.alloc())
t.init(s.base()+npage<<_PageShift, s.npages-npage)
s.npages = npage
p := (t.base() - h.arena_start) >> _PageShift
if p > 0 {
h.spans[p-1] = s
}
h.spans[p] = t
h.spans[p+t.npages-1] = t
t.needzero = s.needzero
s.state = _MSpanManual
t.state = _MSpanManual
h.freeSpanLocked(t, false, false, s.unusedsince)
s.state = _MSpanFree
}
s.unusedsince = 0
p := (s.base() - h.arena_start) >> _PageShift
for n := uintptr(0); n < npage; n++ {
h.spans[p+n] = s
}
*stat += uint64(npage << _PageShift)
memstats.heap_idle -= uint64(npage << _PageShift)
if s.inList() {
throw("still in list")
}
return s
}
继续查看allocLarge函数:
func (h *mheap) allocLarge(npage uintptr) *mspan {
return h.freelarge.remove(npage)
}
freelarge 的类型是 mTreap, 调用remove函数会在树里面搜索一个至少 npage 且在树中的最小的 span 返回:
func (root *mTreap) remove(npages uintptr) *mspan {
t := root.treap
for t != nil {
if t.spanKey == nil {
throw("treap node with nil spanKey found")
}
if t.npagesKey < npages {
t = t.right
} else if t.left != nil && t.left.npagesKey >= npages {
t = t.left
} else {
result := t.spanKey
root.removeNode(t)
return result
}
}
return nil
}
向 arena 区域申请新 span 的函数是 mheap 类的grow函数:
func (h *mheap) grow(npage uintptr) bool {
npage = round(npage, (64<<10)/_PageSize)
ask := npage << _PageShift
if ask < _HeapAllocChunk {
ask = _HeapAllocChunk
}
v := h.sysAlloc(ask)
if v == nil {
if ask > npage<<_PageShift {
ask = npage << _PageShift
v = h.sysAlloc(ask)
}
if v == nil {
print("runtime: out of memory: cannot allocate ", ask, "-byte block (", memstats.heap_sys, " in use)n")
return false
}
}
s := (*mspan)(h.spanalloc.alloc())
s.init(uintptr(v), ask>>_PageShift)
p := (s.base() - h.arena_start) >> _PageShift
for i := p; i < p+s.npages; i++ {
h.spans[i] = s
}
atomic.Store(&s.sweepgen, h.sweepgen)
s.state = _MSpanInUse
h.pagesInUse += uint64(s.npages)
h.freeSpanLocked(s, false, true, 0)
return true
}
继续查看 mheap 的sysAlloc函数:
func (h *mheap) sysAlloc(n uintptr) unsafe.Pointer {
const strandLimit = 16 << 20
if n > h.arena_end-h.arena_alloc {
p_size := round(n+_PageSize, 256<<20)
new_end := h.arena_end + p_size
if h.arena_end <= new_end && new_end-h.arena_start-1 <= _MaxMem {
var reserved bool
p := uintptr(sysReserve(unsafe.Pointer(h.arena_end), p_size, &reserved))
if p == 0 {
goto reservationFailed
}
if p == h.arena_end {
h.arena_end = new_end
h.arena_reserved = reserved
} else if h.arena_start <= p && p+p_size-h.arena_start-1 <= _MaxMem && h.arena_end-h.arena_alloc < strandLimit {
h.arena_end = p + p_size
p = round(p, _PageSize)
h.arena_alloc = p
h.arena_reserved = reserved
} else {
stat := uint64(p_size)
sysFree(unsafe.Pointer(p), p_size, &stat)
}
}
}
if n <= h.arena_end-h.arena_alloc {
p := h.arena_alloc
sysMap(unsafe.Pointer(p), n, h.arena_reserved, &memstats.heap_sys)
h.arena_alloc += n
if h.arena_alloc > h.arena_used {
h.setArenaUsed(h.arena_alloc, true)
}
if p&(_PageSize-1) != 0 {
throw("misrounded allocation in MHeap_SysAlloc")
}
return unsafe.Pointer(p)
}
reservationFailed:
if sys.PtrSize != 4 {
return nil
}
p_size := round(n, _PageSize) + _PageSize
p := uintptr(sysAlloc(p_size, &memstats.heap_sys))
if p == 0 {
return nil
}
if p < h.arena_start || p+p_size-h.arena_start > _MaxMem {
top := uint64(h.arena_start) + _MaxMem
print("runtime: memory allocated by OS (", hex(p), ") not in usable range [", hex(h.arena_start), ",", hex(top), ")n")
sysFree(unsafe.Pointer(p), p_size, &memstats.heap_sys)
return nil
}
p += -p & (_PageSize - 1)
if p+n > h.arena_used {
h.setArenaUsed(p+n, true)
}
if p&(_PageSize-1) != 0 {
throw("misrounded allocation in MHeap_SysAlloc")
}
return unsafe.Pointer(p)
}
以上就是分配对象的完整流程了, 接下来分析 GC 标记和回收对象的处理.
回收对象的流程
GO 的 GC 是并行 GC, 也就是 GC 的大部分处理和普通的 go 代码是同时运行的, 这让 GO 的 GC 流程比较复杂.
首先 GC 有四个阶段, 它们分别是:
- Sweep Termination: 对未清扫的 span 进行清扫, 只有上一轮的 GC 的清扫工作完成才可以开始新一轮的 GC
- Mark: 扫描所有根对象, 和根对象可以到达的所有对象, 标记它们不被回收
- Mark Termination: 完成标记工作, 重新扫描部分根对象 (要求 STW)
- Sweep: 按标记结果清扫 span
下图是比较完整的 GC 流程, 并按颜色对这四个阶段进行了分类:
在 GC 过程中会有两种后台任务 (G), 一种是标记用的后台任务, 一种是清扫用的后台任务.
标记用的后台任务会在需要时启动, 可以同时工作的后台任务数量大约是 P 的数量的 25%, 也就是 go 所讲的让 25% 的 cpu 用在 GC 上的根据.
清扫用的后台任务在程序启动时会启动一个, 进入清扫阶段时唤醒.
目前整个 GC 流程会进行两次 STW(Stop The World), 第一次是 Mark 阶段的开始, 第二次是 Mark Termination 阶段.
第一次 STW 会准备根对象的扫描, 启动写屏障 (Write Barrier) 和辅助 GC(mutator assist).
第二次 STW 会重新扫描部分根对象, 禁用写屏障 (Write Barrier) 和辅助 GC(mutator assist).
需要注意的是, 不是所有根对象的扫描都需要 STW, 例如扫描栈上的对象只需要停止拥有该栈的 G.
从 go 1.9 开始, 写屏障的实现使用了 Hybrid Write Barrier, 大幅减少了第二次 STW 的时间.
GC 的触发条件
GC 在满足一定条件后会被触发, 触发条件有以下几种:
- gcTriggerAlways: 强制触发 GC
- gcTriggerHeap: 当前分配的内存达到一定值就触发 GC
- gcTriggerTime: 当一定时间没有执行过 GC 就触发 GC
- gcTriggerCycle: 要求启动新一轮的 GC, 已启动则跳过, 手动触发 GC 的
runtime.GC()
会使用这个条件
触发条件的判断在 gctrigger 的test函数.
其中 gcTriggerHeap 和 gcTriggerTime 这两个条件是自然触发的, gcTriggerHeap 的判断代码如下:
return memstats.heap_live >= memstats.gc_trigger
heap_live 的增加在上面对分配器的代码分析中可以看到, 当值达到 gc_trigger 就会触发 GC, 那么 gc_trigger 是如何决定的?
gc_trigger 的计算在gcSetTriggerRatio函数中, 公式是:
trigger = uint64(float64(memstats.heap_marked) * (1 + triggerRatio))
当前标记存活的大小乘以 1 + 系数 triggerRatio, 就是下次出发 GC 需要的分配量.
triggerRatio 在每次 GC 后都会调整, 计算 triggerRatio 的函数是encCycle, 公式是:
const triggerGain = 0.5
// 目标Heap增长率, 默认是1.0
goalGrowthRatio := float64(gcpercent) / 100
// 实际Heap增长率, 等于总大小/存活大小-1
actualGrowthRatio := float64(memstats.heap_live)/float64(memstats.heap_marked) - 1
// GC标记阶段的使用时间(因为endCycle是在Mark Termination阶段调用的)
assistDuration := nanotime() - c.markStartTime
// GC标记阶段的CPU占用率, 目标值是0.25
utilization := gcGoalUtilization
if assistDuration > 0 {
// assistTime是G辅助GC标记对象所使用的时间合计
// (nanosecnds spent in mutator assists during this cycle)
// 额外的CPU占用率 = 辅助GC标记对象的总时间 / (GC标记使用时间 * P的数量)
utilization += float64(c.assistTime) / float64(assistDuration*int64(gomaxprocs))
}
// 触发系数偏移值 = 目标增长率 - 原触发系数 - CPU占用率 / 目标CPU占用率 * (实际增长率 - 原触发系数)
// 参数的分析:
// 实际增长率越大, 触发系数偏移值越小, 小于0时下次触发GC会提早
// CPU占用率越大, 触发系数偏移值越小, 小于0时下次触发GC会提早
// 原触发系数越大, 触发系数偏移值越小, 小于0时下次触发GC会提早
triggerError := goalGrowthRatio - memstats.triggerRatio - utilization/gcGoalUtilization*(actualGrowthRatio-memstats.triggerRatio)
// 根据偏移值调整触发系数, 每次只调整偏移值的一半(渐进式调整)
triggerRatio := memstats.triggerRatio + triggerGain*triggerError
公式中的 “目标 Heap 增长率” 可以通过设置环境变量 “GOGC” 调整, 默认值是 100, 增加它的值可以减少 GC 的触发.
设置 “GOGC=off” 可以彻底关掉 GC.
gcTriggerTime 的判断代码如下:
lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime))
return lastgc != 0 && t.now-lastgc > forcegcperiod
forcegcperiod 的定义是 2 分钟, 也就是 2 分钟内没有执行过 GC 就会强制触发.
三色的定义 (黑, 灰, 白)
我看过的对三色 GC 的 “三色” 这个概念解释的最好的文章就是这一篇了, 强烈建议先看这一篇中的讲解.
“三色” 的概念可以简单的理解为:
- 黑色: 对象在这次 GC 中已标记, 且这个对象包含的子对象也已标记
- 灰色: 对象在这次 GC 中已标记, 但这个对象包含的子对象未标记
- 白色: 对象在这次 GC 中未标记
在 go 内部对象并没有保存颜色的属性, 三色只是对它们的状态的描述,
白色的对象在它所在的 span 的 gcmarkBits 中对应的 bit 为 0,
灰色的对象在它所在的 span 的 gcmarkBits 中对应的 bit 为 1, 并且对象在标记队列中,
黑色的对象在它所在的 span 的 gcmarkBits 中对应的 bit 为 1, 并且对象已经从标记队列中取出并处理.
gc 完成后, gcmarkBits 会移动到 allocBits 然后重新分配一个全部为 0 的 bitmap, 这样黑色的对象就变为了白色.
写屏障 (Write Barrier)
因为 go 支持并行 GC, GC 的扫描和 go 代码可以同时运行, 这样带来的问题是 GC 扫描的过程中 go 代码有可能改变了对象的依赖树,
例如开始扫描时发现根对象 A 和 B, B 拥有 C 的指针, GC 先扫描 A, 然后 B 把 C 的指针交给 A, GC 再扫描 B, 这时 C 就不会被扫描到.
为了避免这个问题, go 在 GC 的标记阶段会启用写屏障 (Write Barrier).
启用了写屏障 (Write Barrier) 后, 当 B 把 C 的指针交给 A 时, GC 会认为在这一轮的扫描中 C 的指针是存活的,
即使 A 可能会在稍后丢掉 C, 那么 C 就在下一轮回收.
写屏障只针对指针启用, 而且只在 GC 的标记阶段启用, 平时会直接把值写入到目标地址.
go 在 1.9 开始启用了混合写屏障 (Hybrid Write Barrier), 伪代码如下:
writePointer(slot, ptr):
shade(*slot)
if any stack is grey:
shade(ptr)
*slot = ptr
混合写屏障会同时标记指针写入目标的 “原指针” 和 “新指针 “.
标记原指针的原因是, 其他运行中的线程有可能会同时把这个指针的值复制到寄存器或者栈上的本地变量,
因为复制指针到寄存器或者栈上的本地变量不会经过写屏障, 所以有可能会导致指针不被标记, 试想下面的情况:
[go] b = obj
[go] oldx = nil
[gc] scan oldx...
[go] oldx = b.x
[go] b.x = ptr
[gc] scan b...
如果写屏障不标记原值, 那么oldx就不会被扫描到.
标记新指针的原因是, 其他运行中的线程有可能会转移指针的位置, 试想下面的情况:
[go] a = ptr
[go] b = obj
[gc] scan b...
[go] b.x = a
[go] a = nil
[gc] scan a...
如果写屏障不标记新值, 那么ptr就不会被扫描到.
混合写屏障可以让 GC 在并行标记结束后不需要重新扫描各个 G 的堆栈, 可以减少 Mark Termination 中的 STW 时间.
除了写屏障外, 在 GC 的过程中所有新分配的对象都会立刻变为黑色, 在上面的 mallocgc 函数中可以看到.
辅助 GC(mutator assist)
为了防止 heap 增速太快, 在 GC 执行的过程中如果同时运行的 G 分配了内存, 那么这个 G 会被要求辅助 GC 做一部分的工作.
在 GC 的过程中同时运行的 G 称为 “mutator”, “mutator assist” 机制就是 G 辅助 GC 做一部分工作的机制.
辅助 GC 做的工作有两种类型, 一种是标记 (Mark), 另一种是清扫 (Sweep).
辅助标记的触发可以查看上面的 mallocgc 函数, 触发时 G 会帮助扫描 “工作量” 个对象, 工作量的计算公式是:
debtBytes * assistWorkPerByte
意思是分配的大小乘以系数 assistWorkPerByte, assistWorkPerByte 的计算在函数revise中, 公式是:
scanWorkExpected := int64(memstats.heap_scan) - c.scanWork
if scanWorkExpected < 1000 {
scanWorkExpected = 1000
}
heapDistance := int64(memstats.next_gc) - int64(atomic.Load64(&memstats.heap_live))
if heapDistance <= 0 {
heapDistance = 1
}
c.assistWorkPerByte = float64(scanWorkExpected) / float64(heapDistance)
c.assistBytesPerWork = float64(heapDistance) / float64(scanWorkExpected)
和辅助标记不一样的是, 辅助清扫申请新 span 时才会检查, 而辅助标记是每次分配对象时都会检查.
辅助清扫的触发可以看上面的 cacheSpan 函数, 触发时 G 会帮助回收 “工作量” 页的对象, 工作量的计算公式是:
spanBytes * sweepPagesPerByte // 不完全相同, 具体看deductSweepCredit函数
意思是分配的大小乘以系数 sweepPagesPerByte, sweepPagesPerByte 的计算在函数gcSetTriggerRatio中, 公式是:
heapLiveBasis := atomic.Load64(&memstats.heap_live)
heapDistance := int64(trigger) - int64(heapLiveBasis)
heapDistance -= 1024 * 1024
if heapDistance < _PageSize {
heapDistance = _PageSize
}
pagesSwept := atomic.Load64(&mheap_.pagesSwept)
sweepDistancePages := int64(mheap_.pagesInUse) - int64(pagesSwept)
if sweepDistancePages <= 0 {
mheap_.sweepPagesPerByte = 0
} else {
mheap_.sweepPagesPerByte = float64(sweepDistancePages) / float64(heapDistance)
}
根对象
在 GC 的标记阶段首先需要标记的就是 “根对象”, 从根对象开始可到达的所有对象都会被认为是存活的.
根对象包含了全局变量, 各个 G 的栈上的变量等, GC 会先扫描根对象然后再扫描根对象可到达的所有对象.
扫描根对象包含了一系列的工作, 它们定义在[https://github.com/golang/go/blob/go1.9.2/src/runtime/mgcmark.go#L54]函数:
Fixed Roots: 特殊的扫描工作
- fixedRootFinalizers: 扫描析构器队列
- fixedRootFreeGStacks: 释放已中止的 G 的栈
Flush Cache Roots: 释放 mcache 中的所有 span, 要求 STW
Data Roots: 扫描可读写的全局变量
BSS Roots: 扫描只读的全局变量
Span Roots: 扫描各个 span 中特殊对象 (析构器列表)
Stack Roots: 扫描各个 G 的栈
标记阶段 (Mark) 会做其中的 “Fixed Roots”, “Data Roots”, “BSS Roots”, “Span Roots”, “Stack Roots”.
完成标记阶段 (Mark Termination) 会做其中的 “Fixed Roots”, “Flush Cache Roots”.
标记队列
GC 的标记阶段会使用 “标记队列” 来确定所有可从根对象到达的对象都已标记, 上面提到的 “灰色” 的对象就是在标记队列中的对象.
举例来说, 如果当前有[A, B, C]这三个根对象, 那么扫描根对象时就会把它们放到标记队列:
work queue: [A, B, C]
后台标记任务从标记队列中取出 A, 如果 A 引用了 D, 则把 D 放入标记队列:
work queue: [B, C, D]
后台标记任务从标记队列取出 B, 如果 B 也引用了 D, 这时因为 D 在 gcmarkBits 中对应的 bit 已经是 1 所以会跳过:
work queue: [C, D]
如果并行运行的 go 代码分配了一个对象 E, 对象 E 会被立刻标记, 但不会进入标记队列 (因为确定 E 没有引用其他对象).
然后并行运行的 go 代码把对象 F 设置给对象 E 的成员, 写屏障会标记对象 F 然后把对象 F 加到运行队列:
work queue: [C, D, F]
后台标记任务从标记队列取出 C, 如果 C 没有引用其他对象, 则不需要处理:
work queue: [D, F]
后台标记任务从标记队列取出 D, 如果 D 引用了 X, 则把 X 放入标记队列:
work queue: [F, X]
后台标记任务从标记队列取出 F, 如果 F 没有引用其他对象, 则不需要处理.
后台标记任务从标记队列取出 X, 如果 X 没有引用其他对象, 则不需要处理.
最后标记队列为空, 标记完成, 存活的对象有[A, B, C, D, E, F, X].
实际的状况会比上面介绍的状况稍微复杂一点.
标记队列会分为全局标记队列和各个 P 的本地标记队列, 这点和协程中的运行队列相似.
并且标记队列为空以后, 还需要停止整个世界并禁止写屏障, 然后再次检查是否为空.
源代码分析
go 触发 gc 会从gcStart函数开始:
func gcStart(mode gcMode, trigger gcTrigger) {
mp := acquirem()
if gp := getg(); gp == mp.g0 || mp.locks > 1 || mp.preemptoff != "" {
releasem(mp)
return
}
releasem(mp)
mp = nil
for trigger.test() && gosweepone() != ^uintptr(0) {
sweep.nbgsweep++
}
semacquire(&work.startSema)
if !trigger.test() {
semrelease(&work.startSema)
return
}
work.userForced = trigger.kind == gcTriggerAlways || trigger.kind == gcTriggerCycle
if mode == gcBackgroundMode {
if debug.gcstoptheworld == 1 {
mode = gcForceMode
} else if debug.gcstoptheworld == 2 {
mode = gcForceBlockMode
}
}
semacquire(&worldsema)
if trace.enabled {
traceGCStart()
}
if mode == gcBackgroundMode {
gcBgMarkStartWorkers()
}
gcResetMarkState()
work.stwprocs, work.maxprocs = gcprocs(), gomaxprocs
work.heap0 = atomic.Load64(&memstats.heap_live)
work.pauseNS = 0
work.mode = mode
now := nanotime()
work.tSweepTerm = now
work.pauseStart = now
systemstack(stopTheWorldWithSema)
systemstack(func() {
finishsweep_m()
})
clearpools()
work.cycles++
if mode == gcBackgroundMode {
gcController.startCycle()
work.heapGoal = memstats.next_gc
setGCPhase(_GCmark)
gcBgMarkPrepare()
gcMarkRootPrepare()
gcMarkTinyAllocs()
atomic.Store(&gcBlackenEnabled, 1)
gcController.markStartTime = now
systemstack(startTheWorldWithSema)
now = nanotime()
work.pauseNS += now - work.pauseStart
work.tMark = now
} else {
t := nanotime()
work.tMark, work.tMarkTerm = t, t
work.heapGoal = work.heap0
gcMarkTermination(memstats.triggerRatio)
}
semrelease(&work.startSema)
}
接下来一个个分析 gcStart 调用的函数, 建议配合上面的 “回收对象的流程” 中的图理解.
函数gcBgMarkStartWorkers用于启动后台标记任务, 先分别对每个 P 启动一个:
func gcBgMarkStartWorkers() {
for _, p := range &allp {
if p == nil || p.status == _Pdead {
break
}
if p.gcBgMarkWorker == 0 {
go gcBgMarkWorker(p)
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
}
}
}
这里虽然为每个 P 启动了一个后台标记任务, 但是可以同时工作的只有 25%, 这个逻辑在协程 M 获取 G 时调用的findRunnableGCWorker中:
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g {
if gcBlackenEnabled == 0 {
throw("gcControllerState.findRunnable: blackening not enabled")
}
if _p_.gcBgMarkWorker == 0 {
return nil
}
if !gcMarkWorkAvailable(_p_) {
return nil
}
decIfPositive := func(ptr *int64) bool {
if *ptr > 0 {
if atomic.Xaddint64(ptr, -1) >= 0 {
return true
}
atomic.Xaddint64(ptr, +1)
}
return false
}
if decIfPositive(&c.dedicatedMarkWorkersNeeded) {
_p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode
} else {
if !decIfPositive(&c.fractionalMarkWorkersNeeded) {
return nil
}
const gcForcePreemptNS = 0
now := nanotime() - gcController.markStartTime
then := now + gcForcePreemptNS
timeUsed := c.fractionalMarkTime + gcForcePreemptNS
if then > 0 && float64(timeUsed)/float64(then) > c.fractionalUtilizationGoal {
atomic.Xaddint64(&c.fractionalMarkWorkersNeeded, +1)
return nil
}
_p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode
}
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp
}
gcResetMarkState函数会重置标记相关的状态:
func gcResetMarkState() {
lock(&allglock)
for _, gp := range allgs {
gp.gcscandone = false
gp.gcscanvalid = false
gp.gcAssistBytes = 0
}
unlock(&allglock)
work.bytesMarked = 0
work.initialHeapLive = atomic.Load64(&memstats.heap_live)
work.markrootDone = false
}
stopTheWorldWithSema函数会停止整个世界, 这个函数必须在 g0 中运行:
func stopTheWorldWithSema() {
_g_ := getg()
if _g_.m.locks > 0 {
throw("stopTheWorld: holding locks")
}
lock(&sched.lock)
sched.stopwait = gomaxprocs
atomic.Store(&sched.gcwaiting, 1)
preemptall()
_g_.m.p.ptr().status = _Pgcstop
sched.stopwait--
for i := 0; i < int(gomaxprocs); i++ {
p := allp[i]
s := p.status
if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
if trace.enabled {
traceGoSysBlock(p)
traceProcStop(p)
}
p.syscalltick++
sched.stopwait--
}
}
for {
p := pidleget()
if p == nil {
break
}
p.status = _Pgcstop
sched.stopwait--
}
wait := sched.stopwait > 0
unlock(&sched.lock)
if wait {
for {
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}
bad := ""
if sched.stopwait != 0 {
bad = "stopTheWorld: not stopped (stopwait != 0)"
} else {
for i := 0; i < int(gomaxprocs); i++ {
p := allp[i]
if p.status != _Pgcstop {
bad = "stopTheWorld: not stopped (status != _Pgcstop)"
}
}
}
if atomic.Load(&freezing) != 0 {
lock(&deadlock)
lock(&deadlock)
}
if bad != "" {
throw(bad)
}
}
finishsweep_m函数会清扫上一轮 GC 未清扫的 span, 确保上一轮 GC 已完成:
func finishsweep_m() {
for sweepone() != ^uintptr(0) {
sweep.npausesweep++
}
nextMarkBitArenaEpoch()
}
clearpools函数会清理 sched.sudogcache 和 sched.deferpool, 让它们的内存可以被回收:
func clearpools() {
if poolcleanup != nil {
poolcleanup()
}
lock(&sched.sudoglock)
var sg, sgnext *sudog
for sg = sched.sudogcache; sg != nil; sg = sgnext {
sgnext = sg.next
sg.next = nil
}
sched.sudogcache = nil
unlock(&sched.sudoglock)
lock(&sched.deferlock)
for i := range sched.deferpool {
var d, dlink *_defer
for d = sched.deferpool[i]; d != nil; d = dlink {
dlink = d.link
d.link = nil
}
sched.deferpool[i] = nil
}
unlock(&sched.deferlock)
}
startCycle标记开始了新一轮的 GC:
func (c *gcControllerState) startCycle() {
c.scanWork = 0
c.bgScanCredit = 0
c.assistTime = 0
c.dedicatedMarkTime = 0
c.fractionalMarkTime = 0
c.idleMarkTime = 0
if memstats.gc_trigger <= heapminimum {
memstats.heap_marked = uint64(float64(memstats.gc_trigger) / (1 + memstats.triggerRatio))
}
memstats.next_gc = memstats.heap_marked + memstats.heap_marked*uint64(gcpercent)/100
if gcpercent < 0 {
memstats.next_gc = ^uint64(0)
}
if memstats.next_gc < memstats.heap_live+1024*1024 {
memstats.next_gc = memstats.heap_live + 1024*1024
}
totalUtilizationGoal := float64(gomaxprocs) * gcGoalUtilization
c.dedicatedMarkWorkersNeeded = int64(totalUtilizationGoal)
c.fractionalUtilizationGoal = totalUtilizationGoal - float64(c.dedicatedMarkWorkersNeeded)
if c.fractionalUtilizationGoal > 0 {
c.fractionalMarkWorkersNeeded = 1
} else {
c.fractionalMarkWorkersNeeded = 0
}
for _, p := range &allp {
if p == nil {
break
}
p.gcAssistTime = 0
}
c.revise()
if debug.gcpacertrace > 0 {
print("pacer: assist ratio=", c.assistWorkPerByte,
" (scan ", memstats.heap_scan>>20, " MB in ",
work.initialHeapLive>>20, "->",
memstats.next_gc>>20, " MB)",
" workers=", c.dedicatedMarkWorkersNeeded,
"+", c.fractionalMarkWorkersNeeded, "n")
}
}
setGCPhase函数会修改表示当前 GC 阶段的全局变量和是否开启写屏障的全局变量:
func setGCPhase(x uint32) {
atomic.Store(&gcphase, x)
writeBarrier.needed = gcphase == _GCmark || gcphase == _GCmarktermination
writeBarrier.enabled = writeBarrier.needed || writeBarrier.cgo
}
gcBgMarkPrepare函数会重置后台标记任务的计数:
func gcBgMarkPrepare() {
work.nproc = ^uint32(0)
work.nwait = ^uint32(0)
}
gcMarkRootPrepare函数会计算扫描根对象的任务数量:
func gcMarkRootPrepare() {
if gcphase == _GCmarktermination {
work.nFlushCacheRoots = int(gomaxprocs)
} else {
work.nFlushCacheRoots = 0
}
nBlocks := func(bytes uintptr) int {
return int((bytes + rootBlockBytes - 1) / rootBlockBytes)
}
work.nDataRoots = 0
work.nBSSRoots = 0
if !work.markrootDone {
for _, datap := range activeModules() {
nDataRoots := nBlocks(datap.edata - datap.data)
if nDataRoots > work.nDataRoots {
work.nDataRoots = nDataRoots
}
}
for _, datap := range activeModules() {
nBSSRoots := nBlocks(datap.ebss - datap.bss)
if nBSSRoots > work.nBSSRoots {
work.nBSSRoots = nBSSRoots
}
}
}
if !work.markrootDone {
work.nSpanRoots = mheap_.sweepSpans[mheap_.sweepgen/2%2].numBlocks()
work.nStackRoots = int(atomic.Loaduintptr(&allglen))
} else {
work.nSpanRoots = 0
work.nStackRoots = 0
if debug.gcrescanstacks > 0 {
work.nStackRoots = int(atomic.Loaduintptr(&allglen))
}
}
work.markrootNext = 0
work.markrootJobs = uint32(fixedRootCount + work.nFlushCacheRoots + work.nDataRoots + work.nBSSRoots + work.nSpanRoots + work.nStackRoots)
}
gcMarkTinyAllocs函数会标记所有 tiny alloc 等待合并的对象:
func gcMarkTinyAllocs() {
for _, p := range &allp {
if p == nil || p.status == _Pdead {
break
}
c := p.mcache
if c == nil || c.tiny == 0 {
continue
}
_, hbits, span, objIndex := heapBitsForObject(c.tiny, 0, 0)
gcw := &p.gcw
greyobject(c.tiny, 0, 0, hbits, span, gcw, objIndex)
if gcBlackenPromptly {
gcw.dispose()
}
}
}
startTheWorldWithSema函数会重新启动世界:
func startTheWorldWithSema() {
_g_ := getg()
_g_.m.locks++
gp := netpoll(false)
injectglist(gp)
add := needaddgcproc()
lock(&sched.lock)
procs := gomaxprocs
if newprocs != 0 {
procs = newprocs
newprocs = 0
}
p1 := procresize(procs)
sched.gcwaiting = 0
if sched.sysmonwait != 0 {
sched.sysmonwait = 0
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
for p1 != nil {
p := p1
p1 = p1.link.ptr()
if p.m != 0 {
mp := p.m.ptr()
p.m = 0
if mp.nextp != 0 {
throw("startTheWorld: inconsistent mp->nextp")
}
mp.nextp.set(p)
notewakeup(&mp.park)
} else {
newm(nil, p)
add = false
}
}
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
if add {
newm(mhelpgc, nil)
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt {
_g_.stackguard0 = stackPreempt
}
}
重启世界后各个 M 会重新开始调度, 调度时会优先使用上面提到的 findRunnableGCWorker 函数查找任务, 之后就有大约 25% 的 P 运行后台标记任务.
后台标记任务的函数是gcBgMarkWorker:
func gcBgMarkWorker(_p_ *p) {
gp := getg()
type parkInfo struct {
m muintptr
attach puintptr
}
gp.m.preemptoff = "GC worker init"
park := new(parkInfo)
gp.m.preemptoff = ""
park.m.set(acquirem())
park.attach.set(_p_)
notewakeup(&work.bgMarkReady)
for {
gopark(func(g *g, parkp unsafe.Pointer) bool {
park := (*parkInfo)(parkp)
releasem(park.m.ptr())
if park.attach != 0 {
p := park.attach.ptr()
park.attach.set(nil)
if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) {
return false
}
}
return true
}, unsafe.Pointer(park), "GC worker (idle)", traceEvGoBlock, 0)
if _p_.gcBgMarkWorker.ptr() != gp {
break
}
park.m.set(acquirem())
if gcBlackenEnabled == 0 {
throw("gcBgMarkWorker: blackening not enabled")
}
startTime := nanotime()
decnwait := atomic.Xadd(&work.nwait, -1)
if decnwait == work.nproc {
println("runtime: work.nwait=", decnwait, "work.nproc=", work.nproc)
throw("work.nwait was > work.nproc")
}
systemstack(func() {
casgstatus(gp, _Grunning, _Gwaiting)
switch _p_.gcMarkWorkerMode {
default:
throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
case gcMarkWorkerDedicatedMode:
gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
if gp.preempt {
lock(&sched.lock)
for {
gp, _ := runqget(_p_)
if gp == nil {
break
}
globrunqput(gp)
}
unlock(&sched.lock)
}
gcDrain(&_p_.gcw, gcDrainNoBlock|gcDrainFlushBgCredit)
case gcMarkWorkerFractionalMode:
gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
case gcMarkWorkerIdleMode:
gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
}
casgstatus(gp, _Gwaiting, _Grunning)
})
if gcBlackenPromptly {
_p_.gcw.dispose()
}
duration := nanotime() - startTime
switch _p_.gcMarkWorkerMode {
case gcMarkWorkerDedicatedMode:
atomic.Xaddint64(&gcController.dedicatedMarkTime, duration)
atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 1)
case gcMarkWorkerFractionalMode:
atomic.Xaddint64(&gcController.fractionalMarkTime, duration)
atomic.Xaddint64(&gcController.fractionalMarkWorkersNeeded, 1)
case gcMarkWorkerIdleMode:
atomic.Xaddint64(&gcController.idleMarkTime, duration)
}
incnwait := atomic.Xadd(&work.nwait, +1)
if incnwait > work.nproc {
println("runtime: p.gcMarkWorkerMode=", _p_.gcMarkWorkerMode,
"work.nwait=", incnwait, "work.nproc=", work.nproc)
throw("work.nwait > work.nproc")
}
if incnwait == work.nproc && !gcMarkWorkAvailable(nil) {
_p_.gcBgMarkWorker.set(nil)
releasem(park.m.ptr())
gcMarkDone()
park.m.set(acquirem())
park.attach.set(_p_)
}
}
}
gcDrain函数用于执行标记:
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
if !writeBarrier.needed {
throw("gcDrain phase incorrect")
}
gp := getg().m.curg
preemptible := flags&gcDrainUntilPreempt != 0
blocking := flags&(gcDrainUntilPreempt|gcDrainIdle|gcDrainNoBlock) == 0
flushBgCredit := flags&gcDrainFlushBgCredit != 0
idle := flags&gcDrainIdle != 0
initScanWork := gcw.scanWork
idleCheck := initScanWork + idleCheckThreshold
if work.markrootNext < work.markrootJobs {
for !(preemptible && gp.preempt) {
job := atomic.Xadd(&work.markrootNext, +1) - 1
if job >= work.markrootJobs {
break
}
markroot(gcw, job)
if idle && pollWork() {
goto done
}
}
}
for !(preemptible && gp.preempt) {
if work.full == 0 {
gcw.balance()
}
var b uintptr
if blocking {
b = gcw.get()
} else {
b = gcw.tryGetFast()
if b == 0 {
b = gcw.tryGet()
}
}
if b == 0 {
break
}
scanobject(b, gcw)
if gcw.scanWork >= gcCreditSlack {
atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
if flushBgCredit {
gcFlushBgCredit(gcw.scanWork - initScanWork)
initScanWork = 0
}
idleCheck -= gcw.scanWork
gcw.scanWork = 0
if idle && idleCheck <= 0 {
idleCheck += idleCheckThreshold
if pollWork() {
break
}
}
}
}
done:
if gcw.scanWork > 0 {
atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
if flushBgCredit {
gcFlushBgCredit(gcw.scanWork - initScanWork)
}
gcw.scanWork = 0
}
}
markroot函数用于执行根对象扫描工作:
func markroot(gcw *gcWork, i uint32) {
baseFlushCache := uint32(fixedRootCount)
baseData := baseFlushCache + uint32(work.nFlushCacheRoots)
baseBSS := baseData + uint32(work.nDataRoots)
baseSpans := baseBSS + uint32(work.nBSSRoots)
baseStacks := baseSpans + uint32(work.nSpanRoots)
end := baseStacks + uint32(work.nStackRoots)
switch {
case baseFlushCache <= i && i < baseData:
flushmcache(int(i - baseFlushCache))
case baseData <= i && i < baseBSS:
for _, datap := range activeModules() {
markrootBlock(datap.data, datap.edata-datap.data, datap.gcdatamask.bytedata, gcw, int(i-baseData))
}
case baseBSS <= i && i < baseSpans:
for _, datap := range activeModules() {
markrootBlock(datap.bss, datap.ebss-datap.bss, datap.gcbssmask.bytedata, gcw, int(i-baseBSS))
}
case i == fixedRootFinalizers:
if work.markrootDone {
break
}
for fb := allfin; fb != nil; fb = fb.alllink {
cnt := uintptr(atomic.Load(&fb.cnt))
scanblock(uintptr(unsafe.Pointer(&fb.fin[0])), cnt*unsafe.Sizeof(fb.fin[0]), &finptrmask[0], gcw)
}
case i == fixedRootFreeGStacks:
if !work.markrootDone {
systemstack(markrootFreeGStacks)
}
case baseSpans <= i && i < baseStacks:
markrootSpans(gcw, int(i-baseSpans))
default:
var gp *g
if baseStacks <= i && i < end {
gp = allgs[i-baseStacks]
} else {
throw("markroot: bad index")
}
status := readgstatus(gp)
if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 {
gp.waitsince = work.tstart
}
systemstack(func() {
userG := getg().m.curg
selfScan := gp == userG && readgstatus(userG) == _Grunning
if selfScan {
casgstatus(userG, _Grunning, _Gwaiting)
userG.waitreason = "garbage collection scan"
}
scang(gp, gcw)
if selfScan {
casgstatus(userG, _Gwaiting, _Grunning)
}
})
}
}
scang函数负责扫描 G 的栈:
func scang(gp *g, gcw *gcWork) {
gp.gcscandone = false
const yieldDelay = 10 * 1000
var nextYield int64
loop:
for i := 0; !gp.gcscandone; i++ {
switch s := readgstatus(gp); s {
default:
dumpgstatus(gp)
throw("stopg: invalid status")
case _Gdead:
gp.gcscandone = true
break loop
case _Gcopystack:
case _Grunnable, _Gsyscall, _Gwaiting:
if castogscanstatus(gp, s, s|_Gscan) {
if !gp.gcscandone {
scanstack(gp, gcw)
gp.gcscandone = true
}
restartg(gp)
break loop
}
case _Gscanwaiting:
case _Grunning:
if gp.preemptscan && gp.preempt && gp.stackguard0 == stackPreempt {
break
}
if castogscanstatus(gp, _Grunning, _Gscanrunning) {
if !gp.gcscandone {
gp.preemptscan = true
gp.preempt = true
gp.stackguard0 = stackPreempt
}
casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
}
}
if i == 0 {
nextYield = nanotime() + yieldDelay
}
if nanotime() < nextYield {
procyield(10)
} else {
osyield()
nextYield = nanotime() + yieldDelay/2
}
}
gp.preemptscan = false
}
设置 preemptscan 后, 在抢占 G 成功时会调用 scanstack 扫描它自己的栈, 具体代码在这里.
扫描栈用的函数是scanstack:
func scanstack(gp *g, gcw *gcWork) {
if gp.gcscanvalid {
return
}
if readgstatus(gp)&_Gscan == 0 {
print("runtime:scanstack: gp=", gp, ", goid=", gp.goid, ", gp->atomicstatus=", hex(readgstatus(gp)), "n")
throw("scanstack - bad status")
}
switch readgstatus(gp) &^ _Gscan {
default:
print("runtime: gp=", gp, ", goid=", gp.goid, ", gp->atomicstatus=", readgstatus(gp), "n")
throw("mark - bad status")
case _Gdead:
return
case _Grunning:
print("runtime: gp=", gp, ", goid=", gp.goid, ", gp->atomicstatus=", readgstatus(gp), "n")
throw("scanstack: goroutine not stopped")
case _Grunnable, _Gsyscall, _Gwaiting:
}
if gp == getg() {
throw("can't scan our own stack")
}
mp := gp.m
if mp != nil && mp.helpgc != 0 {
throw("can't scan gchelper stack")
}
if !work.markrootDone {
shrinkstack(gp)
}
var cache pcvalueCache
scanframe := func(frame *stkframe, unused unsafe.Pointer) bool {
scanframeworker(frame, &cache, gcw)
return true
}
gentraceback(^uintptr(0), ^uintptr(0), 0, gp, 0, nil, 0x7fffffff, scanframe, nil, 0)
tracebackdefers(gp, scanframe, nil)
gp.gcscanvalid = true
}
scanblock函数是一个通用的扫描函数, 扫描全局变量和栈空间都会用它, 和 scanobject 不同的是 bitmap 需要手动传入:
func scanblock(b0, n0 uintptr, ptrmask *uint8, gcw *gcWork) {
b := b0
n := n0
arena_start := mheap_.arena_start
arena_used := mheap_.arena_used
for i := uintptr(0); i < n; {
bits := uint32(*addb(ptrmask, i/(sys.PtrSize*8)))
if bits == 0 {
i += sys.PtrSize * 8
continue
}
for j := 0; j < 8 && i < n; j++ {
if bits&1 != 0 {
obj := *(*uintptr)(unsafe.Pointer(b + i))
if obj != 0 && arena_start <= obj && obj < arena_used {
if obj, hbits, span, objIndex := heapBitsForObject(obj, b, i); obj != 0 {
greyobject(obj, b, i, hbits, span, gcw, objIndex)
}
}
}
bits >>= 1
i += sys.PtrSize
}
}
}
greyobject用于标记一个对象存活, 并把它加到标记队列 (该对象变为灰色):
func greyobject(obj, base, off uintptr, hbits heapBits, span *mspan, gcw *gcWork, objIndex uintptr) {
if obj&(sys.PtrSize-1) != 0 {
throw("greyobject: obj not pointer-aligned")
}
mbits := span.markBitsForIndex(objIndex)
if useCheckmark {
if !mbits.isMarked() {
printlock()
print("runtime:greyobject: checkmarks finds unexpected unmarked object obj=", hex(obj), "n")
print("runtime: found obj at *(", hex(base), "+", hex(off), ")n")
gcDumpObject("base", base, off)
gcDumpObject("obj", obj, ^uintptr(0))
getg().m.traceback = 2
throw("checkmark found unmarked object")
}
if hbits.isCheckmarked(span.elemsize) {
return
}
hbits.setCheckmarked(span.elemsize)
if !hbits.isCheckmarked(span.elemsize) {
throw("setCheckmarked and isCheckmarked disagree")
}
} else {
if debug.gccheckmark > 0 && span.isFree(objIndex) {
print("runtime: marking free object ", hex(obj), " found at *(", hex(base), "+", hex(off), ")n")
gcDumpObject("base", base, off)
gcDumpObject("obj", obj, ^uintptr(0))
getg().m.traceback = 2
throw("marking free object")
}
if mbits.isMarked() {
return
}
atomic.Or8(mbits.bytep, mbits.mask)
if span.spanclass.noscan() {
gcw.bytesMarked += uint64(span.elemsize)
return
}
}
if !gcw.putFast(obj) {
gcw.put(obj)
}
}
gcDrain 函数扫描完根对象, 就会开始消费标记队列, 对从标记队列中取出的对象调用scanobject函数:
func scanobject(b uintptr, gcw *gcWork) {
arena_start := mheap_.arena_start
arena_used := mheap_.arena_used
hbits := heapBitsForAddr(b)
s := spanOfUnchecked(b)
n := s.elemsize
if n == 0 {
throw("scanobject n == 0")
}
if n > maxObletBytes {
if b == s.base() {
if s.spanclass.noscan() {
gcw.bytesMarked += uint64(n)
return
}
for oblet := b + maxObletBytes; oblet < s.base()+s.elemsize; oblet += maxObletBytes {
if !gcw.putFast(oblet) {
gcw.put(oblet)
}
}
}
n = s.base() + s.elemsize - b
if n > maxObletBytes {
n = maxObletBytes
}
}
var i uintptr
for i = 0; i < n; i += sys.PtrSize {
if i != 0 {
hbits = hbits.next()
}
bits := hbits.bits()
if i != 1*sys.PtrSize && bits&bitScan == 0 {
break
}
if bits&bitPointer == 0 {
continue
}
obj := *(*uintptr)(unsafe.Pointer(b + i))
if obj != 0 && arena_start <= obj && obj < arena_used && obj-b >= n {
if obj, hbits, span, objIndex := heapBitsForObject(obj, b, i); obj != 0 {
greyobject(obj, b, i, hbits, span, gcw, objIndex)
}
}
}
gcw.bytesMarked += uint64(n)
gcw.scanWork += int64(i)
}
在所有后台标记任务都把标记队列消费完毕时, 会执行gcMarkDone函数准备进入完成标记阶段 (mark termination):
在并行 GC 中 gcMarkDone 会被执行两次, 第一次会禁止本地标记队列然后重新开始后台标记任务, 第二次会进入完成标记阶段 (mark termination)。
func gcMarkDone() {
top:
semacquire(&work.markDoneSema)
if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {
semrelease(&work.markDoneSema)
return
}
atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, -0xffffffff)
atomic.Xaddint64(&gcController.fractionalMarkWorkersNeeded, -0xffffffff)
if !gcBlackenPromptly {
gcBlackenPromptly = true
atomic.Xadd(&work.nwait, -1)
semrelease(&work.markDoneSema)
systemstack(func() {
forEachP(func(_p_ *p) {
_p_.gcw.dispose()
})
})
gcMarkRootCheck()
atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 0xffffffff)
atomic.Xaddint64(&gcController.fractionalMarkWorkersNeeded, 0xffffffff)
incnwait := atomic.Xadd(&work.nwait, +1)
if incnwait == work.nproc && !gcMarkWorkAvailable(nil) {
goto top
}
} else {
now := nanotime()
work.tMarkTerm = now
work.pauseStart = now
getg().m.preemptoff = "gcing"
systemstack(stopTheWorldWithSema)
work.markrootDone = true
atomic.Store(&gcBlackenEnabled, 0)
gcWakeAllAssists()
semrelease(&work.markDoneSema)
nextTriggerRatio := gcController.endCycle()
gcMarkTermination(nextTriggerRatio)
}
}
gcMarkTermination函数会进入完成标记阶段:
func gcMarkTermination(nextTriggerRatio float64) {
atomic.Store(&gcBlackenEnabled, 0)
gcBlackenPromptly = false
setGCPhase(_GCmarktermination)
work.heap1 = memstats.heap_live
startTime := nanotime()
mp := acquirem()
mp.preemptoff = "gcing"
_g_ := getg()
_g_.m.traceback = 2
gp := _g_.m.curg
casgstatus(gp, _Grunning, _Gwaiting)
gp.waitreason = "garbage collection"
systemstack(func() {
gcMark(startTime)
})
systemstack(func() {
work.heap2 = work.bytesMarked
if debug.gccheckmark > 0 {
gcResetMarkState()
initCheckmarks()
gcMark(startTime)
clearCheckmarks()
}
setGCPhase(_GCoff)
gcSweep(work.mode)
if debug.gctrace > 1 {
startTime = nanotime()
gcResetMarkState()
finishsweep_m()
setGCPhase(_GCmarktermination)
gcMark(startTime)
setGCPhase(_GCoff)
gcSweep(work.mode)
}
})
_g_.m.traceback = 0
casgstatus(gp, _Gwaiting, _Grunning)
if trace.enabled {
traceGCDone()
}
mp.preemptoff = ""
if gcphase != _GCoff {
throw("gc done but gcphase != _GCoff")
}
gcSetTriggerRatio(nextTriggerRatio)
now := nanotime()
sec, nsec, _ := time_now()
unixNow := sec*1e9 + int64(nsec)
work.pauseNS += now - work.pauseStart
work.tEnd = now
atomic.Store64(&memstats.last_gc_unix, uint64(unixNow))
atomic.Store64(&memstats.last_gc_nanotime, uint64(now))
memstats.pause_ns[memstats.numgc%uint32(len(memstats.pause_ns))] = uint64(work.pauseNS)
memstats.pause_end[memstats.numgc%uint32(len(memstats.pause_end))] = uint64(unixNow)
memstats.pause_total_ns += uint64(work.pauseNS)
sweepTermCpu := int64(work.stwprocs) * (work.tMark - work.tSweepTerm)
markCpu := gcController.assistTime + gcController.dedicatedMarkTime + gcController.fractionalMarkTime
markTermCpu := int64(work.stwprocs) * (work.tEnd - work.tMarkTerm)
cycleCpu := sweepTermCpu + markCpu + markTermCpu
work.totaltime += cycleCpu
totalCpu := sched.totaltime + (now-sched.procresizetime)*int64(gomaxprocs)
memstats.gc_cpu_fraction = float64(work.totaltime) / float64(totalCpu)
sweep.nbgsweep = 0
sweep.npausesweep = 0
if work.userForced {
memstats.numforcedgc++
}
lock(&work.sweepWaiters.lock)
memstats.numgc++
injectglist(work.sweepWaiters.head.ptr())
work.sweepWaiters.head = 0
unlock(&work.sweepWaiters.lock)
mProf_NextCycle()
systemstack(startTheWorldWithSema)
mProf_Flush()
prepareFreeWorkbufs()
systemstack(freeStackSpans)
if debug.gctrace > 0 {
util := int(memstats.gc_cpu_fraction * 100)
var sbuf [24]byte
printlock()
print("gc ", memstats.numgc,
" @", string(itoaDiv(sbuf[:], uint64(work.tSweepTerm-runtimeInitTime)/1e6, 3)), "s ",
util, "%: ")
prev := work.tSweepTerm
for i, ns := range []int64{work.tMark, work.tMarkTerm, work.tEnd} {
if i != 0 {
print("+")
}
print(string(fmtNSAsMS(sbuf[:], uint64(ns-prev))))
prev = ns
}
print(" ms clock, ")
for i, ns := range []int64{sweepTermCpu, gcController.assistTime, gcController.dedicatedMarkTime + gcController.fractionalMarkTime, gcController.idleMarkTime, markTermCpu} {
if i == 2 || i == 3 {
print("/")
} else if i != 0 {
print("+")
}
print(string(fmtNSAsMS(sbuf[:], uint64(ns))))
}
print(" ms cpu, ",
work.heap0>>20, "->", work.heap1>>20, "->", work.heap2>>20, " MB, ",
work.heapGoal>>20, " MB goal, ",
work.maxprocs, " P")
if work.userForced {
print(" (forced)")
}
print("n")
printunlock()
}
semrelease(&worldsema)
releasem(mp)
mp = nil
if !concurrentSweep {
Gosched()
}
}
gcSweep函数会唤醒后台清扫任务:
后台清扫任务会在程序启动时调用的gcenable函数中启动.
func gcSweep(mode gcMode) {
if gcphase != _GCoff {
throw("gcSweep being done but phase is not GCoff")
}
lock(&mheap_.lock)
mheap_.sweepgen += 2
mheap_.sweepdone = 0
if mheap_.sweepSpans[mheap_.sweepgen/2%2].index != 0 {
throw("non-empty swept list")
}
mheap_.pagesSwept = 0
unlock(&mheap_.lock)
if !_ConcurrentSweep || mode == gcForceBlockMode {
lock(&mheap_.lock)
mheap_.sweepPagesPerByte = 0
unlock(&mheap_.lock)
for sweepone() != ^uintptr(0) {
sweep.npausesweep++
}
prepareFreeWorkbufs()
for freeSomeWbufs(false) {
}
mProf_NextCycle()
mProf_Flush()
return
}
lock(&sweep.lock)
if sweep.parked {
sweep.parked = false
ready(sweep.g, 0, true)
}
unlock(&sweep.lock)
}
后台清扫任务的函数是bgsweep:
func bgsweep(c chan int) {
sweep.g = getg()
lock(&sweep.lock)
sweep.parked = true
c <- 1
goparkunlock(&sweep.lock, "GC sweep wait", traceEvGoBlock, 1)
for {
for gosweepone() != ^uintptr(0) {
sweep.nbgsweep++
Gosched()
}
for freeSomeWbufs(true) {
Gosched()
}
lock(&sweep.lock)
if !gosweepdone() {
unlock(&sweep.lock)
continue
}
sweep.parked = true
goparkunlock(&sweep.lock, "GC sweep wait", traceEvGoBlock, 1)
}
}
gosweepone函数会从 sweepSpans 中取出单个 span 清扫:
func gosweepone() uintptr {
var ret uintptr
systemstack(func() {
ret = sweepone()
})
return ret
}
sweepone函数如下:
func sweepone() uintptr {
_g_ := getg()
sweepRatio := mheap_.sweepPagesPerByte
_g_.m.locks++
if atomic.Load(&mheap_.sweepdone) != 0 {
_g_.m.locks--
return ^uintptr(0)
}
atomic.Xadd(&mheap_.sweepers, +1)
npages := ^uintptr(0)
sg := mheap_.sweepgen
for {
s := mheap_.sweepSpans[1-sg/2%2].pop()
if s == nil {
atomic.Store(&mheap_.sweepdone, 1)
break
}
if s.state != mSpanInUse {
if s.sweepgen != sg {
print("runtime: bad span s.state=", s.state, " s.sweepgen=", s.sweepgen, " sweepgen=", sg, "n")
throw("non in-use span in unswept list")
}
continue
}
if s.sweepgen != sg-2 || !atomic.Cas(&s.sweepgen, sg-2, sg-1) {
continue
}
npages = s.npages
if !s.sweep(false) {
npages = 0
}
break
}
if atomic.Xadd(&mheap_.sweepers, -1) == 0 && atomic.Load(&mheap_.sweepdone) != 0 {
if debug.gcpacertrace > 0 {
print("pacer: sweep done at heap size ", memstats.heap_live>>20, "MB; allocated ", (memstats.heap_live-mheap_.sweepHeapLiveBasis)>>20, "MB during sweep; swept ", mheap_.pagesSwept, " pages at ", sweepRatio, " pages/byten")
}
}
_g_.m.locks--
return npages
}
span 的sweep函数用于清扫单个 span:
func (s *mspan) sweep(preserve bool) bool {
_g_ := getg()
if _g_.m.locks == 0 && _g_.m.mallocing == 0 && _g_ != _g_.m.g0 {
throw("MSpan_Sweep: m is not locked")
}
sweepgen := mheap_.sweepgen
if s.state != mSpanInUse || s.sweepgen != sweepgen-1 {
print("MSpan_Sweep: state=", s.state, " sweepgen=", s.sweepgen, " mheap.sweepgen=", sweepgen, "n")
throw("MSpan_Sweep: bad span state")
}
if trace.enabled {
traceGCSweepSpan(s.npages * _PageSize)
}
atomic.Xadd64(&mheap_.pagesSwept, int64(s.npages))
spc := s.spanclass
size := s.elemsize
res := false
c := _g_.m.mcache
freeToHeap := false
specialp := &s.specials
special := *specialp
for special != nil {
objIndex := uintptr(special.offset) / size
p := s.base() + objIndex*size
mbits := s.markBitsForIndex(objIndex)
if !mbits.isMarked() {
hasFin := false
endOffset := p - s.base() + size
for tmp := special; tmp != nil && uintptr(tmp.offset) < endOffset; tmp = tmp.next {
if tmp.kind == _KindSpecialFinalizer {
mbits.setMarkedNonAtomic()
hasFin = true
break
}
}
for special != nil && uintptr(special.offset) < endOffset {
p := s.base() + uintptr(special.offset)
if special.kind == _KindSpecialFinalizer || !hasFin {
y := special
special = special.next
*specialp = special
freespecial(y, unsafe.Pointer(p), size)
} else {
specialp = &special.next
special = *specialp
}
}
} else {
specialp = &special.next
special = *specialp
}
}
if debug.allocfreetrace != 0 || raceenabled || msanenabled {
mbits := s.markBitsForBase()
abits := s.allocBitsForIndex(0)
for i := uintptr(0); i < s.nelems; i++ {
if !mbits.isMarked() && (abits.index < s.freeindex || abits.isMarked()) {
x := s.base() + i*s.elemsize
if debug.allocfreetrace != 0 {
tracefree(unsafe.Pointer(x), size)
}
if raceenabled {
racefree(unsafe.Pointer(x), size)
}
if msanenabled {
msanfree(unsafe.Pointer(x), size)
}
}
mbits.advance()
abits.advance()
}
}
nalloc := uint16(s.countAlloc())
if spc.sizeclass() == 0 && nalloc == 0 {
s.needzero = 1
freeToHeap = true
}
nfreed := s.allocCount - nalloc
if nalloc > s.allocCount {
print("runtime: nelems=", s.nelems, " nalloc=", nalloc, " previous allocCount=", s.allocCount, " nfreed=", nfreed, "n")
throw("sweep increased allocation count")
}
s.allocCount = nalloc
wasempty := s.nextFreeIndex() == s.nelems
s.freeindex = 0
if trace.enabled {
getg().m.p.ptr().traceReclaimed += uintptr(nfreed) * s.elemsize
}
s.allocBits = s.gcmarkBits
s.gcmarkBits = newMarkBits(s.nelems)
s.refillAllocCache(0)
if freeToHeap || nfreed == 0 {
if s.state != mSpanInUse || s.sweepgen != sweepgen-1 {
print("MSpan_Sweep: state=", s.state, " sweepgen=", s.sweepgen, " mheap.sweepgen=", sweepgen, "n")
throw("MSpan_Sweep: bad span state after sweep")
}
atomic.Store(&s.sweepgen, sweepgen)
}
if nfreed > 0 && spc.sizeclass() != 0 {
c.local_nsmallfree[spc.sizeclass()] += uintptr(nfreed)
res = mheap_.central[spc].mcentral.freeSpan(s, preserve, wasempty)
} else if freeToHeap {
if debug.efence > 0 {
s.limit = 0
sysFault(unsafe.Pointer(s.base()), size)
} else {
mheap_.freeSpan(s, 1)
}
c.local_nlargefree++
c.local_largefree += size
res = true
}
if !res {
mheap_.sweepSpans[sweepgen/2%2].push(s)
}
return res
}
从 bgsweep 和前面的分配器可以看出扫描阶段的工作是十分懒惰 (lazy) 的,
实际可能会出现前一阶段的扫描还未完成, 就需要开始新一轮的 GC 的情况,
所以每一轮 GC 开始之前都需要完成前一轮 GC 的扫描工作 (Sweep Termination 阶段).
GC 的整个流程都分析完毕了, 最后贴上写屏障函数writebarrierptr的实现:
func writebarrierptr(dst *uintptr, src uintptr) {
if writeBarrier.cgo {
cgoCheckWriteBarrier(dst, src)
}
if !writeBarrier.needed {
*dst = src
return
}
if src != 0 && src < minPhysPageSize {
systemstack(func() {
print("runtime: writebarrierptr *", dst, " = ", hex(src), "n")
throw("bad pointer in write barrier")
})
}
writebarrierptr_prewrite1(dst, src)
*dst = src
}
writebarrierptr_prewrite1函数如下:
func writebarrierptr_prewrite1(dst *uintptr, src uintptr) {
mp := acquirem()
if mp.inwb || mp.dying > 0 {
releasem(mp)
return
}
systemstack(func() {
if mp.p == 0 && memstats.enablegc && !mp.inwb && inheap(src) {
throw("writebarrierptr_prewrite1 called with mp.p == nil")
}
mp.inwb = true
gcmarkwb_m(dst, src)
})
mp.inwb = false
releasem(mp)
}
gcmarkwb_m函数如下:
func gcmarkwb_m(slot *uintptr, ptr uintptr) {
if writeBarrier.needed {
if slot1 := uintptr(unsafe.Pointer(slot)); slot1 >= minPhysPageSize {
if optr := *slot; optr != 0 {
shade(optr)
}
}
if ptr != 0 && inheap(ptr) {
shade(ptr)
}
}
}
shade函数如下:
func shade(b uintptr) {
if obj, hbits, span, objIndex := heapBitsForObject(b, 0, 0); obj != 0 {
gcw := &getg().m.p.ptr().gcw
greyobject(obj, 0, 0, hbits, span, gcw, objIndex)
if gcphase == _GCmarktermination || gcBlackenPromptly {
gcw.dispose()
}
}
}
https://github.com/golang/go
https://making.pusher.com/golangs-real-time-gc-in-theory-and-practice
https://github.com/golang/proposal/blob/master/design/17503-eliminate-rescan.md
https://golang.org/s/go15gcpacing
https://golang.org/ref/mem
https://talks.golang.org/2015/go-gc.pdf
https://docs.google.com/document/d/1ETuA2IOmnaQ4j81AtTGT40Y4_Jr6_IDASEKg0t0dBR8/edit#heading=h.x4kziklnb8fr
https://go-review.googlesource.com/c/go/+/21503
http://www.cnblogs.com/diegodu/p/5803202.html
http://legendtkl.com/2017/04/28/golang-gc
https://lengzzz.com/note/gc-in-golang
因为我之前已经对 CoreCLR 的 GC 做过分析 (看这一篇和这一篇), 这里我可以简单的对比一下 CoreCLR 和 GO 的 GC 实现:
CoreCLR 的对象带有类型信息, GO 的对象不带, 而是通过 bitmap 区域记录哪些地方包含指针
CoreCLR 分配对象的速度明显更快, GO 分配对象需要查找 span 和写入 bitmap 区域
CoreCLR 的收集器需要做的工作比 GO 多很多
- CoreCLR 不同大小的对象都会放在一个 segment 中, 只能线性扫描
- CoreCLR 判断对象引用要访问类型信息, 而 go 只需要访问 bitmap
- CoreCLR 清扫时要一个个去标记为自由对象, 而 go 只需要切换 allocBits
CoreCLR 的停顿时间比 GO 要长
- 虽然 CoreCLR 支持并行 GC, 但是没有 GO 彻底, GO 连扫描根对象都不需要完全停顿
CoreCLR 支持分代 GC
- 虽然 Full GC 时 CoreCLR 的效率不如 GO, 但是 CoreCLR 可以在大部分时候只扫描第 0 和第 1 代的对象
- 因为支持分代 GC, 通常 CoreCLR 花在 GC 上的 CPU 时间会比 GO 要少
CoreCLR 的分配器和收集器通常比 GO 要高效, 也就是说 CoreCLR 会有更高的吞吐量.
但 CoreCLR 的最大停顿时间不如 GO 短, 这是因为 GO 的 GC 整个设计都是为了减少停顿时间.
现在分布式计算和横向扩展越来越流行,
比起追求单机吞吐量, 追求低延迟然后让分布式解决吞吐量问题无疑是更明智的选择,
GO 的设计目标使得它比其他语言都更适合编写网络服务程序.
https://segmentfault.com/a/1190000037770285?utm_source=tag-newest