NATS
NATS 是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的 Publish / Subscribe 模型,使用 Golang 语言开发。NATS 的开发哲学认为高质量的 QoS 应该在客户端构建,故只建立了 Request-Reply,不提供 1. 持久化 2. 事务处理 3. 增强的交付模式 4. 企业级队列。
安装
composer require hyperf/nats
使用
创建消费者
php bin/hyperf.php gen:nats-consumer DemoConsumer
如果设置了 queue,则相同的 subject 只会被一个 queue 消费。若不设置 queue,则每个消费者都会受到消息。
<?phpdeclare(strict_types=1);namespace App\Nats\Consumer;use Hyperf\Nats\AbstractConsumer;use Hyperf\Nats\Annotation\Consumer;use Hyperf\Nats\Message;/*** @Consumer(subject="hyperf.demo", queue="hyperf.demo", name="DemoConsumer", nums=1)*/class DemoConsumer extends AbstractConsumer{public function consume(Message $payload){// Do something...}}
投递消息
使用 publish 投递消息。
<?phpdeclare(strict_types=1);namespace App\Controller;use Hyperf\Di\Annotation\Inject;use Hyperf\HttpServer\Annotation\AutoController;use Hyperf\Nats\Driver\DriverInterface;/*** @AutoController(prefix="nats")*/class NatsController extends AbstractController{/*** @Inject* @var DriverInterface*/protected $nats;public function publish(){$res = $this->nats->publish('hyperf.demo', ['id' => 'Hyperf',]);return $this->response->success($res);}}
使用 request 投递消息。
<?phpdeclare(strict_types=1);namespace App\Controller;use Hyperf\Di\Annotation\Inject;use Hyperf\HttpServer\Annotation\AutoController;use Hyperf\Nats\Driver\DriverInterface;use Hyperf\Nats\Message;/*** @AutoController(prefix="nats")*/class NatsController extends AbstractController{/*** @Inject* @var DriverInterface*/protected $nats;public function request(){$res = $this->nats->request('hyperf.reply', ['id' => 'limx',], function (Message $payload) {var_dump($payload->getBody());});return $this->response->success($res);}}
使用 requestSync 投递消息。
<?phpdeclare(strict_types=1);namespace App\Controller;use Hyperf\Di\Annotation\Inject;use Hyperf\HttpServer\Annotation\AutoController;use Hyperf\Nats\Driver\DriverInterface;use Hyperf\Nats\Message;/*** @AutoController(prefix="nats")*/class NatsController extends AbstractController{/*** @Inject* @var DriverInterface*/protected $nats;public function sync(){/** @var Message $message */$message = $this->nats->requestSync('hyperf.reply', ['id' => 'limx',]);return $this->response->success($message->getBody());}}
