转载请声明出处哦~,本篇文章发布于 luozhiyun 的博客:https://www.luozhiyun.com

本文使用的 go 的源码时 14.4

Pool 介绍

总所周知 Go 是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收。如果你想使用 Go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响。因为 Go 在垃圾回收的时候会有一个 STW(stop-the-world,程序暂停)的时间,并且如果对象太多,做标记也需要时间。

所以如果采用对象池来创建对象,增加对象的重复利用率,使用的时候就不必在堆上重新创建对象可以节省开销。

在 Go 中,sync.Pool 提供了对象池的功能。它对外提供了三个方法:New、Get 和 Put。下面用一个简短的例子来说明一下 Pool 使用:

  1. var pool *sync.Pool
  2. type Person struct {
  3. Name string
  4. }
  5. func init() {
  6. pool = &sync.Pool{
  7. New: func() interface{}{
  8. fmt.Println("creating a new person")
  9. return new(Person)
  10. },
  11. }
  12. }
  13. func main() {
  14. person := pool.Get().(*Person)
  15. fmt.Println("Get Pool Object:", person)
  16. person.Name = "first"
  17. pool.Put(person)
  18. fmt.Println("Get Pool Object:",pool.Get().(*Person))
  19. fmt.Println("Get Pool Object:",pool.Get().(*Person))
  20. }

结果:

  1. creating a new person
  2. Get Pool Object &{}
  3. Get Pool Object &{first}
  4. creating a new person
  5. Get Pool Object &{}

这里我用了 init 方法初始化了一个 pool,然后 get 了三次,put 了一次到 pool 中,如果 pool 中没有对象,那么会调用 New 函数创建一个新的对象,否则会重 put 进去的对象中获取。

源码分析

  1. type Pool struct {
  2. noCopy noCopy
  3. local unsafe.Pointer
  4. localSize uintptr
  5. victim unsafe.Pointer
  6. victimSize uintptr
  7. New func() interface{}
  8. }

Pool 结构体里面 noCopy 代表这个结构体是禁止拷贝的,它可以在我们使用 go vet 工具的时候生效;

local 是一个 poolLocal 数组的指针,localSize 代表这个数组的大小;同样 victim 也是一个 poolLocal 数组的指针,每次垃圾回收的时候,Pool 会把 victim 中的对象移除,然后把 local 的数据给 victim;local 和 victim 的逻辑我们下面会详细介绍到。

New 函数是在创建 pool 的时候设置的,当 pool 没有缓存对象的时候,会调用 New 方法生成一个新的对象。

下面我们对照着 pool 的结构图往下讲,避免找不到北:

多图详解Go的sync.Pool源码 - luozhiyun - 博客园 - 图1

  1. type poolLocal struct {
  2. poolLocalInternal
  3. pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
  4. }

local 字段存储的是一个 poolLocal 数组的指针,poolLocal 数组大小是 goroutine 中 P 的数量,访问时,P 的 id 对应 poolLocal 数组下标索引,所以 Pool 的最大个数 runtime.GOMAXPROCS(0)。

通过这样的设计,每个 P 都有了自己的本地空间,多个 goroutine 使用同一个 Pool 时,减少了竞争,提升了性能。如果对 goroutine 的 P、G、M 有疑惑的同学不妨看看这篇文章:The Go scheduler

poolLocal 里面有一个 pad 数组用来占位用,防止在 cache line 上分配多个 poolLocalInternal 从而造成 false sharing,有关于 false sharing 可以看看这篇文章:

What’s false sharing and how to solve it ,文中对于 false sharing 的定义:

That’s what false sharing is: one core update a variable would force other cores to update cache either.

  1. type poolLocalInternal struct {
  2. private interface{}
  3. shared poolChain
  4. }

poolLocalInternal 包含两个字段 private 和 shared。

private 代表缓存的一个元素,只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,所以不会有并发的问题;

