对象的创建和销毁会消耗一定的系统资源(内存,gc 等),过多的创建销毁对象会带来内存不稳定与更长的 gc 停顿,因为 go 的 gc 不存在分代,因而更加不擅长处理这种问题。因而 go 早早就推出 Pool 包用于缓解这种情况。Pool 用于核心的功能就是 Put 和 Get。当我们需要一个对象的时候通过 Get 获取一个,创建的对象也可以 Put 放进池子里,通过这种方式可以反复利用现有对象,这样 gc 就不用高频的促发内存 gc 了。
结构
type Pool struct {
noCopy noCopy
local unsafe.Pointer
localSize uintptr
New func() interface{}
}
创建时候指定 New 方法用于创建默认对象,local,localSize 会在随后用到的时候生成. local 是一个 poolLocalInternal 的切片指针。
type poolLocalInternal struct {
private interface{}
shared []interface{}
Mutex
}
当不同的 p 调用 Pool 时, 每个 p 都会在 local 上分配这样一个 poolLocal,索引值就是 p 的 id。 private 存放的对象只能由创建的 p 读写,shared 则会在多个 p 之间共享。
PUT
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
Put 先要通过 pin 函数获取当前 Pool 对应的 pid 位置上的 localPool,然后检查 private 是否存在,存在则设置到 private 上,如果不存在就追加到 shared 尾部。
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize)
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
return p.pinSlow()
}
pin 函数先通过自旋加锁 (可以避免 p 自身发生并发),在检查本地 local 切片的 size,size 大于当前 pid 则使用 pid 去本地 local 切片上索引到 localpool 对象,否则就要走 pinSlow 对象创建本地 localPool 切片了.
func (p *Pool) pinSlow() *poolLocal {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
return &local[pid]
}
pinShow 先要取消自旋锁,因为后面的 lock 内部也会尝试自旋锁,下面可能会操作 allpool 因而这里需要使用互斥锁 allPoolsMu,然后又加上自旋锁,(这里注释说不会发生 poolCleanup,但是查看代码 gcstart 只是查看了当前 m 的 lock 状态,然而避免不了其他 m 触发的 gc,尚存疑),这里会再次尝试之前的操作,因为可能在 unpin,pin 之间有并发产生了 poolocal,确认本地 local 切片是空的才会生成一个新的 pool。后面是创建 Pool 上的 localPool 切片,runtime.GOMAXPROCS 这里的作用是返回 p 的数量,用于确定 pool 的 localpool 的数量.
GET
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
GET 先调用 pin 获取本地 local,这个具体流程和上面一样了,如果当前 private 存在返回 private 上面的对象,如果不存在就从 shared 查找,存在返回尾部对象,反之就要从其他的 p 的 localPool 里面偷了。
func (p *Pool) getSlow() (x interface{}) {
size := atomic.LoadUintptr(&p.localSize)
local := p.local
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
首先就要获取当前 size,用于轮询 p 的 local,这里的查询顺序不是从 0 开始,而是是从当前 p 的位置往后查一圈。查到依次检查每个 p 的 shared 上是否存在对象,如果存在就获取末尾的值。 如果所有 p 的 poollocal 都是空的,那么初始化的 New 函数就起作用了,调用这个 New 函数创建一个新的对象出来。
清理
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
pool 对象的清理是在每次 gc 之前清理,通过 runtime_registerPoolCleanup 函数注册一个上面的 poolCleanup 对象,内部会把这个函数设置到 clearpool 函数上面,然后每次 gc 之前会调用 clearPool 来取消所有 pool 的引用,重置所有的 Pool。代码很简单就是轮询一边设置 nil,然后取消所有 poollocal,pool 引用。方法简单粗暴。由于 clearPool 是在 STW 中调用的,如果 Pool 存在大量对象会拉长 STW 的时间,在已经有提案来修复这个问题了 (CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]
本文发表于 2019 年 08 月 11 日 11:00
(c) 注:本文转载自https://my.oschina.net/hunjixin/blog/3086168,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.
阅读 547 讨论 0 喜欢 0
https://www.chinacion.cn/article/7090.html