异步队列

异步队列区别于 RabbitMQ Kafka 等消息队列,它只提供一种 异步处理异步延时处理 的能力,并 不能 严格地保证消息的持久化和 不支持 完备的 ACK 应答机制。

安装

  1. composer require hyperf/async-queue

配置

配置文件位于 config/autoload/async_queue.php,如文件不存在可自行创建。

暂时只支持 Redis Driver 驱动。

配置 类型 默认值 备注
driver string Hyperf\AsyncQueue\Driver\RedisDriver::class
channel string queue 队列前缀
redis.pool string default redis 连接池
timeout int 2 pop 消息的超时时间
retry_seconds int,array 5 失败后重新尝试间隔
handle_timeout int 10 消息处理超时时间
processes int 1 消费进程数
concurrent.limit int 1 同时处理消息数
max_messages int 0 进程重启所需最大处理的消息数 默认不重启
  1. <?php
  2. return [
  3. 'default' => [
  4. 'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
  5. 'redis' => [
  6. 'pool' => 'default'
  7. ],
  8. 'channel' => 'queue',
  9. 'timeout' => 2,
  10. 'retry_seconds' => 5,
  11. 'handle_timeout' => 10,
  12. 'processes' => 1,
  13. 'concurrent' => [
  14. 'limit' => 5,
  15. ],
  16. ],
  17. ];

retry_seconds 也可以传入数组,根据重试次数相应修改重试时间,例如

  1. <?php
  2. return [
  3. 'default' => [
  4. 'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
  5. 'channel' => 'queue',
  6. 'retry_seconds' => [1, 5, 10, 20],
  7. 'processes' => 1,
  8. ],
  9. ];

工作原理

ConsumerProcess 是异步消费进程,会根据用户创建的 Job 或者使用 @AsyncQueueMessage 的代码块,执行消费逻辑。 Job@AsyncQueueMessage 都是需要投递和执行的任务,即数据、消费逻辑都会在任务中定义。

  • Job 类中成员变量即为待消费的数据,handle() 方法则为消费逻辑。
  • @AsyncQueueMessage 注解的方法,构造函数传入的数据即为待消费的数据,方法体则为消费逻辑。
  1. graph LR;
  2. A[服务启动]-->B[异步消费进程启动]
  3. B-->C[监听队列]
  4. D[投递任务]-->C
  5. C-->F[消费任务]

使用

配置异步消费进程

组件已经提供了默认 异步消费进程,只需要将它配置到 config/autoload/processes.php 中即可。

  1. <?php
  2. return [
  3. Hyperf\AsyncQueue\Process\ConsumerProcess::class,
  4. ];

当然,您也可以将以下 Process 添加到自己的项目中。

配置方式和注解方式,二选一即可。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Process;
  4. use Hyperf\AsyncQueue\Process\ConsumerProcess;
  5. use Hyperf\Process\Annotation\Process;
  6. #[Process(name: "async-queue")]
  7. class AsyncQueueConsumer extends ConsumerProcess
  8. {
  9. }

生产消息

传统方式

这种模式会把对象直接序列化然后存到 Redis 等队列中,所以为了保证序列化后的体积,尽量不要将 ContainerConfig 等设置为成员变量。

比如以下 Job 的定义,是 不可取 的,同理 @Inject 也是如此。

因为 Job 会被序列化,所以成员变量不要包含 匿名函数 等 无法被序列化 的内容,如果不清楚哪些内容无法被序列化,尽量使用注解方式。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Job;
  4. use Hyperf\AsyncQueue\Job;
  5. use Psr\Container\ContainerInterface;
  6. class ExampleJob extends Job
  7. {
  8. public $container;
  9. public $params;
  10. public function __construct(ContainerInterface $container, $params)
  11. {
  12. $this->container = $container;
  13. $this->params = $params;
  14. }
  15. public function handle()
  16. {
  17. // 根据参数处理具体逻辑
  18. var_dump($this->params);
  19. }
  20. }
  21. $job = make(ExampleJob::class);

