NSQ

NSQ 是一个由 Go 语言编写的开源、轻量级、高性能的实时分布式消息中间件。

安装

  1. composer require hyperf/nsq

使用

配置

NSQ 组件的配置文件默认位于 config/autoload/nsq.php 内,如该文件不存在,可通过 php bin/hyperf.php vendor:publish hyperf/nsq 命令来将发布对应的配置文件。

默认配置文件如下:

  1. <?php
  2. return [
  3. 'default' => [
  4. 'host' => '127.0.0.1',
  5. 'port' => 4150,
  6. 'pool' => [
  7. 'min_connections' => 1,
  8. 'max_connections' => 10,
  9. 'connect_timeout' => 10.0,
  10. 'wait_timeout' => 3.0,
  11. 'heartbeat' => -1,
  12. // 因为 Nsq 服务默认的闲置时间是 60s,故框架维护的最大闲置时间应小于 60s
  13. 'max_idle_time' => 30.0,
  14. ],
  15. ],
  16. ];

创建消费者

通过 gen:nsq-consumer 命令可以快速的生成一个 消费者(Consumer) 对消息进行消费。

  1. php bin/hyperf.php gen:nsq-consumer DemoConsumer

您也可以通过使用 Hyperf\Nsq\Annotation\Consumer 注解来对一个 Hyperf/Nsq/AbstractConsumer 抽象类的子类进行声明,来完成一个 消费者(Consumer) 的定义,其中Hyperf\Nsq\Annotation\Consumer 注解和抽象类均包含以下属性:

配置 类型 注解或抽象类默认值 备注
topic string ‘’ 要监听的 topic
channel string ‘’ 要监听的 channel
name string NsqConsumer 消费者的名称
nums int 1 消费者的进程数
pool string default 消费者对应的连接,对应配置文件的 key

这些注解属性是可选的,因为 Hyperf/Nsq/AbstractConsumer 抽象类中也分别定义了对应的成员属性以及 getter 和 setter,当不对注解属性进行定义时,会使用抽象类的属性默认值。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Nsq\Consumer;
  4. use Hyperf\Nsq\AbstractConsumer;
  5. use Hyperf\Nsq\Annotation\Consumer;
  6. use Hyperf\Nsq\Message;
  7. use Hyperf\Nsq\Result;
  8. /**
  9. * @Consumer(
  10. * topic="hyperf",
  11. * channel="hyperf",
  12. * name ="DemoNsqConsumer",
  13. * nums=1
  14. * )
  15. */
  16. class DemoNsqConsumer extends AbstractConsumer
  17. {
  18. public function consume(Message $payload): string
  19. {
  20. var_dump($payload->getBody());
  21. return Result::ACK;
  22. }
  23. }

禁止消费进程自启

默认情况下,使用了 @Consumer 注解定义后,框架会在启动时自动创建子进程来启动消费者,并且会在子进程异常退出后,自动重新拉起。但如果在处于开发阶段进行某些调试工作时,可能会因为消费者的自动消费导致调试的不便。

在这种情况下,您可通过全局关闭和局部关闭两种形式来控制消费进程的自启。

全局关闭

您可以在默认配置文件 config/autoload/nsq.php 中,将对应连接的 enable 选项设置为 false,即代表该连接下的所有消费者进程都关闭自启功能。

局部关闭

当您只需要关闭个别消费进程的自启功能,只需要在对应的消费者中重写父类方法 isEnable() 并返回 false 即可关闭此消费者的自启功能;

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Nsq\Consumer;
  4. use Hyperf\Nsq\AbstractConsumer;
  5. use Hyperf\Nsq\Annotation\Consumer;
  6. use Hyperf\Nsq\Message;
  7. use Hyperf\Nsq\Result;
  8. use Psr\Container\ContainerInterface;
  9. /**
  10. * @Consumer(
  11. * topic="demo_topic",
  12. * channel="demo_channel",
  13. * name ="DemoConsumer",
  14. * nums=1
  15. * )
  16. */
  17. class DemoConsumer extends AbstractConsumer
  18. {
  19. public function __construct(ContainerInterface $container)
  20. {
  21. parent::__construct($container);
  22. }
  23. public function isEnable(): bool
  24. {
  25. return false;
  26. }
  27. public function consume(Message $payload): string
  28. {
  29. $body = json_decode($payload->getBody(), true);
  30. var_dump($body);
  31. return Result::ACK;
  32. }
  33. }

投递消息

您可以通过调用 Hyperf\Nsq\Nsq::publish(string $topic, $message, float $deferTime = 0.0) 方法来向 NSQ 投递消息, 下面是在 Command 进行消息投递的一个示例:

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Command;
  4. use Hyperf\Command\Command as HyperfCommand;
  5. use Hyperf\Command\Annotation\Command;
  6. use Hyperf\Nsq\Nsq;
  7. /**
  8. * @Command
  9. */
  10. class NsqCommand extends HyperfCommand
  11. {
  12. protected $name = 'nsq:pub';
  13. public function handle()
  14. {
  15. /** @var Nsq $nsq */
  16. $nsq = make(Nsq::class);
  17. $topic = 'hyperf';
  18. $message = 'This is message at ' . time();
  19. $nsq->publish($topic, $message);
  20. $this->line('success', 'info');
  21. }
  22. }

一次性投递多条消息

