main.go
package mainimport ( "fmt" "runtime" "time")func main() { // 创建一个容量为20W的worker资源池 p := NewWorkerPool(100 * 100 * 20) // 为资源池填充worker,并且这些worker都在等待接收任务 p.Invoke() // 开启协程,模拟100W次请求 go func() { for i := 1; i <= 100*100*100; i++ { sc := &Score{Num: i} p.JobQ <- sc } }() // 输出当前协程数 for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) }}
job.go
package main// 任务,worker和workerPool只会处理Job接口type Job interface { Do()}
score.go
package mainimport ( "fmt" "time")type Score struct { Num int}func (s *Score) Do() { fmt.Println("num:", s.Num) time.Sleep(time.Second)}
worker.go
package main// 任务执行者type Worker struct { JobQ chan Job // 无缓冲的任务队列,同时只能处理一个任务,其它任务等待}// 创建新的任务执行者func NewWorker() *Worker { return &Worker{JobQ: make(chan Job)}}// 订阅worker资源池func (w *Worker) Invoke(pool *WorkerPool) { // 死循环,保证worker执行完任务后,继续等待下一个任务 for { pool.WorkerQ <- w // 将自身压入资源池的worker队列,因为worker资源池在分配任务时,会将worker从队列中取出 job := <-w.JobQ // 从队列中读取待执行任务 job.Do() // 执行任务 }}
workerPool.go
package main// worker资源池type WorkerPool struct { cap int // worker资源池容量 JobQ chan Job // 无缓冲的job队列,目的是同一时间只处理一个任务,其它任务等待 WorkerQ chan *Worker // worker队列}// 创建worker资源池func NewWorkerPool(cap int) *WorkerPool { return &WorkerPool{ cap: cap, JobQ: make(chan Job), WorkerQ: make(chan *Worker, cap), }}// 创建worker并等待jobfunc (wp *WorkerPool) Invoke() { for i := 0; i < wp.cap; i++ { worker := NewWorker() go worker.Invoke(wp) // worker异步订阅资源池信息,等待处理资源池推送的job } // 开启协程,处理任务 go func() { for { job := <-wp.JobQ // 从队列中读取一个job worker := <-wp.WorkerQ //从队列中取一个worker来处理job worker.JobQ <- job } }()}