一、RPC 基本介绍
- RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图 RPC标准流程)
过程:
- 调用者(Caller),调用远程API(Remote API)
- 调用远程API会通过一个RPC代理(RpcProxy)
- RPC代理再去调用RpcInvoker(这个是PRC的调用者)
- RpcInvoker通过RPC连接器(RpcConnector)
- RPC连接器用两台机器规定好的PRC协议(RpcProtocol)把数据进行编码
- 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)
- PRC接收器通过PRC协议进行解码拿到数据
- 然后将数据传给RpcProcessor
- RpcProcessor再传给RpcInvoker
- RpcInvoker调用Remote API
- 最后推给 被调用者(Callee)
- 常见的 RPC 框架有:比较知名的如阿里的 Dubbo、Google 的 gRPC、Go 语言的 rpcx、Apache 的 thrift,Spring 旗下的 SpringCloud。
二、RPC调用流程落地常用实现
一次完整的RPC调用流程(同步)如下:
1、消费方调用 以本地调用 的方式调用服务。(本地调用的方式例如dubbo调用的本地jar 包)
2、client stub 接收到调用方后,负责将方法、参数等组装成能够进行网络传输的消息体(dubbo 就是序列化)
3、client stub 找到服务器地址,并将消息发送到服务器端;
4、server stub 收到消息后进行解码;
5、server stub 根据解码结果调用本地服务;
6、本地服务执行并将结果返回给server stub;
7、server stub 将返回结果打包成消息并发送给消费方
8、client stub接收到消息,并进行解码
9、服务消费方得到最终结果。
RPC框架的目标就是实现2~8这些步骤,将其封装起来,这些细节对用户来说是透明的,不可见的。
stub 可以理解为是一个代理对象
三、自己实现 dubbo RPC(基于Netty)
3.1 需求说明
- Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.25.Final
3.2 设计说明
创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
- 开发的分析图
3.3 代码实现
3.3.1 封装的RPC部分
NettyClient
public class NettyClient {
/**
* 创建线程池
*/
private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler clientHandler;
// 编写方法使用代理模式,获取一个代理对象
public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
// 获取方法的执行结果
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 每调用一次hello,就会进入到该代码
if (clientHandler == null) {
initClient();
}
// 设置要发给服务器端的信息 providerName就是协议头(HelloService#hello#),arg[0]就是hello方法的参数
clientHandler.setParam(providerName + args[0]);
// 返回一个代理对象
return executor.submit(clientHandler).get();
}
});
}
// 初始化客户端
private static void initClient() {
clientHandler = new NettyClientHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();
// 不能关闭
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context; // 上下文
private String result; // 返回的结果
private String param; // 客户端调用方法时,传入参数
/**
* (1)
* 与服务器的链接创建后,就会被调用,这个方法被第一个调用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("NettyClientHandler channelActive 被调用");
context = ctx; // 在其他方法会使用到 ctx
}
/**
* (4)
* 收到服务器的数据后,调用方法
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("NettyClientHandler channelRead 被调用");
result = msg.toString();
notify(); // 唤醒等待的线程
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("NettyClientHandler exceptionCaught 被调用");
cause.printStackTrace();
ctx.close();
}
/**
* (3)--- 被唤醒后 --> (5)
* 被 代理对象调用,发送数据给服务器,--> wait --> 等待被唤醒(channelRead) --> 返回结果
*
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
System.out.println("NettyClientHandler call(1) 被调用");
context.writeAndFlush(param);
// 等待 channelRead 方法获取到服务器等结果后,唤醒
wait();
// 服务器返回的结果
System.out.println("NettyClientHandler call(2) 被调用");
return result;
}
/**
* (2)
*
* @param param
*/
void setParam(String param) {
System.out.println("NettyClientHandler setParam 被调用");
this.param = param;
}
}
NettyServer
public class NettyServer {
public static void startServer(String hostName, int port) {
startServer0(hostName, port);
}
// 编写一个方法,完成对 NettyServer 的初始化和启动
private static void startServer0(String hostname, int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 业务处理器
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
System.out.println("服务提供方开始 提供服务........");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取客户端发送的消息,并调用服务
System.out.println("msg" + msg);
// 客户端在调用服务器的 api 时,我们需要定义一个协议
// 比如我们要求 每次发消息都必须以某个字符串开头 "HelloService#hello#"
if(msg.toString().startsWith("HelloService#hello#")){
// 服务端调用 服务端的方法
String response = (new HelloServiceImpl()).hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(response);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
3.3.2 接口
HelloService
/**
* @description: 这个接口是服务提供方和服务消费方都需要的
* @Author: wangchao
* @Date: 2022/1/6
*/
public interface HelloService {
String hello(String msg);
}
3.3.3 提供端(provider)
HelloServiceImpl
public class HelloServiceImpl implements HelloService {
// 验证每次调用的是不是用一个 service
int count = 0;
// 当有消费方调用该方法时,就返回一个结果
@Override
public String hello(String msg) {
System.out.println("收到客户端消息=" + msg);
// 根据 msg 返回不同的结果
if (msg != null) {
return "你好客户端,我已经收到你的消息[" + msg + "] 第" + (++count) + "次";
} else {
return "你好客户端,我已经收到你的消息 ";
}
}
}
ServerBootstrap
/**
* @description: 会启用一个服务的提供者,就是NettyServer
* @Author: wangchao
* @Date: 2022/1/6
*/
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1",8989);
}
}
3.3.4 消费端(customer)
ClientBootstrap
public class ClientBootstrap {
public static final String providerName = "HelloService#hello#";
public static void main(String[] args) throws InterruptedException {
// 创建一个消费者
NettyClient customer = new NettyClient();
// 创建代理对象
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
for (; ; ) {
// 验证每次调用的是不是用一个 service
// 通过代理对象调用服务提供者的方法
String hello = service.hello("你好 dubbo~");
System.out.println("调用结果 res=" + hello);
Thread.sleep(10 * 1000);
}
}
}
3.3.5 效果
调用方
提供方