golang sync.Pool 在 1.14 中的优化

本文基于 golang 1.14 对 sync.Pool 进行分析;

sync.Pool 在 1.12 中实现的原理简述

参考 Golang 的 sync.Pool 设计思路与原理,这篇文章基于 1.12 版本对 golang 的 sync.Pool 实现原理进行了分析。关于 sync.Pool 的使用场景、基本概念的理解可以参考前面的文章。

在 1.12 中 sync.Pool 的设计思路简单总结一下:

  1. 将 G 和 P 绑定,设置与 P 绑定的 M 禁止抢占以防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
  2. 每个 p 都有独享的缓存队列,当 g 进行 sync.pool 操作时,先找到所属 p 的 private,如果没有对象可用,加锁从 shared 切片里获取数据。如果还没有拿到缓存对象,那么到其他 P 的 poolLocal 进行偷数据,如果偷不到,那么就创建新对象。

1.12 sync.pool 的源码,可以发现 sync.pool 里会有各种的锁逻辑,从自己的 shared 拿数据加锁。getSlow() 偷其他 P 缓存,也是需要给每个 p 加锁。put 归还缓存的时候,还是会 mutex 加一次锁。

go mutex 锁的实现原理简单说,他开始也是 atomic cas 自旋,默认是 4 次尝试,当还没有拿到锁的时候进行 waitqueue gopack 休眠调度处理,等待其他协程释放锁时进行 goready 调度唤醒。

Go 1.13 之后,Go 团队对 sync.Pool 的锁竞争这块进行了很多优化,这里还改变了 shared 的数据结构,以前的版本用切片做缓存,现在换成了 poolChain 双端链表。这个双端链表的设计很有意思,你看 sync.pool 源代码会发现跟 redis quicklist 相似,都是链表加数组的设计。

1.14 Pool 数据结构

  1. type Pool struct {
  2. noCopy noCopy
  3. local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
  4. localSize uintptr // size of the local array
  5. victim unsafe.Pointer // local from previous cycle
  6. victimSize uintptr // size of victims array
  7. // New optionally specifies a function to generate
  8. // a value when Get would otherwise return nil.
  9. // It may not be changed concurrently with calls to Get.
  10. New func() interface{}
  11. }
  12. // Local per-P Pool appendix.
  13. type poolLocalInternal struct {
  14. private interface{} // Can be used only by the respective P.
  15. shared poolChain // Local P can pushHead/popHead; any P can popTail.
  16. }
  17. type poolLocal struct {
  18. poolLocalInternal
  19. // Prevents false sharing on widespread platforms with
  20. // 128 mod (cache line size) = 0 .
  21. pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
  22. }

Pool.local 实际上是一个类型 [P]poolLocal 数组,数组长度是调度器中 P 的数量,也就是说每一个 P 有自己独立的 poolLocal。通过 P.id 来获取每个 P 自己独立的 poolLocal。在 poolLocal 中有一个 poolChain。

这里我们先忽略其余的机构,重点关注poolLocalInternal.shared 这个字段。poolChain 是一个双端队列链,缓存对象。 1.12 版本中对于这个字段的并发安全访问是通过 mutex 加锁实现的;1.14 优化后通过 poolChain(无锁化) 实现的。

这里我们先重点分析一下 poolChain 是怎么实现并发无锁编程的。

poolChain

  1. type poolChain struct {
  2. // head is the poolDequeue to push to. This is only accessed
  3. // by the producer, so doesn't need to be synchronized.
  4. head *poolChainElt
  5. // tail is the poolDequeue to popTail from. This is accessed
  6. // by consumers, so reads and writes must be atomic.
  7. tail *poolChainElt
  8. }
  9. type poolChainElt struct {
  10. poolDequeue
  11. // next and prev link to the adjacent poolChainElts in this
  12. // poolChain.
  13. //
  14. // next is written atomically by the producer and read
  15. // atomically by the consumer. It only transitions from nil to
  16. // non-nil.
  17. //
  18. // prev is written atomically by the consumer and read
  19. // atomically by the producer. It only transitions from
  20. // non-nil to nil.
  21. next, prev *poolChainElt
  22. }

poolChain是一个动态大小的双向链接列表的双端队列。每个出站队列的大小是前一个队列的两倍。也就是说 poolChain 里面每个元素 poolChainElt 都是一个双端队列

