利用 goroutine 和 channel 进行 go 的并发模式,实现一个资源池实例,资源池可以存储一定数量的资源,用户程序从资源池获取资源进行使用,使用完成将资源释放回资源池。
资源池结构
m: 互斥锁,这主要是用来保证在多个 goroutine 访问资源时,池内的值是安全的; res:一个有缓冲的通道,用来保存共享的资源,这个通道的大小,在初始化 Pool 的时候就指定的;
- 通道的类型是 io.Closer 接口,所以实现了这个 io.Closer 接口的类型都可以作为资源交给我们的资源池管理
 factory:一个函数类型,它的作用就是当需要一个新的资源时,可以通过这个函数创建;
- 只负责生成新资源的,至于如何生成、生成什么资源由使用者决定
 closed:表示资源池是否被关闭; timeout:资源池获取资源时的超时时间;
type Pool struct {m sync.Mutexres chan io.Closerfactory func() (io.Closer, error)closed booltimeout <-chan time.Time}
资源池方法
var (ErrPoolClosed = errors.New("资源池已经关闭") // 资源池关闭标志ErrTimeout = errors.New("获取资源超时") // 超时标志)//新建资源池func New(fn func() (io.Closer, error), size int) (*Pool, error) {if size <= 0 {return nil, errors.New("新建资源池大小太小")}//新建资源池p := Pool{factory: fn,res: make(chan io.Closer, size),}//向资源池循环添加资源,直到池满for count := 1; count <= cap(p.res); count++ {r, err := fn()if err != nil {fmt.Println("添加资源失败,创建资源方法返回nil")break}fmt.Println("资源加入资源池")p.res <- r}fmt.Println("资源池已满,返回资源池")return &p, nil}//获取资源func (p *Pool) Acquire(d time.Duration) (io.Closer, error) {//设置超时时间p.timeout = time.After(d)select {case r, ok := <-p.res:fmt.Println("获取", "共享资源")if !ok {return nil, ErrPoolClosed}return r, nilcase <-p.timeout:return nil, ErrTimeout}}//放回资源池func (p *Pool) Release(r io.Closer) {//上互斥锁,和Close方法对应,不同时操作p.m.Lock()defer p.m.Unlock()if p.closed {r.Close()return}//资源放回队列select {case p.res <- r:fmt.Println("资源放回队列")default:fmt.Println("资源队列已满,释放资源")r.Close()}}//关闭资源池func (p *Pool) Close() {//互斥锁,保证同步,和Release方法相关,用同一把锁p.m.Lock()defer p.m.Unlock()if p.closed {return}p.closed = true//清空通道资源之前,将通道关闭,否则引起死锁close(p.res)for r := range p.res {r.Close()}}
测试用例
package poolimport ("fmt""io""math/rand""sync""sync/atomic""testing""time")const (maxGoroutines = 25pooledResources = 2)var idCounter int32 //给每个连接资源给id//实现接口类型 资源类型type dbConnection struct {ID int32}//实现接口方法func (conn *dbConnection) Close() error {fmt.Printf("资源关闭,ID:%d\n", conn.ID)return nil}//创建新资源func createConnection() (io.Closer, error) {id := atomic.AddInt32(&idCounter, 1)fmt.Printf("创建新资源,id:%d\n", id)return &dbConnection{ID: id}, nil}//测试资源池func performQueries(query int, p *Pool) {conn, err := p.Acquire(10 * time.Second)if err != nil {fmt.Println("获取资源超时")fmt.Println(err)return}//方法结束后将资源放进资源池defer p.Release(conn)//模拟使用资源time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)fmt.Printf("查询goroutine id:%d,资源ID:%d\n", query, conn.(*dbConnection).ID)}func TestPool(t *testing.T) {var wg sync.WaitGroupwg.Add(maxGoroutines)p, err := New(createConnection, pooledResources)if err != nil {fmt.Println(err)}//每个goroutine一个查询,每个查询从资源池中获取资源for query := 0; query < maxGoroutines; query++ {go func(q int) {performQueries(q, p)wg.Done()}(query)}//主线程等待wg.Wait()fmt.Println("程序结束")//释放资源p.Close()}
输出结果

