pool 关键作用:

  1. 减轻 GC 的压力。
  2. 复用对象内存。有时不一定希望复用内存,单纯是想减轻 GC 压力也可主动给 pool 塞对象。

Pool’s purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists.

原理简述

sync.Pool 就是围绕 New 字段、Get 和 Put 方法来使用。用过都懂,比较简单就不介绍了。

Go 是提供 goroutine 进行并发编程,在并发环境下,sync.Pool 的使用不会造成严重性能问题是它的设计考虑点。

容易想到的方法是 Pool 对象为每个 P 都分配一个空间,这样在 P 上运行的 G 进行 Get 和 Put 操作时,就可以在 P 本地的空间上读写。这样方法比 Pool 对象维护一个全局空间有明显好处,全局空间的读写肯定要加锁。

即使每个 P 都有了自己的本地空间,也不是说就可以完全避免锁使用。不要忘了 Pool 提供了内存复用功效,每个 P 上的 G 都使用的是 P 本地的空间的话,那内存复用就有局限性,只能局限在一个 P 上。

而 pool 提供的内存复用是覆盖所有 P。意思是,一个 G 在执行 Get 方法时,发生 G 所在的 P 上,没有可复用的对象。这时就到别的 P 那儿去偷。偷这个动作就要加锁了。因为偷取别人可复用对象时候,别人也可能同时在读写。

前面开始说每个 P 有自己的空间,作用是避免锁,后面又说到别的 P 上偷对象,又要加锁。是不是矛盾了。

不矛盾,让我们来看看 sync.Pool 的实现原理。

sync.Pool 对象底层两个关键字段,local 和 localSize,前者是指向一个数组,数组大小存在 localSize。localSize 的大小跟 P 个数保持一致。数组每个元素就是代表每个 P 自己的本地空间,类型是poolLocal

poolLocal类型有两个关键字段,private 和 shared

  • shared 是一个数组,读写要加锁。
  • private 只能存一个对象,读写不加锁。

来理一下在 Pool 对象上读写的逻辑:

  1. Get 操作时,先返回本地 P 上的 private 上的对象。
  2. 如果 private 为空,继续从本地 P 上的 shared 找,这里要加锁。
  3. 如果 shared 也没有,就到别的 P 那儿,从 shared 里偷。
  4. 所有其它 P 都遍历过了,没有任何对象可偷。就返回 nil 或调用 New 函数。
  5. Put 操作时,优先放 private。
  6. private 已经被放了,那就放到 shared 的最后。

用一张图来表示:

golang标准库sync.Pool原理及源码简析 - 木白的技术私厨 - 图1

sync.Pool 的特性

  • 无大小限制。
  • 自动清理,每次 GC 前会清掉 Pool 里的所有对象。所以不适用于做连接池。
  • 每个 P 都会有一个本地的 poolLocal,Get 和 Put 优先在当前 P 的本地 poolLocal 操作。其次再进行跨 P 操作。
  • 所以 Pool 的最大个数是 runtime.GOMAXPROCS(0)。

sync.Pool 的缺点

pool 的 Get() 并非成本低廉,最坏情况可能会上锁 runtime.GOMAXPROCS(0) 次。

所以,多 Goroutine 与多 P 的情况下,使用 Pool 的效果才会突显。否则要经历无谓的锁成本。

简单的常用场景

bytes.Buffer 作为临时对象放在池子里,这样减轻每次都需要创建的消耗。

| ```
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

  1. |

type Dao struct { bp sync.Pool }

func New(c conf.Config) (d Dao) { d = &Dao{ bp: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, } return }

func (d *Dao) Infoc(args …string) (value string, err error) { if len(args) == 0 { return }

  1. // fetch a buf from bufpool buf, ok := d.bp.Get().(*bytes.Buffer)
  2. if !ok {
  3. return "", ErrType
  4. }
  5. // append first arg if _, err := buf.WriteString(args[0]); err != nil {
  6. return "", err
  7. }
  8. for _, arg := range args[1:] {
  9. // append ,arg if _, err := buf.WriteString(defaultSpliter); err != nil {
  10. return "", err
  11. }
  12. if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil {
  13. return "", err
  14. }
  15. }
  16. value = buf.String()
  17. buf.Reset()
  18. d.bp.Put(buf)
  19. return

}

  1. |
  2. <a name="c1f41a64"></a>
  3. ### 带注释的源码
  4. sync.Pool 数据结构
  5. | ```<br />
  6. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

