利用 goroutine 和 channel 进行 go 的并发模式,实现一个资源池实例,资源池可以存储一定数量的资源,用户程序从资源池获取资源进行使用,使用完成将资源释放回资源池。

资源池结构

m: 互斥锁,这主要是用来保证在多个 goroutine 访问资源时,池内的值是安全的; res:一个有缓冲的通道,用来保存共享的资源,这个通道的大小,在初始化 Pool 的时候就指定的;

  • 通道的类型是 io.Closer 接口,所以实现了这个 io.Closer 接口的类型都可以作为资源交给我们的资源池管理

factory:一个函数类型,它的作用就是当需要一个新的资源时,可以通过这个函数创建;

  • 只负责生成新资源的,至于如何生成、生成什么资源由使用者决定

closed:表示资源池是否被关闭; timeout:资源池获取资源时的超时时间;

  1. type Pool struct {
  2. m sync.Mutex
  3. res chan io.Closer
  4. factory func() (io.Closer, error)
  5. closed bool
  6. timeout <-chan time.Time
  7. }

资源池方法

  1. var (
  2. ErrPoolClosed = errors.New("资源池已经关闭") // 资源池关闭标志
  3. ErrTimeout = errors.New("获取资源超时") // 超时标志
  4. )
  5. //新建资源池
  6. func New(fn func() (io.Closer, error), size int) (*Pool, error) {
  7. if size <= 0 {
  8. return nil, errors.New("新建资源池大小太小")
  9. }
  10. //新建资源池
  11. p := Pool{
  12. factory: fn,
  13. res: make(chan io.Closer, size),
  14. }
  15. //向资源池循环添加资源,直到池满
  16. for count := 1; count <= cap(p.res); count++ {
  17. r, err := fn()
  18. if err != nil {
  19. fmt.Println("添加资源失败,创建资源方法返回nil")
  20. break
  21. }
  22. fmt.Println("资源加入资源池")
  23. p.res <- r
  24. }
  25. fmt.Println("资源池已满,返回资源池")
  26. return &p, nil
  27. }
  28. //获取资源
  29. func (p *Pool) Acquire(d time.Duration) (io.Closer, error) {
  30. //设置超时时间
  31. p.timeout = time.After(d)
  32. select {
  33. case r, ok := <-p.res:
  34. fmt.Println("获取", "共享资源")
  35. if !ok {
  36. return nil, ErrPoolClosed
  37. }
  38. return r, nil
  39. case <-p.timeout:
  40. return nil, ErrTimeout
  41. }
  42. }
  43. //放回资源池
  44. func (p *Pool) Release(r io.Closer) {
  45. //上互斥锁,和Close方法对应,不同时操作
  46. p.m.Lock()
  47. defer p.m.Unlock()
  48. if p.closed {
  49. r.Close()
  50. return
  51. }
  52. //资源放回队列
  53. select {
  54. case p.res <- r:
  55. fmt.Println("资源放回队列")
  56. default:
  57. fmt.Println("资源队列已满,释放资源")
  58. r.Close()
  59. }
  60. }
  61. //关闭资源池
  62. func (p *Pool) Close() {
  63. //互斥锁,保证同步,和Release方法相关,用同一把锁
  64. p.m.Lock()
  65. defer p.m.Unlock()
  66. if p.closed {
  67. return
  68. }
  69. p.closed = true
  70. //清空通道资源之前,将通道关闭,否则引起死锁
  71. close(p.res)
  72. for r := range p.res {
  73. r.Close()
  74. }
  75. }

测试用例

  1. package pool
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "sync"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. )
  11. const (
  12. maxGoroutines = 25
  13. pooledResources = 2
  14. )
  15. var idCounter int32 //给每个连接资源给id
  16. //实现接口类型 资源类型
  17. type dbConnection struct {
  18. ID int32
  19. }
  20. //实现接口方法
  21. func (conn *dbConnection) Close() error {
  22. fmt.Printf("资源关闭,ID:%d\n", conn.ID)
  23. return nil
  24. }
  25. //创建新资源
  26. func createConnection() (io.Closer, error) {
  27. id := atomic.AddInt32(&idCounter, 1)
  28. fmt.Printf("创建新资源,id:%d\n", id)
  29. return &dbConnection{ID: id}, nil
  30. }
  31. //测试资源池
  32. func performQueries(query int, p *Pool) {
  33. conn, err := p.Acquire(10 * time.Second)
  34. if err != nil {
  35. fmt.Println("获取资源超时")
  36. fmt.Println(err)
  37. return
  38. }
  39. //方法结束后将资源放进资源池
  40. defer p.Release(conn)
  41. //模拟使用资源
  42. time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
  43. fmt.Printf("查询goroutine id:%d,资源ID:%d\n", query, conn.(*dbConnection).ID)
  44. }
  45. func TestPool(t *testing.T) {
  46. var wg sync.WaitGroup
  47. wg.Add(maxGoroutines)
  48. p, err := New(createConnection, pooledResources)
  49. if err != nil {
  50. fmt.Println(err)
  51. }
  52. //每个goroutine一个查询,每个查询从资源池中获取资源
  53. for query := 0; query < maxGoroutines; query++ {
  54. go func(q int) {
  55. performQueries(q, p)
  56. wg.Done()
  57. }(query)
  58. }
  59. //主线程等待
  60. wg.Wait()
  61. fmt.Println("程序结束")
  62. //释放资源
  63. p.Close()
  64. }

输出结果

image.png