Hyperf\Nsq\Nsq::publish(string $topic, $message, float $deferTime = 0.0) 方法的第二个参数除了可以传递一个字符串外,还可以传递一个字符串数组,来实现一次性向一个 Topic 投递多条消息的功能,示例如下:

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Command;
  4. use Hyperf\Command\Command as HyperfCommand;
  5. use Hyperf\Command\Annotation\Command;
  6. use Hyperf\Nsq\Nsq;
  7. /**
  8. * @Command
  9. */
  10. class NsqCommand extends HyperfCommand
  11. {
  12. protected $name = 'nsq:pub';
  13. public function handle()
  14. {
  15. /** @var Nsq $nsq */
  16. $nsq = make(Nsq::class);
  17. $topic = 'hyperf';
  18. $messages = [
  19. 'This is message 1 at ' . time(),
  20. 'This is message 2 at ' . time(),
  21. 'This is message 3 at ' . time(),
  22. ];
  23. $nsq->publish($topic, $messages);
  24. $this->line('success', 'info');
  25. }
  26. }

投递延迟消息

当您希望您投递的消息在特定的时间后再去消费,也可通过对 Hyperf\Nsq\Nsq::publish(string $topic, $message, float $deferTime = 0.0) 方法的第三个参数传递对应的延迟时长,单位为秒,示例如下:

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Command;
  4. use Hyperf\Command\Command as HyperfCommand;
  5. use Hyperf\Command\Annotation\Command;
  6. use Hyperf\Nsq\Nsq;
  7. /**
  8. * @Command
  9. */
  10. class NsqCommand extends HyperfCommand
  11. {
  12. protected $name = 'nsq:pub';
  13. public function handle()
  14. {
  15. /** @var Nsq $nsq */
  16. $nsq = make(Nsq::class);
  17. $topic = 'hyperf';
  18. $message = 'This is message at ' . time();
  19. $deferTime = 5.0;
  20. $nsq->publish($topic, $message, $deferTime);
  21. $this->line('success', 'info');
  22. }
  23. }

NSQD HTTP API

NSQD HTTP API Refer: https://nsq.io/components/nsqd.html

组件对 NSQD HTTP API 进行了封装,您可以很方便的实现对 NSQD HTTP API 的调用。

比如,当您需要删除某个 Topic 时,可以执行以下代码:

  1. <?php
  2. use Hyperf\Utils\ApplicationContext;
  3. use Hyperf\Nsq\Nsqd\Topic;
  4. $container = ApplicationContext::getContainer();
  5. $client = $container->get(Topic::class);
  6. $client->delete('hyperf.test');
  • Hyperf\Nsq\Api\Topic 类对应 topic 相关的 API;
  • Hyperf\Nsq\Api\Channle 类对应 channel 相关的 API;
  • Hyperf\Nsq\Api\Api 类对应 pingstatsconfigdebug 等相关的 API;

NSQ 协议

https://nsq.io/clients/tcp_protocol_spec.html

  • Socket 基础
  1. @startuml
  2. autonumber
  3. hide footbox
  4. title **Socket 基础**
  5. participant "客户端" as client
  6. participant "服务器" as server #orange
  7. activate client
  8. activate server
  9. note right of server: 建立连接
  10. client -> server: socket->connect(ip, port)
  11. ...
  12. note right of server: 多次通信 send/recv
  13. client -> server: socket->send()
  14. server-> client: socket->recv()
  15. ...
  16. note right of server: 关闭连接
  17. client->server: socket->close()
  18. deactivate client
  19. deactivate server
  20. @enduml
  • NSQ 协议流程
  1. @startuml
  2. autonumber
  3. hide footbox
  4. title **NSQ 协议**
  5. participant "客户端" as client
  6. participant "服务器" as server #orange
  7. activate client
  8. activate server
  9. == connect ==
  10. note left of client: connect 后都为 socket->send/recv
  11. client -> server: socket->connect(ip, host)
  12. note left of client: protocol version
  13. client->server: magic: V2
  14. == auth ==
  15. note left of client: client metadata
  16. client->server: IDENTIFY
  17. note right of server: 如果需要 auth
  18. server->client: auth_required=true
  19. client->server: AUTH
  20. ...
  21. == pub ==
  22. note left of client: 发送一条消息
  23. client -> server: PUB~~~~ <topic_name>
  24. note left of client: 发送多条消息
  25. client -> server: MPUB
  26. note left of client: 发送一条延时消息
  27. client -> server: DPUB
  28. ...
  29. == sub ==
  30. note left of client: client 使用 channel 订阅 topic
  31. note right of server: SUB 成功后, client 处于 RDY 0 阶段
  32. client -> server: SUB <topic_name> <channel_name>
  33. note left of client: 使用 RDY 告诉 server 准备好消费 <count> 条消息
  34. client -> server: RDY <count>
  35. note right of server: server 返回 client <count> 条消息
  36. server -> client: <count> msg
  37. note left of client: 标记消息完成消费(消费成功)
  38. client -> server: FIN <message_id>
  39. note left of client: 消息重新入队(消费失败, 重新入队)
  40. client -> server: REQ <message_id> <timeout>
  41. note left of client: 重置消息超时时间
  42. client -> server: TOUCH <message_id>
  43. ...
  44. == heartbeat ==
  45. server -> client: _heartbeat_
  46. note right of server: client 2 次没有应答 NOP, server 将断开连接
  47. client -> server: NOP
  48. ...
  49. == close ==
  50. note left of client: clean close connection, 表示没有消息了, 关闭连接
  51. client -> server: CLS
  52. note right of server: server 端成功应答
  53. server -> client: CLOSE_WAIT
  54. deactivate client
  55. deactivate server
  56. @enduml