| ``` // pool 的数据结构 type Pool struct { noCopy noCopy // 指向一个数组,个数与P相等,每个元素的类型为poolLocalInternal local unsafe.Pointer // local数组的大小 localSize uintptr // 创建pool对象时,用户必须提供的new函数 New func() interface{} }

type poolLocalInternal struct { // 私有对象,每个P都有,用于不同g执行get和put可以无锁操作 private interface{} // 共享对象数组,每个P都有一个,同一个P上不同g可以多次执行put方法,需要有地方能存储。并且别的p上的g可能过来偷,所以要加锁 shared []interface{} // 对shared进行加锁,private不用加锁 Mutex }

type poolLocal struct { poolLocalInternal

  1. // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte

}

  1. |
  2. Get 方法
  3. | ```<br />
  4. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

| ``` func (p *Pool) Put(x interface{}) { if x == nil { return } // … // 拿到当前P对应的pool l := p.pin() if l.private == nil { // 私有区有位置的话直接放私有区 l.private = x x = nil } runtime_procUnpin() if x != nil { // 否则放在共享区里 l.Lock() l.shared = append(l.shared, x) l.Unlock() } // … }

func (p Pool) pin() poolLocal { // 拿到当前P的ID pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) l := p.local if uintptr(pid) < s { // 定义pool对象时,s取值为0。只有经过pinSlow后,p.localSize的值才被设置 // 如果local数组已经初始化,就可以把对应P的本地pool返回 return indexLocal(l, pid) } // 否则就得重建local return p.pinSlow() }

func (p Pool) pinSlow() poolLocal { runtime_procUnpin() // 锁上所有的pool对象 allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() s := p.localSize l := p.local if uintptr(pid) < s { // pinSlow是一个创建local的方法。在获得allPoolsMu锁前,可能被别的P先获取,这种情况下local就已经被初始化了 // 所以在获得allPoolsMu锁后需要再检查一次uintptr(pid) < s return indexLocal(l, pid) } if p.local == nil { allPools = append(allPools, p) } // local的大小默认就是P的个数 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // 设置local atomic.StoreUintptr(&p.localSize, uintptr(size)) // 设置localSize return &local[pid] }

  1. |
  2. Put 方法
  3. | ```<br />
  4. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21

| ``` func (p *Pool) Put(x interface{}) { if x == nil { return } // … // 拿到当前P对应的pool l := p.pin() if l.private == nil { // 私有区有位置的话直接放私有区 l.private = x x = nil } runtime_procUnpin() if x != nil { // 否则放在共享区里 l.Lock() l.shared = append(l.shared, x) l.Unlock() } // … }

  1. |
  2. runtime_procPin runtime_procUnpin
  3. | ```<br />
  4. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26

| ``` //go:linkname sync_runtime_procPin sync.runtime_procPin //go:nosplit func sync_runtime_procPin() int { return procPin() }

//go:nosplit func procPin() int { g := getg() mp := g.m

  1. mp.locks++
  2. return int(mp.p.ptr().id)

}

//go:linkname sync_atomic_runtime_procUnpin sync/atomic.runtime_procUnpin //go:nosplit func sync_atomic_runtime_procUnpin() { procUnpin() }

//go:nosplit func procUnpin() { g := getg() g.m.locks— } ```

|
http://cbsheng.github.io/posts/golang%E6%A0%87%E5%87%86%E5%BA%93sync.pool%E5%8E%9F%E7%90%86%E5%8F%8A%E6%BA%90%E7%A0%81%E7%AE%80%E6%9E%90/