RPC 定义

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。

接下来我们通过手写一个简易的 RPC 框架来分析它的原理。

手写 RPC 框架

原理分析

在手写 RPC 框架之前,我们需要知道为什么要设计 RPC 框架,以及 RPC 框架可能存在的组成部分。

在分布式服务中,最核心的两个点:第一个是我们的模块、功能分布在不同的计算机结点上,第二点是各个模块之间需要基于某种通信的方式去实现数据的交换。

常见的通信方式有两种:HTTP 和 TCP,在 Spring Cloud 里面用的比较多的是 HTTP,在 Dubbo、Thirft、gRPC 中使用的比较多的是 TCP。

我们来思考一个问题,【用户模块】在调用【交易模块】功能的时候,需要做哪些事情?

image.png

【用户模块】每次在调用【交易模块】的时候,都会执行上面的步骤,如果没有一个框架来提供这样的功能,那么我们需要为每一个模块都开发这样的逻辑,所以需要定义一套协议(RPC 协议)来规范整个远程调用,进而在这个协议之上设计一个 RPC 框架。

在设计 RPC 框架的时候,每一层需要哪些技术?

【用户模块】在请求【交易模块】的时候,代理层会把调用方法、参数按照一定的格式进行封装,序列化把代理层封装的数据转换为可传输的数据,然后再通过底层的 Socket 通信传输给对方。

【交易模块】在收到【用户模块】请求的时候,反序列化会把传输过来的数据转换为格式化的数据,代理层根据数据内容进行分发,就是基于反射的方式调用【交易模块】的某一个类的某一个方法,获得执行结果,然后把结果返回给【用户模块】。

手写 RPC V1 版本

代码地址:https://gitee.com/yin_jw/demo/tree/master/rpc-demo

Server 端

基于上面的分析,我们来手写一个简易的 RPC 框架。先设计 Server 端,因为 Server 端把接口暴露给 Client 端远程调用的时候,Client 端需要知道接口的定义,所以把接口和实现拆分成两个包,接口的 Jar 包通过 Maven 仓库提供给 Client 端依赖使用。

image.png

rpc-server-provider 依赖 rpc-server-api,在 rpc-server-api 中定义 IHelloService 接口,在 rpc-server-provider 中实现该接口。

rpc-server 如果想对外提供 IHelloService 服务,则需要把该服务发布出去。设计 RPCProxyServer 代理类提供服务发布功能。

RpcProxyServer:服务代理类。

  1. public class RpcProxyServer {
  2. private static final Map<String, Object> services = new ConcurrentHashMap<>();
  3. private static final Executor threadPool = Executors.newCachedThreadPool();
  4. public void addServices(String serviceName, Object service) {
  5. services.put(serviceName, service);
  6. }
  7. public void publish(int port) {
  8. ServerSocket serverSocket = null;
  9. try {
  10. serverSocket = new ServerSocket(port);
  11. while (true) {
  12. Socket socket = serverSocket.accept();
  13. threadPool.execute(new ProcessHandler(socket, services));
  14. }
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. } finally {
  18. IoUtils.close(serverSocket);
  19. }
  20. }
  21. }

ProcessHandler:请求处理类。

  1. public class ProcessHandler implements Runnable {
  2. private Socket socket;
  3. private Map<String, Object> services;
  4. public ProcessHandler(Socket socket, Map<String, Object> services) {
  5. this.socket = socket;
  6. this.services = services;
  7. }
  8. @Override
  9. public void run() {
  10. ObjectInputStream ois = null;
  11. ObjectOutputStream oos = null;
  12. try {
  13. ois = new ObjectInputStream(socket.getInputStream());
  14. // 处理请求参数,反射调用服务方法
  15. RpcRequest rpcRequest = (RpcRequest) ois.readObject();
  16. Object result = invoke(rpcRequest);
  17. oos = new ObjectOutputStream(socket.getOutputStream());
  18. oos.writeObject(result);
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. } finally {
  22. IoUtils.close(oos, ois);
  23. }
  24. }
  25. /**
  26. * 反射调用服务方法,返回执行结果
  27. *
  28. * @param rpcRequest
  29. */
  30. private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException,
  31. InvocationTargetException, IllegalAccessException {
  32. String className = rpcRequest.getClassName();
  33. Object service = services.get(className);
  34. if (service == null) {
  35. throw new RuntimeException("service is null");
  36. }
  37. Class clzss = service.getClass();
  38. Object[] parameters = rpcRequest.getParameters();
  39. Class<?>[] parameterTypes = new Class[parameters.length];
  40. for (int i = 0; i < parameters.length; i++) {
  41. parameterTypes[i] = parameters[i].getClass();
  42. }
  43. Method method = clzss.getMethod(rpcRequest.getMethodName(), parameterTypes);
  44. Object result = method.invoke(service, parameters);
  45. return result;
  46. }
  47. }

RpcRequest:定义传输内容的实体类。

  1. public class RpcRequest {
  2. private String className;
  3. private String methodName;
  4. private Object[] parameters;
  5. // get set 方法
  6. }

启动 Server 服务。

  1. public class App {
  2. public static void main(String[] args) {
  3. IHelloService helloService = new HelloServiceImpl();
  4. RpcProxyServer rpcProxyServer = new RpcProxyServer();
  5. rpcProxyServer.addServices(IHelloService.class.getName(), helloService);
  6. rpcProxyServer.publish(9090);
  7. }
  8. }

Client 端

Client 端只依赖了 Server 端的接口包,没有实现类,也是通过代理的方式获得 Server 端的服务数据。

