一、RPC 基本介绍

  1. RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  2. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图 RPC标准流程)

image.png
过程:

  1. 调用者(Caller),调用远程API(Remote API)
  2. 调用远程API会通过一个RPC代理(RpcProxy)
  3. RPC代理再去调用RpcInvoker(这个是PRC的调用者)
  4. RpcInvoker通过RPC连接器(RpcConnector)
  5. RPC连接器用两台机器规定好的PRC协议(RpcProtocol)把数据进行编码
  6. 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)
  7. PRC接收器通过PRC协议进行解码拿到数据
  8. 然后将数据传给RpcProcessor
  9. RpcProcessor再传给RpcInvoker
  10. RpcInvoker调用Remote API
  11. 最后推给 被调用者(Callee)
  12. 常见的 RPC 框架有:比较知名的如阿里的 Dubbo、Google 的 gRPC、Go 语言的 rpcx、Apache 的 thrift,Spring 旗下的 SpringCloud。

image.png

二、RPC调用流程落地常用实现

image.png
image.png
一次完整的RPC调用流程(同步)如下:
1、消费方调用 以本地调用 的方式调用服务。(本地调用的方式例如dubbo调用的本地jar 包)
2、client stub 接收到调用方后,负责将方法、参数等组装成能够进行网络传输的消息体(dubbo 就是序列化)
3、client stub 找到服务器地址,并将消息发送到服务器端;
4、server stub 收到消息后进行解码;
5、server stub 根据解码结果调用本地服务;
6、本地服务执行并将结果返回给server stub;
7、server stub 将返回结果打包成消息并发送给消费方
8、client stub接收到消息,并进行解码
9、服务消费方得到最终结果。
RPC框架的目标就是实现2~8这些步骤,将其封装起来,这些细节对用户来说是透明的,不可见的。

stub 可以理解为是一个代理对象

三、自己实现 dubbo RPC(基于Netty)

3.1 需求说明

  1. Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  2. 模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.25.Final

    3.2 设计说明

  3. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

  4. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  5. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
  6. 开发的分析图

image.png

3.3 代码实现

3.3.1 封装的RPC部分

NettyClient

  1. public class NettyClient {
  2. /**
  3. * 创建线程池
  4. */
  5. private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  6. private static NettyClientHandler clientHandler;
  7. // 编写方法使用代理模式,获取一个代理对象
  8. public Object getBean(final Class<?> serviceClass, final String providerName) {
  9. return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
  10. // 获取方法的执行结果
  11. @Override
  12. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  13. // 每调用一次hello,就会进入到该代码
  14. if (clientHandler == null) {
  15. initClient();
  16. }
  17. // 设置要发给服务器端的信息 providerName就是协议头(HelloService#hello#),arg[0]就是hello方法的参数
  18. clientHandler.setParam(providerName + args[0]);
  19. // 返回一个代理对象
  20. return executor.submit(clientHandler).get();
  21. }
  22. });
  23. }
  24. // 初始化客户端
  25. private static void initClient() {
  26. clientHandler = new NettyClientHandler();
  27. NioEventLoopGroup group = new NioEventLoopGroup();
  28. Bootstrap bootstrap = new Bootstrap();
  29. bootstrap.group(group)
  30. .channel(NioSocketChannel.class)
  31. .option(ChannelOption.TCP_NODELAY, true)
  32. .handler(new ChannelInitializer<SocketChannel>() {
  33. @Override
  34. protected void initChannel(SocketChannel ch) throws Exception {
  35. ChannelPipeline pipeline = ch.pipeline();
  36. pipeline.addLast(new StringDecoder());
  37. pipeline.addLast(new StringEncoder());
  38. pipeline.addLast(clientHandler);
  39. }
  40. });
  41. try {
  42. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();
  43. // 不能关闭
  44. // channelFuture.channel().closeFuture().sync();
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }

NettyClientHandler

  1. public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
  2. private ChannelHandlerContext context; // 上下文
  3. private String result; // 返回的结果
  4. private String param; // 客户端调用方法时,传入参数
  5. /**
  6. * (1)
  7. * 与服务器的链接创建后,就会被调用,这个方法被第一个调用
  8. *
  9. * @param ctx
  10. * @throws Exception
  11. */
  12. @Override
  13. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  14. System.out.println("NettyClientHandler channelActive 被调用");
  15. context = ctx; // 在其他方法会使用到 ctx
  16. }
  17. /**
  18. * (4)
  19. * 收到服务器的数据后,调用方法
  20. *
  21. * @param ctx
  22. * @param msg
  23. * @throws Exception
  24. */
  25. @Override
  26. public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  27. System.out.println("NettyClientHandler channelRead 被调用");
  28. result = msg.toString();
  29. notify(); // 唤醒等待的线程
  30. }
  31. @Override
  32. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  33. System.out.println("NettyClientHandler exceptionCaught 被调用");
  34. cause.printStackTrace();
  35. ctx.close();
  36. }
  37. /**
  38. * (3)--- 被唤醒后 --> (5)
  39. * 被 代理对象调用,发送数据给服务器,--> wait --> 等待被唤醒(channelRead) --> 返回结果
  40. *
  41. * @return
  42. * @throws Exception
  43. */
  44. @Override
  45. public synchronized Object call() throws Exception {
  46. System.out.println("NettyClientHandler call(1) 被调用");
  47. context.writeAndFlush(param);
  48. // 等待 channelRead 方法获取到服务器等结果后,唤醒
  49. wait();
  50. // 服务器返回的结果
  51. System.out.println("NettyClientHandler call(2) 被调用");
  52. return result;
  53. }
  54. /**
  55. * (2)
  56. *
  57. * @param param
  58. */
  59. void setParam(String param) {
  60. System.out.println("NettyClientHandler setParam 被调用");
  61. this.param = param;
  62. }
  63. }

