项目地址:https://github.com/panjf2000/ants

ants是一个高性能的协程池,实现了对大规模goroutine的调度管理、goroutine复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。

功能:

  • 实现了自动调度并发的goroutine,复用goroutine
  • 定时清理过期的goroutine,进一步节省资源
  • 提供了友好的接口:任务提交、获取运行中的协程数量、动态调整协程池大小
  • 优雅处理panic,防止程序崩溃
  • 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生goroutine并发具有更高的性能

常量

  1. const (
  2. // DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool.
  3. DEFAULT_ANTS_POOL_SIZE = math.MaxInt32
  4. // DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines.
  5. DEFAULT_CLEAN_INTERVAL_TIME = 1
  6. // CLOSED represents that the pool is closed.
  7. CLOSED = 1
  8. )

方法

func Cap() int:返回默认池池的容量
func Free() int:返回默认池可用的goroutine
func Release():关闭默认池
func Running() int:返回当前运行的goroutine的数量
func Submit(task func()) error:向池提交一个任务

type Pool

  1. type Pool struct {
  2. //panic处理器
  3. PanicHandler func(interface{})
  4. }

接受来自客户机的任务,它通过回收goroutine将goroutine的总数限制为给定的数量

func NewPool(size int) (Pool, error):生成一个ants池的实例,池的数量为默认值
func NewPoolPreMalloc(size int) (
Pool, error):生成一个指定量ants池的实例
func (p Pool) Cap() int
func (p
Pool) Free() int
func (p Pool) Release() error
func (p
Pool) Running() int
func (p Pool) Submit(task func()) error
func (p
Pool) Tune(size int):更改池容量

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/panjf2000/ants"
  7. )
  8. func demoFunc() {
  9. time.Sleep(10 * time.Millisecond)
  10. fmt.Println("Hello World!")
  11. }
  12. func main() {
  13. defer ants.Release()
  14. runTimes := 1000
  15. // Use the common pool.
  16. var wg sync.WaitGroup
  17. syncCalculateSum := func() {
  18. demoFunc()
  19. wg.Done()
  20. }
  21. for i := 0; i < runTimes; i++ {
  22. wg.Add(1)
  23. ants.Submit(syncCalculateSum)
  24. }
  25. wg.Wait()
  26. fmt.Printf("running goroutines: %d\n", ants.Running())
  27. fmt.Printf("finish all tasks.\n")
  28. }

type PoolWithFunc

  1. type PoolWithFunc struct {
  2. PanicHandler func(interface{})
  3. }

func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error)

  • 成一个带有特定函数的ants池实例,并指定池容量

func (p PoolWithFunc) Cap() int
func (p
PoolWithFunc) Free() int
func (p PoolWithFunc) Invoke(args interface{}) error:提交一个任务
func (p
PoolWithFunc) Release() error
func (p PoolWithFunc) Running() int
func (p
PoolWithFunc) Tune(size int)

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/panjf2000/ants"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. var sum int32
  9. func myFunc(i interface{}) {
  10. n := i.(int32)
  11. atomic.AddInt32(&sum, n)
  12. fmt.Printf("run with %d\n", n)
  13. }
  14. func main() {
  15. defer ants.Release()
  16. var wg sync.WaitGroup
  17. // Use the pool with a function,
  18. p, _ := ants.NewPoolWithFunc(100, func(i interface{}) {
  19. myFunc(i)
  20. wg.Done()
  21. })
  22. defer p.Release()
  23. // Submit tasks one by one.
  24. for i := 0; i < 1000; i++ {
  25. wg.Add(1)
  26. p.Invoke(int32(i))
  27. }
  28. wg.Wait()
  29. fmt.Printf("running goroutines: %d\n", p.Running())
  30. fmt.Printf("finish all tasks, result is %d\n", sum)
  31. }