RPC是远程过程调用,执行远程服务器的方法,通过通信协议
    这里rabittMQ实现了这一远程过程调用,有内置的扩展参数
    首先在客户端向队列中推送需要的数据,并标明标识跟回传的队列,执行回调队列消费
    RPC服务器这边获取队列数据使用方法处理,并根据回调队列地址,将结果放入回调队列中,附带上标识
    一旦RPC服务器处理完数据,推送到回调队列,客户端执行对调队列消费者,取出结果,完成RPC调用
    消息属性,子啊new消息主体时,第二个参数,数组属性
    delivery_mode,消息是否持久化,
    content_type :消息的编码
    reply_to:回调的队列
    correlation_id:标识,用于将RPC详情与请求关联起来
    RPC服务端 receive1.php

    1. <?php
    2. //引入类库
    3. require_once __DIR__ . '/vendor/autoload.php';
    4. use PhpAmqpLib\Connection\AMQPStreamConnection;
    5. use PhpAmqpLib\Message\AMQPMessage;
    6. //创建一个连接
    7. $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
    8. //建立通道
    9. $channel = $connection->channel();
    10. //服务端方法
    11. function fib($n)
    12. {
    13. if ($n == 0) {
    14. return 0;
    15. }
    16. if ($n == 1) {
    17. return 1;
    18. }
    19. return fib($n-1) + fib($n-2);
    20. }
    21. //队列
    22. $channel->queue_declare("rpc_queue", false, false, false, false);
    23. /**
    24. * 执行处理
    25. * @param $req
    26. */
    27. $callback = function ($req) {
    28. $n = intval($req->body);
    29. //记录一下,收到了RPC请求
    30. echo ' [.] fib(', $n, ")\n";
    31. //执行方法,数据推到回传队列
    32. $msg = new AMQPMessage(
    33. (string) fib($n),
    34. array('correlation_id' => $req->get('correlation_id'))//标识回传回去
    35. );
    36. $req->delivery_info['channel']->basic_publish(
    37. $msg,
    38. '',
    39. $req->get('reply_to')//回传队列
    40. );
    41. $req->ack();
    42. };
    43. $channel->basic_qos( null , 1 , null );
    44. //消费消息
    45. $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
    46. //等待消息
    47. while ($channel->is_open()) {
    48. $channel->wait();
    49. }
    50. // 关闭通道和连接
    51. $channel->close();
    52. $connection->close();

    客户端 send.php

    <?php
    //引入类库
    require_once __DIR__ . '/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    class FibonacciRpcClient
    {
        private $connection;//连接
        private $channel;//通道
        private $callback_queue;//队列名
        private $response;//返回数据
        private $corr_id;//标识id
    
        /**
         * 构造方法
         * 构建消费队列(执行完RPC的回传队列),不执行,知识构建
         */
        public function __construct()
        {
            //创建一个连接
            $this->connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
            //建立通道
            $this->channel = $this->connection->channel();
            //临时队列
            list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
            //消费消息
            $this->channel->basic_consume($this->callback_queue, '', false, true, false, false,
                array(
                    $this,
                    'onResponse'//本地的方法名称
                )
            );
        }
    
        /**
         * 消费队列执行的方法,判断标识是不是一致
         * @param $rep
         */
        public function onResponse($rep)
        {
            if ($rep->get('correlation_id') == $this->corr_id) {
                $this->response = $rep->body;
            }
        }
    
        /**
         * 执行RPC操作
         * @param $n
         * @return int
         * @throws ErrorException
         */
        public function call($n)
        {
            //定义为空
            $this->response = null;
            //唯一标识,微妙时间戳随机
            $this->corr_id = uniqid();
    
            $msg = new AMQPMessage(
                (string) $n,
                array(
                    'correlation_id' => $this->corr_id,//标识
                    'reply_to' => $this->callback_queue//回调接口
                )
            );
            //推送队列
            $this->channel->basic_publish($msg, '', 'rpc_queue');
            //执行回调队列的消费者循环等待,返回结果
            while (!$this->response) {
                $this->channel->wait();
            }
            //返回消费者的结果
            return intval($this->response);
        }
    }
    $fibonacci_rpc = new FibonacciRpcClient();
    $response = $fibonacci_rpc->call(33);
    echo ' [.] Got ', $response, "\n";