本文主要是对 Asynq 文档部分内容的翻译,原文档地址:https://github.com/hibiken/asynq/wiki
Asynq 库 Github 地址:https://github.com/hibiken/asynq

注意:该库目前还处于 v0.x.x 版本,还在进行大量开发,API 更改频繁。在 v1.0.0 发布之前,API 可能会在没有重大版本更新的情况下更改。
翻译此文档时,Asynq 版本为 0.23.0

Introduction

Asynq 是一个用 Go 开发的简单、可靠、高效的分布式任务队列。可以用来调度异步任务和定时任务。
其定位类似 Python 中的 Celery。

Asynq 用 redis 作为 broker,暂不支持 RabbitMQ 等消息队列。

三句话描述 Asynq 的工作原理:

  • 客户端将任务放入队列。
  • 服务端从队列中拉出任务并为每个任务启动一个 worker(一个 goroutine) 去执行它。
  • 多个 worker 同时处理任务。

示例用例:
image.png

Asynq 的特性:

Getting Started

欢迎来到 Asynq 之旅!

image.png

在本教程中,我们将编写两个程序,clientworkers

  • client.go将创建和安排由 workers 异步处理的任务。
  • workers.go将启动多个并发 workers 来处理客户端创建的任务。

本指南假定你在**localhost:6379**运行了 Redis。

让我们从创建两个主要文件开始:

  1. $ mkdir quickstart && cd quickstart
  2. $ go mod init asynq-quickstart
  3. $ mkdir client workers
  4. $ touch client/client.go workers/workers.go

安装asynq包:

  1. $ go get -u github.com/hibiken/asynq

核心类型

在我们开始编写代码之前,先介绍一下我们将在两个程序中使用的一些核心类型。

Redis 连接选项

Asynq 使用 Redis 作为消息代理。
两个文件client.goworkers.go都需要连接到 Redis 才能写入和读取任务信息。

我们将用RedisClientOpt指定与本地运行的 Redis 服务器的连接:

  1. redisConnOpt := asynq.RedisClientOpt{
  2. Addr: "localhost:6379",
  3. Password: "mypassword", // 如果不需要密码,省略此选项
  4. DB: 0,
  5. }

任务

asynq中,一个工作单元被封装在一个名为Task的类型中,它在概念上具有两个字段:TypePayload

  1. // Type 是表示任务类型的字符串值
  2. func (t *Task) Type() string
  3. // Payload 是任务执行所需的数据
  4. func (t *Task) Payload() []byte

客户端程序

client.go中,我们将创建一些任务并使用asynq.Client将它们放入任务队列。

可以使用NewTask方法创建任务,参数是 type 和 payload。

使用[Enqueue](https://godoc.org/github.com/hibiken/asynq#Client.Enqueue)方法可以将任务放入队列,其参数是一个任务对象和任意数量的选项参数。
使用[ProcessIn](https://godoc.org/github.com/hibiken/asynq#ProcessIn)[ProcessAt](https://godoc.org/github.com/hibiken/asynq#ProcessAt)选项可以安排什么时候处理该任务。

  1. // Task payload for any email related tasks.
  2. type EmailTaskPayload struct {
  3. // ID for the email recipient.
  4. UserID int
  5. }
  6. // client.go
  7. func main() {
  8. client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
  9. // Create a task with typename and payload.
  10. payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
  11. if err != nil {
  12. log.Fatal(err)
  13. }
  14. t1 := asynq.NewTask("email:welcome", payload)
  15. t2 := asynq.NewTask("email:reminder", payload)
  16. // Process the task immediately.
  17. info, err := client.Enqueue(t1)
  18. if err != nil {
  19. log.Fatal(err)
  20. }
  21. log.Printf(" [*] Successfully enqueued task: %+v", info)
  22. // Process the task 24 hours later.
  23. info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. log.Printf(" [*] Successfully enqueued task: %+v", info)
  28. }

这就是客户端程序所需的全部内容。

服务端程序

workers.go中,我们将创建一个asynq.Server实例来启动 workers。

NewServer方法的参数是RedisConnOptConfig

Config用于设置服务端的任务处理行为。
可以查看文档[Config](https://pkg.go.dev/github.com/hibiken/asynq#Config)以查看所有可用的配置选项。

为简单起见,我们将仅在此示例中指定并发性。

  1. // workers.go
  2. func main() {
  3. srv := asynq.NewServer(
  4. asynq.RedisClientOpt{Addr: "localhost:6379"},
  5. asynq.Config{Concurrency: 10},
  6. )
  7. // 注意: 这里的 `handler` 是什么下文中会介绍到
  8. if err := srv.Run(handler); err != nil {
  9. log.Fatal(err)
  10. }
  11. }

(*Server).Run的参数是一个接口asynq.Handler,该接口有一个方法ProcessTask

  1. type Handler interface {
  2. // 如果任务执行成功,ProcessTask 应该返回 nil
  3. // 如果 ProcessTask 返回了一个非 nil 的 error,或抛异常,代表任务执行失败了,任务会稍后重新执行
  4. ProcessTask(context.Context, *Task) error
  5. }

实现 handler 的最简单方法是定义一个具有相同签名的函数,并用asynq.HandlerFunc包装后传递给Run

  1. func handler(ctx context.Context, t *asynq.Task) error {
  2. switch t.Type() {
  3. case "email:welcome":
  4. var p EmailTaskPayload
  5. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  6. return err
  7. }
  8. log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
  9. case "email:reminder":
  10. var p EmailTaskPayload
  11. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  12. return err
  13. }
  14. log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
  15. }
  16. return fmt.Errorf("unexpected task type: %s", t.Type())
  17. }
  18. func main() {
  19. srv := asynq.NewServer(
  20. asynq.RedisClientOpt{Addr: "localhost:6379"},
  21. asynq.Config{Concurrency: 10},
  22. )
  23. // Use asynq.HandlerFunc adapter for a handler function
  24. if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
  25. log.Fatal(err)
  26. }
  27. }