head 指向的 poolChainElt,是用于 Producer 去 Push 元素的,不需要做同步处理。
tail 指向的 poolChainElt,是用于 Consumer 从 tail 去 pop 元素的,这里的读写需要保证原子性。

简单来说,poolChain是一个单 Producer,多 Consumer 并发访问的双端队列链。

对于poolChain中的每一个双端队列 poolChainElt,包含了双端队列实体poolDequeue 一起前后链接的指针。

poolChain 主要方法有:

  1. popHead() (interface{}, bool);
  2. pushHead(val interface{})
  3. popTail() (interface{}, bool)

popHeadpushHead函数是给 Producer 调用的;popTail是给 Consumer 并发调用的。

poolChain.popHead()

前面我们说了,poolChain的 head 指针的操作是单 Producer 的。

  1. func (c *poolChain) popHead() (interface{}, bool) {
  2. d := c.head
  3. for d != nil {
  4. if val, ok := d.popHead(); ok {
  5. return val, ok
  6. }
  7. // There may still be unconsumed elements in the
  8. // previous dequeue, so try backing up.
  9. d = loadPoolChainElt(&d.prev)
  10. }
  11. return nil, false
  12. }

poolChain 要求,popHead函数只能被 Producer 调用。看一下逻辑:

  1. 获取头结点 head;
  2. 如果头结点非空就从头节点所代表的双端队列poolDequeue中调用popHead函数。注意这里poolDequeuepopHead函数和poolChainpopHead函数并不一样。poolDequeue是一个固定 size 的 ring buffer。
  3. 如果从 head 中拿到了 value,就直接返回;
  4. 如果从 head 中拿不到 value,就从 head.prev 再次尝试获取;
  5. 最后都获取不到,就返回 nil。

poolChain.pushHead()

  1. func (c *poolChain) pushHead(val interface{}) {
  2. d := c.head
  3. if d == nil {
  4. // Initialize the chain.
  5. const initSize = 8 // Must be a power of 2
  6. d = new(poolChainElt)
  7. d.vals = make([]eface, initSize)
  8. c.head = d
  9. storePoolChainElt(&c.tail, d)
  10. }
  11. if d.pushHead(val) {
  12. return
  13. }
  14. // The current dequeue is full. Allocate a new one of twice
  15. // the size.
  16. newSize := len(d.vals) * 2
  17. if newSize >= dequeueLimit {
  18. // Can't make it any bigger.
  19. newSize = dequeueLimit
  20. }
  21. d2 := &poolChainElt{prev: d}
  22. d2.vals = make([]eface, newSize)
  23. c.head = d2
  24. storePoolChainElt(&d.next, d2)
  25. d2.pushHead(val)
  26. }

poolChain 要求,pushHead函数同样只能被 Producer 调用。看一下逻辑:

  1. 首先还是获取头结点 head;
  2. 如果头结点为空,需要初始化 chain
    1. 创建poolChainElt 节点,作为 head, 当然也是 tail。
    2. poolChainElt 其实也是固定 size 的双端队列poolDequeue,size 必须是 2 的 n 次幂。
  3. 调用poolDequeuepushHead函数将 val push 进 head 的双端队列poolDequeue
  4. 如果 push 失败了,说明双端队列满了,需要重新创建一个双端队列 d2,新的双端队列的 size 是前一个双端队列 size 的 2 倍;
  5. 更新 poolChain 的 head 指向最新的双端队列,并且建立双链关系;
  6. 然后将 val push 到最新的双端队列。

这里需要注意一点的是 head 其实是指向最后 chain 中最后一个结点 (poolDequeue),chain 执行 push 操作是往最后一个节点 push。 所以这里的 head 的语义不是针对链表结构,而是针对队列结构。

poolChain.popTail()

  1. func (c *poolChain) popTail() (interface{}, bool) {
  2. d := loadPoolChainElt(&c.tail)
  3. if d == nil {
  4. return nil, false
  5. }
  6. for {
  7. // It's important that we load the next pointer
  8. // *before* popping the tail. In general, d may be
  9. // transiently empty, but if next is non-nil before
  10. // the pop and the pop fails, then d is permanently
  11. // empty, which is the only condition under which it's
  12. // safe to drop d from the chain.
  13. d2 := loadPoolChainElt(&d.next)
  14. if val, ok := d.popTail(); ok {
  15. return val, ok
  16. }
  17. if d2 == nil {
  18. // This is the only dequeue. It's empty right
  19. // now, but could be pushed to in the future.
  20. return nil, false
  21. }
  22. // The tail of the chain has been drained, so move on
  23. // to the next dequeue. Try to drop it from the chain
  24. // so the next pop doesn't have to look at the empty
  25. // dequeue again.
  26. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
  27. // We won the race. Clear the prev pointer so
  28. // the garbage collector can collect the empty
  29. // dequeue and so popHead doesn't back up
  30. // further than necessary.
  31. storePoolChainElt(&d2.prev, nil)
  32. }
  33. d = d2
  34. }
  35. }

