一、什么是RPC(Remote Procedure Call)

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
image.png
image.png

1.1 RPC 框架组件

  • User 客户端
  • User-stub 客户端存根
  • RPCRuntime RPC 通信组件
  • Server-stub 服务端存根
  • Server 服务端

1.2 RPC 工作原理

image.png

  1. Client像调用本地服务似的调用远程服务;
  2. Client stub接收到调用后,将方法、参数序列化
  3. 客户端通过sockets将消息发送到服务端
  4. Server stub 收到消息后进行解码(将消息对象反序列化)
  5. Server stub 根据解码结果调用本地的服
  6. 本地服务执行(对于服务端来说是本地执行)并将结果返回给Server stub
  7. Server stub将返回结果打包成消息(将结果消息对象序列化)
  8. 服务端通过sockets将消息发送到客户端
  9. Client stub接收到结果消息,并进行解码(将结果消息反序列化)
  10. 客户端得到最终结果。

注意: RPC 调用分以下两种:

  • 同步调用:客户方等待调用执行完成并返回结果。
  • 异步调用:客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果。若客户方不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。

1.3 rpc 可用的序列化协议方案

  • jdk 的序列化方法。(不推荐,不利于之后的跨语言调用)
  • json 可读性强,但是序列化速度慢,体积大。
  • protobuf
  • kyro
  • Hessian

1.4 rpc 动态代理方案

  • jdk 动态代理
  • cglib 动态代理
  • javassist 动态代理
  • ASM 字节码
  • javassist 字节码

二、有那些RPC 框架的实现

  • Spring Cloud
  • Dubbo
  • Thrift
  • Rabbitmq RPC

三、手动实现一个RPC调用

这里采用简单的json 序列化方式,使用socket 寻址通信

3.1 对解码编码的包装

  1. //编码 包装需要调用服务端的方法参数
  2. @Data
  3. public class RpcRequest implements Serializable {
  4. private static final long serialVersionUID = 1L;
  5. // 需要请求的类名
  6. private String className;
  7. // 需求请求的方法名
  8. private String methodName;
  9. // 请求方法的参数类型
  10. private Class<?>[] paramTypes;
  11. // 请求的参数值
  12. private Object[] params;
  13. }
  14. //解码 服务端响应结果包装
  15. @Data
  16. public class RpcResponse implements Serializable {
  17. private static final long serialVersionUID = 1L;
  18. // 可能抛出的异常
  19. private Throwable error;
  20. // 响应的内容或结果
  21. private Object result;
  22. }

3.2 客户端stub 代理调用服务端(JDK代理)

  1. public class RpcServiceHandler implements InvocationHandler {
  2. private String host; // 服务端地址
  3. private int port; // 服务端口号
  4. public RpcServiceHandler(String host, int port) {
  5. this.host = host;
  6. this.port = port;
  7. }
  8. /**
  9. * 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
  10. */
  11. @Override
  12. public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
  13. // 调用前
  14. System.out.println("执行远程方法前,可以做些事情");
  15. // 封装参数,类似于序列化的过程
  16. RpcRequest request = new RpcRequest();
  17. request.setClassName(method.getDeclaringClass().getName());
  18. request.setMethodName(method.getName());
  19. request.setParamTypes(method.getParameterTypes());
  20. request.setParams(params);
  21. // 连接服务器调用服务
  22. Object rst = execute(request, method);
  23. // 调用后
  24. System.out.println("执行远程方法后,也可以做些事情");
  25. return rst;
  26. }
  27. private Object execute(RpcRequest request, Method method) throws Throwable {
  28. // 打开远端服务连接
  29. Socket server = new Socket(host, port);
  30. OutputStream out = null;
  31. InputStream in = null;
  32. try {
  33. // 1. 服务端输出流,写入请求数据,发送请求数据
  34. out = server.getOutputStream();
  35. IoUtil.writeUtf8(out, false, JSONUtil.toJsonStr(request));
  36. IoUtil.flush(out);
  37. //告知服务端已写入完成
  38. server.shutdownOutput();
  39. // 2. 服务端输入流,获取返回数据,转换参数类型
  40. // 类似于反序列化的过程
  41. in = server.getInputStream();
  42. String json = IoUtil.read(in, "utf-8");
  43. RpcResponse response = JSONUtil.toBean(json, new TypeReference<RpcResponse>() {
  44. }, true);
  45. // 3. 返回服务端响应结果
  46. if (response.getError() != null) { // 服务器产生异常
  47. throw response.getError();
  48. }
  49. return JSONUtil.toBean((JSON) response.getResult(), new TypeReference<Object>() {
  50. @Override
  51. public Type getType() {
  52. return TypeUtil.getReturnType(method);
  53. }
  54. }, true);
  55. } finally {
  56. IoUtil.close(in);
  57. IoUtil.close(out);
  58. IoUtil.close(server);
  59. }
  60. }
  61. }