我们可以继续向这个处理函数添加 switch case,但在实际应用中,在单独的函数中定义每个 case 的逻辑是很方便的。

为了重构我们的代码,让我们用ServeMux来创建我们的处理程序。
ServeMux的使用就像"net/http"包里的一样,可以通过调用Handle或者HandleFunc来注册一个 handler。
ServeMux实现了Handler接口,以便可以将其传递给(*Server).Run

  1. // workers.go
  2. func main() {
  3. srv := asynq.NewServer(
  4. asynq.RedisClientOpt{Addr: "localhost:6379"},
  5. asynq.Config{Concurrency: 10},
  6. )
  7. mux := asynq.NewServeMux()
  8. mux.HandleFunc("email:welcome", sendWelcomeEmail)
  9. mux.HandleFunc("email:reminder", sendReminderEmail)
  10. if err := srv.Run(mux); err != nil {
  11. log.Fatal(err)
  12. }
  13. }
  14. func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
  15. var p EmailTaskPayload
  16. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  17. return err
  18. }
  19. log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
  20. return nil
  21. }
  22. func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
  23. var p EmailTaskPayload
  24. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  25. return err
  26. }
  27. log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
  28. return nil
  29. }

现在我们已经提取了处理每种任务类型的函数,代码看起来更有条理。

但是,代码有点太隐晦了,我们有任务类型和 payload 这些字符串值,它们应该被封装在一个内聚的包中。
让我们通过编写一个封装任务创建和处理的包来重构我们的代码。

下面我们创建一个名为task的包:

  1. $ mkdir task && touch task/task.go
  1. package task
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/hibiken/asynq"
  6. )
  7. // A list of task types.
  8. const (
  9. TypeWelcomeEmail = "email:welcome"
  10. TypeReminderEmail = "email:reminder"
  11. )
  12. // Task payload for any email related tasks.
  13. type emailTaskPayload struct {
  14. // ID for the email recipient.
  15. UserID int
  16. }
  17. func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
  18. payload, err := json.Marshal(emailTaskPayload{UserID: id})
  19. if err != nil {
  20. return nil, err
  21. }
  22. return asynq.NewTask(TypeWelcomeEmail, payload), nil
  23. }
  24. func NewReminderEmailTask(id int) (*asynq.Task, error) {
  25. payload, err := json.Marshal(emailTaskPayload{UserID: id})
  26. if err != nil {
  27. return nil, err
  28. }
  29. return asynq.NewTask(TypeReminderEmail, payload), nil
  30. }
  31. func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
  32. var p emailTaskPayload
  33. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  34. return err
  35. }
  36. log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
  37. return nil
  38. }
  39. func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
  40. var p emailTaskPayload
  41. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  42. return err
  43. }
  44. log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
  45. return nil
  46. }

