RPC 定义
远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。
接下来我们通过手写一个简易的 RPC 框架来分析它的原理。
手写 RPC 框架
原理分析
在手写 RPC 框架之前,我们需要知道为什么要设计 RPC 框架,以及 RPC 框架可能存在的组成部分。
在分布式服务中,最核心的两个点:第一个是我们的模块、功能分布在不同的计算机结点上,第二点是各个模块之间需要基于某种通信的方式去实现数据的交换。
常见的通信方式有两种:HTTP 和 TCP,在 Spring Cloud 里面用的比较多的是 HTTP,在 Dubbo、Thirft、gRPC 中使用的比较多的是 TCP。
我们来思考一个问题,【用户模块】在调用【交易模块】功能的时候,需要做哪些事情?
【用户模块】每次在调用【交易模块】的时候,都会执行上面的步骤,如果没有一个框架来提供这样的功能,那么我们需要为每一个模块都开发这样的逻辑,所以需要定义一套协议(RPC 协议)来规范整个远程调用,进而在这个协议之上设计一个 RPC 框架。
在设计 RPC 框架的时候,每一层需要哪些技术?
【用户模块】在请求【交易模块】的时候,代理层会把调用方法、参数按照一定的格式进行封装,序列化把代理层封装的数据转换为可传输的数据,然后再通过底层的 Socket 通信传输给对方。
【交易模块】在收到【用户模块】请求的时候,反序列化会把传输过来的数据转换为格式化的数据,代理层根据数据内容进行分发,就是基于反射的方式调用【交易模块】的某一个类的某一个方法,获得执行结果,然后把结果返回给【用户模块】。
手写 RPC V1 版本
代码地址:https://gitee.com/yin_jw/demo/tree/master/rpc-demo
Server 端
基于上面的分析,我们来手写一个简易的 RPC 框架。先设计 Server 端,因为 Server 端把接口暴露给 Client 端远程调用的时候,Client 端需要知道接口的定义,所以把接口和实现拆分成两个包,接口的 Jar 包通过 Maven 仓库提供给 Client 端依赖使用。
rpc-server-provider 依赖 rpc-server-api,在 rpc-server-api 中定义 IHelloService 接口,在 rpc-server-provider 中实现该接口。
rpc-server 如果想对外提供 IHelloService 服务,则需要把该服务发布出去。设计 RPCProxyServer 代理类提供服务发布功能。
RpcProxyServer:服务代理类。
public class RpcProxyServer {
private static final Map<String, Object> services = new ConcurrentHashMap<>();
private static final Executor threadPool = Executors.newCachedThreadPool();
public void addServices(String serviceName, Object service) {
services.put(serviceName, service);
}
public void publish(int port) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {
Socket socket = serverSocket.accept();
threadPool.execute(new ProcessHandler(socket, services));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IoUtils.close(serverSocket);
}
}
}
ProcessHandler:请求处理类。
public class ProcessHandler implements Runnable {
private Socket socket;
private Map<String, Object> services;
public ProcessHandler(Socket socket, Map<String, Object> services) {
this.socket = socket;
this.services = services;
}
@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
ois = new ObjectInputStream(socket.getInputStream());
// 处理请求参数,反射调用服务方法
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
Object result = invoke(rpcRequest);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
IoUtils.close(oos, ois);
}
}
/**
* 反射调用服务方法,返回执行结果
*
* @param rpcRequest
*/
private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
String className = rpcRequest.getClassName();
Object service = services.get(className);
if (service == null) {
throw new RuntimeException("service is null");
}
Class clzss = service.getClass();
Object[] parameters = rpcRequest.getParameters();
Class<?>[] parameterTypes = new Class[parameters.length];
for (int i = 0; i < parameters.length; i++) {
parameterTypes[i] = parameters[i].getClass();
}
Method method = clzss.getMethod(rpcRequest.getMethodName(), parameterTypes);
Object result = method.invoke(service, parameters);
return result;
}
}
RpcRequest:定义传输内容的实体类。
public class RpcRequest {
private String className;
private String methodName;
private Object[] parameters;
// get set 方法
}
启动 Server 服务。
public class App {
public static void main(String[] args) {
IHelloService helloService = new HelloServiceImpl();
RpcProxyServer rpcProxyServer = new RpcProxyServer();
rpcProxyServer.addServices(IHelloService.class.getName(), helloService);
rpcProxyServer.publish(9090);
}
}
Client 端
Client 端只依赖了 Server 端的接口包,没有实现类,也是通过代理的方式获得 Server 端的服务数据。
App:Client 端调用,不能直接创建 IHelloService 的实现类,只能获得代理类。
public class App {
public static void main(String[] args) {
RpcProxyClient rpcProxyClient = new RpcProxyClient();
IHelloService helloService = rpcProxyClient.clientProxy(IHelloService.class, "localhost", 9090);
String result = helloService.sayHello("james");
System.out.println(result);
}
}
RpcProxyClient:代理类,用来生成 IHelloService 的代理对象。
public class RpcProxyClient {
public <T> T clientProxy(Class interfaceCls, String host, int port) {
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
new Class[]{interfaceCls},
new RemoteInvocationHandler(host, port));
}
}
RemoteInvocationHandler:Java 动态代理 InvocationHandler 实现类,用来封装请求参数,调底层 Socket 通信实现类。
public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 封装请求参数
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
// 请求Server服务获得数据
RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
Object result = rpcNetTransport.send(rpcRequest);
return result;
}
}
RpcNetTransport:负责 Socket 通信,把封装好的请求参数传输给 Server 端,同时获得返回数据。
public class RpcNetTransport {
private String host;
private int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Object send(RpcRequest rpcRequest) {
Socket socket = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
Object result = null;
try {
socket = new Socket(host, port);
oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(rpcRequest);
oos.flush();
ois = new ObjectInputStream(socket.getInputStream());
result = ois.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
IoUtils.close(ois, oos, socket);
}
return result;
}
}
手写 RPC V1 版本到这里就完成了,主要涉及到的知识点有:Java 动态代理、序列化与反序列、Java BIO Socket 编程。
- “Java 动态代理”参考:https://www.yuque.com/yinjianwei/vyrvkf/dtdhxw
- “序列化与反序列”参考:https://www.yuque.com/yinjianwei/vyrvkf/sin872
- “Java BIO Socket 编程”:后续会涉及
手写 RPC V2 版本
在 V1 版本的基础上,我们想进一步优化我们的 RPC 框架,引入注解功能,通过注解的方式发布服务。
先引入 Spring 的 Jar 包,通过 Spring 来启动 Server 端。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.16.RELEASE</version>
</dependency>
RpcService:定义注解,该注解需要被 Spring 管理,所以需要加上 @Component 注解。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
// 服务接口名
Class<?> value();
}
RpcProxyServer:重新定义服务代理类,引入 Spring 功能。
@Component
public class RpcProxyServer implements InitializingBean, ApplicationContextAware {
private static final Map<String, Object> services = new ConcurrentHashMap<>();
private static final Executor threadPool = Executors.newCachedThreadPool();
private int port = 9090;
/**
* 启动Socket服务
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {
Socket socket = serverSocket.accept();
threadPool.execute(new ProcessHandler(socket, services));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IoUtils.close(serverSocket);
}
}
/**
* 通过Spring,获得定义了RpcService注解的service
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> servicesMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (CollectionUtils.isEmpty(servicesMap)) {
return;
}
for (Object serviceBean : servicesMap.values()) {
RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
String serviceName = rpcService.value().getName();
services.put(serviceName, serviceBean);
}
}
}
HelloServiceImpl:实现类不要忘了加上自定义的注解 @RpcService(IHelloService.class)
@RpcService(IHelloService.class)
public class HelloServiceImpl implements IHelloService {
@Override
public String sayHello(String content) {
System.out.println("request content: " + content);
return "say hello";
}
}
App:启动类,使用 Spring 启动。
public class App {
public static void main(String[] args) {
AnnotationConfigApplicationContext context =
new AnnotationConfigApplicationContext("com.yjw.rpc.v2");
context.start();
}
}
新增加了 Spring 的一些知识点。
作者:殷建卫 链接:https://www.yuque.com/yinjianwei/vyrvkf/yctc8t 来源:殷建卫 - 架构笔记 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。