队列

[[toc]]

简介

在构建 Web 应用程序时,你可能需要执行一些任务(例如解析和存储上传的 CSV 文件),但这些任务在典型的 Web 请求中花费的时间太长。幸运的是,Goravel 允许你轻松地创建可在后台排队处理的任务作业。通过将耗时的任务移到队列中,你的应用程序可以以超快的速度响应 Web 请求,并为客户提供更好的用户体验。

队列配置文件存储在 config/queue.go 中。 在这个文件中,你可以找到框架中包含的队列驱动程序的连接配置,其中包括 redissync 驱动。

连接 Vs 队列

在开始使用 Goravel 队列之前,理解「连接」和「队列」之间的区别非常重要。在 config/queue.go 配置文件中,有一个 connections 配置选项。此选项定义到后端服务(如 Redis)的特定连接。然而,任何给定的队列连接都可能有多个「队列」,这些「队列」可能被认为是不同的堆栈或成堆的排队任务。

请注意,config/queue.go 文件中的每个连接配置示例都包含一个 queue 属性。 这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果你没有显式地定义任务应该被发送到哪个队列,那么该任务将被放置在连接配置的 queue 属性中定义的队列上:

  1. // 这个任务将被推送到默认队列
  2. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{
  3. {Type: "int", Value: 1}
  4. }).Dispatch()
  5. // 这个任务将被推送到 "emails" 队列
  6. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{
  7. {Type: "int", Value: 1}
  8. }).OnQueue("emails").Dispatch()

创建任务

生成任务类

默认情况下,应用程序的所有的可排队任务都被存储在了 app/jobs 目录中。如果 app/jobs 目录不存在,当你运行 make:job Artisan 命令时,将会自动创建该目录:

  1. go run . artisan make:job ProcessPodcast

类结构

任务类非常简单,包含 Signature, Handle 方法,Signature 是任务类的唯一标识,Handle 在队列处理任务时将会被调用,在调用任务时传入的 []queue.Arg{} 将会被传入 Handle 中:

  1. package jobs
  2. type ProcessPodcast struct {
  3. }
  4. //Signature The name and signature of the job.
  5. func (receiver *ProcessPodcast) Signature() string {
  6. return "process_podcast"
  7. }
  8. //Handle Execute the job.
  9. func (receiver *ProcessPodcast) Handle(args ...interface{}) error {
  10. return nil
  11. }

注册任务

当任务创建好后,需要注册到 app/provides/queue_service_provider.go,以便能够正确调用。

  1. func (receiver *QueueServiceProvider) Jobs() []queue.Job {
  2. return []queue.Job{
  3. &jobs.Test{},
  4. }
  5. }

启动队列服务器

在根目录下 main.go 中启动队列服务器

  1. package main
  2. import (
  3. "github.com/goravel/framework/support/facades"
  4. "goravel/bootstrap"
  5. )
  6. func main() {
  7. // This bootstraps the framework and gets it ready for use.
  8. bootstrap.Boot()
  9. // Start http server by facades.Route.
  10. go func() {
  11. if err := facades.Route.Run(facades.Config.GetString("app.host")); err != nil {
  12. facades.Log.Errorf("Route run error: %v", err)
  13. }
  14. }()
  15. // Start queue server by facades.Queue.
  16. go func() {
  17. if err := facades.Queue.Worker(queue.Args{}).Run(); err != nil {
  18. facades.Log.Errorf("Queue run error: %v", err)
  19. }
  20. }()
  21. select {}
  22. }

facades.Queue.Worker 方法中可以传入不同的参数,通过启动多个 facades.Queue.Worker,可以达到监听多个队列的目的。

  1. // 不传参数,默认监听 `config/queue.go` 中的配置,并发数为 1
  2. go func() {
  3. if err := facades.Queue.Worker(queue.Args{}).Run(); err != nil {
  4. facades.Log.Errorf("Queue run error: %v", err)
  5. }
  6. }()
  7. // 监听 redis 链接的 processing 队列,并发数 10
  8. go func() {
  9. if err := facades.Queue.Worker(queue.Args{
  10. Connection: "redis",
  11. Queue: "processing",
  12. Concurrent: 10,
  13. }).Run(); err != nil {
  14. facades.Log.Errorf("Queue run error: %v", err)
  15. }
  16. }()

调度任务

一旦你写好了你的任务类,你可以使用任务本身的 dispatch 方法来调度它:

  1. package controllers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "github.com/goravel/framework/contracts/queue"
  5. "github.com/goravel/framework/support/facades"
  6. "goravel/app/jobs"
  7. )
  8. type UserController struct {
  9. }
  10. func (r UserController) Show(ctx *gin.Context) {
  11. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).Dispatch()
  12. if err != nil {
  13. // do something
  14. }
  15. }

同步调度

如果你想立即(同步)调度任务,你可以使用 dispatchSync 方法。使用此方法时,任务不会排队,会在当前进程内立即执行:

  1. package controllers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "github.com/goravel/framework/contracts/queue"
  5. "github.com/goravel/framework/support/facades"
  6. "goravel/app/jobs"
  7. )
  8. type UserController struct {
  9. }
  10. func (r UserController) Show(ctx *gin.Context) {
  11. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).DispatchSync()
  12. if err != nil {
  13. // do something
  14. }
  15. }

任务链

任务链允许你指定一组应在主任务成功执行后按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,你可以使用 facades.Queue 提供的 chain 方法:

  1. err := facades.Queue.Chain([]queue.Jobs{
  2. {
  3. Job: &jobs.Test{},
  4. Args: []queue.Arg{
  5. {Type: "int", Value: 1},
  6. },
  7. },
  8. {
  9. Job: &jobs.Test1{},
  10. Args: []queue.Arg{
  11. {Type: "int", Value: 2},
  12. },
  13. },
  14. }).Dispatch()

自定义队列 & 连接

分派到特定队列

通过将任务推送到不同的队列,你可以对排队的任务进行「分类」,甚至可以优先考虑分配给各个队列的 worker 数量。请记住,这不会将任务推送到队列配置文件定义的不同队列「连接」,而只会推送到单个连接中的特定队列。要指定队列,请在调度任务时使用 onQueue 方法:

  1. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).OnQueue("processing").Dispatch()

调度到特定连接

如果你的应用程序与多个队列连接交互,你可以使用 onConnection 方法指定将任务推送到哪个连接:

  1. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).OnConnection("sync").Dispatch()

你可以将 onConnection 和 onQueue 方法链接在一起,以指定任务的连接和队列:

  1. err := facades.Queue.Job(&jobs.Test{}, []queue.Arg{}).OnConnection("sync").OnQueue("processing").Dispatch()

queue.Arg.Type 支持的类型

  1. bool
  2. int
  3. int8
  4. int16
  5. int32
  6. int64
  7. uint
  8. uint8
  9. uint16
  10. uint32
  11. uint64
  12. float32
  13. float64
  14. string
  15. []bool
  16. []int
  17. []int8
  18. []int16
  19. []int32
  20. []int64
  21. []uint
  22. []uint8
  23. []uint16
  24. []uint32
  25. []uint64
  26. []float32
  27. []float64
  28. []string