什么是Asynq
Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiq和celery,他具有以下特点:
- 保证至少执行一次任务
- 持久化
- 失败重试
- worker崩溃自动恢复
- 优先队列
- 暂停队列
- 支持中间件
- 允许唯一任务
- 周期性任务
- 支持Redis Cluster实现自动分片
- 支持Redis Sentinels实现高可用
- 支持Prometheus metrics
- 提供web ui管理
- 提供cli管理
安装
# 代码库:go get -u github.com/hibiken/asynq# 命令行工具:go get -u github.com/hibiken/asynq/tools/asynq
使用
1. 准备
2. worker端(消费端)
main.go:
package mainimport ("context""fmt""log""os""os/signal""time""github.com/hibiken/asynq""golang.org/x/sys/unix")func main() {// asynq serversrv := asynq.NewServer(asynq.RedisClientOpt{Addr: ":6379",Password: "Your password",DB: 0,},asynq.Config{Concurrency: 20},)mux := asynq.NewServeMux()// some middlewaresmux.Use(func(next asynq.Handler) asynq.Handler {return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {// just record a logfmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))return next.ProcessTask(ctx, t)})})// some workersmux.HandleFunc("msg", HandleMsg)// start serverif err := srv.Start(mux); err != nil {log.Fatalf("could not start server: %v", err)}// Wait for termination signal.c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,syscall.SIGQUIT,//syscall.SIGUSR1, syscall.SIGUSR2,)for {s := <- cswitch s {case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:fmt.Println("Program Exit...", s)srv.Shutdown()srv.Stop()return//case syscall.SIGUSR1:// fmt.Println("usr1 signal", s)//case syscall.SIGUSR2:// fmt.Println("usr2 signal", s)default:fmt.Println("other signal", s)}}}
worker.go:
package mainimport ("context""fmt""github.com/hibiken/asynq")// HandleMsg 处理msgfunc HandleMsg(ctx context.Context, t *asynq.Task) error {//fmt.Println("------HandleMsg start------")log.Printf("type: %v, payload: %s", t.Type(), string(t.Payload()))return nil}
创建任务enqueue_test.go:
package mainimport ("fmt""os""testing""time""github.com/hibiken/asynq")var c *asynq.Clientfunc TestMain(m *testing.M) {r := asynq.RedisClientOpt{Addr: ":6379",Password: "Your password",DB: 0,}c = asynq.NewClient(r)ret := m.Run()c.Close()os.Exit(ret)}// 即时消费func Test_Enqueue(t *testing.T) {payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}task := asynq.NewTask("msg", payload)res, err := c.Enqueue(task)if err != nil {t.Errorf("could not enqueue task: %v", err)t.FailNow()}fmt.Printf("Enqueued Result: %+v\n", res)}// 延时消费func Test_EnqueueDelay(t *testing.T) {payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}task := asynq.NewTask("msg", payload)res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))// res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))if err != nil {t.Errorf("could not enqueue task: %v", err)t.FailNow()}fmt.Printf("Enqueued Result: %+v\n", res)}// 超时、重试、过期func Test_EnqueueOther(t *testing.T) {payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}task := asynq.NewTask("msg", payload)// 10秒超时,最多重试3次,20秒后过期res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))if err != nil {t.Errorf("could not enqueue task: %v", err)t.FailNow()}fmt.Printf("Enqueued Result: %+v\n", res)}
测试:
先启动worker:
$ go run main.go worker.go
创建任务:
$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1=== RUN Test_EnqueueEnqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}--- PASS: Test_Enqueue (0.00s)PASSok asynq_test 0.009s
监控与管理
命令行工具asynq
https://github.com/hibiken/asynq/tree/master/tools/asynq
$ asynq -p Yourpassword statsTask Count by Stateactive pending scheduled retry archived---------- -------- --------- ----- ----0 0 0 0 0Task Count by Queuedefault-------0Daily Stats 2021-06-11 UTCprocessed failed error rate--------- ------ ----------4 0 0.00%Redis Infoversion uptime connections memory usage peak memory usage------- ------ ----------- ------------ -----------------6.2.0 0 days 5 16.04MB 16.14MB
Web UI
https://github.com/hibiken/asynqmon
启动:
./asynqmon --port=3000 --redis-addr=localhost:6380