shared 则可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。

  1. type poolChain struct {
  2. head *poolChainElt
  3. tail *poolChainElt
  4. }
  5. type poolChainElt struct {
  6. poolDequeue
  7. next, prev *poolChainElt
  8. }
  9. type poolDequeue struct {
  10. headTail uint64
  11. vals []eface
  12. }

poolChain 是一个双端队列,里面的 head 和 tail 分别指向队列头尾;poolDequeue 里面存放真正的数据,是一个单生产者、多消费者的固定大小的无锁的环状队列,headTail 是环状队列的首位位置的指针,可以通过位运算解析出首尾的位置,生产者可以从 head 插入、head 删除,而消费者仅可从 tail 删除。

这个双端队列的模型大概是这个样子:

多图详解Go的sync.Pool源码 - luozhiyun - 博客园 - 图2

poolDequeue 里面的环状队列大小是固定的,后面分析源码我们会看到,当环状队列满了的时候会创建一个 size 是原来两倍大小的环状队列。大家这张图好好体会一下,会反复用到。

Get 方法

  1. func (p *Pool) Get() interface{} {
  2. ...
  3. l, pid := p.pin()
  4. x := l.private
  5. l.private = nil
  6. if x == nil {
  7. x, _ = l.shared.popHead()
  8. if x == nil {
  9. x = p.getSlow(pid)
  10. }
  11. }
  12. runtime_procUnpin()
  13. ...
  14. if x == nil && p.New != nil {
  15. x = p.New()
  16. }
  17. return x
  18. }
  • 这一段代码首先会将当前 goroutine 绑定在当前的 P 上返回对应的 local,然后尝试从 local 的 private 中获取,然后需要把 private 字段置空,因为已经拿到了想要的对象;
  • private 中获取不到,那么就去 shared 的头部获取;
  • shared 也没有,那么尝试遍历所有的 local,尝试从它们的 shared 弹出一个元素;
  • 最后如果还是没有,那么就直接调用预先设置好的 New 函数,创建一个出来。

pin

  1. func (p *Pool) pin() (*poolLocal, int) {
  2. pid := runtime_procPin()
  3. s := atomic.LoadUintptr(&p.localSize)
  4. l := p.local
  5. if uintptr(pid) < s {
  6. return indexLocal(l, pid), pid
  7. }
  8. return p.pinSlow()
  9. }

pin 方法里面首先会调用 runtime_procPin 方法会先获取当前 goroutine,然后绑定到对应的 M 上,然后返回 M 目前绑定的 P 的 id,因为这个 pid 后面会用到,防止在使用途中 P 被抢占,具体的细节可以看这篇:https://zhuanlan.zhihu.com/p/99710992。

接下来会使用原子操作取出 localSize,如果当前 pid 大于 localSize,那么就表示 Pool 还没创建对应的 poolLocal,那么调用 pinSlow 进行创建工作,否则调用 indexLocal 取出 pid 对应的 poolLocal 返回。

  1. func indexLocal(l unsafe.Pointer, i int) *poolLocal {
  2. lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
  3. return (*poolLocal)(lp)
  4. }

indexLocal 里面是使用了地址操作,传入的 i 是数组的 index 值,所以需要获取 poolLocal{} 的 size 做一下地址的位移操作,然后再转成转成 poolLocal 地址返回。

pinSlow

  1. func (p *Pool) pinSlow() (*poolLocal, int) {
  2. runtime_procUnpin()
  3. allPoolsMu.Lock()
  4. defer allPoolsMu.Unlock()
  5. pid := runtime_procPin()
  6. s := p.localSize
  7. l := p.local
  8. if uintptr(pid) < s {
  9. return indexLocal(l, pid), pid
  10. }
  11. if p.local == nil {
  12. allPools = append(allPools, p)
  13. }
  14. size := runtime.GOMAXPROCS(0)
  15. local := make([]poolLocal, size)
  16. atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
  17. atomic.StoreUintptr(&p.localSize, uintptr(size))
  18. return &local[pid], pid
  19. }

因为 allPoolsMu 是一个全局 Mutex 锁,因此上锁会比较慢可能被阻塞,所以上锁前调用 runtime_procUnpin 方法解除 pin 的操作;