poolChain 要求,popTail函数能被任何 P 调用,也就是所有的 P 都是 Consumer。这里总结下,当前 G 所对应的 P 在 Pool 里面是 Producer 角色,任何 P 都是 Consumer 角色。

popTail函数是并发调用的,所以需要特别注意。

  1. 首先需要原子的 load chain 的 tail 指向的双端队列 d(poolDequeue);
  2. 如果 d 为空,pool 还是空,所以直接 return nil
  3. 下面就是典型的无锁原子化编程:进入一个 for 循环
    1. 首先就是获取 tail 的 next 结点 d2。这里需要强调一下为什么需要在 tail 执行 popTail 之前先 load tail 的 next 结点。
      1. tail 有可能存在短暂性为空的场景。比如 head 和 tail 实际指向同一个结点 (双端队列) 时候,可能 tail 为空只是暂时的,因为存在有线程往 head push 数据的情况。
      2. 如果因为 tail 执行 popTail() 时因为 tail 为空而失败了,然后再 load tail.next,发现 tail.next 非空,再将 tail 原子切换到 tail.next,这个时候就会出现错误了。假设 tail 和 head 指向同一个结点,在判断 tail 是空之后,head 往里面插入了很多个数据,直接将 tail 结点打满,然后 head 指向下一个结点了。这时候 tail.next 也非空了。然后就将 tail 更新到 tail.next,就会导致丢数据了。
      3. 所以必须在:1)tail 执行 popTail 之前 tail.next 是非空的,2)tail 执行 popTail 时发现 tail 是空的。满足这两个条件才能说明 tail 是永久性是空的。也就是需要提前 load tail.next 指针。
    2. 如果从 tail 里面 pop 数据成功,就直接返回 val。
    3. 如果从 tail 里面 pop 数据失败,并且 d2 也是空,说明当前 chain 里面只有一个结点,并且是空。直接返回 nil
    4. 如果从 tail 里面 pop 数据失败并且 d2 非空,说明 tail 已经被 drain 干净了,原子的 tail 到 tail.next,并清除双向链表关系。
    5. 从 d2 开始新的一轮 for 循环。

上面的流程是典型的的无锁并发编程。

poolDequeue

poolChain 中每一个结点都是一个双端队列poolDequeue

poolDequeue是一个无锁的、固定 size 的、单 Producer、多 Consumer 的 deque。只有一个 Producer 可以从 head 去 push 或则 pop;多个 Consumer 可以从 tail 去 pop。

数据结构

  1. type poolDequeue struct {
  2. // 用高32位和低32位分别表示head和tail
  3. // head是下一个fill的slot的index;
  4. // tail是deque中最老的一个元素的index
  5. // 队列中有效元素是[tail, head)
  6. headTail uint64
  7. vals []eface
  8. }
  9. type eface struct {
  10. typ, val unsafe.Pointer
  11. }

这里通过一个字段 headTail 来表示 head 和 tail 的 index。headTail是 8 个字节 64 位。

  1. 高 32 位表示 head;
  2. 低 32 位表示 tail。
  3. head 和 tail 自加溢出时是安全的。

vals是一个固定 size 的 slice,其实也就是一个 ring buffer,size 必须是 2 的次幂 (为了做位运算);

pack/unpack

一个字段 headTail 来表示 head 和 tail 的 index,所以需要有具体的 pack 和 unpack 逻辑:

  1. const dequeueBits = 32
  2. func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
  3. const mask = 1<<dequeueBits - 1
  4. head = uint32((ptrs >> dequeueBits) & mask)
  5. tail = uint32(ptrs & mask)
  6. return
  7. }
  8. func (d *poolDequeue) pack(head, tail uint32) uint64 {
  9. const mask = 1<<dequeueBits - 1
  10. return (uint64(head) << dequeueBits) |
  11. uint64(tail&mask)
  12. }

