本文主要是对 Asynq 文档部分内容的翻译,原文档地址:https://github.com/hibiken/asynq/wiki
Asynq 库 Github 地址:https://github.com/hibiken/asynq
注意:该库目前还处于 v0.x.x 版本,还在进行大量开发,API 更改频繁。在 v1.0.0 发布之前,API 可能会在没有重大版本更新的情况下更改。
翻译此文档时,Asynq 版本为 0.23.0
Introduction
Asynq 是一个用 Go 开发的简单、可靠、高效的分布式任务队列。可以用来调度异步任务和定时任务。
其定位类似 Python 中的 Celery。
Asynq 用 redis 作为 broker,暂不支持 RabbitMQ 等消息队列。
三句话描述 Asynq 的工作原理:
- 客户端将任务放入队列。
 - 服务端从队列中拉出任务并为每个任务启动一个 worker(一个 goroutine) 去执行它。
 - 多个 worker 同时处理任务。
 
示例用例:
Asynq 的特性:
- 保证任务至少执行一次
 - 任务调度
 - 重试失败的任务
 - 在 worker 崩溃的情况下自动恢复任务
 - 加权优先级队列
 - 严格的优先级队列
 - 由于 Redis 中的写入速度很快,因此添加任务的延迟很低
 - 使用唯一选项对任务进行重复数据删除
 - 允许每个任务设置超时时间或截止时间
 - 允许组合一组任务以批处理多个连续操作
 - 灵活的 handler 接口(支持中间件)
 - 能够暂停队列以停止处理队列中的任务
 - 周期任务
 - 支持 Redis 集群实现自动分片和高可用性
 - 支持 Redis 哨兵以实现高可用性
 - 与 Prometheus 集成以收集和可视化队列指标
 - 用于审计和远程控制队列和任务的 Web UI
 - 用于审计和远程控制队列和任务的 CLI
 
Getting Started
欢迎来到 Asynq 之旅!

