介绍

Tunny 是生成和管理 goroutine 池的工具,使用同步 API 限制来自任意数量的 goroutine 的工作,它有如下特点:

  • 一个 worker 拥有一个 goroutine。
  • 任意时刻都可以安全地改变 goroutine 池的大小,即使有 goroutine 正在运行。
  • 支持限时处理请求,若在指定时间内未处理完毕则返回错误。
  • 允许通过实现 tunny.Worker 接口来自定义 worker ,精确控制每个 goroutine 的状态。
  • 积压的作业不保证会被按顺序处理,虽然目前的实现确实会让积压的作业按 FIFO 的顺序处理,但是这种行为并不是标准,不应该被依赖。

源码分析

Tunny 的结构

先看看定义的结构体和接口,从宏观上有个基本了解。

Pool

  1. // Pool is a struct that manages a collection of workers, each with their own
  2. // goroutine. The Pool can initialize, expand, compress and close the workers,
  3. // as well as processing jobs with the workers synchronously.
  4. type Pool struct {
  5. queuedJobs int64 // 排队的作业
  6. ctor func() Worker // Worker 的构造函数
  7. workers []*workerWrapper // 一组 workerWrapper
  8. reqChan chan workRequest // 传输请求的管道
  9. workerMut sync.Mutex // 一把互斥锁
  10. }

Pool 结构管理一组 workers,每个 worker 都有一个自己的 goroutine,Pool 支持对 workers 数量进行初始化、扩大、缩小、关闭,并且使用 workers 同步处理作业,Pool 里使用了其他类型 Worker, workerWrapper, workRequest,分别看看它们是什么。

Worker

  1. // Worker is an interface representing a Tunny working agent. It will be used to
  2. // block a calling goroutine until ready to process a job, process that job
  3. // synchronously, interrupt its own process call when jobs are abandoned, and
  4. // clean up its resources when being removed from the pool.
  5. //
  6. // Each of these duties are implemented as a single method and can be averted
  7. // when not needed by simply implementing an empty func.
  8. type Worker interface {
  9. // Process will synchronously perform a job and return the result.
  10. // Process 同步地执行一个作业并返回结果
  11. Process(interface{}) interface{}
  12. // BlockUntilReady is called before each job is processed and must block the
  13. // calling goroutine until the Worker is ready to process the next job.
  14. // BlockUntilReady 会在每个作业被执行之前调用,并且会阻塞执行该作业
  15. BlockUntilReady()
  16. // Interrupt is called when a job is cancelled. The worker is responsible
  17. // for unblocking the Process implementation.
  18. // Interrupt 会在一个作业被取消时调用。Worker 负责解除阻塞的 Process
  19. Interrupt()
  20. // Terminate is called when a Worker is removed from the processing pool
  21. // and is responsible for cleaning up any held resources.
  22. // Terminate 在一个 Worker 被移出协程池时被调用,并且负责回收资源
  23. Terminate()
  24. }

Worker 是接口类型,定义了一组方法,因此某个类型想要实现 Worker 接口,就必须实现它定义的所有方法,若某个类型实现了 Worker 的所有方法,则该类型就拥有了 Worker 类型。
小贴士:约定 Worker 为接口,worker 为 Worker 的一个实例,workers 是一组实例。

workerWrapper

  1. // workerWrapper takes a Worker implementation and wraps it within a goroutine
  2. // and channel arrangement. The workerWrapper is responsible for managing the
  3. // lifetime of both the Worker and the goroutine.
  4. // workerWrapper 封装了一个 Worker 实例,一个 goroutine 和一组管道
  5. // 并且负责管理它们的生命周期
  6. workerWrapper struct {
  7. worker Worker // Worker 实例
  8. interruptChan chan struct{}
  9. // reqChan is NOT owned by this type, it is used to send requests for work.
  10. // reqChan 不属于 workerWrapper(属于 Pool),用来传递请求
  11. reqChan chan<- workRequest
  12. // closeChan can be closed in order to cleanly shutdown this worker.
  13. // closeChan 可以被关闭,目的是关闭这个 worker
  14. closeChan chan struct{}
  15. // closedChan is closed by the run() goroutine when it exits.
  16. // closedChan 会在 run() goroutine 结束时被其关闭
  17. closedChan chan struct{}
  18. }

因为原本的 Worker 只是个接口,没有与 Pool 进行通信的能力,因此 workerWrapper 才会封装一个 worker,一个 goroutine(在 workerWrapper 的方法中)和一组管道(用于与 Pool 通信),然后利用管道接收来自 Pool 的请求,调用 Worker 实例里的处理函数处理请求。