pack:

  1. 首先拿到 mask,这里实际上就是 0xffffffff(2^32-1)
  2. head 左移 32 位 | tail&0xffffffff 就可以得到 head 和 tail pack 之后的值。

unpack:

  1. 首先拿到 mask,这里实际上就是 0xffffffff(2^32-1)
  2. ptrs 右移 32 位拿到高 32 位然后 & mask 就可以得到 head;
  3. ptrs 直接 & mask 就可以得到低 32 位,也就是 tail。

poolDequeue.pushHead

pushHead 将 val push 到 head 指向的位置,如果 deque 满了,就返回 false。

  1. func (d *poolDequeue) pushHead(val interface{}) bool {
  2. ptrs := atomic.LoadUint64(&d.headTail)
  3. head, tail := d.unpack(ptrs)
  4. if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  5. // Queue is full.
  6. return false
  7. }
  8. slot := &d.vals[head&uint32(len(d.vals)-1)]
  9. // Check if the head slot has been released by popTail.
  10. typ := atomic.LoadPointer(&slot.typ)
  11. if typ != nil {
  12. // Another goroutine is still cleaning up the tail, so
  13. // the queue is actually still full.
  14. return false
  15. }
  16. // The head slot is free, so we own it.
  17. if val == nil {
  18. val = dequeueNil(nil)
  19. }
  20. *(*interface{})(unsafe.Pointer(slot)) = val
  21. // Increment head. This passes ownership of slot to popTail
  22. // and acts as a store barrier for writing the slot.
  23. atomic.AddUint64(&d.headTail, 1<<dequeueBits)
  24. return true
  25. }

主要逻辑:

  1. 原子 load head 和 tail,
  2. 如果 tail + len(vals) == head 说明 deque 已经满了。
  3. 拿到 head 在 vals 中 index 的 slot
  4. 如果 slot 的 type 非空,说明该 slot 还没有被 popTail release,实际上 deque 还是满的;所以直接 return false;
  5. 更新 val 到 slot 的指针指向的值。
  6. 原子的自加 head

需要注意的是,pushHead 不是并发安全的,只能有一个 Producer 去执行;只有 slot 的的 type 指针为空时候 slot 才是空。

poolDequeue.popHead

popHead 将 head 指向的前一个位置弹出,如果 deque 是空,就返回 false。

  1. func (d *poolDequeue) popHead() (interface{}, bool) {
  2. var slot *eface
  3. for {
  4. ptrs := atomic.LoadUint64(&d.headTail)
  5. head, tail := d.unpack(ptrs)
  6. if tail == head {
  7. // Queue is empty.
  8. return nil, false
  9. }
  10. // Confirm tail and decrement head. We do this before
  11. // reading the value to take back ownership of this
  12. // slot.
  13. head--
  14. ptrs2 := d.pack(head, tail)
  15. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  16. // We successfully took back slot.
  17. slot = &d.vals[head&uint32(len(d.vals)-1)]
  18. break
  19. }
  20. }
  21. val := *(*interface{})(unsafe.Pointer(slot))
  22. if val == dequeueNil(nil) {
  23. val = nil
  24. }
  25. // Zero the slot. Unlike popTail, this isn't racing with
  26. // pushHead, so we don't need to be careful here.
  27. *slot = eface{}
  28. return val, true
  29. }

主要逻辑:

  1. 由于从 head 前一个位置 pop 元素,可能会与 tail 位置 pop 冲突,所以不可避免的需要 cas 操作。所以最开始进入就是一个 for 循环;
  2. 原子 load poolDequeue.headTail然后 unpack 拿到 head 和 tail
  3. 如果 head == tail,表示 deque 是空,直接 return nil.
  4. head –
  5. 根据新的 head 和老的 tail, 重新 pack 出 ptrs2;
  6. 原子 cas 更新poolDequeue.headTailatomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
  7. 如果更新成功,就拿到 head 执行的 slot,并获取到实际的 value,并 return;
  8. 如果原子更新失败了,重新进入 for 循环再次执行。

poolDequeue.popTail

