简介
之前写过一篇文章介绍了ants这个 goroutine 池实现。当时在网上查看相关资料的时候,发现了另外一个实现tunny。趁着时间相近,正好研究一番。也好比较一下这两个库。那就让我们开始吧。
快速开始
本文代码使用 Go Modules。
创建目录并初始化:
$ mkdir tunny && cd tunny$ go mod init github.com/go-quiz/go-daily-lib/tunny
使用go get从 GitHub 获取tunny库:
$ go get -u github.com/Jeffail/tunny
为了方便地和ants做一个对比,我们将ants中的示例重新用tunny实现一遍:还是那个分段求和的例子:
const (DataSize = 10000DataPerTask = 100)func main() {numCPUs := runtime.NumCPU()p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {var sum intfor _, n := range payload.([]int) {sum += n}return sum})defer p.Close()// ...}
使用也非常简单,首先创建一个Pool,这里使用tunny.NewFunc()。
第一个参数为池子大小,即同时有多少个 worker (也即 goroutine)在工作,这里设置成逻辑 CPU 个数,对于 CPU 密集型任务,这个值设置太大无意义,反而有可能导致 goroutine 切换频繁而降低性能。
第二个参数传入一个func(interface{})interface{}的参数作为任务处理函数。后续传入数据就会调用这个函数处理。
池子使用完需要关闭,这里使用defer p.Close()在程序退出前关闭。
然后,生成测试数据,还是 10000 个随机数,分成 100 组:
nums := make([]int, DataSize)for i := range nums {nums[i] = rand.Intn(1000)}
处理每组数据:
var wg sync.WaitGroupwg.Add(DataSize / DataPerTask)partialSums := make([]int, DataSize/DataPerTask)for i := 0; i < DataSize/DataPerTask; i++ {go func(i int) {partialSums[i] = p.Process(nums[i*DataPerTask : (i+1)*DataPerTask]).(int)wg.Done()}(i)}wg.Wait()
调用p.Process()方法,传入任务数据,池子中会选择空闲的 goroutine 来处理这个数据。由于我们上面设置了处理函数,goroutine 会直接调用该函数,将这个切片作为参数传入。
tunny与ants不同的是,tunny的任务处理是同步的,即调用p.Process()方法之后,当前 goroutine 会挂起,直到任务处理完成之后才会被唤醒。由于是同步的,所以p.Process()方法可以直接返回处理结果。这也是上面程序在分发任务的时候,启动多个 goroutine 的原因。如果不是每个任务都启动一个 goroutine,p.Process()方法会一直等待任务完成,那么后面的任务要等到前面的任务全部执行完之后才能执行。这样就发挥不了并发的优势了。
这里注意一个小细节,我将for循环变量作为参数传给 goroutine 函数了。如果不这样做,所有 goroutine 都共用外层的i,而且 goroutine 开始运行时,for循环大概率已经结束了,这时i = DataSize/DataPerTask,索引nums[i*DataPerTask : (i+1)*DataPerTask]会越界触发 panic。
最后统计数据,验证结果:
var sum intfor _, s := range partialSums {sum += s}var expect intfor _, num := range nums {expect += num}fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
运行:
$ go run main.gofinish all tasks, result is 5010172 expect:5010172
超时
默认情况下,p.Process()会一直阻塞直到任务完成,即使当前没有空闲 worker 也会阻塞。我们也可以使用带超时的Process()方法:ProcessTimed()。传入一个超时时间间隔,如果超过这个时间还没有空闲 worker,或者任务还没有处理完成,就会终止,并返回一个错误。
超时有 2 种情况:
- 等不到空闲的 worker:所有 worker 一直处理繁忙状态,正在处理的任务比较耗时,无法短时间内完成;
- 任务本身比较耗时。
下面我们编写一个计算斐波那契的函数,使用递归这种低效的实现方法:
func fib(n int) int {if n <= 1 {return 1}return fib(n-1) + fib(n-2)}
我们先看任务比较耗时的情况,创建Pool对象。为了观察更明显,在处理函数中添加了time.Sleep()语句:
p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {n := payload.(int)result := fib(n)time.Sleep(5 * time.Second)return result})defer p.Close()
生成与池容量相等的任务数,调用p.ProcessTimed()方法,设置超时为 1s:
var wg sync.WaitGroupwg.Add(numCPUs)for i := 0; i < numCPUs; i++ {go func(i int) {n := rand.Intn(30)result, err := p.ProcessTimed(n, time.Second)nowStr := time.Now().Format("2006-01-02 15:04:05")if err != nil {fmt.Printf("[%s]task(%d) failed:%v\n", nowStr, i, err)} else {fmt.Printf("[%s]fib(%d) = %d\n", nowStr, n, result)}wg.Done()}(i)}wg.Wait()
因为处理函数中 sleep 5s,所以任务在执行过程中就超时了。运行:
$ go run main.go[2021-06-10 16:36:26]task(7) failed:job request timed out[2021-06-10 16:36:26]task(4) failed:job request timed out[2021-06-10 16:36:26]task(1) failed:job request timed out[2021-06-10 16:36:26]task(6) failed:job request timed out[2021-06-10 16:36:26]task(5) failed:job request timed out[2021-06-10 16:36:26]task(0) failed:job request timed out[2021-06-10 16:36:26]task(3) failed:job request timed out[2021-06-10 16:36:26]task(2) failed:job request timed out
都在同一秒中超时。
我们将任务数量翻倍,再将处理函数中的 sleep 改为 990ms,保证前一批任务能顺利完成,后续任务或者由于等不到空闲 worker,或者由于执行时间过长而超时返回。运行:
$ go run main.go[2021-06-10 16:42:46]fib(11) = 144[2021-06-10 16:42:46]fib(25) = 121393[2021-06-10 16:42:46]fib(27) = 317811[2021-06-10 16:42:46]fib(1) = 1[2021-06-10 16:42:46]fib(18) = 4181[2021-06-10 16:42:46]fib(29) = 832040[2021-06-10 16:42:46]fib(17) = 2584[2021-06-10 16:42:46]fib(20) = 10946[2021-06-10 16:42:46]task(5) failed:job request timed out[2021-06-10 16:42:46]task(14) failed:job request timed out[2021-06-10 16:42:46]task(8) failed:job request timed out[2021-06-10 16:42:46]task(7) failed:job request timed out[2021-06-10 16:42:46]task(13) failed:job request timed out[2021-06-10 16:42:46]task(12) failed:job request timed out[2021-06-10 16:42:46]task(11) failed:job request timed out[2021-06-10 16:42:46]task(6) failed:job request timed out
context
context 是协调 goroutine 的工具。tunny支持带context.Context参数的方法:ProcessCtx()。当前 context 状态变为Done之后,任务也会停止执行。context 会由于超时、取消等原因切换为Done状态。还是拿上面的例子:
go func(i int) {n := rand.Intn(30)ctx, cancel := context.WithCancel(context.Background())if i%2 == 0 {go func() {time.Sleep(500 * time.Millisecond)cancel()}()}result, err := p.ProcessCtx(ctx, n)if err != nil {fmt.Printf("task(%d) failed:%v\n", i, err)} else {fmt.Printf("fib(%d) = %d\n", n, result)}wg.Done()}(i)
其他代码都一样,我们调用p.ProcessCtx()方法来执行任务。参数是一个可取消的Context。对于序号为偶数的任务,我们启动一个 goroutine 在 500ms 之后cancel()掉这个Context。代码运行结果如下:
$ go run main.gotask(4) failed:context canceledtask(6) failed:context canceledtask(0) failed:context canceledtask(2) failed:context canceledfib(27) = 317811fib(25) = 121393fib(1) = 1fib(18) = 4181
我们看到偶数序号的任务都被取消了。
源码
tunny的源码更少,除去测试代码和注释,连 500 行都不到。那么就一起来看一下吧。Pool结构如下:
// src/github.com/Jeffail/tunny.gotype Pool struct {queuedJobs int64ctor func() Workerworkers []*workerWrapperreqChan chan workRequestworkerMut sync.Mutex}
Pool结构中有一个ctor字段,这是一个函数对象,用于返回一个实现Worker接口的值:
type Worker interface {Process(interface{}) interface{}BlockUntilReady()Interrupt()Terminate()}
这个接口不同的方法在任务执行的不同阶段调用。最重要的当属Process(interface{}) interface{}方法了。这个就是执行任务的函数。tunny提供New()方法创建Pool对象,这个方法需要我们自己构造ctor函数对象,使用多有不便。tunny提供了另外两个默认实现closureWorker和callbackWorker:
type closureWorker struct {processor func(interface{}) interface{}}func (w *closureWorker) Process(payload interface{}) interface{} {return w.processor(payload)}func (w *closureWorker) BlockUntilReady() {}func (w *closureWorker) Interrupt() {}func (w *closureWorker) Terminate() {}type callbackWorker struct{}func (w *callbackWorker) Process(payload interface{}) interface{} {f, ok := payload.(func())if !ok {return ErrJobNotFunc}f()return nil}func (w *callbackWorker) BlockUntilReady() {}func (w *callbackWorker) Interrupt() {}func (w *callbackWorker) Terminate() {}
tunny.NewFunc()方法使用的就是closureWorker:
func NewFunc(n int, f func(interface{}) interface{}) *Pool {return New(n, func() Worker {return &closureWorker{processor: f,}})}
创建的closureWorker直接将参数f作为任务处理函数。
tunny.NewCallback()方法使用callbackWorker:
func NewCallback(n int) *Pool {return New(n, func() Worker {return &callbackWorker{}})}
callbackWorker结构中没有处理函数,只能给它发送无参无返回值的函数对象作为任务,它的Process()方法就是执行这个函数。
创建Pool对象后,都是调用它的SetSize()方法,设置 worker 数量。在这个方法中会启动相应数量的 goroutine:
func (p *Pool) SetSize(n int) {p.workerMut.Lock()defer p.workerMut.Unlock()lWorkers := len(p.workers)if lWorkers == n {return}for i := lWorkers; i < n; i++ {p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))}// 停止过多的 workerfor i := n; i < lWorkers; i++ {p.workers[i].stop()}// 等待 worker 停止for i := n; i < lWorkers; i++ {p.workers[i].join()// --------------
