RPC 实现

Remote Procedure Call 简称 RPC,即远程调用。这个请自行百度。
这里 RPC 实现和传统的 RPC 还不太一样。
image.png

  1. 客户端发送一个请求,并设置了一个回复的队列
  2. 服务端,消费发送来的请求,并像这个回复的队列,响应了一个消息

就这个流程,通过消息队列来实现,

  1. final AMQP.BasicProperties properties = new AMQP.BasicProperties()
  2. .builder()
  3. .correlationId(corrid)
  4. .replyTo(replyQueue)
  5. .build();
  6. channel.basicPublish("", requestQueue, properties, "message".getBytes());
  7. Copied!

1
2
3
4
5
6

可以通过发送消息时指定 replyTo 和 correlationId 属性:

  • replyTo :通常用来设置一个回调队列
  • correlationId :用来关联请求(request)为每个 RPC 请求创建一个回调队列,效率很低,可以使用这个通用的解决方案:为每个客户端创建一个单一的回调队列。多个 RPC 公用一个回调队列,就存在哪一个请求对应的响应是什么?那么这个 correlationId 就是解决这个问题的。

核心思路就是利用这两个属性,来模拟 RPC 的实现。下面是一个例子,尽管这个例子模仿了 RPC 的调用,但是存在一个问题,不能在多线程中调用。所以该例子是一个半成品。
RpcServer 服务端

  1. package cn.mrcode.rabbitmq.rpc;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.DefaultConsumer;
  7. import com.rabbitmq.client.Envelope;
  8. import java.io.IOException;
  9. import java.util.concurrent.TimeoutException;
  10. public class RpcServer {
  11. private static String rpcQueue = "rpc_queue";
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. final String IP_ADDRESS = "192.168.4.250";
  14. final int PORT = 5672;
  15. final ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost(IP_ADDRESS);
  17. factory.setPort(PORT);
  18. factory.setUsername("admin");
  19. factory.setPassword("root");
  20. final Connection connection = factory.newConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.queueDeclare(rpcQueue, false, false, false, null);
  23. channel.basicQos(1);
  24. System.out.println(" [x] Awaiting RPC requests");
  25. channel.basicConsume(rpcQueue, false, new DefaultConsumer(channel) {
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. // 正常的消费消息
  29. final String message = new String(body, "UTF-8");
  30. System.out.println(" [.] message: " + message);
  31. // 然后再发出去一条消息
  32. final AMQP.BasicProperties replyProps = new AMQP.BasicProperties().builder()
  33. .correlationId(properties.getCorrelationId())
  34. .build();
  35. channel.basicPublish("", properties.getReplyTo(), replyProps, (message + " reply").getBytes());
  36. channel.basicAck(envelope.getDeliveryTag(), false);
  37. }
  38. });
  39. }
  40. }
  41. Copied!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

RpcClient 客户端

  1. package cn.mrcode.rabbitmq.rpc;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Address;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. import com.rabbitmq.client.QueueingConsumer;
  8. import java.io.IOException;
  9. import java.util.UUID;
  10. import java.util.concurrent.TimeoutException;
  11. public class RpcClient {
  12. final String IP_ADDRESS = "192.168.4.250";
  13. final int PORT = 5672;
  14. final Address[] addresses = {
  15. new Address(IP_ADDRESS, PORT)
  16. };
  17. private Connection connection;
  18. private Channel channel;
  19. // 请求服务端的队列名
  20. private String requestQueue = "rpc_queue";
  21. // 服务器处理完成后,响应的队列名称
  22. private String replyQueue;
  23. // 等待回调
  24. private QueueingConsumer queueingConsumer;
  25. public RpcClient() throws IOException, TimeoutException {
  26. final ConnectionFactory factory = new ConnectionFactory();
  27. factory.setUsername("admin");
  28. factory.setPassword("root");
  29. connection = factory.newConnection(addresses);
  30. channel = connection.createChannel();
  31. // 生命的响应队列:是一个临时的队列
  32. replyQueue = channel.queueDeclare().getQueue();
  33. queueingConsumer = new QueueingConsumer(channel);
  34. channel.basicConsume(replyQueue, true, queueingConsumer);
  35. }
  36. public String call(String message) throws IOException, InterruptedException {
  37. final String corrid = UUID.randomUUID().toString();
  38. final AMQP.BasicProperties properties = new AMQP.BasicProperties()
  39. .builder()
  40. .correlationId(corrid)
  41. .replyTo(replyQueue)
  42. .build();
  43. channel.basicPublish("", requestQueue, properties, "message".getBytes());
  44. // 想服务端发送后,轮询,知道回去到服务端的响应为止
  45. while (true) {
  46. final QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  47. if (delivery.getProperties().getCorrelationId().equals(corrid)) {
  48. return new String(delivery.getBody());
  49. }
  50. }
  51. }
  52. private void close() throws IOException {
  53. connection.close();
  54. }
  55. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  56. // 客户端调用
  57. final RpcClient rpcClient = new RpcClient();
  58. System.out.println(" [x] Requesting call(30)");
  59. final String response = rpcClient.call("30");
  60. System.out.println(" [.] Got '" + response + "'");
  61. rpcClient.close();
  62. }
  63. }
  64. Copied!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

运行这个程序,服务端和客户端的输出如下

  1. # 服务端
  2. [x] Awaiting RPC requests
  3. [.] message: message
  4. # 客户端
  5. [x] Requesting call(30)
  6. [.] Got 'message reply'
  7. Copied!

1
2
3
4
5
6
7

从客户端调用来看,非常像我们在调用一个 service。 这就是用队列模拟了 RPC 调用。
客户端和服务端发送都没有定义交换器,是空串,这个应该是 RabbitMQ 默认的交换器?
这里笔者需要强调一点的是:以上类,使用了临时队列之类的声明,不要被这个迷惑了。他的核心思路就是:

  1. 客户端发送消息到一个 队列 A 中发送消息时,通过参数传递 replyTo 和 correlationId
  2. 服务端消费 队列 A 中的消息并处理这个消息,然后从参数中拿到 correlationId 作为参数,把处理结果发送给从参数中获得的回调队列 replyTo
  3. 客户端接受响应消息只是在语法上将异步调用模拟成了同步调用