Pool 是可伸缩、并发安全的临时对象池,用来存放已经分配但暂时不用的临时对象,通过对象重用机制,缓解 GC 压力,提高程序性能。

一个比较好的例子是 fmt 包,fmt 包总是需要使用一些 []byte 之类的对象,Golang 建立了一个临时对象池,存放着这些对象,如果需要使用一个 []byte,就去 Pool 中取,如果拿不到就分配一个。这比起不停生成新的[]byte,用完了再等待 GC 回收要高效得多。

注意:
sync.Pool 是一个临时的对象池,适用于储存一些会在 goroutine 间共享的临时对象,其中保存的任何项都可能随时不做通知地释放掉。
临时对象是它的特点,对于数据库等长连接就不合适,因为它保存的对象会在未来某个时刻被移除掉。而且如果没有别的对象引用这个被移除的对象的话,那么这个被移除的对象就会被 GC 回收掉。
所以不适合用于存放诸如 socket 长连接或数据库连接的对象。

Pool 的数据结构

  1. type Pool struct {
  2. // 用于检测 Pool 池是否被 copy,因为 Pool 不希望被 copy。用这个字段可以在 go vet 工具中检测出被 copy(在编译期间就发现问题)
  3. noCopy noCopy // A Pool must not be copied after first use.
  4. // 实际指向 []poolLocal,数组大小等于 P 的数量;每个 P 一一对应一个 poolLocal
  5. local unsafe.Pointer
  6. localSize uintptr // []poolLocal 的大小
  7. // GC 时,victim 和 victimSize 会分别接管 local 和 localSize;
  8. // victim 的目的是为了减少 GC 后冷启动导致的性能抖动,让分配对象更平滑;
  9. victim unsafe.Pointer
  10. victimSize uintptr
  11. // 对象初始化构造方法,使用方定义
  12. New func() interface{}
  13. }
  14. //从 Pool 中获取元素,元素数量 -1,当 Pool 中没有元素时,会调用 New 生成元素,新元素不会放入 Pool 中,若 New 未定义,则返回 nil
  15. func (p *Pool) Get() interface{}
  16. //往 Pool 中添加元素 x
  17. func (p *Pool) Put(x interface{})

sync.Pool 的基本使用

  1. type UserInfo struct {
  2. Name string
  3. Age int64
  4. }
  5. var userInfoBuf, _ = json.Marshal(UserInfo{Name: "Hello", Age: 18})
  6. func main() {
  7. // 声明对象池
  8. pool := sync.Pool{New: func() any {
  9. return new(UserInfo)
  10. }}
  11. // 从对象池中获取对象,返回 interface{},避免了频繁创建对象
  12. userInfo := pool.Get().(*UserInfo)
  13. // 将 userInfoBuf 反序列化到 userInfo
  14. err := json.Unmarshal(userInfoBuf, userInfo)
  15. if err != nil {
  16. return
  17. }
  18. fmt.Printf("%v\n", *userInfo)
  19. // 使用完的对象,重新返回对象池
  20. pool.Put(userInfo)
  21. }