这个函数是可以被 Consumer 并发访问的。

  1. func (d *poolDequeue) popTail() (interface{}, bool) {
  2. var slot *eface
  3. for {
  4. ptrs := atomic.LoadUint64(&d.headTail)
  5. head, tail := d.unpack(ptrs)
  6. if tail == head {
  7. // Queue is empty.
  8. return nil, false
  9. }
  10. // Confirm head and tail (for our speculative check
  11. // above) and increment tail. If this succeeds, then
  12. // we own the slot at tail.
  13. ptrs2 := d.pack(head, tail+1)
  14. if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
  15. // Success.
  16. slot = &d.vals[tail&uint32(len(d.vals)-1)]
  17. break
  18. }
  19. }
  20. // We now own slot.
  21. val := *(*interface{})(unsafe.Pointer(slot))
  22. if val == dequeueNil(nil) {
  23. val = nil
  24. }
  25. // Tell pushHead that we're done with this slot. Zeroing the
  26. // slot is also important so we don't leave behind references
  27. // that could keep this object live longer than necessary.
  28. //
  29. // We write to val first and then publish that we're done with
  30. // this slot by atomically writing to typ.
  31. slot.val = nil
  32. atomic.StorePointer(&slot.typ, nil)
  33. // At this point pushHead owns the slot.
  34. return val, true
  35. }

主要逻辑:

  1. 并发访问,所以与 cas 相关的 for 循环不可少;
  2. 原子 load,拿到 head 和 tail 值;
  3. 将 (tail+1) 和 head 重新 pack 成 ptrs2;
  4. CAS:atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2); 如果更新成功,就拿到 vals[tail]t 的指针。如果失败就再次返回 1 的 for 循环。
  5. 拿到 slot 对应的 val。
  6. 将 slot 的 val 和 type 都清为 nil, 告诉 pushHead, slot 我们已经使用完了,pushHead 可以往里面填充数据了。

数据结构总结

用一张图完整描述 sync.Pool 的数据结构:
golang sync.Pool在1.14中的优化_惜暮-CSDN博客 - 图1

强调一点:

  1. head 的操作只能是 local P;
  2. tail 的操作是任意 P;

参考网上一张图来看更加清晰:
golang sync.Pool在1.14中的优化_惜暮-CSDN博客 - 图2

Pool 并没有直接使用 poolDequeue,因为它是 fixed size 的,而 Pool 的大小是没有限制的。因此,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,可以动态增长。

Pool.Put

  1. func (p *Pool) Put(x interface{}) {
  2. if x == nil {
  3. return
  4. }
  5. l, _ := p.pin()
  6. if l.private == nil {
  7. l.private = x
  8. x = nil
  9. }
  10. if x != nil {
  11. l.shared.pushHead(x)
  12. }
  13. runtime_procUnpin()
  14. }
  15. func (p *Pool) pin() (*poolLocal, int) {
  16. pid := runtime_procPin()
  17. // In pinSlow we store to local and then to localSize, here we load in opposite order.
  18. // Since we've disabled preemption, GC cannot happen in between.
  19. // Thus here we must observe local at least as large localSize.
  20. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
  21. s := atomic.LoadUintptr(&p.localSize) // load-acquire
  22. l := p.local // load-consume
  23. if uintptr(pid) < s {
  24. return indexLocal(l, pid), pid
  25. }
  26. return p.pinSlow()
  27. }
  28. func (p *Pool) pinSlow() (*poolLocal, int) {
  29. // Retry under the mutex.
  30. // Can not lock the mutex while pinned.
  31. runtime_procUnpin()
  32. allPoolsMu.Lock()
  33. defer allPoolsMu.Unlock()
  34. pid := runtime_procPin()
  35. // poolCleanup won't be called while we are pinned.
  36. s := p.localSize
  37. l := p.local
  38. if uintptr(pid) < s {
  39. return indexLocal(l, pid), pid
  40. }
  41. if p.local == nil {
  42. allPools = append(allPools, p)
  43. }
  44. // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
  45. size := runtime.GOMAXPROCS(0)
  46. local := make([]poolLocal, size)
  47. atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
  48. atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
  49. return &local[pid], pid
  50. }

Put 函数主要逻辑:

  1. 先调用p.pin() 函数,这个函数会将当前 goroutine 与 P 绑定,并设置当前 g 不可被抢占 (也就不会出现多个协程并发读写当前 P 上绑定的数据);
    1. p.pin() 函数里面还会 check per P 的[P]poolLocal 数组是否发生了扩容 (P 扩张)。
    2. 如果发生了扩容,需要调用pinSlow()来执行具体扩容。扩容获取一个调度器全局大锁allPoolsMu,然后根据当前最新的 P 的数量去执行新的扩容。这里的成本很高,所以尽可能避免手动增加 P 的数量。
  2. 拿到 per P 的 poolLocal 后,优先将 val put 到 private,如果 private 已经存在,就通过调用shared.pushHead(x) 塞到 poolLocal 里面的无锁双端队列的 chain 中。Put 函数对于双端队列来说是作为一个 Producer 角色,所以这里的调用是无锁的。
  3. 最后解除当前 goroutine 的禁止抢占。