workRequest

  1. // workRequest is a struct containing context representing a workers intention
  2. // to receive a work payload.
  3. // workRequest 是一个包含上下文的结构,上下文表示
  4. type workRequest struct {
  5. // jobChan is used to send the payload to this worker.
  6. // jobChan 用于 Pool 把 payload 发给该 worker(payload 是真正要执行的作业)
  7. jobChan chan<- interface{}
  8. // retChan is used to read the result from this worker.
  9. // retChan 用于把该 worker 的处理结果传给 Pool
  10. retChan <-chan interface{}
  11. // interruptFunc can be called to cancel a running job. When called it is no
  12. // longer necessary to read from retChan.
  13. // interruptFunc 用于取消一个执行中的作业,调用后不再需要从 retChan 中读取处理结果
  14. interruptFunc func()
  15. }

workRequest 是请求的上下文,包含了两个管道,一个用于接收来自 Pool 的 payload(真正要执行的作业),一个用于把处理结果传回给 Pool,Pool 通过请求上下文来传递作业和接收作业处理结果。

作业处理流程

通过以上了解,不难知道 Tunny 的工作原理是由 Pool 管理了一组 Worker,通过 reqChan 向它们传递请求并取得处理结果。
image.png
图1:大致的处理流程

worker 的类型

通过分析 Tunny 的结构得知,真正处理作业的是 Worker 里的 Process 方法,我们可以自定义一个类型去实现 Worker 接口,而 Tunny 已经为我们实现了两种 worker 类型:closureWorker 和 callbackWorker

closureWorker

  1. // closureWorker is a minimal Worker implementation that simply wraps a
  2. // func(interface{}) interface{}
  3. type closureWorker struct {
  4. processor func(interface{}) interface{}
  5. }
  6. func (w *closureWorker) Process(payload interface{}) interface{} {
  7. // payload 就是要处理的作业
  8. // processor 是真正用于处理作业的方法
  9. return w.processor(payload)
  10. }
  11. func (w *closureWorker) BlockUntilReady() {}
  12. func (w *closureWorker) Interrupt() {}
  13. func (w *closureWorker) Terminate() {}

closureWorker 要求真正处理作业的方法 processor 的类型是 func(interface{}) interface{},对作业 Payload 的类型没有限制(payload 是 interface{} 类型,是任何类型的“祖先”)

callbackWorker

  1. // callbackWorker is a minimal Worker implementation that attempts to cast
  2. // each job into func() and either calls it if successful or returns
  3. // ErrJobNotFunc.
  4. type callbackWorker struct{}
  5. func (w *callbackWorker) Process(payload interface{}) interface{} {
  6. f, ok := payload.(func())
  7. if !ok {
  8. return ErrJobNotFunc
  9. }
  10. f() // 执行 payload 代表的函数
  11. return nil
  12. }
  13. func (w *callbackWorker) BlockUntilReady() {}
  14. func (w *callbackWorker) Interrupt() {}
  15. func (w *callbackWorker) Terminate() {}

callbackWorker 要求作业 payload 的类型必须是 func()即没有参数也没有返回值的一个函数。
因此 callbackWorker 的 Process 实际上就是负责执行 payload 代表的函数。

Pool 的构造函数

Pool 有 3 种构造函数,每种对应一种 Worker 类型,对应关系如下:

  • New:自定义 Worker 类型
  • NewFunc:closureWorker
  • NewCallback:callbackWorker

NewFunc 和 NewCallback 最终都是通过调用 New 实现的。

New

  1. // New creates a new Pool of workers that starts with n workers. You must
  2. // provide a constructor function that creates new Worker types and when you
  3. // change the size of the pool the constructor will be called to create each new
  4. // Worker.
  5. func New(n int, ctor func() Worker) *Pool {
  6. p := &Pool{
  7. ctor: ctor,
  8. reqChan: make(chan workRequest),
  9. }
  10. p.SetSize(n) // 生成具有与 Pool 通信能力的 worker
  11. return p
  12. }

New 方法是最通用的 Pool 构造函数,它要求传入两个参数:

  • n 表示要创建 worker 的个数。
  • ctor 表示 worker 的构造函数。

使用 New 需要自己定义一个类型然后实现 Worker 接口,像 closureWorker 和 callbackWorker 那样。
生成具有与 Pool 通信能力的 worker 的方法是 SetSize。