上面代码声明了一个对象池,如果对象池不存在,将会使用 New 函数创建。
通过 Get 方法从对象池获取对象,再将反序列化的内容赋值到对象中,一旦对象使用完毕,通过 Put 方法将对象返回对象池中。
使用 sync.Pool 有两个知识点:

  • sync.Pool 是线程安全的,多个 goroutine 可以并发地调用存取对象;
  • sync.Pool 不允许复制使用。

    实现

    ```go package main

import ( “bytes” “io” “os” “sync” “time” )

var bufPool = sync.Pool { New: func() interface{} { return new(bytes.Buffer) }, }

func PoolTest(w io.Writer, key, val string) { b, _ := bufPool.Get().(*bytes.Buffer) b.Reset() b.WriteString(time.Now().UTC().Format(“2006-01-02 15:04:05”)) b.WriteByte(‘|’) b.WriteString(key) b.WriteByte(‘=’) b.WriteString(val) w.Write(b.Bytes()) w.Write([]byte(“\n”)) bufPool.Put(b) }

func main() { PoolTest(os.Stdout, “dablelv”, “monkey”) // 2023-04-17 15:23:56|dablelv=monkey }

  1. ```go
  2. package main
  3. import (
  4. "errors"
  5. "io"
  6. "log"
  7. "math/rand"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. //"flysnow.org/hello/common"
  12. )
  13. //一个安全的资源池,被管理的资源必须都实现io.Close接口
  14. type Pool struct {
  15. m sync.Mutex
  16. res chan io.Closer
  17. factory func() (io.Closer, error)
  18. closed bool
  19. }
  20. var ErrPoolClosed = errors.New("资源池已经被关闭。")
  21. //创建一个资源池
  22. func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
  23. if size <= 0 {
  24. return nil, errors.New("size的值太小了。")
  25. }
  26. return &Pool{
  27. factory: fn,
  28. res: make(chan io.Closer, size),
  29. }, nil
  30. }
  31. //从资源池里获取一个资源
  32. func (p *Pool) Acquire() (io.Closer, error) {
  33. select {
  34. case r, ok := <-p.res:
  35. log.Println("Acquire:共享资源")
  36. if !ok {
  37. return nil, ErrPoolClosed
  38. }
  39. return r, nil
  40. default:
  41. log.Println("Acquire:新生成资源")
  42. return p.factory()
  43. }
  44. }
  45. //关闭资源池,释放资源
  46. func (p *Pool) Close() {
  47. p.m.Lock()
  48. defer p.m.Unlock()
  49. if p.closed {
  50. return
  51. }
  52. p.closed = true
  53. //关闭通道,不让写入了
  54. close(p.res)
  55. //关闭通道里的资源
  56. for r := range p.res {
  57. r.Close()
  58. }
  59. }
  60. func (p *Pool) Release(r io.Closer) {
  61. //保证该操作和Close方法的操作是安全的
  62. p.m.Lock()
  63. defer p.m.Unlock()
  64. //资源池都关闭了,就省这一个没有释放的资源了,释放即可
  65. if p.closed {
  66. r.Close()
  67. return
  68. }
  69. select {
  70. case p.res <- r:
  71. log.Println("资源释放到池子里了")
  72. default:
  73. log.Println("资源池满了,释放这个资源吧")
  74. r.Close()
  75. }
  76. }
  77. const (
  78. //模拟的最大goroutine
  79. maxGoroutine = 5
  80. //资源池的大小
  81. poolRes = 2
  82. )
  83. func main() {
  84. //等待任务完成
  85. var wg sync.WaitGroup
  86. wg.Add(maxGoroutine)
  87. p, err := New(createConnection, poolRes)
  88. if err != nil {
  89. log.Println(err)
  90. return
  91. }
  92. // 模拟好几个goroutine同时使用资源池查询数据
  93. for query := 0; query < maxGoroutine; query++ {
  94. go func(q int) {
  95. dbQuery(q, p)
  96. wg.Done()
  97. }(query)
  98. }
  99. wg.Wait()
  100. log.Println("开始关闭资源池")
  101. p.Close()
  102. }
  103. //模拟数据库查询
  104. func dbQuery(query int, pool *Pool) {
  105. conn, err := pool.Acquire()
  106. if err != nil {
  107. log.Println(err)
  108. return
  109. }
  110. // 防止忘记释放资源,
  111. defer pool.Release(conn)
  112. //模拟查询
  113. time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
  114. log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)
  115. }
  116. //数据库连接
  117. type dbConnection struct {
  118. ID int32//连接的标志
  119. }
  120. // 实现io.Closer接口
  121. func (db *dbConnection) Close() error {
  122. log.Println("关闭连接", db.ID)
  123. return nil
  124. }
  125. var idCounter int32
  126. // 生成数据库连接的方法,以供资源池使用,这个函数符合Pool中的factory的类型,
  127. func createConnection() (io.Closer, error) {
  128. //并发安全,给数据库连接生成唯一标志
  129. id := atomic.AddInt32(&idCounter, 1)
  130. return &dbConnection{id}, nil
  131. }

参考