对象的创建和销毁会消耗一定的系统资源(内存,gc 等),过多的创建销毁对象会带来内存不稳定与更长的 gc 停顿,因为 go 的 gc 不存在分代,因而更加不擅长处理这种问题。因而 go 早早就推出 Pool 包用于缓解这种情况。Pool 用于核心的功能就是 Put 和 Get。当我们需要一个对象的时候通过 Get 获取一个,创建的对象也可以 Put 放进池子里,通过这种方式可以反复利用现有对象,这样 gc 就不用高频的促发内存 gc 了。

结构

  1. type Pool struct {
  2. noCopy noCopy
  3. local unsafe.Pointer
  4. localSize uintptr
  5. New func() interface{}
  6. }

创建时候指定 New 方法用于创建默认对象,local,localSize 会在随后用到的时候生成. local 是一个 poolLocalInternal 的切片指针。

  1. type poolLocalInternal struct {
  2. private interface{}
  3. shared []interface{}
  4. Mutex
  5. }

当不同的 p 调用 Pool 时, 每个 p 都会在 local 上分配这样一个 poolLocal,索引值就是 p 的 id。 private 存放的对象只能由创建的 p 读写,shared 则会在多个 p 之间共享。 sync.Pool实现原理 技术指南 - 图1

PUT

  1. func (p *Pool) Put(x interface{}) {
  2. if x == nil {
  3. return
  4. }
  5. if race.Enabled {
  6. if fastrand()%4 == 0 {
  7. return
  8. }
  9. race.ReleaseMerge(poolRaceAddr(x))
  10. race.Disable()
  11. }
  12. l := p.pin()
  13. if l.private == nil {
  14. l.private = x
  15. x = nil
  16. }
  17. runtime_procUnpin()
  18. if x != nil {
  19. l.Lock()
  20. l.shared = append(l.shared, x)
  21. l.Unlock()
  22. }
  23. if race.Enabled {
  24. race.Enable()
  25. }
  26. }

Put 先要通过 pin 函数获取当前 Pool 对应的 pid 位置上的 localPool,然后检查 private 是否存在,存在则设置到 private 上,如果不存在就追加到 shared 尾部。

  1. func (p *Pool) pin() *poolLocal {
  2. pid := runtime_procPin()
  3. s := atomic.LoadUintptr(&p.localSize)
  4. l := p.local
  5. if uintptr(pid) < s {
  6. return indexLocal(l, pid)
  7. }
  8. return p.pinSlow()
  9. }

pin 函数先通过自旋加锁 (可以避免 p 自身发生并发),在检查本地 local 切片的 size,size 大于当前 pid 则使用 pid 去本地 local 切片上索引到 localpool 对象,否则就要走 pinSlow 对象创建本地 localPool 切片了.

  1. func (p *Pool) pinSlow() *poolLocal {
  2. runtime_procUnpin()
  3. allPoolsMu.Lock()
  4. defer allPoolsMu.Unlock()
  5. pid := runtime_procPin()
  6. s := p.localSize
  7. l := p.local
  8. if uintptr(pid) < s {
  9. return indexLocal(l, pid)
  10. }
  11. if p.local == nil {
  12. allPools = append(allPools, p)
  13. }
  14. size := runtime.GOMAXPROCS(0)
  15. local := make([]poolLocal, size)
  16. atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
  17. atomic.StoreUintptr(&p.localSize, uintptr(size))
  18. return &local[pid]
  19. }

pinShow 先要取消自旋锁,因为后面的 lock 内部也会尝试自旋锁,下面可能会操作 allpool 因而这里需要使用互斥锁 allPoolsMu,然后又加上自旋锁,(这里注释说不会发生 poolCleanup,但是查看代码 gcstart 只是查看了当前 m 的 lock 状态,然而避免不了其他 m 触发的 gc,尚存疑),这里会再次尝试之前的操作,因为可能在 unpin,pin 之间有并发产生了 poolocal,确认本地 local 切片是空的才会生成一个新的 pool。后面是创建 Pool 上的 localPool 切片,runtime.GOMAXPROCS 这里的作用是返回 p 的数量,用于确定 pool 的 localpool 的数量.

GET

  1. func (p *Pool) Get() interface{} {
  2. if race.Enabled {
  3. race.Disable()
  4. }
  5. l := p.pin()
  6. x := l.private
  7. l.private = nil
  8. runtime_procUnpin()
  9. if x == nil {
  10. l.Lock()
  11. last := len(l.shared) - 1
  12. if last >= 0 {
  13. x = l.shared[last]
  14. l.shared = l.shared[:last]
  15. }
  16. l.Unlock()
  17. if x == nil {
  18. x = p.getSlow()
  19. }
  20. }
  21. if race.Enabled {
  22. race.Enable()
  23. if x != nil {
  24. race.Acquire(poolRaceAddr(x))
  25. }
  26. }
  27. if x == nil && p.New != nil {
  28. x = p.New()
  29. }
  30. return x
  31. }

GET 先调用 pin 获取本地 local,这个具体流程和上面一样了,如果当前 private 存在返回 private 上面的对象,如果不存在就从 shared 查找,存在返回尾部对象,反之就要从其他的 p 的 localPool 里面偷了。

  1. func (p *Pool) getSlow() (x interface{}) {
  2. size := atomic.LoadUintptr(&p.localSize)
  3. local := p.local
  4. pid := runtime_procPin()
  5. runtime_procUnpin()
  6. for i := 0; i < int(size); i++ {
  7. l := indexLocal(local, (pid+i+1)%int(size))
  8. l.Lock()
  9. last := len(l.shared) - 1
  10. if last >= 0 {
  11. x = l.shared[last]
  12. l.shared = l.shared[:last]
  13. l.Unlock()
  14. break
  15. }
  16. l.Unlock()
  17. }
  18. return x
  19. }

首先就要获取当前 size,用于轮询 p 的 local,这里的查询顺序不是从 0 开始,而是是从当前 p 的位置往后查一圈。查到依次检查每个 p 的 shared 上是否存在对象,如果存在就获取末尾的值。 如果所有 p 的 poollocal 都是空的,那么初始化的 New 函数就起作用了,调用这个 New 函数创建一个新的对象出来。

清理

  1. func poolCleanup() {
  2. for i, p := range allPools {
  3. allPools[i] = nil
  4. for i := 0; i < int(p.localSize); i++ {
  5. l := indexLocal(p.local, i)
  6. l.private = nil
  7. for j := range l.shared {
  8. l.shared[j] = nil
  9. }
  10. l.shared = nil
  11. }
  12. p.local = nil
  13. p.localSize = 0
  14. }
  15. allPools = []*Pool{}
  16. }

pool 对象的清理是在每次 gc 之前清理,通过 runtime_registerPoolCleanup 函数注册一个上面的 poolCleanup 对象,内部会把这个函数设置到 clearpool 函数上面,然后每次 gc 之前会调用 clearPool 来取消所有 pool 的引用,重置所有的 Pool。代码很简单就是轮询一边设置 nil,然后取消所有 poollocal,pool 引用。方法简单粗暴。由于 clearPool 是在 STW 中调用的,如果 Pool 存在大量对象会拉长 STW 的时间,在已经有提案来修复这个问题了 (CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]

本文发表于 2019 年 08 月 11 日 11:00
(c) 注:本文转载自https://my.oschina.net/hunjixin/blog/3086168,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 547 讨论 0 喜欢 0
https://www.chinacion.cn/article/7090.html