队列

[[toc]]

简介

在构建 Web 应用程序时,你可能需要执行一些比较耗时的任务(例如解析和存储上传的 CSV 文件),Goravel 可以让你轻松地创建可在后台排队处理的任务。通过将耗时的任务移到队列中,你的应用程序可以以超快的速度响应 Web 请求,并为客户提供更好的用户体验。我们使用 facades.Queue 实现这些功能。

队列配置文件存储在 config/queue.go 中。目前框架支持两种队列驱动: redissync

连接 Vs 队列

在开始使用 Goravel 队列之前,理解「连接」和「队列」之间的区别非常重要。在 config/queue.go 配置文件中,有一个 connections 配置选项。此选项定义到后端服务(如 Redis)的特定连接。然而,任何给定的队列连接都可能有多个「队列」,这些「队列」可能被认为是不同的堆栈或成堆的排队任务。

请注意,config/queue.go 文件中的每个连接配置示例都包含一个 queue 属性。 这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果你没有显式地定义任务应该被发送到哪个队列,那么该任务将被放置在 queue 定义的队列上:

  1. // 这个任务将被推送到默认队列
  2. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{
  3. {Type: "int", Value: 1},
  4. }).Dispatch()
  5. // 这个任务将被推送到 "emails" 队列
  6. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{
  7. {Type: "int", Value: 1}
  8. }).OnQueue("emails").Dispatch()

创建任务

生成任务类

默认情况下,应用程序的所有的任务都被存储在了 app/jobs 目录中。如果 app/jobs 目录不存在,当你运行 make:job Artisan 命令时,将会自动创建该目录:

  1. go run . artisan make:job ProcessPodcast

类结构

任务类非常简单,包含 Signature, Handle 方法,Signature 是任务类的唯一标识,Handle 在队列处理任务时将会被调用,在调用任务时传入的 []queue.Arg{} 将会被传入 Handle 中:

  1. package jobs
  2. type ProcessPodcast struct {
  3. }
  4. //Signature The name and signature of the job.
  5. func (receiver *ProcessPodcast) Signature() string {
  6. return "process_podcast"
  7. }
  8. //Handle Execute the job.
  9. func (receiver *ProcessPodcast) Handle(args ...interface{}) error {
  10. return nil
  11. }

注册任务

当任务创建好后,需要注册到 app/provides/queue_service_provider.go,以便能够正确调用。

  1. func (receiver *QueueServiceProvider) Jobs() []queue.Job {
  2. return []queue.Job{
  3. &jobs.Test{},
  4. }
  5. }

启动队列服务器

在根目录下 main.go 中启动队列服务器。

  1. package main
  2. import (
  3. "github.com/goravel/framework/facades"
  4. "goravel/bootstrap"
  5. )
  6. func main() {
  7. // This bootstraps the framework and gets it ready for use.
  8. bootstrap.Boot()
  9. // Start queue server by facades.Queue.
  10. go func() {
  11. if err := facades.Queue.Worker(nil).Run(); err != nil {
  12. facades.Log.Errorf("Queue run error: %v", err)
  13. }
  14. }()
  15. select {}
  16. }

facades.Queue.Worker 方法中可以传入不同的参数,通过启动多个 facades.Queue.Worker,可以达到监听多个队列的目的。

  1. // 不传参数,默认监听 `config/queue.go` 中的配置,并发数为 1
  2. go func() {
  3. if err := facades.Queue.Worker(nil).Run(); err != nil {
  4. facades.Log.Errorf("Queue run error: %v", err)
  5. }
  6. }()
  7. // 监听 redis 链接的 processing 队列,并发数 10
  8. go func() {
  9. if err := facades.Queue.Worker(&queue.Args{
  10. Connection: "redis",
  11. Queue: "processing",
  12. Concurrent: 10,
  13. }).Run(); err != nil {
  14. facades.Log.Errorf("Queue run error: %v", err)
  15. }
  16. }()

调度任务

一旦写好了任务类,你可以使用任务本身的 dispatch 方法来调度它:

  1. package controllers
  2. import (
  3. "github.com/goravel/framework/contracts/queue"
  4. "github.com/goravel/framework/contracts/http"
  5. "github.com/goravel/framework/facades"
  6. "goravel/app/jobs"
  7. )
  8. type UserController struct {
  9. }
  10. func (r *UserController) Show(ctx http.Context) {
  11. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).Dispatch()
  12. if err != nil {
  13. // do something
  14. }
  15. }

同步调度

如果你想立即(同步)调度任务,你可以使用 dispatchSync 方法。使用此方法时,任务不会排队,会在当前进程内立即执行:

  1. package controllers
  2. import (
  3. "github.com/goravel/framework/contracts/queue"
  4. "github.com/goravel/framework/contracts/http"
  5. "github.com/goravel/framework/facades"
  6. "goravel/app/jobs"
  7. )
  8. type UserController struct {
  9. }
  10. func (r *serController) Show(ctx http.Context) {
  11. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).DispatchSync()
  12. if err != nil {
  13. // do something
  14. }
  15. }

任务链

任务链允许你指定一组按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,你可以使用 facades.Queue 提供的 chain 方法:

  1. err := facades.Queue.Chain([]queue.Jobs{
  2. {
  3. Job: &jobs.Test{},
  4. Args: []queue.Arg{
  5. {Type: "int", Value: 1},
  6. },
  7. },
  8. {
  9. Job: &jobs.Test1{},
  10. Args: []queue.Arg{
  11. {Type: "int", Value: 2},
  12. },
  13. },
  14. }).Dispatch()

自定义队列 & 连接

分派到特定队列

通过将任务推送到不同的队列,你可以对排队的任务进行「分类」,甚至可以优先考虑分配给各个队列的 worker 数量。

  1. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).OnQueue("processing").Dispatch()

调度到特定连接

如果你的应用程序与多个队列连接交互,你可以使用 onConnection 方法指定将任务推送到哪个连接:

  1. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).OnConnection("sync").Dispatch()

你可以将 onConnection 和 onQueue 方法链接在一起,以指定任务的连接和队列:

  1. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).OnConnection("sync").OnQueue("processing").Dispatch()

queue.Arg.Type 支持的类型

  1. bool
  2. int
  3. int8
  4. int16
  5. int32
  6. int64
  7. uint
  8. uint8
  9. uint16
  10. uint32
  11. uint64
  12. float32
  13. float64
  14. string
  15. []bool
  16. []int
  17. []int8
  18. []int16
  19. []int32
  20. []int64
  21. []uint
  22. []uint8
  23. []uint16
  24. []uint32
  25. []uint64
  26. []float32
  27. []float64
  28. []string