package main
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
)
type Task struct {
Handle func(...interface{})
Params []interface{}
wg *sync.WaitGroup
}
type Pool struct {
Capacity uint64
WorksNums uint64
State int32
TaskChan chan *Task
FinishChan chan bool
PanicHandler func(interface{})
}
func NewPool(c uint64)*Pool{
return &Pool{
Capacity: c,
WorksNums: 0,
State: 1,
TaskChan: make(chan *Task),
FinishChan: make(chan bool),
}
}
func(p *Pool)run(){
p.incWorkNum()
go func() {
defer func() {
p.desWorkNum()
if r :=recover();r!=nil{
if p.PanicHandler!=nil{
p.PanicHandler(r)
}else {
log.Printf("work panic%s\n",r)
}
}
}()
for {
select {
case task,ok:=<-p.TaskChan:
if !ok{
return
}
task.Handle(task.Params)
task.wg.Done()
case <-p.FinishChan:
return
}
}
}()
}
func(p *Pool)Add(task *Task)error{
if p.State==0{
return errors.New("pool stack is not start")
}
if p.getWorkNum()<p.getCap(){
p.run()
}
p.TaskChan<-task
return nil
}
func (p *Pool)Close() {
if atomic.LoadInt32(&p.State)==0 {
return
}
atomic.StoreInt32(&p.State, 0)
for len(p.TaskChan) > 0 {
}
p.FinishChan <- true
close(p.TaskChan)
}
func(p *Pool)incWorkNum(){
atomic.AddUint64(&p.WorksNums,1)
}
func (p *Pool)desWorkNum(){
atomic.AddUint64(&p.WorksNums,^uint64(0))
}
func (p *Pool)getWorkNum()uint64{
return atomic.LoadUint64(&p.WorksNums)
}
func (p *Pool)getCap()uint64{
return atomic.LoadUint64(&p.Capacity)
}
func main() {
pool := NewPool(10)
var wg sync.WaitGroup
for i:=0;i<5;i++{
wg.Add(1)
pool.Add(&Task{
Handle: func(i ...interface{}) {
fmt.Println(i[0])
},
wg:&wg,
Params: []interface{}{i},
})
}
wg.Wait()
}
参考
https://segmentfault.com/a/1190000018193161
https://blog.csdn.net/KingEasternSun/article/details/78964267