路由模式中,交换器的类型是 direct ,在交换器跟队列绑定的时候,加上一个路由键,进行路由绑定,这样的话,数据发送给交换器的时候,会通过路由将数据发送到指定的队列中去,实现了1234 12 1 的推送模式
    路由模式 - 图1
    生产者 send.php

    1. <?php
    2. //引入类库
    3. require_once __DIR__ . '/vendor/autoload.php';
    4. use PhpAmqpLib\Connection\AMQPStreamConnection;
    5. use PhpAmqpLib\Message\AMQPMessage;
    6. try {
    7. //创建一个连接
    8. $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
    9. //建立通道
    10. $channel = $connection->channel();
    11. //申请交换机 direct类型
    12. $channel->exchange_declare('exchange_1', 'direct', false, true, false);
    13. //命令行获取参数 就是路由键
    14. $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
    15. //指定要发送的消息内容
    16. for ($i = 0; $i < 10; $i++) {
    17. $arrar['id'] = $i;
    18. $arrar['name'] = '我是消息主体吧';
    19. $json = json_encode($arrar);
    20. //消息持久
    21. $msg = new AMQPMessage($json);
    22. //增加路由键参数
    23. $channel->basic_publish($msg, 'exchange_1', $severity);
    24. }
    25. // 关闭通道
    26. $channel->close();
    27. // 关闭连接
    28. $connection->close();
    29. echo '发送成功';
    30. } catch (Exception $e) {
    31. die($e->getMessage());
    32. }

    消费者 receive.php

    <?php
    //引入类库
    require_once __DIR__ . '/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    
    //创建一个连接
    $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/'); // 建立连接到RabbitMQ服务器
    //建立通道
    $channel = $connection->channel();
    
    //申请交换机
    $channel->exchange_declare('exchange_1', 'direct', false, true, false);
    
    //队列-随机名称,断开删除
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    //$channel->queue_declare($queue_name, false, false, false, false);
    
    //接收参数
    $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
    
    //交换机,队列绑定,一个队列可以绑定多个路由键
    $channel->queue_bind($queue_name, 'exchange_1', $severity);
    $channel->queue_bind($queue_name, 'exchange_1', 'count');
    $channel->queue_bind($queue_name, 'exchange_1', 'count2');
    
    /**
     * 执行处理
     * @param $msg
     */
    $callback = function ($msg) {
        echo ' 队列的信息:', $msg->body, "\n";
        sleep(1);
    };
    //消费消息
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    //等待消息
    while ($channel->is_open()) {
        $channel->wait();
    }
    
    // 关闭通道和连接
    $channel->close();
    $connection->close();