package mainimport ("errors""fmt""log""sync""sync/atomic")type Task struct {Handle func(...interface{})Params []interface{}wg *sync.WaitGroup}type Pool struct {Capacity uint64WorksNums uint64State int32TaskChan chan *TaskFinishChan chan boolPanicHandler func(interface{})}func NewPool(c uint64)*Pool{return &Pool{Capacity: c,WorksNums: 0,State: 1,TaskChan: make(chan *Task),FinishChan: make(chan bool),}}func(p *Pool)run(){p.incWorkNum()go func() {defer func() {p.desWorkNum()if r :=recover();r!=nil{if p.PanicHandler!=nil{p.PanicHandler(r)}else {log.Printf("work panic%s\n",r)}}}()for {select {case task,ok:=<-p.TaskChan:if !ok{return}task.Handle(task.Params)task.wg.Done()case <-p.FinishChan:return}}}()}func(p *Pool)Add(task *Task)error{if p.State==0{return errors.New("pool stack is not start")}if p.getWorkNum()<p.getCap(){p.run()}p.TaskChan<-taskreturn nil}func (p *Pool)Close() {if atomic.LoadInt32(&p.State)==0 {return}atomic.StoreInt32(&p.State, 0)for len(p.TaskChan) > 0 {}p.FinishChan <- trueclose(p.TaskChan)}func(p *Pool)incWorkNum(){atomic.AddUint64(&p.WorksNums,1)}func (p *Pool)desWorkNum(){atomic.AddUint64(&p.WorksNums,^uint64(0))}func (p *Pool)getWorkNum()uint64{return atomic.LoadUint64(&p.WorksNums)}func (p *Pool)getCap()uint64{return atomic.LoadUint64(&p.Capacity)}func main() {pool := NewPool(10)var wg sync.WaitGroupfor i:=0;i<5;i++{wg.Add(1)pool.Add(&Task{Handle: func(i ...interface{}) {fmt.Println(i[0])},wg:&wg,Params: []interface{}{i},})}wg.Wait()}
参考
https://segmentfault.com/a/1190000018193161
https://blog.csdn.net/KingEasternSun/article/details/78964267
