1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type Task struct {
  10. Handle func(...interface{})
  11. Params []interface{}
  12. wg *sync.WaitGroup
  13. }
  14. type Pool struct {
  15. Capacity uint64
  16. WorksNums uint64
  17. State int32
  18. TaskChan chan *Task
  19. FinishChan chan bool
  20. PanicHandler func(interface{})
  21. }
  22. func NewPool(c uint64)*Pool{
  23. return &Pool{
  24. Capacity: c,
  25. WorksNums: 0,
  26. State: 1,
  27. TaskChan: make(chan *Task),
  28. FinishChan: make(chan bool),
  29. }
  30. }
  31. func(p *Pool)run(){
  32. p.incWorkNum()
  33. go func() {
  34. defer func() {
  35. p.desWorkNum()
  36. if r :=recover();r!=nil{
  37. if p.PanicHandler!=nil{
  38. p.PanicHandler(r)
  39. }else {
  40. log.Printf("work panic%s\n",r)
  41. }
  42. }
  43. }()
  44. for {
  45. select {
  46. case task,ok:=<-p.TaskChan:
  47. if !ok{
  48. return
  49. }
  50. task.Handle(task.Params)
  51. task.wg.Done()
  52. case <-p.FinishChan:
  53. return
  54. }
  55. }
  56. }()
  57. }
  58. func(p *Pool)Add(task *Task)error{
  59. if p.State==0{
  60. return errors.New("pool stack is not start")
  61. }
  62. if p.getWorkNum()<p.getCap(){
  63. p.run()
  64. }
  65. p.TaskChan<-task
  66. return nil
  67. }
  68. func (p *Pool)Close() {
  69. if atomic.LoadInt32(&p.State)==0 {
  70. return
  71. }
  72. atomic.StoreInt32(&p.State, 0)
  73. for len(p.TaskChan) > 0 {
  74. }
  75. p.FinishChan <- true
  76. close(p.TaskChan)
  77. }
  78. func(p *Pool)incWorkNum(){
  79. atomic.AddUint64(&p.WorksNums,1)
  80. }
  81. func (p *Pool)desWorkNum(){
  82. atomic.AddUint64(&p.WorksNums,^uint64(0))
  83. }
  84. func (p *Pool)getWorkNum()uint64{
  85. return atomic.LoadUint64(&p.WorksNums)
  86. }
  87. func (p *Pool)getCap()uint64{
  88. return atomic.LoadUint64(&p.Capacity)
  89. }
  90. func main() {
  91. pool := NewPool(10)
  92. var wg sync.WaitGroup
  93. for i:=0;i<5;i++{
  94. wg.Add(1)
  95. pool.Add(&Task{
  96. Handle: func(i ...interface{}) {
  97. fmt.Println(i[0])
  98. },
  99. wg:&wg,
  100. Params: []interface{}{i},
  101. })
  102. }
  103. wg.Wait()
  104. }

参考

https://segmentfault.com/a/1190000018193161
https://blog.csdn.net/KingEasternSun/article/details/78964267