AMQP 组件

hyperf/amqp 是实现 AMQP 标准的组件,主要适用于对 RabbitMQ 的使用。

安装

  1. composer require hyperf/amqp

默认配置

配置 类型 默认值 备注
host string localhost Host
port int 5672 端口号
user string guest 用户名
password string guest 密码
vhost string / vhost
concurrent.limit int 0 同时消费的数量
pool object 连接池配置
pool.connections int 1 进程内保持的连接数
params object 基本配置
  1. <?php
  2. return [
  3. 'default' => [
  4. 'host' => 'localhost',
  5. 'port' => 5672,
  6. 'user' => 'guest',
  7. 'password' => 'guest',
  8. 'vhost' => '/',
  9. 'concurrent' => [
  10. 'limit' => 1,
  11. ],
  12. 'pool' => [
  13. 'connections' => 1,
  14. ],
  15. 'params' => [
  16. 'insist' => false,
  17. 'login_method' => 'AMQPLAIN',
  18. 'login_response' => null,
  19. 'locale' => 'en_US',
  20. 'connection_timeout' => 3.0,
  21. 'read_write_timeout' => 6.0,
  22. 'context' => null,
  23. 'keepalive' => false,
  24. 'heartbeat' => 3,
  25. 'close_on_destruct' => false,
  26. ],
  27. ],
  28. 'pool2' => [
  29. ...
  30. ]
  31. ];

可在 producer 或者 consumer__construct 函数中,设置不同 pool,例如上述的 defaultpool2

投递消息

使用 gen:producer 命令创建一个 producer

  1. php bin/hyperf.php gen:amqp-producer DemoProducer

在 DemoProducer 文件中,我们可以修改 @Producer 注解对应的字段来替换对应的 exchangeroutingKey。 其中 payload 就是最终投递到消息队列中的数据,所以我们可以随意改写 __construct 方法,只要最后赋值 payload 即可。 示例如下。

使用 @Producer 注解时需 use Hyperf\Amqp\Annotation\Producer; 命名空间;

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Producers;
  4. use Hyperf\Amqp\Annotation\Producer;
  5. use Hyperf\Amqp\Message\ProducerMessage;
  6. use App\Models\User;
  7. #[Producer(exchange: "hyperf", routingKey: "hyperf")]
  8. class DemoProducer extends ProducerMessage
  9. {
  10. public function __construct($id)
  11. {
  12. // 设置不同 pool
  13. $this->poolName = 'pool2';
  14. $user = User::where('id', $id)->first();
  15. $this->payload = [
  16. 'id' => $id,
  17. 'data' => $user->toArray()
  18. ];
  19. }
  20. }

通过 DI Container 获取 Hyperf\Amqp\Producer 实例,即可投递消息。以下实例直接使用 ApplicationContext 获取 Hyperf\Amqp\Producer 其实并不合理,DI Container 具体使用请到 依赖注入 章节中查看。

  1. <?php
  2. use Hyperf\Amqp\Producer;
  3. use App\Amqp\Producers\DemoProducer;
  4. use Hyperf\Utils\ApplicationContext;
  5. $message = new DemoProducer(1);
  6. $producer = ApplicationContext::getContainer()->get(Producer::class);
  7. $result = $producer->produce($message);

消费消息

使用 gen:amqp-consumer 命令创建一个 consumer

  1. php bin/hyperf.php gen:amqp-consumer DemoConsumer

在 DemoConsumer 文件中,我们可以修改 @Consumer 注解对应的字段来替换对应的 exchangeroutingKeyqueue。 其中 $data 就是解析后的消息数据。 示例如下。

使用 @Consumer 注解时需 use Hyperf\Amqp\Annotation\Consumer; 命名空间;

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Consumers;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerMessage;
  6. use Hyperf\Amqp\Result;
  7. use PhpAmqpLib\Message\AMQPMessage;
  8. #[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1)]
  9. class DemoConsumer extends ConsumerMessage
  10. {
  11. public function consumeMessage($data, AMQPMessage $message): string
  12. {
  13. print_r($data);
  14. return Result::ACK;
  15. }
  16. }

禁止消费进程自启

默认情况下,使用了 @Consumer 注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。 如果出于开发阶段,进行消费者调试时,可能会因为消费其他消息而导致调试不便。