现在我们在client.goworkers.go中都引入这个包。

  1. // client.go
  2. func main() {
  3. client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
  4. t1, err := task.NewWelcomeEmailTask(42)
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. t2, err := task.NewReminderEmailTask(42)
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. // Process the task immediately.
  13. info, err := client.Enqueue(t1)
  14. if err != nil {
  15. log.Fatal(err)
  16. }
  17. log.Printf(" [*] Successfully enqueued task: %+v", info)
  18. // Process the task 24 hours later.
  19. info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. log.Printf(" [*] Successfully enqueued task: %+v", info)
  24. }
  1. // workers.go
  2. func main() {
  3. srv := asynq.NewServer(
  4. asynq.RedisClientOpt{Addr: "localhost:6379"},
  5. asynq.Config{Concurrency: 10},
  6. )
  7. mux := asynq.NewServeMux()
  8. mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
  9. mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
  10. if err := srv.Run(mux); err != nil {
  11. log.Fatal(err)
  12. }
  13. }

现在代码看起来好多了!

运行程序

现在我们有了clientworkers,我们可以运行这两个程序。

让我们运行client程序来创建和安排任务:

  1. $ go run client/client.go

这将创建两项任务,一项立即处理,另一项在 24 小时后处理。

让我们使用asynqCLI 检查任务:

  1. $ asynq dash

应该能够看到有一个任务处于 Enqueued 状态,另一个处于 Scheduled 状态。

注意:要详细了解每个状态的含义,可以查看任务生命周期

最后,让我们启动workers程序来处理任务:

  1. $ go run workers/workers.go

注意workers是个守护程序,在发送终止程序的信号之前它不会退出。有关如何安全终止 workers 的最佳实践,请参阅 Signal Wiki 页面。

您应该能够在终端中看到一些文本,表明任务已成功处理。
您可以再次运行client程序,以查看 workers 如何接手并处理它们。

任务重试

默认情况下,失败的任务将使用指数退避算法重试多达 25 次。

让我们更新我们的处理程序,让它返回错误以模拟不成功的场景:

  1. // tasks.go
  2. func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
  3. var p EmailTaskPayload
  4. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  5. return err
  6. }
  7. log.Printf(" [*] Attempting to Send Welcome Email to User %d...", p.UserID)
  8. return fmt.Errorf("could not send email to the user") // <-- Return error
  9. }

让我们重新启动 workers 并将任务排入队列:

  1. $ go run workers/workers.go
  2. $ go run client/client.go

如果您正在运行asynq dash,您应该能够看到有一个任务处于重试状态(通过导航到队列详细信息视图并突出显示“重试”选项卡)。

要检查哪些任务处于重试状态,您还可以运行:

  1. $ asynq task ls --queue=default --state=retry

这将列出将来要重试的所有任务。输出内容包含任务下一次执行的 ETA。

一旦任务用完其重试计数,该任务将转换为归档状态并且不会再被重试(您仍然可以使用 CLI 或 WebUI 工具手动运行归档任务)。

在结束本教程之前,让我们修复我们的处理程序:

  1. func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
  2. var p EmailTaskPayload
  3. if err := json.Unmarshal(t.Payload(), &p); err != nil {
  4. return err
  5. }
  6. log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
  7. return nil
  8. }

现在我们修复了处理程序,任务将在下一次尝试中成功处理 :)

周期任务

基本原理

Asynq 运行周期任务的方式和 Celery 相似。

即启动一个调度程序 scheduler(Celery 中这个调度程序叫做 beat),它是一个后台服务,负责定期将任务放入队列,然后就什么也不管了,workers 去执行任务,这样就实现了周期任务。

所以这个 scheduler 主要就是包含定时的逻辑。事实上 asynq 的 scheduler 是依赖于 cron 实现的。

示例

  1. scheduler := asynq.NewScheduler(redisConnOpt, nil) // 创建 scheduler
  2. task := asynq.NewTask("example_task", nil) // 创建任务
  3. // You can use cron spec string to specify the schedule.
  4. entryID, err := scheduler.Register("* * * * *", task) // 注册任务到 scheduler 中
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. log.Printf("registered an entry: %q\n", entryID)
  9. // You can use "@every <duration>" to specify the interval.
  10. entryID, err = scheduler.Register("@every 30s", task)
  11. if err != nil {
  12. log.Fatal(err)
  13. }
  14. log.Printf("registered an entry: %q\n", entryID)
  15. // You can also pass options.
  16. entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. log.Printf("registered an entry: %q\n", entryID)
  21. if err := scheduler.Run(); err != nil { // 运行 scheduler
  22. log.Fatal(err)
  23. }

调用scheduler.Run()后该程序会夯在这里,直到接收到 TERM 或 INT 信号。

时区