NewFunc

  1. // NewFunc creates a new Pool of workers where each worker will process using
  2. // the provided func.
  3. func NewFunc(n int, f func(interface{}) interface{}) *Pool {
  4. return New(n, func() Worker {
  5. return &closureWorker{
  6. processor: f,
  7. }
  8. })
  9. }

NewFunc 是创建拥有 n 个 closureWorker 的 Pool 构造函数,它要求两个参数:

  • n 表示要创建 closureWorker 的个数。
  • f 表示作业的处理函数。

NewCallback

  1. // NewCallback creates a new Pool of workers where workers cast the job payload
  2. // into a func() and runs it, or returns ErrNotFunc if the cast failed.
  3. func NewCallback(n int) *Pool {
  4. return New(n, func() Worker {
  5. return &callbackWorker{}
  6. })
  7. }

NewCallback 用于创建拥有 n 个 callbackWorker 的 Pool ,它要求一个参数:

  • n 表示要创建 closureWorker 的个数。

小贴士:为什么叫回调呢?用一个生活例子理解回调“我要求你帮我做一件事,你不确定什么时候能做完,但是我有其他事要做,没空等你做完,所以我给你留了电话号码,等你做完了再打电话通知我”。类比到协程池,外部告诉 Pool 这里有个 payload 要做,然后外部就去处理其他事了,留下 payload 代表的函数用于等你做完后告诉外部。

Tunny 提供的其他管理方法

介绍里提到过 Tunny 是一个用于生成和管理 goroutine 池的工具,上面介绍了如何生成 Pool 实例,接下来介绍它提供的其他管理 goroutine 的方法。

SetSize:控制 worker 的数量

  1. // SetSize changes the total number of workers in the Pool. This can be called
  2. // by any goroutine at any time unless the Pool has been stopped, in which case
  3. // a panic will occur.
  4. func (p *Pool) SetSize(n int) {
  5. // 改变 Pool 大小时必须上锁
  6. // 避免多个协程并发改变 Pool 大小的竞争问题
  7. p.workerMut.Lock()
  8. defer p.workerMut.Unlock()
  9. lWorkers := len(p.workers) // 当前已有的 worker 数量
  10. // 相同则不需要改变
  11. if lWorkers == n {
  12. return
  13. }
  14. // 要增加
  15. // Add extra workers if N > len(workers)
  16. for i := lWorkers; i < n; i++ {
  17. p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
  18. }
  19. // 要减少
  20. // Asynchronously stop all workers > N
  21. for i := n; i < lWorkers; i++ {
  22. p.workers[i].stop()
  23. }
  24. // Synchronously wait for all workers > N to stop
  25. for i := n; i < lWorkers; i++ {
  26. p.workers[i].join()
  27. }
  28. // Remove stopped workers from slice
  29. p.workers = p.workers[:n]
  30. }

函数逻辑:

  • 当要设置的 worker 数量恰好等于当前 worker 的数量,则函数结束;
  • 当要设置的 worker 数量大于当前 worker 的数量,则通过第 1 个 for 循环增加 worker 的数量,不会执行下面两个 for 循环;
  • 当要设置的 worker 数量小于当前 worker 的数量,则跳过第 1 个 for 循环,通过第 2, 3 个 for 循环减少worker 的数量。

第 19 行 ctor 方法已经生成了 Worker 实例,但不具备与 Pool 通信能力,深入探索 newWorkerWrapper 如何生成具有与 Pool 通信能力的 Worker 实例。

增加逻辑 newWorkerWrapper

  1. func newWorkerWrapper(
  2. reqChan chan<- workRequest,
  3. worker Worker,
  4. ) *workerWrapper {
  5. w := workerWrapper{
  6. worker: worker,
  7. interruptChan: make(chan struct{}),
  8. reqChan: reqChan,
  9. closeChan: make(chan struct{}),
  10. closedChan: make(chan struct{}),
  11. }
  12. go w.run()
  13. return &w
  14. }

代码分析:

  • Pool 把发送请求的管道 reqChan 和 Worker 实例传递给 newWorkerWrapper,workerWrapper 通过封装 worker 和复用 Pool 的 reqChan 使其具备了和 Pool 的通信能力。
  • 第 13 行是最重要的一行代码,它验证了前面说到的一句话“每个 Worker 都拥有自己的 goroutine”

核心 run