正确的 Job 应该是只有需要处理的数据,其他相关数据,可以在 handle 方法中重新获取,如下。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Job;
  4. use Hyperf\AsyncQueue\Job;
  5. class ExampleJob extends Job
  6. {
  7. public $params;
  8. /**
  9. * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
  10. */
  11. protected int $maxAttempts = 2;
  12. public function __construct($params)
  13. {
  14. // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
  15. $this->params = $params;
  16. }
  17. public function handle()
  18. {
  19. // 根据参数处理具体逻辑
  20. // 通过具体参数获取模型等
  21. // 这里的逻辑会在 ConsumerProcess 进程中执行
  22. var_dump($this->params);
  23. }
  24. }

正确定义完 Job 后,我们需要写一个专门投递消息的 Service,代码如下。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Service;
  4. use App\Job\ExampleJob;
  5. use Hyperf\AsyncQueue\Driver\DriverFactory;
  6. use Hyperf\AsyncQueue\Driver\DriverInterface;
  7. class QueueService
  8. {
  9. protected DriverInterface $driver;
  10. public function __construct(DriverFactory $driverFactory)
  11. {
  12. $this->driver = $driverFactory->get('default');
  13. }
  14. /**
  15. * 生产消息.
  16. * @param $params 数据
  17. * @param int $delay 延时时间 单位秒
  18. */
  19. public function push($params, int $delay = 0): bool
  20. {
  21. // 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
  22. // 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
  23. // 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
  24. return $this->driver->push(new ExampleJob($params), $delay);
  25. }
  26. }

投递消息

接下来,调用我们的 QueueService 投递消息即可。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Service\QueueService;
  5. use Hyperf\Di\Annotation\Inject;
  6. use Hyperf\HttpServer\Annotation\AutoController;
  7. #[AutoController]
  8. class QueueController extends AbstractController
  9. {
  10. #[Inject]
  11. protected QueueService $service;
  12. /**
  13. * 传统模式投递消息
  14. */
  15. public function index()
  16. {
  17. $this->service->push([
  18. 'group@hyperf.io',
  19. 'https://doc.hyperf.io',
  20. 'https://www.hyperf.io',
  21. ]);
  22. return 'success';
  23. }
  24. }

注解方式

框架除了传统方式投递消息,还提供了注解方式。

注解方式会在非消费环境下自动投递消息到队列,故,如果我们在队列中使用注解方式时,则不会再次投递到队列当中,而是直接在本消费进程中执行。 如果仍然需要在队列中投递消息,则可以在队列中使用传统模式投递。

让我们重写上述 QueueService,直接将 ExampleJob 的逻辑搬到 example 方法中,并加上对应注解 AsyncQueueMessage,具体代码如下。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Service;
  4. use Hyperf\AsyncQueue\Annotation\AsyncQueueMessage;
  5. class QueueService
  6. {
  7. #[AsyncQueueMessage]
  8. public function example($params)
  9. {
  10. // 需要异步执行的代码逻辑
  11. // 这里的逻辑会在 ConsumerProcess 进程中执行
  12. var_dump($params);
  13. }
  14. }

投递消息

注解模式投递消息就跟平常调用方法一致,代码如下。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Service\QueueService;
  5. use Hyperf\Di\Annotation\Inject;
  6. use Hyperf\HttpServer\Annotation\AutoController;
  7. #[AutoController]
  8. class QueueController extends AbstractController
  9. {
  10. /**
  11. * @var QueueService
  12. */
  13. #[Inject]
  14. protected $service;
  15. /**
  16. * 注解模式投递消息
  17. */
  18. public function example()
  19. {
  20. $this->service->example([
  21. 'group@hyperf.io',
  22. 'https://doc.hyperf.io',
  23. 'https://www.hyperf.io',
  24. ]);
  25. return 'success';
  26. }
  27. }

事件

事件名称 触发时机 备注
BeforeHandle 处理消息前触发
AfterHandle 处理消息后触发
FailedHandle 处理消息失败后触发
RetryHandle 重试处理消息前触发
QueueLength 每处理 500 个消息后触发 用户可以监听此事件,判断失败或超时队列是否有消息积压

QueueLengthListener