在解除绑定后,pinSlow 可能被其他的线程调用过了,p.local 可能会发生变化。因此这时候需要再次对 pid 进行检查。

最后初始化 local,并使用原子操作对 local 和 localSize 设值,返回当前 P 对应的 local。

到这里 pin 方法终于讲完了。画一个简单的图描述一下这整个流程:

多图详解Go的sync.Pool源码 - luozhiyun - 博客园 - 图3

下面我们再回到 Get 方法中往下走,代码我再贴一遍,以便阅读:

  1. func (p *Pool) Get() interface{} {
  2. ...
  3. x := l.private
  4. l.private = nil
  5. if x == nil {
  6. x, _ = l.shared.popHead()
  7. if x == nil {
  8. x = p.getSlow(pid)
  9. }
  10. }
  11. ...
  12. return x
  13. }

如果 private 中没有值,那么会调用 shared 的 popHead 方法获取值。

popHead

  1. func (c *poolChain) popHead() (interface{}, bool) {
  2. d := c.head
  3. for d != nil {
  4. if val, ok := d.popHead(); ok {
  5. return val, ok
  6. }
  7. d = loadPoolChainElt(&d.prev)
  8. }
  9. return nil, false
  10. }

popHead 方法里面会获取到 poolChain 的头结点,不记得 poolChain 数据结构的同学建议往上面翻一下再回来。

接着有个 for 循环会挨个从 poolChain 的头结点往下遍历,直到获取对象返回。

  1. func (d *poolDequeue) popHead() (interface{}, bool) {
  2. var slot *eface
  3. for {
  4. ptrs := atomic.LoadUint64(&d.headTail)
  5. head, tail := d.unpack(ptrs)
  6. if tail == head {
  7. return nil, false
  8. }
  9. head--
  10. ptrs2 := d.pack(head, tail)
  11. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  12. slot = &d.vals[head&uint32(len(d.vals)-1)]
  13. break
  14. }
  15. }
  16. val := *(*interface{})(unsafe.Pointer(slot))
  17. if val == dequeueNil(nil) {
  18. val = nil
  19. }
  20. *slot = eface{}
  21. return val, true
  22. }
  • poolDequeue 的 popHead 方法首先会获取到 headTail 的值,然后调用 unpack 解包,headTail 是一个 64 位的值,高 32 位表示 head,低 32 位表示 tail。
  • 判断 head 和 tail 是否相等,相等那么这个队列就是空的;
  • 如果队列不是空的,那么将 head 减一之后再使用,因为 head 当前指的位置是空值,表示下一个新对象存放的位置;
  • CAS 重新设值新的 headTail,成功之后获取 slot,这里因为 vals 大小是 2 的 n 次幂,因此len(d.vals)-1)之后低 n 位全是 1,和 head 取与之后可以获取到 head 的低 n 位的值;
  • 如果 slot 所对应的对象是 dequeueNil,那么表示是空值,直接返回,否则将 slot 指针对应位置的值置空,返回 val。

如果 shared 的 popHead 方法也没获取到值,那么就需要调用 getSlow 方法获取了。

getSlow

  1. func (p *Pool) getSlow(pid int) interface{} {
  2. size := atomic.LoadUintptr(&p.localSize)
  3. locals := p.local
  4. for i := 0; i < int(size); i++ {
  5. l := indexLocal(locals, (pid+i+1)%int(size))
  6. if x, _ := l.shared.popTail(); x != nil {
  7. return x
  8. }
  9. }
  10. size = atomic.LoadUintptr(&p.victimSize)
  11. if uintptr(pid) >= size {
  12. return nil
  13. }
  14. locals = p.victim
  15. l := indexLocal(locals, pid)
  16. if x := l.private; x != nil {
  17. l.private = nil
  18. return x
  19. }
  20. for i := 0; i < int(size); i++ {
  21. l := indexLocal(locals, (pid+i)%int(size))
  22. if x, _ := l.shared.popTail(); x != nil {
  23. return x
  24. }
  25. }
  26. atomic.StoreUintptr(&p.victimSize, 0)
  27. return nil
  28. }