3.3 服务端stub

  1. public class RpcProvider {
  2. /**
  3. * 线程池
  4. */
  5. private static final ExecutorService executorService = new ThreadPoolExecutor(
  6. 10,
  7. 10,
  8. 30,
  9. TimeUnit.SECONDS,
  10. new LinkedBlockingQueue<Runnable>()
  11. );
  12. /**
  13. * rpc 提供端,暴露服务
  14. */
  15. public static <T> void provider(final T service, int port) throws IOException {
  16. //创建服务端的套接字,绑定端口port
  17. ServerSocket serverSocket = new ServerSocket(port);
  18. while (true) {
  19. //开始接收客户端的消息,并以此创建套接字
  20. final Socket socket = serverSocket.accept();
  21. executorService.execute(new Handler(socket, service));
  22. }
  23. }
  24. /**
  25. * 响应调用端
  26. *
  27. * @param oout
  28. * @param result
  29. */
  30. private static void response(OutputStream oout, Object result, Throwable throwable) {
  31. RpcResponse response = new RpcResponse();
  32. response.setResult(result);
  33. response.setError(throwable);
  34. IoUtil.writeUtf8(oout, false, JSONUtil.toJsonStr(response));
  35. IoUtil.flush(oout);
  36. System.out.println(Thread.currentThread().getName() + "=====> 响应结果" + response);
  37. }
  38. public static class Handler<T> implements Runnable {
  39. private final Socket socket;
  40. private T service;
  41. public Handler(Socket socket, T service) {
  42. this.socket = socket;
  43. this.service = service;
  44. }
  45. @Override
  46. public void run() {
  47. final Socket client = socket;
  48. //创建呢一个对内传输的对象流,并绑定套接字
  49. RpcRequest request = null;
  50. InputStream in;
  51. try {
  52. // 1. 获取流以待操作
  53. in = client.getInputStream();
  54. String json = IoUtil.read(in, "utf-8");
  55. System.out.println(Thread.currentThread().getName() + "<===== 接受rpc 调用端请求" + json);
  56. //2读取参数
  57. request = JSONUtil.toBean(json, RpcRequest.class);
  58. // 3. 执行服务方法, 返回结果
  59. Class<?> clazz = service.getClass();
  60. Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
  61. Object result = method.invoke(service, request.getParams());
  62. System.out.println(Thread.currentThread().getName() + "<===== 处理结果" + result);
  63. // 4. 返回RPC响应,序列化RpcResponse
  64. response(client.getOutputStream(), result, null);
  65. } catch (Exception e) {
  66. try { //异常处理
  67. if (client.getOutputStream() != null) {
  68. response(client.getOutputStream(), null, e);
  69. }
  70. } catch (Exception e1) {
  71. e1.printStackTrace();
  72. }
  73. } finally {
  74. IoUtil.close(client);
  75. }
  76. }
  77. }
  78. }

3.4 测试

  • 服务端启动 ```java public class RpcProviderApp {

    public static void main(String[] args) throws IOException {

    1. //实例化服务
    2. StudentService studentService = new StudentServiceImpl();
    3. //暴露服务
    4. RpcProvider.provider(studentService, 9999);

    }

}

  1. - 客户端启动
  2. ```java
  3. //客户端启动
  4. public class RpcConsumerApp {
  5. public static void main(String[] args) {
  6. StudentService studentService = RpcServiceFactory.create(StudentService.class, "127.0.0.1", 9999);
  7. System.out.println(studentService.getInfo());
  8. }
  9. }

项目地址

https://github.com/h-dj/Spring-Learning

参考