框架自带了一个记录队列长度的监听器,默认不开启,您如果需要,可以自行添加到 listeners 配置中。

  1. <?php
  2. declare(strict_types=1);
  3. return [
  4. Hyperf\AsyncQueue\Listener\QueueLengthListener::class
  5. ];

ReloadChannelListener

当消息执行超时,或项目重启导致消息执行被中断,最终都会被移动到 timeout 队列中,只要您可以保证消息执行是幂等的(同一个消息执行一次,或执行多次,最终表现一致), 就可以开启以下监听器,框架会自动将 timeout 队列中消息移动到 waiting 队列中,等待下次消费。

监听器监听 QueueLength 事件,默认执行 500 次消息后触发一次。

  1. <?php
  2. declare(strict_types=1);
  3. return [
  4. Hyperf\AsyncQueue\Listener\ReloadChannelListener::class
  5. ];

任务执行流转流程

任务执行流转流程主要包括以下几个队列:

队列名 备注
waiting 等待消费的队列
reserved 正在消费的队列
delayed 延迟消费的队列
failed 消费失败的队列
timeout 消费超时的队列 (虽然超时,但可能执行成功)

队列流转顺序如下:

  1. graph LR;
  2. A[投递延时消息]-->C[delayed队列];
  3. B[投递消息]-->D[waiting队列];
  4. C--到期-->D;
  5. D--消费-->E[reserved队列];
  6. E--成功-->F[删除消息];
  7. E--失败-->G[failed队列];
  8. E--超时-->H[timeout队列];

配置多个异步队列

当您需要使用多个队列来区分消费高频和低频或其他种类的消息时,可以配置多个队列。

  1. 添加配置
  1. <?php
  2. return [
  3. 'default' => [
  4. 'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
  5. 'channel' => '{queue}',
  6. 'timeout' => 2,
  7. 'retry_seconds' => 5,
  8. 'handle_timeout' => 10,
  9. 'processes' => 1,
  10. 'concurrent' => [
  11. 'limit' => 2,
  12. ],
  13. ],
  14. 'other' => [
  15. 'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
  16. 'channel' => '{other.queue}',
  17. 'timeout' => 2,
  18. 'retry_seconds' => 5,
  19. 'handle_timeout' => 10,
  20. 'processes' => 1,
  21. 'concurrent' => [
  22. 'limit' => 2,
  23. ],
  24. ],
  25. ];
  1. 添加消费进程
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Process;
  4. use Hyperf\AsyncQueue\Process\ConsumerProcess;
  5. use Hyperf\Process\Annotation\Process;
  6. #[Process]
  7. class OtherConsumerProcess extends ConsumerProcess
  8. {
  9. protected string $queue = 'other';
  10. }
  1. 调用
  1. use Hyperf\AsyncQueue\Driver\DriverFactory;
  2. use Hyperf\Utils\ApplicationContext;
  3. $driver = ApplicationContext::getContainer()->get(DriverFactory::class)->get('other');
  4. return $driver->push(new ExampleJob());

安全关闭

异步队列在终止时,如果正在进行消费逻辑,可能会导致出现错误。框架提供了 DriverStopHandler ,可以让异步队列进程安全关闭。

当前信号处理器并不适配于 CoroutineServer,如有需要请自行实现

安装信号处理器

  1. composer require hyperf/signal

添加配置

  1. <?php
  2. declare(strict_types=1);
  3. return [
  4. 'handlers' => [
  5. Hyperf\AsyncQueue\Signal\DriverStopHandler::class,
  6. ],
  7. 'timeout' => 5.0,
  8. ];

异步驱动之间的区别

  • Hyperf\AsyncQueue\Driver\RedisDriver::class

此异步驱动会将整个 JOB 进行序列化,当投递即时队列后,会 lpushlist 结构中,投递延时队列,会 zaddzset 结构中。 所以,如果 Job 的参数完全一致的情况,在延时队列中就会出现后投递的消息 覆盖 前面投递的消息的问题。 如果不想出现延时消息覆盖的情况,只需要在 Job 里增加一个唯一的 uniqid,或者在使用 注解 的方法上增加一个 uniqid 的入参即可。