路由模式中,交换器的类型是 direct ,在交换器跟队列绑定的时候,加上一个路由键,进行路由绑定,这样的话,数据发送给交换器的时候,会通过路由将数据发送到指定的队列中去,实现了1234 12 1 的推送模式
生产者 send.php
<?php//引入类库require_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;try {//创建一个连接$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');//建立通道$channel = $connection->channel();//申请交换机 direct类型$channel->exchange_declare('exchange_1', 'direct', false, true, false);//命令行获取参数 就是路由键$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';//指定要发送的消息内容for ($i = 0; $i < 10; $i++) {$arrar['id'] = $i;$arrar['name'] = '我是消息主体吧';$json = json_encode($arrar);//消息持久$msg = new AMQPMessage($json);//增加路由键参数$channel->basic_publish($msg, 'exchange_1', $severity);}// 关闭通道$channel->close();// 关闭连接$connection->close();echo '发送成功';} catch (Exception $e) {die($e->getMessage());}
消费者 receive.php
<?php
//引入类库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//创建一个连接
$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/'); // 建立连接到RabbitMQ服务器
//建立通道
$channel = $connection->channel();
//申请交换机
$channel->exchange_declare('exchange_1', 'direct', false, true, false);
//队列-随机名称,断开删除
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//$channel->queue_declare($queue_name, false, false, false, false);
//接收参数
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
//交换机,队列绑定,一个队列可以绑定多个路由键
$channel->queue_bind($queue_name, 'exchange_1', $severity);
$channel->queue_bind($queue_name, 'exchange_1', 'count');
$channel->queue_bind($queue_name, 'exchange_1', 'count2');
/**
* 执行处理
* @param $msg
*/
$callback = function ($msg) {
echo ' 队列的信息:', $msg->body, "\n";
sleep(1);
};
//消费消息
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//等待消息
while ($channel->is_open()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();
