golang sync.Pool 在 1.14 中的优化
本文基于 golang 1.14 对 sync.Pool 进行分析;
sync.Pool 在 1.12 中实现的原理简述
参考 Golang 的 sync.Pool 设计思路与原理,这篇文章基于 1.12 版本对 golang 的 sync.Pool 实现原理进行了分析。关于 sync.Pool 的使用场景、基本概念的理解可以参考前面的文章。
在 1.12 中 sync.Pool 的设计思路简单总结一下:
- 将 G 和 P 绑定,设置与 P 绑定的 M 禁止抢占以防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
 - 每个 p 都有独享的缓存队列,当 g 进行 sync.pool 操作时,先找到所属 p 的 private,如果没有对象可用,加锁从 shared 切片里获取数据。如果还没有拿到缓存对象,那么到其他 P 的 poolLocal 进行偷数据,如果偷不到,那么就创建新对象。
 
1.12 sync.pool 的源码,可以发现 sync.pool 里会有各种的锁逻辑,从自己的 shared 拿数据加锁。getSlow() 偷其他 P 缓存,也是需要给每个 p 加锁。put 归还缓存的时候,还是会 mutex 加一次锁。
go mutex 锁的实现原理简单说,他开始也是 atomic cas 自旋,默认是 4 次尝试,当还没有拿到锁的时候进行 waitqueue gopack 休眠调度处理,等待其他协程释放锁时进行 goready 调度唤醒。
Go 1.13 之后,Go 团队对 sync.Pool 的锁竞争这块进行了很多优化,这里还改变了 shared 的数据结构,以前的版本用切片做缓存,现在换成了 poolChain 双端链表。这个双端链表的设计很有意思,你看 sync.pool 源代码会发现跟 redis quicklist 相似,都是链表加数组的设计。
1.14 Pool 数据结构
type Pool struct {noCopy noCopylocal unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocallocalSize uintptr // size of the local arrayvictim unsafe.Pointer // local from previous cyclevictimSize uintptr // size of victims array// New optionally specifies a function to generate// a value when Get would otherwise return nil.// It may not be changed concurrently with calls to Get.New func() interface{}}// Local per-P Pool appendix.type poolLocalInternal struct {private interface{} // Can be used only by the respective P.shared poolChain // Local P can pushHead/popHead; any P can popTail.}type poolLocal struct {poolLocalInternal// Prevents false sharing on widespread platforms with// 128 mod (cache line size) = 0 .pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte}
Pool.local 实际上是一个类型 [P]poolLocal 数组,数组长度是调度器中 P 的数量,也就是说每一个 P 有自己独立的 poolLocal。通过 P.id 来获取每个 P 自己独立的 poolLocal。在 poolLocal 中有一个 poolChain。
这里我们先忽略其余的机构,重点关注poolLocalInternal.shared 这个字段。poolChain 是一个双端队列链,缓存对象。 1.12 版本中对于这个字段的并发安全访问是通过 mutex 加锁实现的;1.14 优化后通过 poolChain(无锁化) 实现的。
这里我们先重点分析一下 poolChain 是怎么实现并发无锁编程的。
poolChain
type poolChain struct {// head is the poolDequeue to push to. This is only accessed// by the producer, so doesn't need to be synchronized.head *poolChainElt// tail is the poolDequeue to popTail from. This is accessed// by consumers, so reads and writes must be atomic.tail *poolChainElt}type poolChainElt struct {poolDequeue// next and prev link to the adjacent poolChainElts in this// poolChain.//// next is written atomically by the producer and read// atomically by the consumer. It only transitions from nil to// non-nil.//// prev is written atomically by the consumer and read// atomically by the producer. It only transitions from// non-nil to nil.next, prev *poolChainElt}
poolChain是一个动态大小的双向链接列表的双端队列。每个出站队列的大小是前一个队列的两倍。也就是说 poolChain 里面每个元素 poolChainElt 都是一个双端队列。
head 指向的 poolChainElt,是用于 Producer 去 Push 元素的,不需要做同步处理。
tail 指向的 poolChainElt,是用于 Consumer 从 tail 去 pop 元素的,这里的读写需要保证原子性。
简单来说,poolChain是一个单 Producer,多 Consumer 并发访问的双端队列链。
对于poolChain中的每一个双端队列 poolChainElt,包含了双端队列实体poolDequeue 一起前后链接的指针。
poolChain 主要方法有:
popHead() (interface{}, bool);pushHead(val interface{})popTail() (interface{}, bool)
popHead和pushHead函数是给 Producer 调用的;popTail是给 Consumer 并发调用的。
poolChain.popHead()
前面我们说了,poolChain的 head 指针的操作是单 Producer 的。
func (c *poolChain) popHead() (interface{}, bool) {d := c.headfor d != nil {if val, ok := d.popHead(); ok {return val, ok}// There may still be unconsumed elements in the// previous dequeue, so try backing up.d = loadPoolChainElt(&d.prev)}return nil, false}
poolChain 要求,popHead函数只能被 Producer 调用。看一下逻辑:
- 获取头结点 head;
 - 如果头结点非空就从头节点所代表的双端队列
poolDequeue中调用popHead函数。注意这里poolDequeue的popHead函数和poolChain的popHead函数并不一样。poolDequeue是一个固定 size 的 ring buffer。 - 如果从 head 中拿到了 value,就直接返回;
 - 如果从 head 中拿不到 value,就从 head.prev 再次尝试获取;
 - 最后都获取不到,就返回 nil。
 
poolChain.pushHead()
func (c *poolChain) pushHead(val interface{}) {d := c.headif d == nil {// Initialize the chain.const initSize = 8 // Must be a power of 2d = new(poolChainElt)d.vals = make([]eface, initSize)c.head = dstorePoolChainElt(&c.tail, d)}if d.pushHead(val) {return}// The current dequeue is full. Allocate a new one of twice// the size.newSize := len(d.vals) * 2if newSize >= dequeueLimit {// Can't make it any bigger.newSize = dequeueLimit}d2 := &poolChainElt{prev: d}d2.vals = make([]eface, newSize)c.head = d2storePoolChainElt(&d.next, d2)d2.pushHead(val)}
poolChain 要求,pushHead函数同样只能被 Producer 调用。看一下逻辑:
- 首先还是获取头结点 head;
 - 如果头结点为空,需要初始化 chain
- 创建
poolChainElt节点,作为 head, 当然也是 tail。 poolChainElt其实也是固定 size 的双端队列poolDequeue,size 必须是 2 的 n 次幂。
 - 创建
 - 调用
poolDequeue的pushHead函数将 val push 进 head 的双端队列poolDequeue。 - 如果 push 失败了,说明双端队列满了,需要重新创建一个双端队列 d2,新的双端队列的 size 是前一个双端队列 size 的 2 倍;
 - 更新 poolChain 的 head 指向最新的双端队列,并且建立双链关系;
 - 然后将 val push 到最新的双端队列。
 
这里需要注意一点的是 head 其实是指向最后 chain 中最后一个结点 (poolDequeue),chain 执行 push 操作是往最后一个节点 push。 所以这里的 head 的语义不是针对链表结构,而是针对队列结构。
poolChain.popTail()
func (c *poolChain) popTail() (interface{}, bool) {d := loadPoolChainElt(&c.tail)if d == nil {return nil, false}for {// It's important that we load the next pointer// *before* popping the tail. In general, d may be// transiently empty, but if next is non-nil before// the pop and the pop fails, then d is permanently// empty, which is the only condition under which it's// safe to drop d from the chain.d2 := loadPoolChainElt(&d.next)if val, ok := d.popTail(); ok {return val, ok}if d2 == nil {// This is the only dequeue. It's empty right// now, but could be pushed to in the future.return nil, false}// The tail of the chain has been drained, so move on// to the next dequeue. Try to drop it from the chain// so the next pop doesn't have to look at the empty// dequeue again.if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {// We won the race. Clear the prev pointer so// the garbage collector can collect the empty// dequeue and so popHead doesn't back up// further than necessary.storePoolChainElt(&d2.prev, nil)}d = d2}}
poolChain 要求,popTail函数能被任何 P 调用,也就是所有的 P 都是 Consumer。这里总结下,当前 G 所对应的 P 在 Pool 里面是 Producer 角色,任何 P 都是 Consumer 角色。
popTail函数是并发调用的,所以需要特别注意。
- 首先需要原子的 load chain 的 tail 指向的双端队列 d(
poolDequeue); - 如果 d 为空,pool 还是空,所以直接 return nil
 - 下面就是典型的无锁原子化编程:进入一个 for 循环
- 首先就是获取 tail 的 next 结点 d2。这里需要强调一下为什么需要在 tail 执行 popTail 之前先 load tail 的 next 结点。
- tail 有可能存在短暂性为空的场景。比如 head 和 tail 实际指向同一个结点 (双端队列) 时候,可能 tail 为空只是暂时的,因为存在有线程往 head push 数据的情况。
 - 如果因为 tail 执行 popTail() 时因为 tail 为空而失败了,然后再 load tail.next,发现 tail.next 非空,再将 tail 原子切换到 tail.next,这个时候就会出现错误了。假设 tail 和 head 指向同一个结点,在判断 tail 是空之后,head 往里面插入了很多个数据,直接将 tail 结点打满,然后 head 指向下一个结点了。这时候 tail.next 也非空了。然后就将 tail 更新到 tail.next,就会导致丢数据了。
 - 所以必须在:1)tail 执行 popTail 之前 tail.next 是非空的,2)tail 执行 popTail 时发现 tail 是空的。满足这两个条件才能说明 tail 是永久性是空的。也就是需要提前 load tail.next 指针。
 
 - 如果从 tail 里面 pop 数据成功,就直接返回 val。
 - 如果从 tail 里面 pop 数据失败,并且 d2 也是空,说明当前 chain 里面只有一个结点,并且是空。直接返回 nil
 - 如果从 tail 里面 pop 数据失败并且 d2 非空,说明 tail 已经被 drain 干净了,原子的 tail 到 tail.next,并清除双向链表关系。
 - 从 d2 开始新的一轮 for 循环。
 
 - 首先就是获取 tail 的 next 结点 d2。这里需要强调一下为什么需要在 tail 执行 popTail 之前先 load tail 的 next 结点。
 
上面的流程是典型的的无锁并发编程。
poolDequeue
poolChain 中每一个结点都是一个双端队列poolDequeue。
poolDequeue是一个无锁的、固定 size 的、单 Producer、多 Consumer 的 deque。只有一个 Producer 可以从 head 去 push 或则 pop;多个 Consumer 可以从 tail 去 pop。
数据结构
type poolDequeue struct {// 用高32位和低32位分别表示head和tail// head是下一个fill的slot的index;// tail是deque中最老的一个元素的index// 队列中有效元素是[tail, head)headTail uint64vals []eface}type eface struct {typ, val unsafe.Pointer}
这里通过一个字段 headTail 来表示 head 和 tail 的 index。headTail是 8 个字节 64 位。
- 高 32 位表示 head;
 - 低 32 位表示 tail。
 - head 和 tail 自加溢出时是安全的。
 
vals是一个固定 size 的 slice,其实也就是一个 ring buffer,size 必须是 2 的次幂 (为了做位运算);
pack/unpack
一个字段 headTail 来表示 head 和 tail 的 index,所以需要有具体的 pack 和 unpack 逻辑:
const dequeueBits = 32func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {const mask = 1<<dequeueBits - 1head = uint32((ptrs >> dequeueBits) & mask)tail = uint32(ptrs & mask)return}func (d *poolDequeue) pack(head, tail uint32) uint64 {const mask = 1<<dequeueBits - 1return (uint64(head) << dequeueBits) |uint64(tail&mask)}
pack:
- 首先拿到 mask,这里实际上就是 0xffffffff(2^32-1)
 - head 左移 32 位 | tail&0xffffffff 就可以得到 head 和 tail pack 之后的值。
 
unpack:
- 首先拿到 mask,这里实际上就是 0xffffffff(2^32-1)
 - ptrs 右移 32 位拿到高 32 位然后 & mask 就可以得到 head;
 - ptrs 直接 & mask 就可以得到低 32 位,也就是 tail。
 
poolDequeue.pushHead
pushHead 将 val push 到 head 指向的位置,如果 deque 满了,就返回 false。
func (d *poolDequeue) pushHead(val interface{}) bool {ptrs := atomic.LoadUint64(&d.headTail)head, tail := d.unpack(ptrs)if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {// Queue is full.return false}slot := &d.vals[head&uint32(len(d.vals)-1)]// Check if the head slot has been released by popTail.typ := atomic.LoadPointer(&slot.typ)if typ != nil {// Another goroutine is still cleaning up the tail, so// the queue is actually still full.return false}// The head slot is free, so we own it.if val == nil {val = dequeueNil(nil)}*(*interface{})(unsafe.Pointer(slot)) = val// Increment head. This passes ownership of slot to popTail// and acts as a store barrier for writing the slot.atomic.AddUint64(&d.headTail, 1<<dequeueBits)return true}
主要逻辑:
- 原子 load head 和 tail,
 - 如果 tail + len(vals) == head 说明 deque 已经满了。
 - 拿到 head 在 vals 中 index 的 slot
 - 如果 slot 的 type 非空,说明该 slot 还没有被 popTail release,实际上 deque 还是满的;所以直接 return false;
 - 更新 val 到 slot 的指针指向的值。
 - 原子的自加 head
 
需要注意的是,pushHead 不是并发安全的,只能有一个 Producer 去执行;只有 slot 的的 type 指针为空时候 slot 才是空。
poolDequeue.popHead
popHead 将 head 指向的前一个位置弹出,如果 deque 是空,就返回 false。
func (d *poolDequeue) popHead() (interface{}, bool) {var slot *efacefor {ptrs := atomic.LoadUint64(&d.headTail)head, tail := d.unpack(ptrs)if tail == head {// Queue is empty.return nil, false}// Confirm tail and decrement head. We do this before// reading the value to take back ownership of this// slot.head--ptrs2 := d.pack(head, tail)if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {// We successfully took back slot.slot = &d.vals[head&uint32(len(d.vals)-1)]break}}val := *(*interface{})(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// Zero the slot. Unlike popTail, this isn't racing with// pushHead, so we don't need to be careful here.*slot = eface{}return val, true}
主要逻辑:
- 由于从 head 前一个位置 pop 元素,可能会与 tail 位置 pop 冲突,所以不可避免的需要 cas 操作。所以最开始进入就是一个 for 循环;
 - 原子 load 
poolDequeue.headTail然后 unpack 拿到 head 和 tail - 如果 head == tail,表示 deque 是空,直接 return nil.
 - head –
 - 根据新的 head 和老的 tail, 重新 pack 出 ptrs2;
 - 原子 cas 更新
poolDequeue.headTail,atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2), - 如果更新成功,就拿到 head 执行的 slot,并获取到实际的 value,并 return;
 - 如果原子更新失败了,重新进入 for 循环再次执行。
 
poolDequeue.popTail
这个函数是可以被 Consumer 并发访问的。
func (d *poolDequeue) popTail() (interface{}, bool) {var slot *efacefor {ptrs := atomic.LoadUint64(&d.headTail)head, tail := d.unpack(ptrs)if tail == head {// Queue is empty.return nil, false}// Confirm head and tail (for our speculative check// above) and increment tail. If this succeeds, then// we own the slot at tail.ptrs2 := d.pack(head, tail+1)if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {// Success.slot = &d.vals[tail&uint32(len(d.vals)-1)]break}}// We now own slot.val := *(*interface{})(unsafe.Pointer(slot))if val == dequeueNil(nil) {val = nil}// Tell pushHead that we're done with this slot. Zeroing the// slot is also important so we don't leave behind references// that could keep this object live longer than necessary.//// We write to val first and then publish that we're done with// this slot by atomically writing to typ.slot.val = nilatomic.StorePointer(&slot.typ, nil)// At this point pushHead owns the slot.return val, true}
主要逻辑:
- 并发访问,所以与 cas 相关的 for 循环不可少;
 - 原子 load,拿到 head 和 tail 值;
 - 将 (tail+1) 和 head 重新 pack 成 ptrs2;
 - CAS:
atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2); 如果更新成功,就拿到 vals[tail]t 的指针。如果失败就再次返回 1 的 for 循环。 - 拿到 slot 对应的 val。
 - 将 slot 的 val 和 type 都清为 nil, 告诉 pushHead, slot 我们已经使用完了,pushHead 可以往里面填充数据了。
 
数据结构总结
用一张图完整描述 sync.Pool 的数据结构:
强调一点:
- head 的操作只能是 local P;
 - tail 的操作是任意 P;
 
参考网上一张图来看更加清晰:
Pool 并没有直接使用 poolDequeue,因为它是 fixed size 的,而 Pool 的大小是没有限制的。因此,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,可以动态增长。
Pool.Put
func (p *Pool) Put(x interface{}) {if x == nil {return}l, _ := p.pin()if l.private == nil {l.private = xx = nil}if x != nil {l.shared.pushHead(x)}runtime_procUnpin()}func (p *Pool) pin() (*poolLocal, int) {pid := runtime_procPin()// In pinSlow we store to local and then to localSize, here we load in opposite order.// Since we've disabled preemption, GC cannot happen in between.// Thus here we must observe local at least as large localSize.// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).s := atomic.LoadUintptr(&p.localSize) // load-acquirel := p.local // load-consumeif uintptr(pid) < s {return indexLocal(l, pid), pid}return p.pinSlow()}func (p *Pool) pinSlow() (*poolLocal, int) {// Retry under the mutex.// Can not lock the mutex while pinned.runtime_procUnpin()allPoolsMu.Lock()defer allPoolsMu.Unlock()pid := runtime_procPin()// poolCleanup won't be called while we are pinned.s := p.localSizel := p.localif uintptr(pid) < s {return indexLocal(l, pid), pid}if p.local == nil {allPools = append(allPools, p)}// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.size := runtime.GOMAXPROCS(0)local := make([]poolLocal, size)atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-releaseatomic.StoreUintptr(&p.localSize, uintptr(size)) // store-releasereturn &local[pid], pid}
Put 函数主要逻辑:
- 先调用
p.pin()函数,这个函数会将当前 goroutine 与 P 绑定,并设置当前 g 不可被抢占 (也就不会出现多个协程并发读写当前 P 上绑定的数据);- 在
p.pin()函数里面还会 check per P 的[P]poolLocal 数组是否发生了扩容 (P 扩张)。 - 如果发生了扩容,需要调用
pinSlow()来执行具体扩容。扩容获取一个调度器全局大锁allPoolsMu,然后根据当前最新的 P 的数量去执行新的扩容。这里的成本很高,所以尽可能避免手动增加 P 的数量。 
 - 在
 - 拿到 per P 的 poolLocal 后,优先将 val put 到 private,如果 private 已经存在,就通过调用
shared.pushHead(x)塞到 poolLocal 里面的无锁双端队列的 chain 中。Put 函数对于双端队列来说是作为一个 Producer 角色,所以这里的调用是无锁的。 - 最后解除当前 goroutine 的禁止抢占。
 
Pool.Get
func (p *Pool) Get() interface{} {l, pid := p.pin()x := l.privatel.private = nilif x == nil {// Try to pop the head of the local shard. We prefer// the head over the tail for temporal locality of// reuse.x, _ = l.shared.popHead()if x == nil {x = p.getSlow(pid)}}runtime_procUnpin()if x == nil && p.New != nil {x = p.New()}return x}func (p *Pool) getSlow(pid int) interface{} {// See the comment in pin regarding ordering of the loads.size := atomic.LoadUintptr(&p.localSize) // load-acquirelocals := p.local // load-consume// Try to steal one element from other procs.for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i+1)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// Try the victim cache. We do this after attempting to steal// from all primary caches because we want objects in the// victim cache to age out if at all possible.size = atomic.LoadUintptr(&p.victimSize)if uintptr(pid) >= size {return nil}locals = p.victiml := indexLocal(locals, pid)if x := l.private; x != nil {l.private = nilreturn x}for i := 0; i < int(size); i++ {l := indexLocal(locals, (pid+i)%int(size))if x, _ := l.shared.popTail(); x != nil {return x}}// Mark the victim cache as empty for future gets don't bother// with it.atomic.StoreUintptr(&p.victimSize, 0)return nil}
Get 函数主要逻辑:
- 设置当前 goroutine 禁止抢占;
 - 从 poolLocal 的 private 取,如果 private 不为空直接 return;
 - 从 poolLocal.shared 这个双端队列 chain 里面无锁调用去取,如果取得到也直接 return;
 - 上面都去不到,调用
getSlow(pid)去取- 首先会通过 steal 算法,去别的 P 里面的 poolLocal 去取,这里的实现是无锁的 cas。如果能够 steal 一个过来,就直接 return;
 - 如果 steal 不到,则从 victim 里找,和 poolLocal 的逻辑类似。最后,实在没找到,就把 victimSize 置 0,防止后来的 “人” 再到 victim 里找。
 
 - 最后还拿不到,就通过 New 函数来创建一个新的对象。
 
这里是一个很明显的多层级缓存优化 + GPM 调度结合起来。
private -> shared -> steal from other P -> victim cache -> New
victim cache 优化与 GC
对于 Pool 来说并不能够无上限的扩展,否则对象占用内存太多了,会引起内存溢出。
几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?
这里是使用 GC。在 pool.go 里面的init 函数 会注册清理函数:
func init() {runtime_registerPoolCleanup(poolCleanup)}// mgc.go//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanupfunc sync_runtime_registerPoolCleanup(f func()) {poolcleanup = f}
编译器会把 runtime_registerPoolCleanup 函数调用链接到 mgc.go 里面的 sync_runtime_registerPoolCleanup函数调用,实际上注册到 poolcleanup 函数。整个调用链如下:
gcStart() -> clearpools() -> poolcleanup()
也就是每一轮 GC 开始都会执行 pool 的清除操作。
func poolCleanup() {// This function is called with the world stopped, at the beginning of a garbage collection.// It must not allocate and probably should not call any runtime functions.// Because the world is stopped, no pool user can be in a// pinned section (in effect, this has all Ps pinned).// Drop victim caches from all pools.for _, p := range oldPools {p.victim = nilp.victimSize = 0}// Move primary cache to victim cache.for _, p := range allPools {p.victim = p.localp.victimSize = p.localSizep.local = nilp.localSize = 0}// The pools with non-empty primary caches now have non-empty// victim caches and no pools have primary caches.oldPools, allPools = allPools, nil}
poolCleanup 会在 STW 阶段被调用。整体看起来,比较简洁。主要是将 local 和 victim 作交换,这样也就不致于让 GC 把所有的 Pool 都清空了,有 victim 在 “兜底”。
重点:如果 sync.Pool 的获取、释放速度稳定,那么就不会有新的池对象进行分配。如果获取的速度下降了,那么对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。
在 Go1.13 之前的 poolCleanup 比较粗暴,直接清空了所有 Pool 的 p.local 和 poolLocal.shared。
通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。
所以 p.victim 的作用其实就是次级缓存。
sync.Pool 总结
- 关键思想是对象的复用,避免重复创建、销毁。将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力。
 - sync.Pool 是协程安全的,使用起来非常方便。设置好 New 函数后,调用 Get 获取,调用 Put 归还对象。
 - 不要对 Get 得到的对象有任何假设,默认 Get 到对象是一个空对象,Get 之后手动初始化。
 - 好的实践是:Put 操作执行前将对象 “清空”,并且确保对象被 Put 进去之后不要有任何的指针引用再次使用,不然极大概率导致 data race。
 - 第三和第四也就是考虑清楚复用对象的生命周期
 - Pool 里对象的生命周期受 GC 影响,不适合于做连接池,因为连接池需要自己管理对象的生命周期。
 - Pool 不可以指定⼤⼩,⼤⼩只受制于 GC 临界值。
 - procPin 将 G 和 P 绑定,防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
 - 在加入 victim 机制前,sync.Pool 里对象的最⼤缓存时间是一个 GC 周期,当 GC 开始时,没有被引⽤的对象都会被清理掉;加入 victim 机制后,最大缓存时间为两个 GC 周期。
 - Victim Cache 本来是计算机架构里面的一个概念,是 CPU 硬件处理缓存的一种技术,sync.Pool 引入的意图在于降低 GC 压力的同时提高命中率。
 - sync.Pool 的最底层使用切片加链表来实现双端队列,并将缓存的对象存储在切片中。
 - sync.Pool 的设计理念,包括:无锁、操作对象隔离、原子操作代替锁、行为隔离——链表、Victim Cache 降低 GC 开销。
 
参考文档:
理解 Go 1.13 中 sync.Pool 的设计与实现
golang 新版如何优化 sync.pool 锁竞争消耗?
深度解密 Go 语言之 sync.Pool
最新评论
- etcd Backend 存储引擎实现原理
Kur623: 膜拜大佬 - Golang map 实践以及实现原理
dota 爱好者: 讲的详细又通俗易懂,一级棒! - Golang-gopark 函数和 goready 函数原理分析
东泽 XD: 谁知道传进来的那把锁有啥用,作用在哪 - golang unsafe.Pointer 使用原则以及 uintptr 隐藏的坑
hcb0825: [code=golang] func foo() { p0, y, z := createInt(), createInt(), createInt() var p1 = unsafe.Pointer(y) // 和 y 一样引用着同一个值 var p2 = uintptr(unsafe.Pointer(z)) // 此时,即使 z 指针值所引用的 int 值的地址仍旧存储 // 在 p2 值中,但是此 int 值已经不再被使用了,所以垃圾 // 回收器认为可以回收它所占据的内存块了。另一方面, // p0 和 p1 各自所引用的 int 值仍旧将在下面被使用。 // uintptr 值可以参与算术运算。 p2 += 2; p2—; p2— p0 = 1 // okay (int)(p1) = 2 // okay (int)(unsafe.Pointer(p2)) = 3 // 危险操作! fmt.Println((*int)(unsafe.Pointer(p2))) } [/code] 这段代码我在 go1.14.3 版本下能正常运行 - golang 逃逸分析与栈、堆分配分析
hack0072005: 学习,收藏了 
最新文章

目录
- golang sync.Pool 在 1.14 中的优化
 - sync.Pool 在 1.12 中实现的原理简述
 - 1.14 Pool 数据结构
 
- Pool.Put
 - Pool.Get
 - victim cache 优化与 GC
 - sync.Pool 总结
 
