NATS

NATS 是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的 Publish / Subscribe 模型,使用 Golang 语言开发。NATS 的开发哲学认为高质量的 QoS 应该在客户端构建,故只建立了 Request-Reply,不提供 1. 持久化 2. 事务处理 3. 增强的交付模式 4. 企业级队列。

安装

  1. composer require hyperf/nats

使用

创建消费者

  1. php bin/hyperf.php gen:nats-consumer DemoConsumer

如果设置了 queue,则相同的 subject 只会被一个 queue 消费。若不设置 queue,则每个消费者都会受到消息。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Nats\Consumer;
  4. use Hyperf\Nats\AbstractConsumer;
  5. use Hyperf\Nats\Annotation\Consumer;
  6. use Hyperf\Nats\Message;
  7. /**
  8. * @Consumer(subject="hyperf.demo", queue="hyperf.demo", name="DemoConsumer", nums=1)
  9. */
  10. class DemoConsumer extends AbstractConsumer
  11. {
  12. public function consume(Message $payload)
  13. {
  14. // Do something...
  15. }
  16. }

投递消息

使用 publish 投递消息。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use Hyperf\Di\Annotation\Inject;
  5. use Hyperf\HttpServer\Annotation\AutoController;
  6. use Hyperf\Nats\Driver\DriverInterface;
  7. /**
  8. * @AutoController(prefix="nats")
  9. */
  10. class NatsController extends AbstractController
  11. {
  12. /**
  13. * @Inject
  14. * @var DriverInterface
  15. */
  16. protected $nats;
  17. public function publish()
  18. {
  19. $res = $this->nats->publish('hyperf.demo', [
  20. 'id' => 'Hyperf',
  21. ]);
  22. return $this->response->success($res);
  23. }
  24. }

使用 request 投递消息。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use Hyperf\Di\Annotation\Inject;
  5. use Hyperf\HttpServer\Annotation\AutoController;
  6. use Hyperf\Nats\Driver\DriverInterface;
  7. use Hyperf\Nats\Message;
  8. /**
  9. * @AutoController(prefix="nats")
  10. */
  11. class NatsController extends AbstractController
  12. {
  13. /**
  14. * @Inject
  15. * @var DriverInterface
  16. */
  17. protected $nats;
  18. public function request()
  19. {
  20. $res = $this->nats->request('hyperf.reply', [
  21. 'id' => 'limx',
  22. ], function (Message $payload) {
  23. var_dump($payload->getBody());
  24. });
  25. return $this->response->success($res);
  26. }
  27. }

使用 requestSync 投递消息。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use Hyperf\Di\Annotation\Inject;
  5. use Hyperf\HttpServer\Annotation\AutoController;
  6. use Hyperf\Nats\Driver\DriverInterface;
  7. use Hyperf\Nats\Message;
  8. /**
  9. * @AutoController(prefix="nats")
  10. */
  11. class NatsController extends AbstractController
  12. {
  13. /**
  14. * @Inject
  15. * @var DriverInterface
  16. */
  17. protected $nats;
  18. public function sync()
  19. {
  20. /** @var Message $message */
  21. $message = $this->nats->requestSync('hyperf.reply', [
  22. 'id' => 'limx',
  23. ]);
  24. return $this->response->success($message->getBody());
  25. }
  26. }