NettyServer

  1. public class NettyServer {
  2. public static void startServer(String hostName, int port) {
  3. startServer0(hostName, port);
  4. }
  5. // 编写一个方法,完成对 NettyServer 的初始化和启动
  6. private static void startServer0(String hostname, int port) {
  7. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  8. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  9. try {
  10. ServerBootstrap serverBootstrap = new ServerBootstrap();
  11. serverBootstrap.group(bossGroup, workerGroup)
  12. .channel(NioServerSocketChannel.class)
  13. .childHandler(new ChannelInitializer<SocketChannel>() {
  14. @Override
  15. protected void initChannel(SocketChannel ch) throws Exception {
  16. ChannelPipeline pipeline = ch.pipeline();
  17. pipeline.addLast(new StringDecoder());
  18. pipeline.addLast(new StringEncoder());
  19. // 业务处理器
  20. pipeline.addLast(new NettyServerHandler());
  21. }
  22. });
  23. ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
  24. System.out.println("服务提供方开始 提供服务........");
  25. channelFuture.channel().closeFuture().sync();
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. } finally {
  29. bossGroup.shutdownGracefully();
  30. workerGroup.shutdownGracefully();
  31. }
  32. }
  33. }

NettyServerHandler

  1. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. // 获取客户端发送的消息,并调用服务
  5. System.out.println("msg" + msg);
  6. // 客户端在调用服务器的 api 时,我们需要定义一个协议
  7. // 比如我们要求 每次发消息都必须以某个字符串开头 "HelloService#hello#"
  8. if(msg.toString().startsWith("HelloService#hello#")){
  9. // 服务端调用 服务端的方法
  10. String response = (new HelloServiceImpl()).hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
  11. ctx.writeAndFlush(response);
  12. }
  13. }
  14. @Override
  15. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  16. super.exceptionCaught(ctx, cause);
  17. }
  18. }

3.3.2 接口

HelloService

  1. /**
  2. * @description: 这个接口是服务提供方和服务消费方都需要的
  3. * @Author: wangchao
  4. * @Date: 2022/1/6
  5. */
  6. public interface HelloService {
  7. String hello(String msg);
  8. }

3.3.3 提供端(provider)

HelloServiceImpl

  1. public class HelloServiceImpl implements HelloService {
  2. // 验证每次调用的是不是用一个 service
  3. int count = 0;
  4. // 当有消费方调用该方法时,就返回一个结果
  5. @Override
  6. public String hello(String msg) {
  7. System.out.println("收到客户端消息=" + msg);
  8. // 根据 msg 返回不同的结果
  9. if (msg != null) {
  10. return "你好客户端,我已经收到你的消息[" + msg + "] 第" + (++count) + "次";
  11. } else {
  12. return "你好客户端,我已经收到你的消息 ";
  13. }
  14. }
  15. }

ServerBootstrap

  1. /**
  2. * @description: 会启用一个服务的提供者,就是NettyServer
  3. * @Author: wangchao
  4. * @Date: 2022/1/6
  5. */
  6. public class ServerBootstrap {
  7. public static void main(String[] args) {
  8. NettyServer.startServer("127.0.0.1",8989);
  9. }
  10. }

3.3.4 消费端(customer)

ClientBootstrap

  1. public class ClientBootstrap {
  2. public static final String providerName = "HelloService#hello#";
  3. public static void main(String[] args) throws InterruptedException {
  4. // 创建一个消费者
  5. NettyClient customer = new NettyClient();
  6. // 创建代理对象
  7. HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
  8. for (; ; ) {
  9. // 验证每次调用的是不是用一个 service
  10. // 通过代理对象调用服务提供者的方法
  11. String hello = service.hello("你好 dubbo~");
  12. System.out.println("调用结果 res=" + hello);
  13. Thread.sleep(10 * 1000);
  14. }
  15. }
  16. }

3.3.5 效果

调用方
image.png
提供方
image.png