一、什么是RPC(Remote Procedure Call)
RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
1.1 RPC 框架组件
- User 客户端
- User-stub 客户端存根
- RPCRuntime RPC 通信组件
- Server-stub 服务端存根
- Server 服务端
1.2 RPC 工作原理
- Client像调用本地服务似的调用远程服务;
- Client stub接收到调用后,将方法、参数序列化
- 客户端通过sockets将消息发送到服务端
- Server stub 收到消息后进行解码(将消息对象反序列化)
- Server stub 根据解码结果调用本地的服
- 本地服务执行(对于服务端来说是本地执行)并将结果返回给Server stub
- Server stub将返回结果打包成消息(将结果消息对象序列化)
- 服务端通过sockets将消息发送到客户端
- Client stub接收到结果消息,并进行解码(将结果消息反序列化)
- 客户端得到最终结果。
注意: RPC 调用分以下两种:
- 同步调用:客户方等待调用执行完成并返回结果。
- 异步调用:客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果。若客户方不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。
1.3 rpc 可用的序列化协议方案
- jdk 的序列化方法。(不推荐,不利于之后的跨语言调用)
- json 可读性强,但是序列化速度慢,体积大。
- protobuf
- kyro
- Hessian
1.4 rpc 动态代理方案
- jdk 动态代理
- cglib 动态代理
- javassist 动态代理
- ASM 字节码
- javassist 字节码
二、有那些RPC 框架的实现
- Spring Cloud
- Dubbo
- Thrift
- Rabbitmq RPC
三、手动实现一个RPC调用
这里采用简单的json 序列化方式,使用socket 寻址通信
3.1 对解码编码的包装
//编码 包装需要调用服务端的方法参数
@Data
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 1L;
// 需要请求的类名
private String className;
// 需求请求的方法名
private String methodName;
// 请求方法的参数类型
private Class<?>[] paramTypes;
// 请求的参数值
private Object[] params;
}
//解码 服务端响应结果包装
@Data
public class RpcResponse implements Serializable {
private static final long serialVersionUID = 1L;
// 可能抛出的异常
private Throwable error;
// 响应的内容或结果
private Object result;
}
3.2 客户端stub 代理调用服务端(JDK代理)
public class RpcServiceHandler implements InvocationHandler {
private String host; // 服务端地址
private int port; // 服务端口号
public RpcServiceHandler(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 动态代理做的事情,接口的实现不在本地,在网络中的其他进程中,我们通过实现了Rpc客户端的对象来发起远程服务的调用。
*/
@Override
public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
// 调用前
System.out.println("执行远程方法前,可以做些事情");
// 封装参数,类似于序列化的过程
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(params);
// 连接服务器调用服务
Object rst = execute(request, method);
// 调用后
System.out.println("执行远程方法后,也可以做些事情");
return rst;
}
private Object execute(RpcRequest request, Method method) throws Throwable {
// 打开远端服务连接
Socket server = new Socket(host, port);
OutputStream out = null;
InputStream in = null;
try {
// 1. 服务端输出流,写入请求数据,发送请求数据
out = server.getOutputStream();
IoUtil.writeUtf8(out, false, JSONUtil.toJsonStr(request));
IoUtil.flush(out);
//告知服务端已写入完成
server.shutdownOutput();
// 2. 服务端输入流,获取返回数据,转换参数类型
// 类似于反序列化的过程
in = server.getInputStream();
String json = IoUtil.read(in, "utf-8");
RpcResponse response = JSONUtil.toBean(json, new TypeReference<RpcResponse>() {
}, true);
// 3. 返回服务端响应结果
if (response.getError() != null) { // 服务器产生异常
throw response.getError();
}
return JSONUtil.toBean((JSON) response.getResult(), new TypeReference<Object>() {
@Override
public Type getType() {
return TypeUtil.getReturnType(method);
}
}, true);
} finally {
IoUtil.close(in);
IoUtil.close(out);
IoUtil.close(server);
}
}
}
3.3 服务端stub
public class RpcProvider {
/**
* 线程池
*/
private static final ExecutorService executorService = new ThreadPoolExecutor(
10,
10,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
/**
* rpc 提供端,暴露服务
*/
public static <T> void provider(final T service, int port) throws IOException {
//创建服务端的套接字,绑定端口port
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
//开始接收客户端的消息,并以此创建套接字
final Socket socket = serverSocket.accept();
executorService.execute(new Handler(socket, service));
}
}
/**
* 响应调用端
*
* @param oout
* @param result
*/
private static void response(OutputStream oout, Object result, Throwable throwable) {
RpcResponse response = new RpcResponse();
response.setResult(result);
response.setError(throwable);
IoUtil.writeUtf8(oout, false, JSONUtil.toJsonStr(response));
IoUtil.flush(oout);
System.out.println(Thread.currentThread().getName() + "=====> 响应结果" + response);
}
public static class Handler<T> implements Runnable {
private final Socket socket;
private T service;
public Handler(Socket socket, T service) {
this.socket = socket;
this.service = service;
}
@Override
public void run() {
final Socket client = socket;
//创建呢一个对内传输的对象流,并绑定套接字
RpcRequest request = null;
InputStream in;
try {
// 1. 获取流以待操作
in = client.getInputStream();
String json = IoUtil.read(in, "utf-8");
System.out.println(Thread.currentThread().getName() + "<===== 接受rpc 调用端请求" + json);
//2读取参数
request = JSONUtil.toBean(json, RpcRequest.class);
// 3. 执行服务方法, 返回结果
Class<?> clazz = service.getClass();
Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
Object result = method.invoke(service, request.getParams());
System.out.println(Thread.currentThread().getName() + "<===== 处理结果" + result);
// 4. 返回RPC响应,序列化RpcResponse
response(client.getOutputStream(), result, null);
} catch (Exception e) {
try { //异常处理
if (client.getOutputStream() != null) {
response(client.getOutputStream(), null, e);
}
} catch (Exception e1) {
e1.printStackTrace();
}
} finally {
IoUtil.close(client);
}
}
}
}
3.4 测试
服务端启动 ```java public class RpcProviderApp {
public static void main(String[] args) throws IOException {
//实例化服务
StudentService studentService = new StudentServiceImpl();
//暴露服务
RpcProvider.provider(studentService, 9999);
}
}
- 客户端启动
```java
//客户端启动
public class RpcConsumerApp {
public static void main(String[] args) {
StudentService studentService = RpcServiceFactory.create(StudentService.class, "127.0.0.1", 9999);
System.out.println(studentService.getInfo());
}
}
项目地址
https://github.com/h-dj/Spring-Learning