这种情况,只需要在 @Consumer 注解中配置 enable=false (默认为 true 跟随服务启动)或者在对应的消费者中重写类方法 isEnable() 返回 false 即可

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Consumers;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerMessage;
  6. use Hyperf\Amqp\Result;
  7. use PhpAmqpLib\Message\AMQPMessage;
  8. #[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1, enable: false)]
  9. class DemoConsumer extends ConsumerMessage
  10. {
  11. public function consumeMessage($data, AMQPMessage $message): string
  12. {
  13. print_r($data);
  14. return Result::ACK;
  15. }
  16. public function isEnable(): bool
  17. {
  18. return parent::isEnable();
  19. }
  20. }

设置最大消费数

可以修改 @Consumer 注解中的 maxConsumption 属性,设置此消费者最大处理的消息数,达到指定消费数后,消费者进程会重启。

消费结果

框架会根据 Consumer 内的 consume 方法所返回的结果来决定该消息的响应行为,共有 4 中响应结果,分别为 \Hyperf\Amqp\Result::ACK\Hyperf\Amqp\Result::NACK\Hyperf\Amqp\Result::REQUEUE\Hyperf\Amqp\Result::DROP,每个返回值分别代表如下行为:

返回值 行为
\Hyperf\Amqp\Result::ACK 确认消息正确被消费掉了
\Hyperf\Amqp\Result::NACK 消息没有被正确消费掉,以 basic_nack 方法来响应
\Hyperf\Amqp\Result::REQUEUE 消息没有被正确消费掉,以 basic_reject 方法来响应,并使消息重新入列
\Hyperf\Amqp\Result::DROP 消息没有被正确消费掉,以 basic_reject 方法来响应

延时队列

AMQP 的延时队列,并不会根据延时时间进行排序,所以,一旦你投递了一个延时 10s 的任务,又往这个队列中投递了一个延时 5s 的任务,那么也一定会在第一个 10s 任务完成后,才会消费第二个 5s 的任务。 所以,需要根据时间设置不同的队列,如果想要更加灵活的延时队列,可以尝试 异步队列(async-queue) 和 AMQP 配合使用。

另外,AMQP 需要下载 延时插件,并激活才能正常使用

  1. wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
  2. cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
  3. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

生产者

使用 gen:amqp-producer 命令创建一个 producer。这里举例 direct 类型,其他类型如 fanouttopic,改生产者和消费者中的 type 即可。

  1. php bin/hyperf.php gen:amqp-producer DelayDirectProducer

在 DelayDirectProducer 文件中,加入use ProducerDelayedMessageTrait;,示例如下:

  1. <?php
  2. namespace App\Amqp\Producer;
  3. use Hyperf\Amqp\Annotation\Producer;
  4. use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
  5. use Hyperf\Amqp\Message\ProducerMessage;
  6. use Hyperf\Amqp\Message\Type;
  7. #[Producer]
  8. class DelayDirectProducer extends ProducerMessage
  9. {
  10. use ProducerDelayedMessageTrait;
  11. protected $exchange = 'ext.hyperf.delay';
  12. protected $type = Type::DIRECT;
  13. protected $routingKey = '';
  14. public function __construct($data)
  15. {
  16. $this->payload = $data;
  17. }
  18. }

消费者

使用 gen:amqp-consumer 命令创建一个 consumer

  1. php bin/hyperf.php gen:amqp-consumer DelayDirectConsumer

DelayDirectConsumer 文件中,增加引入use ProducerDelayedMessageTrait, ConsumerDelayedMessageTrait;,示例如下:

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Consumer;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
  8. use Hyperf\Amqp\Message\Type;
  9. use Hyperf\Amqp\Result;
  10. use PhpAmqpLib\Message\AMQPMessage;
  11. #[Consumer(nums: 1)]
  12. class DelayDirectConsumer extends ConsumerMessage
  13. {
  14. use ProducerDelayedMessageTrait;
  15. use ConsumerDelayedMessageTrait;
  16. protected $exchange = 'ext.hyperf.delay';
  17. protected $queue = 'queue.hyperf.delay';
  18. protected $type = Type::DIRECT; //Type::FANOUT;
  19. protected $routingKey = '';
  20. public function consumeMessage($data, AMQPMessage $message): string
  21. {
  22. var_dump($data, 'delay+direct consumeTime:' . (microtime(true)));
  23. return Result::ACK;
  24. }
  25. }

生产延时消息

以下是在 Command 中演示如何使用,具体用法请以实际为准