getSlow 方法会遍历 locals 列表,这里需要注意的是,遍历是从索引为 pid+1 的 poolLocal 处开始,尝试调用 shared 的 popTail 方法获取对象;如果没有拿到,则从 victim 里找。如果都没找到,那么就将 victimSize 置为 0,下次就不找 victim 了。

poolChain&popTail

  1. func (c *poolChain) popTail() (interface{}, bool) {
  2. d := loadPoolChainElt(&c.tail)
  3. if d == nil {
  4. return nil, false
  5. }
  6. for {
  7. d2 := loadPoolChainElt(&d.next)
  8. if val, ok := d.popTail(); ok {
  9. return val, ok
  10. }
  11. if d2 == nil {
  12. return nil, false
  13. }
  14. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
  15. storePoolChainElt(&d2.prev, nil)
  16. }
  17. d = d2
  18. }
  19. }
  • 判断 poolChain,如果最后一个节点是空的,那么直接返回;
  • 进入 for 循环,获取 tail 的 next 节点,这里需要注意的是这个双向链表与一般的链表是反向的,不清楚的可以再去看看第一张图;
  • 调用 popTail 获取 poolDequeue 列表的对象,有对象直接返回;
  • d2 为空则表示已经遍历完整个 poolChain 双向列表了,都为空,那么直接返回;
  • 通过 CAS 将 tail 重置为 d2,因为 d 已经没有数据了,并将 d2 的 prev 节点置为 nil,然后将 d 置为 d2,进入下一个循环;

poolDequeue&popTail

  1. func (d *poolDequeue) popTail() (interface{}, bool) {
  2. var slot *eface
  3. for {
  4. ptrs := atomic.LoadUint64(&d.headTail)
  5. head, tail := d.unpack(ptrs)
  6. if tail == head {
  7. return nil, false
  8. }
  9. ptrs2 := d.pack(head, tail+1)
  10. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  11. slot = &d.vals[tail&uint32(len(d.vals)-1)]
  12. break
  13. }
  14. }
  15. val := *(*interface{})(unsafe.Pointer(slot))
  16. if val == dequeueNil(nil) {
  17. val = nil
  18. }
  19. slot.val = nil
  20. atomic.StorePointer(&slot.typ, nil)
  21. return val, true
  22. }

如果看懂了 popHead,那么这个 popTail 方法是和它非常的相近的。

popTail 简单来说也是从队列尾部移除一个元素,如果队列为空,返回 false。但是需要注意的是,这个 popTail 可能会被多个消费者调用,所以需要循环 CAS 获取对象;在 poolDequeue 环状列表中 tail 是有数据的,不必像 popHead 中head--

最后,需要将 slot 置空。

大家可以再对照一下图回顾一下代码:

多图详解Go的sync.Pool源码 - luozhiyun - 博客园 - 图4

Put 方法

  1. func (p *Pool) Put(x interface{}) {
  2. if x == nil {
  3. return
  4. }
  5. ...
  6. l, _ := p.pin()
  7. if l.private == nil {
  8. l.private = x
  9. x = nil
  10. }
  11. if x != nil {
  12. l.shared.pushHead(x)
  13. }
  14. runtime_procUnpin()
  15. ...
  16. }

看完了 Get 方法,看 Put 方法就容易多了。同样 Put 方法首先会去 Pin 住当前 goroutine 和 P,然后尝试将 x 赋值给 private 字段。如果 private 不为空,那么就调用 pushHead 将其放入到 shared 队列中。

poolChain&pushHead

  1. func (c *poolChain) pushHead(val interface{}) {
  2. d := c.head
  3. if d == nil {
  4. const initSize = 8
  5. d = new(poolChainElt)
  6. d.vals = make([]eface, initSize)
  7. c.head = d
  8. storePoolChainElt(&c.tail, d)
  9. }
  10. if d.pushHead(val) {
  11. return
  12. }
  13. newSize := len(d.vals) * 2
  14. if newSize >= dequeueLimit {
  15. newSize = dequeueLimit
  16. }
  17. d2 := &poolChainElt{prev: d}
  18. d2.vals = make([]eface, newSize)
  19. c.head = d2
  20. storePoolChainElt(&d.next, d2)
  21. d2.pushHead(val)
  22. }