Pool.Get

  1. func (p *Pool) Get() interface{} {
  2. l, pid := p.pin()
  3. x := l.private
  4. l.private = nil
  5. if x == nil {
  6. // Try to pop the head of the local shard. We prefer
  7. // the head over the tail for temporal locality of
  8. // reuse.
  9. x, _ = l.shared.popHead()
  10. if x == nil {
  11. x = p.getSlow(pid)
  12. }
  13. }
  14. runtime_procUnpin()
  15. if x == nil && p.New != nil {
  16. x = p.New()
  17. }
  18. return x
  19. }
  20. func (p *Pool) getSlow(pid int) interface{} {
  21. // See the comment in pin regarding ordering of the loads.
  22. size := atomic.LoadUintptr(&p.localSize) // load-acquire
  23. locals := p.local // load-consume
  24. // Try to steal one element from other procs.
  25. for i := 0; i < int(size); i++ {
  26. l := indexLocal(locals, (pid+i+1)%int(size))
  27. if x, _ := l.shared.popTail(); x != nil {
  28. return x
  29. }
  30. }
  31. // Try the victim cache. We do this after attempting to steal
  32. // from all primary caches because we want objects in the
  33. // victim cache to age out if at all possible.
  34. size = atomic.LoadUintptr(&p.victimSize)
  35. if uintptr(pid) >= size {
  36. return nil
  37. }
  38. locals = p.victim
  39. l := indexLocal(locals, pid)
  40. if x := l.private; x != nil {
  41. l.private = nil
  42. return x
  43. }
  44. for i := 0; i < int(size); i++ {
  45. l := indexLocal(locals, (pid+i)%int(size))
  46. if x, _ := l.shared.popTail(); x != nil {
  47. return x
  48. }
  49. }
  50. // Mark the victim cache as empty for future gets don't bother
  51. // with it.
  52. atomic.StoreUintptr(&p.victimSize, 0)
  53. return nil
  54. }

Get 函数主要逻辑:

  1. 设置当前 goroutine 禁止抢占;
  2. 从 poolLocal 的 private 取,如果 private 不为空直接 return;
  3. 从 poolLocal.shared 这个双端队列 chain 里面无锁调用去取,如果取得到也直接 return;
  4. 上面都去不到,调用getSlow(pid)去取
    1. 首先会通过 steal 算法,去别的 P 里面的 poolLocal 去取,这里的实现是无锁的 cas。如果能够 steal 一个过来,就直接 return;
    2. 如果 steal 不到,则从 victim 里找,和 poolLocal 的逻辑类似。最后,实在没找到,就把 victimSize 置 0,防止后来的 “人” 再到 victim 里找。
  5. 最后还拿不到,就通过 New 函数来创建一个新的对象。

这里是一个很明显的多层级缓存优化 + GPM 调度结合起来。

private -> shared -> steal from other P -> victim cache -> New

victim cache 优化与 GC

对于 Pool 来说并不能够无上限的扩展,否则对象占用内存太多了,会引起内存溢出。

几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?

这里是使用 GC。在 pool.go 里面的init 函数 会注册清理函数:

  1. func init() {
  2. runtime_registerPoolCleanup(poolCleanup)
  3. }
  4. // mgc.go
  5. //go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
  6. func sync_runtime_registerPoolCleanup(f func()) {
  7. poolcleanup = f
  8. }

编译器会把 runtime_registerPoolCleanup 函数调用链接到 mgc.go 里面的 sync_runtime_registerPoolCleanup函数调用,实际上注册到 poolcleanup 函数。整个调用链如下:

gcStart() -> clearpools() -> poolcleanup()