这是处理请求的核心部分,需要结合 Pool 的 Process 方法分析

  1. func (w *workerWrapper) run() {
  2. jobChan, retChan := make(chan interface{}), make(chan interface{})
  3. // 处理完要回收资源
  4. defer func() {
  5. w.worker.Terminate()
  6. close(retChan)
  7. close(w.closedChan)
  8. }()
  9. for {
  10. // NOTE: Blocking here will prevent the worker from closing down.
  11. w.worker.BlockUntilReady()
  12. select {
  13. case w.reqChan <- workRequest{ // 尝试往 reqChan 里传入请求
  14. jobChan: jobChan,
  15. retChan: retChan,
  16. interruptFunc: w.interrupt,
  17. }:
  18. select {
  19. case payload := <-jobChan:
  20. result := w.worker.Process(payload)
  21. select {
  22. case retChan <- result:
  23. case <-w.interruptChan:
  24. w.interruptChan = make(chan struct{})
  25. }
  26. case _, _ = <-w.interruptChan:
  27. w.interruptChan = make(chan struct{})
  28. }
  29. case <-w.closeChan:
  30. return
  31. }
  32. }
  33. }
  1. // Process will use the Pool to process a payload and synchronously return the
  2. // result. Process can be called safely by any goroutines, but will panic if the
  3. // Pool has been stopped.
  4. func (p *Pool) Process(payload interface{}) interface{} {
  5. atomic.AddInt64(&p.queuedJobs, 1)
  6. request, open := <-p.reqChan
  7. if !open {
  8. panic(ErrPoolNotRunning)
  9. }
  10. request.jobChan <- payload
  11. payload, open = <-request.retChan
  12. if !open {
  13. panic(ErrWorkerClosed)
  14. }
  15. atomic.AddInt64(&p.queuedJobs, -1)
  16. return payload
  17. }

代码分析:

  • 前面提到 Pool 与所有的 worker 通过共用 reqChan 来传递请求和处理结果。
  • run 的核心部分是 for 循环及其内部 3 重 select 嵌套。
    • 第一重 select 中所有 worker 尝试往 reqChan 中传入 workRequest,对应 Process 的第 7 行从 reqChan 中接收 workRequest。
    • 第二重 select 中某个 worker(被 Pool 拿走 workRequest 对应的那个 worker,前面提到每个 worker 都有自己的 workRequest)尝试从 jobChan 中接收 payload 并执行,对应 Process 的第 12 行 Pool 尝试往 jobChan 中发送 payload。
    • 第三重 select 中该 worker 尝试往 retChan 发送处理结果,对应 Process 的第 14 行。
  • 注意这里并没有分析 BlockUntilReady 带来的影响,因为 Tunny 实现的 closureWorker 和 callbackWorker 都把它实现为空函数。

通过分析发现当 Pool 有作业要处理时,会随机选中一个 worker,同时处理多个作业并不能保证处理顺序。

GetSize:获取 worker 的数量

  1. // GetSize returns the current size of the pool.
  2. func (p *Pool) GetSize() int {
  3. // 上锁
  4. p.workerMut.Lock()
  5. defer p.workerMut.Unlock()
  6. return len(p.workers)
  7. }

代码分析:在分析 Pool 结构时,Pool 是通过一个切片来管理一组 worker 的 workers []*workerWrapper,因此对 workers 切片求长度即可 len(p.workers)
注意:求切片长度时要上锁,避免别的协程进行 SetSize 操作。

Close:关闭 Pool

  1. // Close will terminate all workers and close the job channel of this Pool.
  2. func (p *Pool) Close() {
  3. p.SetSize(0)
  4. close(p.reqChan)
  5. }

Close 通过 SetSize 来减少 worker 数量,然后关闭请求管道 reqChan。

减少逻辑 stop & join

  1. func (w *workerWrapper) stop() {
  2. close(w.closeChan)
  3. }
  4. func (w *workerWrapper) join() {
  5. <-w.closedChan
  6. }

stop 关闭了 worker 的 closeChan,在 run 中被接收到(第一重 selecet 的第二个 case),通过 return 终止了 worker 开启的 goroutine。
join 操作让每个 worker 从 closedChan 里接收一个数据,由于 closedChan 是同步管道,在源码只有在 run 函数结束时在 defer 里面的有一个 close(w.closedChan) 会触发一次广播式通知(nil + false),因此逻辑上是这样的:stop 会执行 defer 里 close(w.closedChan) 进行广播通知,然后每个 worker 从里面接收到管道已关闭消息。