App:Client 端调用,不能直接创建 IHelloService 的实现类,只能获得代理类。

  1. public class App {
  2. public static void main(String[] args) {
  3. RpcProxyClient rpcProxyClient = new RpcProxyClient();
  4. IHelloService helloService = rpcProxyClient.clientProxy(IHelloService.class, "localhost", 9090);
  5. String result = helloService.sayHello("james");
  6. System.out.println(result);
  7. }
  8. }

RpcProxyClient:代理类,用来生成 IHelloService 的代理对象。

  1. public class RpcProxyClient {
  2. public <T> T clientProxy(Class interfaceCls, String host, int port) {
  3. return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
  4. new Class[]{interfaceCls},
  5. new RemoteInvocationHandler(host, port));
  6. }
  7. }

RemoteInvocationHandler:Java 动态代理 InvocationHandler 实现类,用来封装请求参数,调底层 Socket 通信实现类。

  1. public class RemoteInvocationHandler implements InvocationHandler {
  2. private String host;
  3. private int port;
  4. public RemoteInvocationHandler(String host, int port) {
  5. this.host = host;
  6. this.port = port;
  7. }
  8. @Override
  9. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  10. // 封装请求参数
  11. RpcRequest rpcRequest = new RpcRequest();
  12. rpcRequest.setClassName(method.getDeclaringClass().getName());
  13. rpcRequest.setMethodName(method.getName());
  14. rpcRequest.setParameters(args);
  15. // 请求Server服务获得数据
  16. RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
  17. Object result = rpcNetTransport.send(rpcRequest);
  18. return result;
  19. }
  20. }

RpcNetTransport:负责 Socket 通信,把封装好的请求参数传输给 Server 端,同时获得返回数据。

  1. public class RpcNetTransport {
  2. private String host;
  3. private int port;
  4. public RpcNetTransport(String host, int port) {
  5. this.host = host;
  6. this.port = port;
  7. }
  8. public Object send(RpcRequest rpcRequest) {
  9. Socket socket = null;
  10. ObjectOutputStream oos = null;
  11. ObjectInputStream ois = null;
  12. Object result = null;
  13. try {
  14. socket = new Socket(host, port);
  15. oos = new ObjectOutputStream(socket.getOutputStream());
  16. oos.writeObject(rpcRequest);
  17. oos.flush();
  18. ois = new ObjectInputStream(socket.getInputStream());
  19. result = ois.readObject();
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. } finally {
  23. IoUtils.close(ois, oos, socket);
  24. }
  25. return result;
  26. }
  27. }

手写 RPC V1 版本到这里就完成了,主要涉及到的知识点有:Java 动态代理、序列化与反序列、Java BIO Socket 编程。

手写 RPC V2 版本

在 V1 版本的基础上,我们想进一步优化我们的 RPC 框架,引入注解功能,通过注解的方式发布服务。

先引入 Spring 的 Jar 包,通过 Spring 来启动 Server 端。

  1. <dependency>
  2. <groupId>org.springframework</groupId>
  3. <artifactId>spring-context</artifactId>
  4. <version>4.3.16.RELEASE</version>
  5. </dependency>

RpcService:定义注解,该注解需要被 Spring 管理,所以需要加上 @Component 注解。

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Component
  4. public @interface RpcService {
  5. // 服务接口名
  6. Class<?> value();
  7. }

RpcProxyServer:重新定义服务代理类,引入 Spring 功能。

  1. @Component
  2. public class RpcProxyServer implements InitializingBean, ApplicationContextAware {
  3. private static final Map<String, Object> services = new ConcurrentHashMap<>();
  4. private static final Executor threadPool = Executors.newCachedThreadPool();
  5. private int port = 9090;
  6. /**
  7. * 启动Socket服务
  8. *
  9. * @throws Exception
  10. */
  11. @Override
  12. public void afterPropertiesSet() throws Exception {
  13. ServerSocket serverSocket = null;
  14. try {
  15. serverSocket = new ServerSocket(port);
  16. while (true) {
  17. Socket socket = serverSocket.accept();
  18. threadPool.execute(new ProcessHandler(socket, services));
  19. }
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. } finally {
  23. IoUtils.close(serverSocket);
  24. }
  25. }
  26. /**
  27. * 通过Spring,获得定义了RpcService注解的service
  28. *
  29. * @param applicationContext
  30. * @throws BeansException
  31. */
  32. @Override
  33. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  34. Map<String, Object> servicesMap = applicationContext.getBeansWithAnnotation(RpcService.class);
  35. if (CollectionUtils.isEmpty(servicesMap)) {
  36. return;
  37. }
  38. for (Object serviceBean : servicesMap.values()) {
  39. RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
  40. String serviceName = rpcService.value().getName();
  41. services.put(serviceName, serviceBean);
  42. }
  43. }
  44. }

HelloServiceImpl:实现类不要忘了加上自定义的注解 @RpcService(IHelloService.class)

  1. @RpcService(IHelloService.class)
  2. public class HelloServiceImpl implements IHelloService {
  3. @Override
  4. public String sayHello(String content) {
  5. System.out.println("request content: " + content);
  6. return "say hello";
  7. }
  8. }

App:启动类,使用 Spring 启动。

  1. public class App {
  2. public static void main(String[] args) {
  3. AnnotationConfigApplicationContext context =
  4. new AnnotationConfigApplicationContext("com.yjw.rpc.v2");
  5. context.start();
  6. }
  7. }

新增加了 Spring 的一些知识点。

作者:殷建卫 链接:https://www.yuque.com/yinjianwei/vyrvkf/yctc8t 来源:殷建卫 - 架构笔记 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。