什么是Asynq

Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiq和celery,他具有以下特点:

  • 保证至少执行一次任务
  • 持久化
  • 失败重试
  • worker崩溃自动恢复
  • 优先队列
  • 暂停队列
  • 支持中间件
  • 允许唯一任务
  • 周期性任务
  • 支持Redis Cluster实现自动分片
  • 支持Redis Sentinels实现高可用
  • 支持Prometheus metrics
  • 提供web ui管理
  • 提供cli管理

安装

  1. # 代码库:
  2. go get -u github.com/hibiken/asynq
  3. # 命令行工具:
  4. go get -u github.com/hibiken/asynq/tools/asynq

使用

1. 准备

先准备一个redis,单点或者集群都ok。

2. worker端(消费端)

main.go:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "time"
  9. "github.com/hibiken/asynq"
  10. "golang.org/x/sys/unix"
  11. )
  12. func main() {
  13. // asynq server
  14. srv := asynq.NewServer(
  15. asynq.RedisClientOpt{
  16. Addr: ":6379",
  17. Password: "Your password",
  18. DB: 0,
  19. },
  20. asynq.Config{Concurrency: 20},
  21. )
  22. mux := asynq.NewServeMux()
  23. // some middlewares
  24. mux.Use(func(next asynq.Handler) asynq.Handler {
  25. return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
  26. // just record a log
  27. fmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))
  28. return next.ProcessTask(ctx, t)
  29. })
  30. })
  31. // some workers
  32. mux.HandleFunc("msg", HandleMsg)
  33. // start server
  34. if err := srv.Start(mux); err != nil {
  35. log.Fatalf("could not start server: %v", err)
  36. }
  37. // Wait for termination signal.
  38. c := make(chan os.Signal, 1)
  39. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM,
  40. syscall.SIGQUIT,
  41. //syscall.SIGUSR1, syscall.SIGUSR2,
  42. )
  43. for {
  44. s := <- c
  45. switch s {
  46. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
  47. fmt.Println("Program Exit...", s)
  48. srv.Shutdown()
  49. srv.Stop()
  50. return
  51. //case syscall.SIGUSR1:
  52. // fmt.Println("usr1 signal", s)
  53. //case syscall.SIGUSR2:
  54. // fmt.Println("usr2 signal", s)
  55. default:
  56. fmt.Println("other signal", s)
  57. }
  58. }
  59. }

worker.go:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/hibiken/asynq"
  6. )
  7. // HandleMsg 处理msg
  8. func HandleMsg(ctx context.Context, t *asynq.Task) error {
  9. //fmt.Println("------HandleMsg start------")
  10. log.Printf("type: %v, payload: %s", t.Type(), string(t.Payload()))
  11. return nil
  12. }

创建任务enqueue_test.go:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "testing"
  6. "time"
  7. "github.com/hibiken/asynq"
  8. )
  9. var c *asynq.Client
  10. func TestMain(m *testing.M) {
  11. r := asynq.RedisClientOpt{
  12. Addr: ":6379",
  13. Password: "Your password",
  14. DB: 0,
  15. }
  16. c = asynq.NewClient(r)
  17. ret := m.Run()
  18. c.Close()
  19. os.Exit(ret)
  20. }
  21. // 即时消费
  22. func Test_Enqueue(t *testing.T) {
  23. payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
  24. task := asynq.NewTask("msg", payload)
  25. res, err := c.Enqueue(task)
  26. if err != nil {
  27. t.Errorf("could not enqueue task: %v", err)
  28. t.FailNow()
  29. }
  30. fmt.Printf("Enqueued Result: %+v\n", res)
  31. }
  32. // 延时消费
  33. func Test_EnqueueDelay(t *testing.T) {
  34. payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
  35. task := asynq.NewTask("msg", payload)
  36. res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
  37. // res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
  38. if err != nil {
  39. t.Errorf("could not enqueue task: %v", err)
  40. t.FailNow()
  41. }
  42. fmt.Printf("Enqueued Result: %+v\n", res)
  43. }
  44. // 超时、重试、过期
  45. func Test_EnqueueOther(t *testing.T) {
  46. payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
  47. task := asynq.NewTask("msg", payload)
  48. // 10秒超时,最多重试3次,20秒后过期
  49. res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
  50. if err != nil {
  51. t.Errorf("could not enqueue task: %v", err)
  52. t.FailNow()
  53. }
  54. fmt.Printf("Enqueued Result: %+v\n", res)
  55. }

测试:
先启动worker:

  1. $ go run main.go worker.go

创建任务:

  1. $ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1
  2. === RUN Test_Enqueue
  3. Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}
  4. --- PASS: Test_Enqueue (0.00s)
  5. PASS
  6. ok asynq_test 0.009s

监控与管理

命令行工具asynq

https://github.com/hibiken/asynq/tree/master/tools/asynq

  1. $ asynq -p Yourpassword stats
  2. Task Count by State
  3. active pending scheduled retry archived
  4. ---------- -------- --------- ----- ----
  5. 0 0 0 0 0
  6. Task Count by Queue
  7. default
  8. -------
  9. 0
  10. Daily Stats 2021-06-11 UTC
  11. processed failed error rate
  12. --------- ------ ----------
  13. 4 0 0.00%
  14. Redis Info
  15. version uptime connections memory usage peak memory usage
  16. ------- ------ ----------- ------------ -----------------
  17. 6.2.0 0 days 5 16.04MB 16.14MB

Web UI

https://github.com/hibiken/asynqmon
启动:

  1. ./asynqmon --port=3000 --redis-addr=localhost:6380

每日一库之112:asynq - 图1