QueueLength:获取等待作业队列长度

  1. // QueueLength returns the current count of pending queued jobs.
  2. func (p *Pool) QueueLength() int64 {
  3. return atomic.LoadInt64(&p.queuedJobs)
  4. }

Tunny 并没有显示维护等待作业队列,而是通过维护一个 queuedJobs 变量来表示当前积压的作业数。
当调用 Process 处理一个作业时,若 Pool 里已经没有空闲的 worker 了,那么就会阻塞在 Process 的 request, open := <-p.reqChan语句。

Process

  1. // Process will use the Pool to process a payload and synchronously return the
  2. // result. Process can be called safely by any goroutines, but will panic if the
  3. // Pool has been stopped.
  4. func (p *Pool) Process(payload interface{}) interface{} {
  5. atomic.AddInt64(&p.queuedJobs, 1)
  6. request, open := <-p.reqChan
  7. if !open {
  8. panic(ErrPoolNotRunning) // Pool 已关闭
  9. }
  10. request.jobChan <- payload
  11. payload, open = <-request.retChan
  12. if !open {
  13. panic(ErrWorkerClosed) // worker 已关闭
  14. }
  15. atomic.AddInt64(&p.queuedJobs, -1)
  16. return payload
  17. }

Process 在上文已经结合了 run 来解析正常的请求处理流程,这里需要注意的点:

  • 处理前后通过原子加维护 queuedJobs 变量。
  • 两次从管道取数据都有额外的错误判断。

ProcessTimed

  1. // ProcessTimed will use the Pool to process a payload and synchronously return
  2. // the result. If the timeout occurs before the job has finished the worker will
  3. // be interrupted and ErrJobTimedOut will be returned. ProcessTimed can be
  4. // called safely by any goroutines.
  5. func (p *Pool) ProcessTimed(
  6. payload interface{},
  7. timeout time.Duration,
  8. ) (interface{}, error) {
  9. atomic.AddInt64(&p.queuedJobs, 1)
  10. defer atomic.AddInt64(&p.queuedJobs, -1)
  11. tout := time.NewTimer(timeout) // 打开计时器
  12. var request workRequest
  13. var open bool
  14. select {
  15. case request, open = <-p.reqChan:
  16. if !open {
  17. return nil, ErrPoolNotRunning
  18. }
  19. case <-tout.C:
  20. return nil, ErrJobTimedOut
  21. }
  22. select {
  23. case request.jobChan <- payload:
  24. case <-tout.C:
  25. request.interruptFunc()
  26. return nil, ErrJobTimedOut
  27. }
  28. select {
  29. case payload, open = <-request.retChan:
  30. if !open {
  31. return nil, ErrWorkerClosed
  32. }
  33. case <-tout.C:
  34. request.interruptFunc()
  35. return nil, ErrJobTimedOut
  36. }
  37. tout.Stop()
  38. return payload, nil
  39. }

ProcessTimed 在 Process 的基础上增加了计时器,每次都通过 select 判断是否已经超时。
满足特定场景:设置请求处理所需的最大时间,即这么多时间内必须要处理完,否则报错。

ProcessCtx

  1. // ProcessCtx will use the Pool to process a payload and synchronously return
  2. // the result. If the context cancels before the job has finished the worker will
  3. // be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be
  4. // called safely by any goroutines.
  5. func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}, error) {
  6. atomic.AddInt64(&p.queuedJobs, 1)
  7. defer atomic.AddInt64(&p.queuedJobs, -1)
  8. var request workRequest
  9. var open bool
  10. select {
  11. case request, open = <-p.reqChan:
  12. if !open {
  13. return nil, ErrPoolNotRunning
  14. }
  15. case <-ctx.Done():
  16. return nil, ctx.Err()
  17. }
  18. select {
  19. case request.jobChan <- payload:
  20. case <-ctx.Done():
  21. request.interruptFunc()
  22. return nil, ctx.Err()
  23. }
  24. select {
  25. case payload, open = <-request.retChan:
  26. if !open {
  27. return nil, ErrWorkerClosed
  28. }
  29. case <-ctx.Done():
  30. request.interruptFunc()
  31. return nil, ctx.Err()
  32. }
  33. return payload, nil
  34. }

ProcessCtx 在 Process 的基础上增加了 context 是请求的上下文是否被取消的判定。
若请求的上下文被取消了,那么该请求也就不用处理了,通过 request.interruptFunc() 停止。

Tunny 应用实战