周期任务默认使用 UTC 时间,可以使用SchedulerOpts设置时区:

  1. // Example of using America/Los_Angeles timezone instead of the default UTC timezone.
  2. loc, err := time.LoadLocation("America/Los_Angeles")
  3. if err != nil {
  4. panic(err)
  5. }
  6. scheduler := asynq.NewScheduler(
  7. redisConnOpt,
  8. &asynq.SchedulerOpts{
  9. Location: loc,
  10. },
  11. )

错误处理

如果调度程序无法将任务排入队列,您可以提供一个函数来处理错误:

  1. function handleEnqueueError(task *asynq.Task, opts []Option, err error) {
  2. // your error handling logic
  3. }
  4. scheduler := asynq.NewScheduler(
  5. redisConnOpt,
  6. &asynq.SchedulerOpts{
  7. EnqueueErrorHandler: handleEnqueueError,
  8. },
  9. )

通过 CLI 检查

CLI 有一个子命令cron来检查调度程序条目。

要查看当前正在运行的调度程序中的所有条目,您可以运行:

  1. $ asynq cron ls

此命令将输出条目列表,每个条目及其 ID、调度规范、下一个入队时间、上一个入队时间。

您还可以通过运行以下命令查看每个条目的历史记录:

  1. $ asynq cron history <entryID>

动态添加、删除任务

如果想动态添加和删除周期任务(即不重新启动 scheduler),需要使用PeriodicTaskManager
PeriodicTaskManager 使用PeriodicTaskConfigProvider定期获取周期任务配置,并将调度程序的条目与之同步。

例如,可以将周期任务的配置存储在数据库或本地文件中,要动态添加或删除任务时就更新此配置源。

下面的示例展示了如何使用本地文件来实现这一点,当然也可以很容易地修改示例以使用数据库或其他配置源。

在此示例中,我们将周期任务的配置存储在 YAML 文件中:

  1. configs:
  2. - cronspec: "* * * * *"
  3. task_type: foo
  4. - cronspec: "* * * * *"
  5. task_type: bar

然后实现我们的PeriodicTaskConfigProvider,它用来读取 yaml 文件并返回一个PeriodicTaskConfig列表:

  1. func main() {
  2. provider := &FileBasedConfigProvider{filename: "./periodic_task_config.yml"}
  3. mgr, err := asynq.NewPeriodicTaskManager(
  4. asynq.PeriodicTaskManagerOpts{
  5. RedisConnOpt: asynq.RedisClientOpt{Addr: "localhost:6379"},
  6. PeriodicTaskConfigProvider: provider, // this provider object is the interface to your config source
  7. SyncInterval: 10 * time.Second, // this field specifies how often sync should happen
  8. })
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. if err := mgr.Run(); err != nil {
  13. log.Fatal(err)
  14. }
  15. }
  16. // FileBasedConfigProvider implements asynq.PeriodicTaskConfigProvider interface.
  17. type FileBasedConfigProvider struct {
  18. filename string
  19. }
  20. // Parses the yaml file and return a list of PeriodicTaskConfigs.
  21. func (p *FileBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
  22. data, err := os.ReadFile(p.filename)
  23. if err != nil {
  24. return nil, err
  25. }
  26. var c PeriodicTaskConfigContainer
  27. if err := yaml.Unmarshal(data, &c); err != nil {
  28. return nil, err
  29. }
  30. var configs []*asynq.PeriodicTaskConfig
  31. for _, cfg := range c.Configs {
  32. configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.Cronspec, Task: asynq.NewTask(cfg.TaskType, nil)})
  33. }
  34. return configs, nil
  35. }
  36. type PeriodicTaskConfigContainer struct {
  37. Configs []*Config `yaml:"configs"`
  38. }
  39. type Config struct {
  40. Cronspec string `yaml:"cronspec"`
  41. TaskType string `yaml:"task_type"`
  42. }

Web UI

Asynqmon 是一个基于 Web 的工具,用来监控和管理 Asynq 队列和任务。

下面是几张截图。

Queues View:
114697016-07327f00-9d26-11eb-808c-0ac841dc888e.png

Tasks View:
114697070-1f0a0300-9d26-11eb-855c-d3ec263865b7.png

Metrics View:
146777420-cae6c476-bac6-469c-acce-b2f6584e8707.png

关于该工具的详细使用方法,参见该工具的 README

命令行工具

Asynq 附带了一个命令行工具来检查队列和任务的状态。

要安装该命令行工具,需运行:

  1. $ go install github.com/hibiken/asynq/tools/asynq

下面是运行asynq dash命令的一个示例:
dash.gif

关于该工具的详细使用方法,参见该工具的 README