main.go
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
// 创建一个容量为20W的worker资源池
p := NewWorkerPool(100 * 100 * 20)
// 为资源池填充worker,并且这些worker都在等待接收任务
p.Invoke()
// 开启协程,模拟100W次请求
go func() {
for i := 1; i <= 100*100*100; i++ {
sc := &Score{Num: i}
p.JobQ <- sc
}
}()
// 输出当前协程数
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}
job.go
package main
// 任务,worker和workerPool只会处理Job接口
type Job interface {
Do()
}
score.go
package main
import (
"fmt"
"time"
)
type Score struct {
Num int
}
func (s *Score) Do() {
fmt.Println("num:", s.Num)
time.Sleep(time.Second)
}
worker.go
package main
// 任务执行者
type Worker struct {
JobQ chan Job // 无缓冲的任务队列,同时只能处理一个任务,其它任务等待
}
// 创建新的任务执行者
func NewWorker() *Worker {
return &Worker{JobQ: make(chan Job)}
}
// 订阅worker资源池
func (w *Worker) Invoke(pool *WorkerPool) {
// 死循环,保证worker执行完任务后,继续等待下一个任务
for {
pool.WorkerQ <- w // 将自身压入资源池的worker队列,因为worker资源池在分配任务时,会将worker从队列中取出
job := <-w.JobQ // 从队列中读取待执行任务
job.Do() // 执行任务
}
}
workerPool.go
package main
// worker资源池
type WorkerPool struct {
cap int // worker资源池容量
JobQ chan Job // 无缓冲的job队列,目的是同一时间只处理一个任务,其它任务等待
WorkerQ chan *Worker // worker队列
}
// 创建worker资源池
func NewWorkerPool(cap int) *WorkerPool {
return &WorkerPool{
cap: cap,
JobQ: make(chan Job),
WorkerQ: make(chan *Worker, cap),
}
}
// 创建worker并等待job
func (wp *WorkerPool) Invoke() {
for i := 0; i < wp.cap; i++ {
worker := NewWorker()
go worker.Invoke(wp) // worker异步订阅资源池信息,等待处理资源池推送的job
}
// 开启协程,处理任务
go func() {
for {
job := <-wp.JobQ // 从队列中读取一个job
worker := <-wp.WorkerQ //从队列中取一个worker来处理job
worker.JobQ <- job
}
}()
}