在本教程中,我们将编写两个程序,client和workers。
client.go将创建和安排由 workers 异步处理的任务。workers.go将启动多个并发 workers 来处理客户端创建的任务。
本指南假定你在**localhost:6379**运行了 Redis。
让我们从创建两个主要文件开始:
$ mkdir quickstart && cd quickstart$ go mod init asynq-quickstart$ mkdir client workers$ touch client/client.go workers/workers.go
安装asynq包:
$ go get -u github.com/hibiken/asynq
核心类型
在我们开始编写代码之前,先介绍一下我们将在两个程序中使用的一些核心类型。
Redis 连接选项
Asynq 使用 Redis 作为消息代理。
两个文件client.go和workers.go都需要连接到 Redis 才能写入和读取任务信息。
我们将用RedisClientOpt指定与本地运行的 Redis 服务器的连接:
redisConnOpt := asynq.RedisClientOpt{Addr: "localhost:6379",Password: "mypassword", // 如果不需要密码,省略此选项DB: 0,}
任务
在asynq中,一个工作单元被封装在一个名为Task的类型中,它在概念上具有两个字段:Type和Payload:
// Type 是表示任务类型的字符串值func (t *Task) Type() string// Payload 是任务执行所需的数据func (t *Task) Payload() []byte
客户端程序
在client.go中,我们将创建一些任务并使用asynq.Client将它们放入任务队列。
可以使用NewTask方法创建任务,参数是 type 和 payload。
使用[Enqueue](https://godoc.org/github.com/hibiken/asynq#Client.Enqueue)方法可以将任务放入队列,其参数是一个任务对象和任意数量的选项参数。
使用[ProcessIn](https://godoc.org/github.com/hibiken/asynq#ProcessIn)或[ProcessAt](https://godoc.org/github.com/hibiken/asynq#ProcessAt)选项可以安排什么时候处理该任务。
// Task payload for any email related tasks.type EmailTaskPayload struct {// ID for the email recipient.UserID int}// client.gofunc main() {client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})// Create a task with typename and payload.payload, err := json.Marshal(EmailTaskPayload{UserID: 42})if err != nil {log.Fatal(err)}t1 := asynq.NewTask("email:welcome", payload)t2 := asynq.NewTask("email:reminder", payload)// Process the task immediately.info, err := client.Enqueue(t1)if err != nil {log.Fatal(err)}log.Printf(" [*] Successfully enqueued task: %+v", info)// Process the task 24 hours later.info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))if err != nil {log.Fatal(err)}log.Printf(" [*] Successfully enqueued task: %+v", info)}
这就是客户端程序所需的全部内容。
服务端程序
在workers.go中,我们将创建一个asynq.Server实例来启动 workers。
NewServer方法的参数是RedisConnOpt和Config。
Config用于设置服务端的任务处理行为。
可以查看文档[Config](https://pkg.go.dev/github.com/hibiken/asynq#Config)以查看所有可用的配置选项。
为简单起见,我们将仅在此示例中指定并发性。
// workers.gofunc main() {srv := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"},asynq.Config{Concurrency: 10},)// 注意: 这里的 `handler` 是什么下文中会介绍到if err := srv.Run(handler); err != nil {log.Fatal(err)}}
(*Server).Run的参数是一个接口asynq.Handler,该接口有一个方法ProcessTask:
type Handler interface {// 如果任务执行成功,ProcessTask 应该返回 nil// 如果 ProcessTask 返回了一个非 nil 的 error,或抛异常,代表任务执行失败了,任务会稍后重新执行ProcessTask(context.Context, *Task) error}
实现 handler 的最简单方法是定义一个具有相同签名的函数,并用asynq.HandlerFunc包装后传递给Run:
func handler(ctx context.Context, t *asynq.Task) error {switch t.Type() {case "email:welcome":var p EmailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Welcome Email to User %d", p.UserID)case "email:reminder":var p EmailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Reminder Email to User %d", p.UserID)}return fmt.Errorf("unexpected task type: %s", t.Type())}func main() {srv := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"},asynq.Config{Concurrency: 10},)// Use asynq.HandlerFunc adapter for a handler functionif err := srv.Run(asynq.HandlerFunc(handler)); err != nil {log.Fatal(err)}}
我们可以继续向这个处理函数添加 switch case,但在实际应用中,在单独的函数中定义每个 case 的逻辑是很方便的。
为了重构我们的代码,让我们用ServeMux来创建我们的处理程序。ServeMux的使用就像"net/http"包里的一样,可以通过调用Handle或者HandleFunc来注册一个 handler。ServeMux实现了Handler接口,以便可以将其传递给(*Server).Run。
// workers.gofunc main() {srv := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"},asynq.Config{Concurrency: 10},)mux := asynq.NewServeMux()mux.HandleFunc("email:welcome", sendWelcomeEmail)mux.HandleFunc("email:reminder", sendReminderEmail)if err := srv.Run(mux); err != nil {log.Fatal(err)}}func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {var p EmailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Welcome Email to User %d", p.UserID)return nil}func sendReminderEmail(ctx context.Context, t *asynq.Task) error {var p EmailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Reminder Email to User %d", p.UserID)return nil}
现在我们已经提取了处理每种任务类型的函数,代码看起来更有条理。
但是,代码有点太隐晦了,我们有任务类型和 payload 这些字符串值,它们应该被封装在一个内聚的包中。
让我们通过编写一个封装任务创建和处理的包来重构我们的代码。
下面我们创建一个名为task的包:
$ mkdir task && touch task/task.go
package taskimport ("context""fmt""github.com/hibiken/asynq")// A list of task types.const (TypeWelcomeEmail = "email:welcome"TypeReminderEmail = "email:reminder")// Task payload for any email related tasks.type emailTaskPayload struct {// ID for the email recipient.UserID int}func NewWelcomeEmailTask(id int) (*asynq.Task, error) {payload, err := json.Marshal(emailTaskPayload{UserID: id})if err != nil {return nil, err}return asynq.NewTask(TypeWelcomeEmail, payload), nil}func NewReminderEmailTask(id int) (*asynq.Task, error) {payload, err := json.Marshal(emailTaskPayload{UserID: id})if err != nil {return nil, err}return asynq.NewTask(TypeReminderEmail, payload), nil}func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {var p emailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Welcome Email to User %d", p.UserID)return nil}func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {var p emailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Reminder Email to User %d", p.UserID)return nil}
现在我们在client.go和workers.go中都引入这个包。
// client.gofunc main() {client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})t1, err := task.NewWelcomeEmailTask(42)if err != nil {log.Fatal(err)}t2, err := task.NewReminderEmailTask(42)if err != nil {log.Fatal(err)}// Process the task immediately.info, err := client.Enqueue(t1)if err != nil {log.Fatal(err)}log.Printf(" [*] Successfully enqueued task: %+v", info)// Process the task 24 hours later.info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))if err != nil {log.Fatal(err)}log.Printf(" [*] Successfully enqueued task: %+v", info)}
// workers.gofunc main() {srv := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"},asynq.Config{Concurrency: 10},)mux := asynq.NewServeMux()mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)if err := srv.Run(mux); err != nil {log.Fatal(err)}}
现在代码看起来好多了!
运行程序
现在我们有了client和workers,我们可以运行这两个程序。
让我们运行client程序来创建和安排任务:
$ go run client/client.go
这将创建两项任务,一项立即处理,另一项在 24 小时后处理。
让我们使用asynqCLI 检查任务:
$ asynq dash
应该能够看到有一个任务处于 Enqueued 状态,另一个处于 Scheduled 状态。
注意:要详细了解每个状态的含义,可以查看任务生命周期。
最后,让我们启动workers程序来处理任务:
$ go run workers/workers.go
注意:workers是个守护程序,在发送终止程序的信号之前它不会退出。有关如何安全终止 workers 的最佳实践,请参阅 Signal Wiki 页面。
您应该能够在终端中看到一些文本,表明任务已成功处理。
您可以再次运行client程序,以查看 workers 如何接手并处理它们。
任务重试
默认情况下,失败的任务将使用指数退避算法重试多达 25 次。
让我们更新我们的处理程序,让它返回错误以模拟不成功的场景:
// tasks.gofunc HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {var p EmailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Attempting to Send Welcome Email to User %d...", p.UserID)return fmt.Errorf("could not send email to the user") // <-- Return error}
让我们重新启动 workers 并将任务排入队列:
$ go run workers/workers.go$ go run client/client.go
如果您正在运行asynq dash,您应该能够看到有一个任务处于重试状态(通过导航到队列详细信息视图并突出显示“重试”选项卡)。
要检查哪些任务处于重试状态,您还可以运行:
$ asynq task ls --queue=default --state=retry
这将列出将来要重试的所有任务。输出内容包含任务下一次执行的 ETA。
一旦任务用完其重试计数,该任务将转换为归档状态并且不会再被重试(您仍然可以使用 CLI 或 WebUI 工具手动运行归档任务)。
在结束本教程之前,让我们修复我们的处理程序:
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {var p EmailTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return err}log.Printf(" [*] Send Welcome Email to User %d", p.UserID)return nil}
现在我们修复了处理程序,任务将在下一次尝试中成功处理 :)
周期任务
基本原理
Asynq 运行周期任务的方式和 Celery 相似。
即启动一个调度程序 scheduler(Celery 中这个调度程序叫做 beat),它是一个后台服务,负责定期将任务放入队列,然后就什么也不管了,workers 去执行任务,这样就实现了周期任务。
所以这个 scheduler 主要就是包含定时的逻辑。事实上 asynq 的 scheduler 是依赖于 cron 实现的。
示例
scheduler := asynq.NewScheduler(redisConnOpt, nil) // 创建 schedulertask := asynq.NewTask("example_task", nil) // 创建任务// You can use cron spec string to specify the schedule.entryID, err := scheduler.Register("* * * * *", task) // 注册任务到 scheduler 中if err != nil {log.Fatal(err)}log.Printf("registered an entry: %q\n", entryID)// You can use "@every <duration>" to specify the interval.entryID, err = scheduler.Register("@every 30s", task)if err != nil {log.Fatal(err)}log.Printf("registered an entry: %q\n", entryID)// You can also pass options.entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))if err != nil {log.Fatal(err)}log.Printf("registered an entry: %q\n", entryID)if err := scheduler.Run(); err != nil { // 运行 schedulerlog.Fatal(err)}
调用scheduler.Run()后该程序会夯在这里,直到接收到 TERM 或 INT 信号。
时区
周期任务默认使用 UTC 时间,可以使用SchedulerOpts设置时区:
// Example of using America/Los_Angeles timezone instead of the default UTC timezone.loc, err := time.LoadLocation("America/Los_Angeles")if err != nil {panic(err)}scheduler := asynq.NewScheduler(redisConnOpt,&asynq.SchedulerOpts{Location: loc,},)
错误处理
如果调度程序无法将任务排入队列,您可以提供一个函数来处理错误:
function handleEnqueueError(task *asynq.Task, opts []Option, err error) {// your error handling logic}scheduler := asynq.NewScheduler(redisConnOpt,&asynq.SchedulerOpts{EnqueueErrorHandler: handleEnqueueError,},)
通过 CLI 检查
CLI 有一个子命令cron来检查调度程序条目。
要查看当前正在运行的调度程序中的所有条目,您可以运行:
$ asynq cron ls
此命令将输出条目列表,每个条目及其 ID、调度规范、下一个入队时间、上一个入队时间。
您还可以通过运行以下命令查看每个条目的历史记录:
$ asynq cron history <entryID>
动态添加、删除任务
如果想动态添加和删除周期任务(即不重新启动 scheduler),需要使用PeriodicTaskManager。
PeriodicTaskManager 使用PeriodicTaskConfigProvider定期获取周期任务配置,并将调度程序的条目与之同步。
例如,可以将周期任务的配置存储在数据库或本地文件中,要动态添加或删除任务时就更新此配置源。
下面的示例展示了如何使用本地文件来实现这一点,当然也可以很容易地修改示例以使用数据库或其他配置源。
在此示例中,我们将周期任务的配置存储在 YAML 文件中:
configs:- cronspec: "* * * * *"task_type: foo- cronspec: "* * * * *"task_type: bar
然后实现我们的PeriodicTaskConfigProvider,它用来读取 yaml 文件并返回一个PeriodicTaskConfig列表:
func main() {provider := &FileBasedConfigProvider{filename: "./periodic_task_config.yml"}mgr, err := asynq.NewPeriodicTaskManager(asynq.PeriodicTaskManagerOpts{RedisConnOpt: asynq.RedisClientOpt{Addr: "localhost:6379"},PeriodicTaskConfigProvider: provider, // this provider object is the interface to your config sourceSyncInterval: 10 * time.Second, // this field specifies how often sync should happen})if err != nil {log.Fatal(err)}if err := mgr.Run(); err != nil {log.Fatal(err)}}// FileBasedConfigProvider implements asynq.PeriodicTaskConfigProvider interface.type FileBasedConfigProvider struct {filename string}// Parses the yaml file and return a list of PeriodicTaskConfigs.func (p *FileBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {data, err := os.ReadFile(p.filename)if err != nil {return nil, err}var c PeriodicTaskConfigContainerif err := yaml.Unmarshal(data, &c); err != nil {return nil, err}var configs []*asynq.PeriodicTaskConfigfor _, cfg := range c.Configs {configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.Cronspec, Task: asynq.NewTask(cfg.TaskType, nil)})}return configs, nil}type PeriodicTaskConfigContainer struct {Configs []*Config `yaml:"configs"`}type Config struct {Cronspec string `yaml:"cronspec"`TaskType string `yaml:"task_type"`}
Web UI
Asynqmon 是一个基于 Web 的工具,用来监控和管理 Asynq 队列和任务。
下面是几张截图。
Queues View:
Tasks View:
Metrics View:
关于该工具的详细使用方法,参见该工具的 README。
命令行工具
Asynq 附带了一个命令行工具来检查队列和任务的状态。
要安装该命令行工具,需运行:
$ go install github.com/hibiken/asynq/tools/asynq
下面是运行asynq dash命令的一个示例:
关于该工具的详细使用方法,参见该工具的 README。
