RPC 定义
远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。
接下来我们通过手写一个简易的 RPC 框架来分析它的原理。
手写 RPC 框架
原理分析
在手写 RPC 框架之前,我们需要知道为什么要设计 RPC 框架,以及 RPC 框架可能存在的组成部分。
在分布式服务中,最核心的两个点:第一个是我们的模块、功能分布在不同的计算机结点上,第二点是各个模块之间需要基于某种通信的方式去实现数据的交换。
常见的通信方式有两种:HTTP 和 TCP,在 Spring Cloud 里面用的比较多的是 HTTP,在 Dubbo、Thirft、gRPC 中使用的比较多的是 TCP。
我们来思考一个问题,【用户模块】在调用【交易模块】功能的时候,需要做哪些事情?

【用户模块】每次在调用【交易模块】的时候,都会执行上面的步骤,如果没有一个框架来提供这样的功能,那么我们需要为每一个模块都开发这样的逻辑,所以需要定义一套协议(RPC 协议)来规范整个远程调用,进而在这个协议之上设计一个 RPC 框架。
在设计 RPC 框架的时候,每一层需要哪些技术?
【用户模块】在请求【交易模块】的时候,代理层会把调用方法、参数按照一定的格式进行封装,序列化把代理层封装的数据转换为可传输的数据,然后再通过底层的 Socket 通信传输给对方。
【交易模块】在收到【用户模块】请求的时候,反序列化会把传输过来的数据转换为格式化的数据,代理层根据数据内容进行分发,就是基于反射的方式调用【交易模块】的某一个类的某一个方法,获得执行结果,然后把结果返回给【用户模块】。
手写 RPC V1 版本
代码地址:https://gitee.com/yin_jw/demo/tree/master/rpc-demo
Server 端
基于上面的分析,我们来手写一个简易的 RPC 框架。先设计 Server 端,因为 Server 端把接口暴露给 Client 端远程调用的时候,Client 端需要知道接口的定义,所以把接口和实现拆分成两个包,接口的 Jar 包通过 Maven 仓库提供给 Client 端依赖使用。