也就是每一轮 GC 开始都会执行 pool 的清除操作。

  1. func poolCleanup() {
  2. // This function is called with the world stopped, at the beginning of a garbage collection.
  3. // It must not allocate and probably should not call any runtime functions.
  4. // Because the world is stopped, no pool user can be in a
  5. // pinned section (in effect, this has all Ps pinned).
  6. // Drop victim caches from all pools.
  7. for _, p := range oldPools {
  8. p.victim = nil
  9. p.victimSize = 0
  10. }
  11. // Move primary cache to victim cache.
  12. for _, p := range allPools {
  13. p.victim = p.local
  14. p.victimSize = p.localSize
  15. p.local = nil
  16. p.localSize = 0
  17. }
  18. // The pools with non-empty primary caches now have non-empty
  19. // victim caches and no pools have primary caches.
  20. oldPools, allPools = allPools, nil
  21. }

poolCleanup 会在 STW 阶段被调用。整体看起来,比较简洁。主要是将 local 和 victim 作交换,这样也就不致于让 GC 把所有的 Pool 都清空了,有 victim 在 “兜底”。

重点:如果 sync.Pool 的获取、释放速度稳定,那么就不会有新的池对象进行分配。如果获取的速度下降了,那么对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。

在 Go1.13 之前的 poolCleanup 比较粗暴,直接清空了所有 Pool 的 p.local 和 poolLocal.shared。

通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。

所以 p.victim 的作用其实就是次级缓存。

sync.Pool 总结

  1. 关键思想是对象的复用,避免重复创建、销毁。将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力。
  2. sync.Pool 是协程安全的,使用起来非常方便。设置好 New 函数后,调用 Get 获取,调用 Put 归还对象。
  3. 不要对 Get 得到的对象有任何假设,默认 Get 到对象是一个空对象,Get 之后手动初始化。
  4. 好的实践是:Put 操作执行前将对象 “清空”,并且确保对象被 Put 进去之后不要有任何的指针引用再次使用,不然极大概率导致 data race。
  5. 第三和第四也就是考虑清楚复用对象的生命周期
  6. Pool 里对象的生命周期受 GC 影响,不适合于做连接池,因为连接池需要自己管理对象的生命周期。
  7. Pool 不可以指定⼤⼩,⼤⼩只受制于 GC 临界值。
  8. procPin 将 G 和 P 绑定,防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
  9. 在加入 victim 机制前,sync.Pool 里对象的最⼤缓存时间是一个 GC 周期,当 GC 开始时,没有被引⽤的对象都会被清理掉;加入 victim 机制后,最大缓存时间为两个 GC 周期。
  10. Victim Cache 本来是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool 引入的意图在于降低 GC 压力的同时提高命中率。
  11. sync.Pool 的最底层使用切片加链表来实现双端队列,并将缓存的对象存储在切片中。
  12. sync.Pool 的设计理念,包括:无锁、操作对象隔离、原子操作代替锁、行为隔离——链表、Victim Cache 降低 GC 开销。

参考文档:
理解 Go 1.13 中 sync.Pool 的设计与实现
golang 新版如何优化 sync.pool 锁竞争消耗?
深度解密 Go 语言之 sync.Pool

最新评论

最新文章

2021 年 2 篇

2020 年 17 篇

2019 年 29 篇

2018 年 17 篇

2017 年 180 篇

2016 年 90 篇

2014 年 3 篇

golang sync.Pool在1.14中的优化_惜暮-CSDN博客 - 图3

目录

  1. golang sync.Pool 在 1.14 中的优化
  2. sync.Pool 在 1.12 中实现的原理简述
  3. 1.14 Pool 数据结构

    1. poolChain

      1. poolChain.popHead()
      2. poolChain.pushHead()
      3. poolChain.popTail()
    2. poolDequeue

      1. 数据结构
      2. pack/unpack
      3. poolDequeue.pushHead
      4. poolDequeue.popHead
      5. poolDequeue.popTail
    3. 数据结构总结
  4. Pool.Put
  5. Pool.Get
  6. victim cache 优化与 GC
  7. sync.Pool 总结

目录

  1. golang sync.Pool 在 1.14 中的优化
  2. sync.Pool 在 1.12 中实现的原理简述
  3. 1.14 Pool 数据结构

    1. poolChain

      1. poolChain.popHead()
      2. poolChain.pushHead()
      3. poolChain.popTail()
    2. poolDequeue

      1. 数据结构
      2. pack/unpack
      3. poolDequeue.pushHead
      4. poolDequeue.popHead
      5. poolDequeue.popTail
    3. 数据结构总结
  4. Pool.Put
  5. Pool.Get
  6. victim cache 优化与 GC
  7. sync.Pool 总结

https://blog.csdn.net/u010853261/article/details/106156091