本文主要是对 Asynq 文档部分内容的翻译,原文档地址:https://github.com/hibiken/asynq/wiki
Asynq 库 Github 地址:https://github.com/hibiken/asynq
注意:该库目前还处于 v0.x.x 版本,还在进行大量开发,API 更改频繁。在 v1.0.0 发布之前,API 可能会在没有重大版本更新的情况下更改。
翻译此文档时,Asynq 版本为 0.23.0
Introduction
Asynq 是一个用 Go 开发的简单、可靠、高效的分布式任务队列。可以用来调度异步任务和定时任务。
其定位类似 Python 中的 Celery。
Asynq 用 redis 作为 broker,暂不支持 RabbitMQ 等消息队列。
三句话描述 Asynq 的工作原理:
- 客户端将任务放入队列。
- 服务端从队列中拉出任务并为每个任务启动一个 worker(一个 goroutine) 去执行它。
- 多个 worker 同时处理任务。
示例用例:
Asynq 的特性:
- 保证任务至少执行一次
- 任务调度
- 重试失败的任务
- 在 worker 崩溃的情况下自动恢复任务
- 加权优先级队列
- 严格的优先级队列
- 由于 Redis 中的写入速度很快,因此添加任务的延迟很低
- 使用唯一选项对任务进行重复数据删除
- 允许每个任务设置超时时间或截止时间
- 允许组合一组任务以批处理多个连续操作
- 灵活的 handler 接口(支持中间件)
- 能够暂停队列以停止处理队列中的任务
- 周期任务
- 支持 Redis 集群实现自动分片和高可用性
- 支持 Redis 哨兵以实现高可用性
- 与 Prometheus 集成以收集和可视化队列指标
- 用于审计和远程控制队列和任务的 Web UI
- 用于审计和远程控制队列和任务的 CLI
Getting Started
欢迎来到 Asynq 之旅!
在本教程中,我们将编写两个程序,client
和workers
。
client.go
将创建和安排由 workers 异步处理的任务。workers.go
将启动多个并发 workers 来处理客户端创建的任务。
本指南假定你在**localhost:6379**
运行了 Redis。
让我们从创建两个主要文件开始:
$ mkdir quickstart && cd quickstart
$ go mod init asynq-quickstart
$ mkdir client workers
$ touch client/client.go workers/workers.go
安装asynq
包:
$ go get -u github.com/hibiken/asynq
核心类型
在我们开始编写代码之前,先介绍一下我们将在两个程序中使用的一些核心类型。
Redis 连接选项
Asynq 使用 Redis 作为消息代理。
两个文件client.go
和workers.go
都需要连接到 Redis 才能写入和读取任务信息。
我们将用RedisClientOpt
指定与本地运行的 Redis 服务器的连接:
redisConnOpt := asynq.RedisClientOpt{
Addr: "localhost:6379",
Password: "mypassword", // 如果不需要密码,省略此选项
DB: 0,
}
任务
在asynq
中,一个工作单元被封装在一个名为Task
的类型中,它在概念上具有两个字段:Type
和Payload
:
// Type 是表示任务类型的字符串值
func (t *Task) Type() string
// Payload 是任务执行所需的数据
func (t *Task) Payload() []byte
客户端程序
在client.go
中,我们将创建一些任务并使用asynq.Client
将它们放入任务队列。
可以使用NewTask
方法创建任务,参数是 type 和 payload。
使用[Enqueue](https://godoc.org/github.com/hibiken/asynq#Client.Enqueue)
方法可以将任务放入队列,其参数是一个任务对象和任意数量的选项参数。
使用[ProcessIn](https://godoc.org/github.com/hibiken/asynq#ProcessIn)
或[ProcessAt](https://godoc.org/github.com/hibiken/asynq#ProcessAt)
选项可以安排什么时候处理该任务。
// Task payload for any email related tasks.
type EmailTaskPayload struct {
// ID for the email recipient.
UserID int
}
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
// Create a task with typename and payload.
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("email:welcome", payload)
t2 := asynq.NewTask("email:reminder", payload)
// Process the task immediately.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
// Process the task 24 hours later.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
}
这就是客户端程序所需的全部内容。
服务端程序
在workers.go
中,我们将创建一个asynq.Server
实例来启动 workers。
NewServer
方法的参数是RedisConnOpt
和Config
。
Config
用于设置服务端的任务处理行为。
可以查看文档[Config](https://pkg.go.dev/github.com/hibiken/asynq#Config)
以查看所有可用的配置选项。
为简单起见,我们将仅在此示例中指定并发性。
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// 注意: 这里的 `handler` 是什么下文中会介绍到
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
(*Server).Run
的参数是一个接口asynq.Handler
,该接口有一个方法ProcessTask
:
type Handler interface {
// 如果任务执行成功,ProcessTask 应该返回 nil
// 如果 ProcessTask 返回了一个非 nil 的 error,或抛异常,代表任务执行失败了,任务会稍后重新执行
ProcessTask(context.Context, *Task) error
}
实现 handler 的最简单方法是定义一个具有相同签名的函数,并用asynq.HandlerFunc
包装后传递给Run
:
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
}
return fmt.Errorf("unexpected task type: %s", t.Type())
}
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Use asynq.HandlerFunc adapter for a handler function
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
我们可以继续向这个处理函数添加 switch case,但在实际应用中,在单独的函数中定义每个 case 的逻辑是很方便的。
为了重构我们的代码,让我们用ServeMux
来创建我们的处理程序。ServeMux
的使用就像"net/http"
包里的一样,可以通过调用Handle
或者HandleFunc
来注册一个 handler。ServeMux
实现了Handler
接口,以便可以将其传递给(*Server).Run
。
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}
现在我们已经提取了处理每种任务类型的函数,代码看起来更有条理。
但是,代码有点太隐晦了,我们有任务类型和 payload 这些字符串值,它们应该被封装在一个内聚的包中。
让我们通过编写一个封装任务创建和处理的包来重构我们的代码。
下面我们创建一个名为task
的包:
$ mkdir task && touch task/task.go
package task
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// A list of task types.
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Task payload for any email related tasks.
type emailTaskPayload struct {
// ID for the email recipient.
UserID int
}
func NewWelcomeEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeWelcomeEmail, payload), nil
}
func NewReminderEmailTask(id int) (*asynq.Task, error) {
payload, err := json.Marshal(emailTaskPayload{UserID: id})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeReminderEmail, payload), nil
}
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}
func HandleReminderEmailTask(ctx context.Context, t *asynq.Task) error {
var p emailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}
现在我们在client.go
和workers.go
中都引入这个包。
// client.go
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
t1, err := task.NewWelcomeEmailTask(42)
if err != nil {
log.Fatal(err)
}
t2, err := task.NewReminderEmailTask(42)
if err != nil {
log.Fatal(err)
}
// Process the task immediately.
info, err := client.Enqueue(t1)
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
// Process the task 24 hours later.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
}
// workers.go
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(task.TypeWelcomeEmail, task.HandleWelcomeEmailTask)
mux.HandleFunc(task.TypeReminderEmail, task.HandleReminderEmailTask)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
现在代码看起来好多了!
运行程序
现在我们有了client
和workers
,我们可以运行这两个程序。
让我们运行client
程序来创建和安排任务:
$ go run client/client.go
这将创建两项任务,一项立即处理,另一项在 24 小时后处理。
让我们使用asynq
CLI 检查任务:
$ asynq dash
应该能够看到有一个任务处于 Enqueued 状态,另一个处于 Scheduled 状态。
注意:要详细了解每个状态的含义,可以查看任务生命周期。
最后,让我们启动workers
程序来处理任务:
$ go run workers/workers.go
注意:workers
是个守护程序,在发送终止程序的信号之前它不会退出。有关如何安全终止 workers 的最佳实践,请参阅 Signal Wiki 页面。
您应该能够在终端中看到一些文本,表明任务已成功处理。
您可以再次运行client
程序,以查看 workers 如何接手并处理它们。
任务重试
默认情况下,失败的任务将使用指数退避算法重试多达 25 次。
让我们更新我们的处理程序,让它返回错误以模拟不成功的场景:
// tasks.go
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Attempting to Send Welcome Email to User %d...", p.UserID)
return fmt.Errorf("could not send email to the user") // <-- Return error
}
让我们重新启动 workers 并将任务排入队列:
$ go run workers/workers.go
$ go run client/client.go
如果您正在运行asynq dash
,您应该能够看到有一个任务处于重试状态(通过导航到队列详细信息视图并突出显示“重试”选项卡)。
要检查哪些任务处于重试状态,您还可以运行:
$ asynq task ls --queue=default --state=retry
这将列出将来要重试的所有任务。输出内容包含任务下一次执行的 ETA。
一旦任务用完其重试计数,该任务将转换为归档状态并且不会再被重试(您仍然可以使用 CLI 或 WebUI 工具手动运行归档任务)。
在结束本教程之前,让我们修复我们的处理程序:
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}
现在我们修复了处理程序,任务将在下一次尝试中成功处理 :)
周期任务
基本原理
Asynq 运行周期任务的方式和 Celery 相似。
即启动一个调度程序 scheduler(Celery 中这个调度程序叫做 beat),它是一个后台服务,负责定期将任务放入队列,然后就什么也不管了,workers 去执行任务,这样就实现了周期任务。
所以这个 scheduler 主要就是包含定时的逻辑。事实上 asynq 的 scheduler 是依赖于 cron 实现的。
示例
scheduler := asynq.NewScheduler(redisConnOpt, nil) // 创建 scheduler
task := asynq.NewTask("example_task", nil) // 创建任务
// You can use cron spec string to specify the schedule.
entryID, err := scheduler.Register("* * * * *", task) // 注册任务到 scheduler 中
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// You can use "@every <duration>" to specify the interval.
entryID, err = scheduler.Register("@every 30s", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
// You can also pass options.
entryID, err = scheduler.Register("@every 24h", task, asynq.Queue("myqueue"))
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
if err := scheduler.Run(); err != nil { // 运行 scheduler
log.Fatal(err)
}
调用scheduler.Run()
后该程序会夯在这里,直到接收到 TERM 或 INT 信号。
时区
周期任务默认使用 UTC 时间,可以使用SchedulerOpts
设置时区:
// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
错误处理
如果调度程序无法将任务排入队列,您可以提供一个函数来处理错误:
function handleEnqueueError(task *asynq.Task, opts []Option, err error) {
// your error handling logic
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
EnqueueErrorHandler: handleEnqueueError,
},
)
通过 CLI 检查
CLI 有一个子命令cron
来检查调度程序条目。
要查看当前正在运行的调度程序中的所有条目,您可以运行:
$ asynq cron ls
此命令将输出条目列表,每个条目及其 ID、调度规范、下一个入队时间、上一个入队时间。
您还可以通过运行以下命令查看每个条目的历史记录:
$ asynq cron history <entryID>
动态添加、删除任务
如果想动态添加和删除周期任务(即不重新启动 scheduler),需要使用PeriodicTaskManager
。
PeriodicTaskManager 使用PeriodicTaskConfigProvider
定期获取周期任务配置,并将调度程序的条目与之同步。
例如,可以将周期任务的配置存储在数据库或本地文件中,要动态添加或删除任务时就更新此配置源。
下面的示例展示了如何使用本地文件来实现这一点,当然也可以很容易地修改示例以使用数据库或其他配置源。
在此示例中,我们将周期任务的配置存储在 YAML 文件中:
configs:
- cronspec: "* * * * *"
task_type: foo
- cronspec: "* * * * *"
task_type: bar
然后实现我们的PeriodicTaskConfigProvider
,它用来读取 yaml 文件并返回一个PeriodicTaskConfig
列表:
func main() {
provider := &FileBasedConfigProvider{filename: "./periodic_task_config.yml"}
mgr, err := asynq.NewPeriodicTaskManager(
asynq.PeriodicTaskManagerOpts{
RedisConnOpt: asynq.RedisClientOpt{Addr: "localhost:6379"},
PeriodicTaskConfigProvider: provider, // this provider object is the interface to your config source
SyncInterval: 10 * time.Second, // this field specifies how often sync should happen
})
if err != nil {
log.Fatal(err)
}
if err := mgr.Run(); err != nil {
log.Fatal(err)
}
}
// FileBasedConfigProvider implements asynq.PeriodicTaskConfigProvider interface.
type FileBasedConfigProvider struct {
filename string
}
// Parses the yaml file and return a list of PeriodicTaskConfigs.
func (p *FileBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
data, err := os.ReadFile(p.filename)
if err != nil {
return nil, err
}
var c PeriodicTaskConfigContainer
if err := yaml.Unmarshal(data, &c); err != nil {
return nil, err
}
var configs []*asynq.PeriodicTaskConfig
for _, cfg := range c.Configs {
configs = append(configs, &asynq.PeriodicTaskConfig{Cronspec: cfg.Cronspec, Task: asynq.NewTask(cfg.TaskType, nil)})
}
return configs, nil
}
type PeriodicTaskConfigContainer struct {
Configs []*Config `yaml:"configs"`
}
type Config struct {
Cronspec string `yaml:"cronspec"`
TaskType string `yaml:"task_type"`
}
Web UI
Asynqmon 是一个基于 Web 的工具,用来监控和管理 Asynq 队列和任务。
下面是几张截图。
Queues View:
Tasks View:
Metrics View:
关于该工具的详细使用方法,参见该工具的 README。
命令行工具
Asynq 附带了一个命令行工具来检查队列和任务的状态。
要安装该命令行工具,需运行:
$ go install github.com/hibiken/asynq/tools/asynq
下面是运行asynq dash
命令的一个示例:
关于该工具的详细使用方法,参见该工具的 README。