rpc-server-provider 依赖 rpc-server-api,在 rpc-server-api 中定义 IHelloService 接口,在 rpc-server-provider 中实现该接口。
rpc-server 如果想对外提供 IHelloService 服务,则需要把该服务发布出去。设计 RPCProxyServer 代理类提供服务发布功能。
RpcProxyServer:服务代理类。
public class RpcProxyServer {private static final Map<String, Object> services = new ConcurrentHashMap<>();private static final Executor threadPool = Executors.newCachedThreadPool();public void addServices(String serviceName, Object service) {services.put(serviceName, service);}public void publish(int port) {ServerSocket serverSocket = null;try {serverSocket = new ServerSocket(port);while (true) {Socket socket = serverSocket.accept();threadPool.execute(new ProcessHandler(socket, services));}} catch (IOException e) {e.printStackTrace();} finally {IoUtils.close(serverSocket);}}}
ProcessHandler:请求处理类。
public class ProcessHandler implements Runnable {private Socket socket;private Map<String, Object> services;public ProcessHandler(Socket socket, Map<String, Object> services) {this.socket = socket;this.services = services;}@Overridepublic void run() {ObjectInputStream ois = null;ObjectOutputStream oos = null;try {ois = new ObjectInputStream(socket.getInputStream());// 处理请求参数,反射调用服务方法RpcRequest rpcRequest = (RpcRequest) ois.readObject();Object result = invoke(rpcRequest);oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(result);} catch (Exception e) {e.printStackTrace();} finally {IoUtils.close(oos, ois);}}/*** 反射调用服务方法,返回执行结果** @param rpcRequest*/private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException,InvocationTargetException, IllegalAccessException {String className = rpcRequest.getClassName();Object service = services.get(className);if (service == null) {throw new RuntimeException("service is null");}Class clzss = service.getClass();Object[] parameters = rpcRequest.getParameters();Class<?>[] parameterTypes = new Class[parameters.length];for (int i = 0; i < parameters.length; i++) {parameterTypes[i] = parameters[i].getClass();}Method method = clzss.getMethod(rpcRequest.getMethodName(), parameterTypes);Object result = method.invoke(service, parameters);return result;}}
RpcRequest:定义传输内容的实体类。
public class RpcRequest {private String className;private String methodName;private Object[] parameters;// get set 方法}
启动 Server 服务。
public class App {public static void main(String[] args) {IHelloService helloService = new HelloServiceImpl();RpcProxyServer rpcProxyServer = new RpcProxyServer();rpcProxyServer.addServices(IHelloService.class.getName(), helloService);rpcProxyServer.publish(9090);}}
Client 端
Client 端只依赖了 Server 端的接口包,没有实现类,也是通过代理的方式获得 Server 端的服务数据。
App:Client 端调用,不能直接创建 IHelloService 的实现类,只能获得代理类。
public class App {public static void main(String[] args) {RpcProxyClient rpcProxyClient = new RpcProxyClient();IHelloService helloService = rpcProxyClient.clientProxy(IHelloService.class, "localhost", 9090);String result = helloService.sayHello("james");System.out.println(result);}}
RpcProxyClient:代理类,用来生成 IHelloService 的代理对象。
public class RpcProxyClient {public <T> T clientProxy(Class interfaceCls, String host, int port) {return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),new Class[]{interfaceCls},new RemoteInvocationHandler(host, port));}}
RemoteInvocationHandler:Java 动态代理 InvocationHandler 实现类,用来封装请求参数,调底层 Socket 通信实现类。
public class RemoteInvocationHandler implements InvocationHandler {private String host;private int port;public RemoteInvocationHandler(String host, int port) {this.host = host;this.port = port;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 封装请求参数RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParameters(args);// 请求Server服务获得数据RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);Object result = rpcNetTransport.send(rpcRequest);return result;}}
RpcNetTransport:负责 Socket 通信,把封装好的请求参数传输给 Server 端,同时获得返回数据。
public class RpcNetTransport {private String host;private int port;public RpcNetTransport(String host, int port) {this.host = host;this.port = port;}public Object send(RpcRequest rpcRequest) {Socket socket = null;ObjectOutputStream oos = null;ObjectInputStream ois = null;Object result = null;try {socket = new Socket(host, port);oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(rpcRequest);oos.flush();ois = new ObjectInputStream(socket.getInputStream());result = ois.readObject();} catch (Exception e) {e.printStackTrace();} finally {IoUtils.close(ois, oos, socket);}return result;}}
手写 RPC V1 版本到这里就完成了,主要涉及到的知识点有:Java 动态代理、序列化与反序列、Java BIO Socket 编程。
- “Java 动态代理”参考:https://www.yuque.com/yinjianwei/vyrvkf/dtdhxw
 - “序列化与反序列”参考:https://www.yuque.com/yinjianwei/vyrvkf/sin872
 - “Java BIO Socket 编程”:后续会涉及
 
手写 RPC V2 版本
在 V1 版本的基础上,我们想进一步优化我们的 RPC 框架,引入注解功能,通过注解的方式发布服务。
先引入 Spring 的 Jar 包,通过 Spring 来启动 Server 端。
<dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.3.16.RELEASE</version></dependency>
RpcService:定义注解,该注解需要被 Spring 管理,所以需要加上 @Component 注解。
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface RpcService {// 服务接口名Class<?> value();}
RpcProxyServer:重新定义服务代理类,引入 Spring 功能。
@Componentpublic class RpcProxyServer implements InitializingBean, ApplicationContextAware {private static final Map<String, Object> services = new ConcurrentHashMap<>();private static final Executor threadPool = Executors.newCachedThreadPool();private int port = 9090;/*** 启动Socket服务** @throws Exception*/@Overridepublic void afterPropertiesSet() throws Exception {ServerSocket serverSocket = null;try {serverSocket = new ServerSocket(port);while (true) {Socket socket = serverSocket.accept();threadPool.execute(new ProcessHandler(socket, services));}} catch (IOException e) {e.printStackTrace();} finally {IoUtils.close(serverSocket);}}/*** 通过Spring,获得定义了RpcService注解的service** @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, Object> servicesMap = applicationContext.getBeansWithAnnotation(RpcService.class);if (CollectionUtils.isEmpty(servicesMap)) {return;}for (Object serviceBean : servicesMap.values()) {RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);String serviceName = rpcService.value().getName();services.put(serviceName, serviceBean);}}}
HelloServiceImpl:实现类不要忘了加上自定义的注解 @RpcService(IHelloService.class)
@RpcService(IHelloService.class)public class HelloServiceImpl implements IHelloService {@Overridepublic String sayHello(String content) {System.out.println("request content: " + content);return "say hello";}}
App:启动类,使用 Spring 启动。
public class App {public static void main(String[] args) {AnnotationConfigApplicationContext context =new AnnotationConfigApplicationContext("com.yjw.rpc.v2");context.start();}}
新增加了 Spring 的一些知识点。
作者:殷建卫 链接:https://www.yuque.com/yinjianwei/vyrvkf/yctc8t 来源:殷建卫 - 架构笔记 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
