RPC是远程过程调用,执行远程服务器的方法,通过通信协议
这里rabittMQ实现了这一远程过程调用,有内置的扩展参数
首先在客户端向队列中推送需要的数据,并标明标识跟回传的队列,执行回调队列消费
RPC服务器这边获取队列数据使用方法处理,并根据回调队列地址,将结果放入回调队列中,附带上标识
一旦RPC服务器处理完数据,推送到回调队列,客户端执行对调队列消费者,取出结果,完成RPC调用
消息属性,子啊new消息主体时,第二个参数,数组属性
delivery_mode,消息是否持久化,
content_type :消息的编码
reply_to:回调的队列
correlation_id:标识,用于将RPC详情与请求关联起来
RPC服务端 receive1.php
<?php
//引入类库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//创建一个连接
$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
//建立通道
$channel = $connection->channel();
//服务端方法
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
//队列
$channel->queue_declare("rpc_queue", false, false, false, false);
/**
* 执行处理
* @param $req
*/
$callback = function ($req) {
$n = intval($req->body);
//记录一下,收到了RPC请求
echo ' [.] fib(', $n, ")\n";
//执行方法,数据推到回传队列
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))//标识回传回去
);
$req->delivery_info['channel']->basic_publish(
$msg,
'',
$req->get('reply_to')//回传队列
);
$req->ack();
};
$channel->basic_qos( null , 1 , null );
//消费消息
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
//等待消息
while ($channel->is_open()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();
客户端 send.php
<?php
//引入类库
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FibonacciRpcClient
{
private $connection;//连接
private $channel;//通道
private $callback_queue;//队列名
private $response;//返回数据
private $corr_id;//标识id
/**
* 构造方法
* 构建消费队列(执行完RPC的回传队列),不执行,知识构建
*/
public function __construct()
{
//创建一个连接
$this->connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
//建立通道
$this->channel = $this->connection->channel();
//临时队列
list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
//消费消息
$this->channel->basic_consume($this->callback_queue, '', false, true, false, false,
array(
$this,
'onResponse'//本地的方法名称
)
);
}
/**
* 消费队列执行的方法,判断标识是不是一致
* @param $rep
*/
public function onResponse($rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
/**
* 执行RPC操作
* @param $n
* @return int
* @throws ErrorException
*/
public function call($n)
{
//定义为空
$this->response = null;
//唯一标识,微妙时间戳随机
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array(
'correlation_id' => $this->corr_id,//标识
'reply_to' => $this->callback_queue//回调接口
)
);
//推送队列
$this->channel->basic_publish($msg, '', 'rpc_queue');
//执行回调队列的消费者循环等待,返回结果
while (!$this->response) {
$this->channel->wait();
}
//返回消费者的结果
return intval($this->response);
}
}
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(33);
echo ' [.] Got ', $response, "\n";