main.go

  1. package main
  2. import (
  3. "fmt"
  4. "runtime"
  5. "time"
  6. )
  7. func main() {
  8. // 创建一个容量为20W的worker资源池
  9. p := NewWorkerPool(100 * 100 * 20)
  10. // 为资源池填充worker,并且这些worker都在等待接收任务
  11. p.Invoke()
  12. // 开启协程,模拟100W次请求
  13. go func() {
  14. for i := 1; i <= 100*100*100; i++ {
  15. sc := &Score{Num: i}
  16. p.JobQ <- sc
  17. }
  18. }()
  19. // 输出当前协程数
  20. for {
  21. fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
  22. time.Sleep(2 * time.Second)
  23. }
  24. }

job.go

  1. package main
  2. // 任务,worker和workerPool只会处理Job接口
  3. type Job interface {
  4. Do()
  5. }

score.go

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. type Score struct {
  7. Num int
  8. }
  9. func (s *Score) Do() {
  10. fmt.Println("num:", s.Num)
  11. time.Sleep(time.Second)
  12. }

worker.go

  1. package main
  2. // 任务执行者
  3. type Worker struct {
  4. JobQ chan Job // 无缓冲的任务队列,同时只能处理一个任务,其它任务等待
  5. }
  6. // 创建新的任务执行者
  7. func NewWorker() *Worker {
  8. return &Worker{JobQ: make(chan Job)}
  9. }
  10. // 订阅worker资源池
  11. func (w *Worker) Invoke(pool *WorkerPool) {
  12. // 死循环,保证worker执行完任务后,继续等待下一个任务
  13. for {
  14. pool.WorkerQ <- w // 将自身压入资源池的worker队列,因为worker资源池在分配任务时,会将worker从队列中取出
  15. job := <-w.JobQ // 从队列中读取待执行任务
  16. job.Do() // 执行任务
  17. }
  18. }

workerPool.go

  1. package main
  2. // worker资源池
  3. type WorkerPool struct {
  4. cap int // worker资源池容量
  5. JobQ chan Job // 无缓冲的job队列,目的是同一时间只处理一个任务,其它任务等待
  6. WorkerQ chan *Worker // worker队列
  7. }
  8. // 创建worker资源池
  9. func NewWorkerPool(cap int) *WorkerPool {
  10. return &WorkerPool{
  11. cap: cap,
  12. JobQ: make(chan Job),
  13. WorkerQ: make(chan *Worker, cap),
  14. }
  15. }
  16. // 创建worker并等待job
  17. func (wp *WorkerPool) Invoke() {
  18. for i := 0; i < wp.cap; i++ {
  19. worker := NewWorker()
  20. go worker.Invoke(wp) // worker异步订阅资源池信息,等待处理资源池推送的job
  21. }
  22. // 开启协程,处理任务
  23. go func() {
  24. for {
  25. job := <-wp.JobQ // 从队列中读取一个job
  26. worker := <-wp.WorkerQ //从队列中取一个worker来处理job
  27. worker.JobQ <- job
  28. }
  29. }()
  30. }