如果头节点为空,那么需要创建一个新的 poolChainElt 对象作为头节点,大小为 8;然后调用 pushHead 放入到环状队列中;

如果放置失败,那么创建一个 poolChainElt 节点,并且双端队列的长度翻倍,当然长度也不能超过 dequeueLimit,即 2 的 30 次方;

然后将新的节点 d2 和 d 互相绑定一下,并将 d2 设值为头节点,将传入的对象 push 到 d2 中;

poolDequeue&pushHead

  1. func (d *poolDequeue) pushHead(val interface{}) bool {
  2. ptrs := atomic.LoadUint64(&d.headTail)
  3. head, tail := d.unpack(ptrs)
  4. if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  5. return false
  6. }
  7. slot := &d.vals[head&uint32(len(d.vals)-1)]
  8. typ := atomic.LoadPointer(&slot.typ)
  9. if typ != nil {
  10. return false
  11. }
  12. if val == nil {
  13. val = dequeueNil(nil)
  14. }
  15. *(*interface{})(unsafe.Pointer(slot)) = val
  16. atomic.AddUint64(&d.headTail, 1<<dequeueBits)
  17. return true
  18. }

首先通过位运算判断队列是否已满,也就是将尾部指针加上 len(d.vals) ,因为 head 指向的是将要被填充的位置,所以 head 和 tail 位置是相隔len(d.vals),然后再取低 31 位,看它是否和 head 相等。如果队列满了,直接返回 false;

然后找到找到 head 的槽位 slot,并判断 typ 是否为空,因为 popTail 是先设置 val,再将 typ 设置为 nil,所以如果有冲突,那么直接返回;

最后设值 slot,并将 head 加 1 返回;

GC

在 pool.go 文件的 init 函数里,注册了 GC 发生时,如何清理 Pool 的函数:

  1. func init() {
  2. runtime_registerPoolCleanup(poolCleanup)
  3. }
  4. func poolCleanup() {
  5. for _, p := range oldPools {
  6. p.victim = nil
  7. p.victimSize = 0
  8. }
  9. for _, p := range allPools {
  10. p.victim = p.local
  11. p.victimSize = p.localSize
  12. p.local = nil
  13. p.localSize = 0
  14. }
  15. oldPools, allPools = allPools, nil
  16. }

poolCleanup 会在 STW 阶段被调用。主要是将 local 和 victim 作交换,那么不至于 GC 把所有的 Pool 都清空了,而是需要两个 GC 周期才会被释放。如果 sync.Pool 的获取、释放速度稳定,那么就不会有新的池对象进行分配。

总结

Pool 这个概念在后台优化中是一个非常重要的手段,比如说在使用 Http 的时候会使用 Http 连接池,使用数据库的时候,也会用到数据库连接池。这些通过对象重用和预先分配可以减少服务器的压力。

当我们在后期的项目开发中,如果发现 GC 耗时很高,有大量临时对象时不妨可以考虑使用 Pool。

例如发现现系统中的 goroutine 数量非常多,由于一个 goroutine 初始栈是 2048 字节,所以一个服务器上运行数十万的 goroutine 也是非常耗时的;这时候就可以考虑使用 Worker Pool 来减少 goroutine 的使用。

Reference

https://medium.com/@genchilu/whats-false-sharing-and-how-to-solve-it-using-golang-as-example-ef978a305e10

https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41#wechat_redirect

https://zhuanlan.zhihu.com/p/99710992

https://github.com/golang/go/commit/d5fd2dd6a17a816b7dfd99d4df70a85f1bf0de31#diff-491b0013c82345bf6cfa937bd78b690d

https://morsmachine.dk/go-scheduler
https://www.cnblogs.com/luozhiyun/p/14194872.html