主体模式使用交换机 topic 类型,
根据路由键,定义规则,跟发布订阅类似,符合规则的分发一份数据到队列
代码其实没什么该变,只是交换机类型变了
规则:
“ * “:代表的是一个字符串,
“ # “:代表的是一个或者多个字符串,
生产者 send.php
<?php
//引入类库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
try {
//创建一个连接
$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
//建立通道
$channel = $connection->channel();
//申请交换机,主题类型
$channel->exchange_declare('exchange_topic', 'topic', false, true, false);
//命令行获取参数
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
//指定要发送的消息内容
for ($i = 0; $i < 10; $i++) {
$arrar['id'] = $i;
$arrar['name'] = '我是消息主体吧';
$json = json_encode($arrar);
//消息持久
$msg = new AMQPMessage($json);
$channel->basic_publish($msg, 'exchange_topic', $severity);
}
// 关闭通道
$channel->close();
// 关闭连接
$connection->close();
echo '发送成功';
} catch (Exception $e) {
die($e->getMessage());
}
消费者 receive.php
<?php
//引入类库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//创建一个连接
$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/'); // 建立连接到RabbitMQ服务器
//建立通道
$channel = $connection->channel();
//申请交换机 主体类型
$channel->exchange_declare('exchange_topic', 'topic', false, true, false);
//队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
//交换机,队列绑定
$channel->queue_bind($queue_name, 'exchange_topic', $severity);
/**
* 执行处理
* @param $msg
*/
$callback = function ($msg) {
echo ' 队列的信息:', $msg->body, "\n";
sleep(1);
};
//消费消息
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//等待消息
while ($channel->is_open()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();