使用 gen:command DelayCommand 命令创建一个 DelayCommand。如下:

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Command;
  4. use App\Amqp\Producer\DelayDirectProducer;
  5. //use App\Amqp\Producer\DelayFanoutProducer;
  6. //use App\Amqp\Producer\DelayTopicProducer;
  7. use Hyperf\Amqp\Producer;
  8. use Hyperf\Command\Annotation\Command;
  9. use Hyperf\Command\Command as HyperfCommand;
  10. use Hyperf\Utils\ApplicationContext;
  11. use Psr\Container\ContainerInterface;
  12. #[Command]
  13. class DelayCommand extends HyperfCommand
  14. {
  15. protected ContainerInterface $container;
  16. public function __construct(ContainerInterface $container)
  17. {
  18. $this->container = $container;
  19. parent::__construct('demo:command');
  20. }
  21. public function configure()
  22. {
  23. parent::configure();
  24. $this->setDescription('Hyperf Demo Command');
  25. }
  26. public function handle()
  27. {
  28. //1.delayed + direct
  29. $message = new DelayDirectProducer('delay+direct produceTime:'.(microtime(true)));
  30. //2.delayed + fanout
  31. //$message = new DelayFanoutProducer('delay+fanout produceTime:'.(microtime(true)));
  32. //3.delayed + topic
  33. //$message = new DelayTopicProducer('delay+topic produceTime:' . (microtime(true)));
  34. $message->setDelayMs(5000);
  35. $producer = ApplicationContext::getContainer()->get(Producer::class);
  36. $producer->produce($message);
  37. }
  38. }

执行命令行生产消息

  1. php bin/hyperf.php demo:command

RPC 远程过程调用

除了典型的消息队列场景,我们还可以通过 AMQP 来实现 RPC 远程过程调用,本组件也为这个实现提供了对应的支持。

创建消费者

RPC 使用的消费者,与典型消息队列场景的消费者实现基本无差,唯一的区别是需要通过调用 reply 方法返回数据给生产者。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Amqp\Consumer;
  4. use Hyperf\Amqp\Annotation\Consumer;
  5. use Hyperf\Amqp\Message\ConsumerMessage;
  6. use Hyperf\Amqp\Result;
  7. use PhpAmqpLib\Message\AMQPMessage;
  8. #[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "rpc.reply", name: "ReplyConsumer", nums: 1, enable: true)]
  9. class ReplyConsumer extends ConsumerMessage
  10. {
  11. public function consumeMessage($data, AMQPMessage $message): string
  12. {
  13. $data['message'] .= 'Reply:' . $data['message'];
  14. $this->reply($data, $message);
  15. return Result::ACK;
  16. }
  17. }

发起 RPC 调用

作为生成者发起一次 RPC 远程过程调用也非常的简单,只需通过依赖注入容器获得 Hyperf\Amqp\RpcClient 对象并调用其中的 call 方法即可,返回的结果是消费者 reply 的数据,如下所示:

  1. <?php
  2. use Hyperf\Amqp\Message\DynamicRpcMessage;
  3. use Hyperf\Amqp\RpcClient;
  4. use Hyperf\Utils\ApplicationContext;
  5. $rpcClient = ApplicationContext::getContainer()->get(RpcClient::class);
  6. // 在 DynamicRpcMessage 上设置与 Consumer 一致的 Exchange 和 RoutingKey
  7. $result = $rpcClient->call(new DynamicRpcMessage('hyperf', 'hyperf', ['message' => 'Hello Hyperf']));
  8. // $result:
  9. // array(1) {
  10. // ["message"]=>
  11. // string(18) "Reply:Hello Hyperf"
  12. // }

抽象 RpcMessage

上面的 RPC 调用过程是直接通过 Hyperf\Amqp\Message\DynamicRpcMessage 类来完成 Exchange 和 RoutingKey 的定义,并传递消息数据,在生产项目的设计上,我们可以对 RpcMessage 进行一层抽象,以统一 Exchange 和 RoutingKey 的定义。

我们可以创建对应的 RpcMessage 类如 App\Amqp\FooRpcMessage 如下:

  1. <?php
  2. use Hyperf\Amqp\Message\RpcMessage;
  3. class FooRpcMessage extends RpcMessage
  4. {
  5. protected $exchange = 'hyperf';
  6. protected $routingKey = 'hyperf';
  7. public function __construct($data)
  8. {
  9. // 要传递数据
  10. $this->payload = $data;
  11. }
  12. }

这样我们进行 RPC 调用时,只需直接传递 FooRpcMessage 实例到 call 方法即可,无需每次调用时都去定义 Exchange 和 RoutingKey。