RPC 实现
Remote Procedure Call 简称 RPC,即远程调用。这个请自行百度。
这里 RPC 实现和传统的 RPC 还不太一样。
- 客户端发送一个请求,并设置了一个回复的队列
- 服务端,消费发送来的请求,并像这个回复的队列,响应了一个消息
就这个流程,通过消息队列来实现,
final AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(corrid).replyTo(replyQueue).build();channel.basicPublish("", requestQueue, properties, "message".getBytes());Copied!
1
2
3
4
5
6
可以通过发送消息时指定 replyTo 和 correlationId 属性:
- replyTo :通常用来设置一个回调队列
- correlationId :用来关联请求(request)为每个 RPC 请求创建一个回调队列,效率很低,可以使用这个通用的解决方案:为每个客户端创建一个单一的回调队列。多个 RPC 公用一个回调队列,就存在哪一个请求对应的响应是什么?那么这个 correlationId 就是解决这个问题的。
核心思路就是利用这两个属性,来模拟 RPC 的实现。下面是一个例子,尽管这个例子模仿了 RPC 的调用,但是存在一个问题,不能在多线程中调用。所以该例子是一个半成品。
RpcServer 服务端
package cn.mrcode.rabbitmq.rpc;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RpcServer {private static String rpcQueue = "rpc_queue";public static void main(String[] args) throws IOException, TimeoutException {final String IP_ADDRESS = "192.168.4.250";final int PORT = 5672;final ConnectionFactory factory = new ConnectionFactory();factory.setHost(IP_ADDRESS);factory.setPort(PORT);factory.setUsername("admin");factory.setPassword("root");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(rpcQueue, false, false, false, null);channel.basicQos(1);System.out.println(" [x] Awaiting RPC requests");channel.basicConsume(rpcQueue, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 正常的消费消息final String message = new String(body, "UTF-8");System.out.println(" [.] message: " + message);// 然后再发出去一条消息final AMQP.BasicProperties replyProps = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", properties.getReplyTo(), replyProps, (message + " reply").getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}});}}Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
RpcClient 客户端
package cn.mrcode.rabbitmq.rpc;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Address;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;import java.util.UUID;import java.util.concurrent.TimeoutException;public class RpcClient {final String IP_ADDRESS = "192.168.4.250";final int PORT = 5672;final Address[] addresses = {new Address(IP_ADDRESS, PORT)};private Connection connection;private Channel channel;// 请求服务端的队列名private String requestQueue = "rpc_queue";// 服务器处理完成后,响应的队列名称private String replyQueue;// 等待回调private QueueingConsumer queueingConsumer;public RpcClient() throws IOException, TimeoutException {final ConnectionFactory factory = new ConnectionFactory();factory.setUsername("admin");factory.setPassword("root");connection = factory.newConnection(addresses);channel = connection.createChannel();// 生命的响应队列:是一个临时的队列replyQueue = channel.queueDeclare().getQueue();queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(replyQueue, true, queueingConsumer);}public String call(String message) throws IOException, InterruptedException {final String corrid = UUID.randomUUID().toString();final AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(corrid).replyTo(replyQueue).build();channel.basicPublish("", requestQueue, properties, "message".getBytes());// 想服务端发送后,轮询,知道回去到服务端的响应为止while (true) {final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();if (delivery.getProperties().getCorrelationId().equals(corrid)) {return new String(delivery.getBody());}}}private void close() throws IOException {connection.close();}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 客户端调用final RpcClient rpcClient = new RpcClient();System.out.println(" [x] Requesting call(30)");final String response = rpcClient.call("30");System.out.println(" [.] Got '" + response + "'");rpcClient.close();}}Copied!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
运行这个程序,服务端和客户端的输出如下
# 服务端[x] Awaiting RPC requests[.] message: message# 客户端[x] Requesting call(30)[.] Got 'message reply'Copied!
1
2
3
4
5
6
7
从客户端调用来看,非常像我们在调用一个 service。 这就是用队列模拟了 RPC 调用。
客户端和服务端发送都没有定义交换器,是空串,这个应该是 RabbitMQ 默认的交换器?
这里笔者需要强调一点的是:以上类,使用了临时队列之类的声明,不要被这个迷惑了。他的核心思路就是:
- 客户端发送消息到一个 队列 A 中发送消息时,通过参数传递 replyTo 和 correlationId
- 服务端消费 队列 A 中的消息并处理这个消息,然后从参数中拿到 correlationId 作为参数,把处理结果发送给从参数中获得的回调队列 replyTo
- 客户端接受响应消息只是在语法上将异步调用模拟成了同步调用
