Pool 是可伸缩、并发安全的临时对象池,用来存放已经分配但暂时不用的临时对象,通过对象重用机制,缓解 GC 压力,提高程序性能。
一个比较好的例子是 fmt 包,fmt 包总是需要使用一些 []byte 之类的对象,Golang 建立了一个临时对象池,存放着这些对象,如果需要使用一个 []byte,就去 Pool 中取,如果拿不到就分配一个。这比起不停生成新的[]byte,用完了再等待 GC 回收要高效得多。
注意:
sync.Pool 是一个临时的对象池,适用于储存一些会在 goroutine 间共享的临时对象,其中保存的任何项都可能随时不做通知地释放掉。
临时对象是它的特点,对于数据库等长连接就不合适,因为它保存的对象会在未来某个时刻被移除掉。而且如果没有别的对象引用这个被移除的对象的话,那么这个被移除的对象就会被 GC 回收掉。
所以不适合用于存放诸如 socket 长连接或数据库连接的对象。
Pool 的数据结构
type Pool struct {
// 用于检测 Pool 池是否被 copy,因为 Pool 不希望被 copy。用这个字段可以在 go vet 工具中检测出被 copy(在编译期间就发现问题)
noCopy noCopy // A Pool must not be copied after first use.
// 实际指向 []poolLocal,数组大小等于 P 的数量;每个 P 一一对应一个 poolLocal
local unsafe.Pointer
localSize uintptr // []poolLocal 的大小
// GC 时,victim 和 victimSize 会分别接管 local 和 localSize;
// victim 的目的是为了减少 GC 后冷启动导致的性能抖动,让分配对象更平滑;
victim unsafe.Pointer
victimSize uintptr
// 对象初始化构造方法,使用方定义
New func() interface{}
}
//从 Pool 中获取元素,元素数量 -1,当 Pool 中没有元素时,会调用 New 生成元素,新元素不会放入 Pool 中,若 New 未定义,则返回 nil
func (p *Pool) Get() interface{}
//往 Pool 中添加元素 x
func (p *Pool) Put(x interface{})
sync.Pool 的基本使用
type UserInfo struct {
Name string
Age int64
}
var userInfoBuf, _ = json.Marshal(UserInfo{Name: "Hello", Age: 18})
func main() {
// 声明对象池
pool := sync.Pool{New: func() any {
return new(UserInfo)
}}
// 从对象池中获取对象,返回 interface{},避免了频繁创建对象
userInfo := pool.Get().(*UserInfo)
// 将 userInfoBuf 反序列化到 userInfo
err := json.Unmarshal(userInfoBuf, userInfo)
if err != nil {
return
}
fmt.Printf("%v\n", *userInfo)
// 使用完的对象,重新返回对象池
pool.Put(userInfo)
}
上面代码声明了一个对象池,如果对象池不存在,将会使用 New 函数创建。
通过 Get 方法从对象池获取对象,再将反序列化的内容赋值到对象中,一旦对象使用完毕,通过 Put 方法将对象返回对象池中。
使用 sync.Pool 有两个知识点:
import ( “bytes” “io” “os” “sync” “time” )
var bufPool = sync.Pool { New: func() interface{} { return new(bytes.Buffer) }, }
func PoolTest(w io.Writer, key, val string) { b, _ := bufPool.Get().(*bytes.Buffer) b.Reset() b.WriteString(time.Now().UTC().Format(“2006-01-02 15:04:05”)) b.WriteByte(‘|’) b.WriteString(key) b.WriteByte(‘=’) b.WriteString(val) w.Write(b.Bytes()) w.Write([]byte(“\n”)) bufPool.Put(b) }
func main() { PoolTest(os.Stdout, “dablelv”, “monkey”) // 2023-04-17 15:23:56|dablelv=monkey }
```go
package main
import (
"errors"
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
//"flysnow.org/hello/common"
)
//一个安全的资源池,被管理的资源必须都实现io.Close接口
type Pool struct {
m sync.Mutex
res chan io.Closer
factory func() (io.Closer, error)
closed bool
}
var ErrPoolClosed = errors.New("资源池已经被关闭。")
//创建一个资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size的值太小了。")
}
return &Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil
}
//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.res:
log.Println("Acquire:共享资源")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
log.Println("Acquire:新生成资源")
return p.factory()
}
}
//关闭资源池,释放资源
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
//关闭通道,不让写入了
close(p.res)
//关闭通道里的资源
for r := range p.res {
r.Close()
}
}
func (p *Pool) Release(r io.Closer) {
//保证该操作和Close方法的操作是安全的
p.m.Lock()
defer p.m.Unlock()
//资源池都关闭了,就省这一个没有释放的资源了,释放即可
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
log.Println("资源释放到池子里了")
default:
log.Println("资源池满了,释放这个资源吧")
r.Close()
}
}
const (
//模拟的最大goroutine
maxGoroutine = 5
//资源池的大小
poolRes = 2
)
func main() {
//等待任务完成
var wg sync.WaitGroup
wg.Add(maxGoroutine)
p, err := New(createConnection, poolRes)
if err != nil {
log.Println(err)
return
}
// 模拟好几个goroutine同时使用资源池查询数据
for query := 0; query < maxGoroutine; query++ {
go func(q int) {
dbQuery(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("开始关闭资源池")
p.Close()
}
//模拟数据库查询
func dbQuery(query int, pool *Pool) {
conn, err := pool.Acquire()
if err != nil {
log.Println(err)
return
}
// 防止忘记释放资源,
defer pool.Release(conn)
//模拟查询
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)
}
//数据库连接
type dbConnection struct {
ID int32//连接的标志
}
// 实现io.Closer接口
func (db *dbConnection) Close() error {
log.Println("关闭连接", db.ID)
return nil
}
var idCounter int32
// 生成数据库连接的方法,以供资源池使用,这个函数符合Pool中的factory的类型,
func createConnection() (io.Closer, error) {
//并发安全,给数据库连接生成唯一标志
id := atomic.AddInt32(&idCounter, 1)
return &dbConnection{id}, nil
}