RPC 基本介绍
- RPC(Remote Procedure Call)— 远程过程调用, 是一个计算机通信协议, 该协议允许运行于一台计算机的程序调用另一台计算机的子程序, 而程序员无需额外的为这个交互过程编程
 - 两个或多个应用程序都分布在不同的服务器上,他们之间的调用都像是本地方法调用一样(如图)
 

- 常见的RPC框架有: 比较知名的阿里的Dubbo,Nacos,Google的gRpc, Go语言的rpcx, Apache的thrift, Spring的SpringCloud(Eureka)
 
RPC调用流程图

术语说明: 
在RPC中, Client端叫做服务消费者, Server端叫做服务提供者
RPC调用流程说明
- 服务消费方(Client) 以本地调用方式调用服务
 - ClientStub接收到调用后负责将该方法,参数等封装成能够进行网络传输的消息体
 - ClientStub将消息进行编码并发送到服务器端
 - ServerStub收到消息后进行解码
 - ServerStub根据解码结果调用本地的服务
 - 本地服务执行并将结果返回给ServerStub
 - ServerStub将返回结果进行编码并发送至消费方
 - ClientSub接收到消息并进行解码
 - 服务消费方(client)得到结果
 
小结: RPC的目标就是将2-8这些步骤都封装起来, 用户无需关心这些细节, 可以像调用本地方法一样即可完成远程服务的服务调用
基于Netty 实现Dubbo RPC
需求说明
- Dubbo底层使用了Netty作为网络通讯框架, 要求用Netty实现一个简单的RPC框架
 模仿Dubbo, 消费者和提供者约定接口和协议, 消费者远程调用提供者服务, 提供者返回一个字符串, 消费者打印提供者返回的数据, 底层通讯使用Netty 4.1.20
设计说明
创建一个接口, 定义抽象方法, 用于消费者和提供者之间的约定
- 创建一个提供者, 该类需要监听消费者的请求, 并按照约定返回数据
 - 创建一个消费者, 该类需要透明的调用自己不存在的方法, 内部使用Netty请求提供者返回数据
 - 开发分析图
 
代码实现
定义接口
package com.dance.netty.netty.dubbo.common.api;/*** 对外提供服务的Service接口*/public interface HelloService {String printHello(String msg);}
新建接口实现类(作为提供者)
package com.dance.netty.netty.dubbo.provider.service.impl;import com.dance.netty.netty.dubbo.common.api.HelloService;public class HelloServiceImpl implements HelloService {private static final HelloService helloService = new HelloServiceImpl();@Overridepublic String printHello(String msg) {System.out.println("接收到参数: " + msg);return "hello msg!";}private HelloServiceImpl(){}public static HelloService getInstance(){return helloService;}}
新建NettyServer
package com.dance.netty.netty.dubbo.common.netty;
import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.StandardCharsets;
public class NettyServer {
    public static void startServer0() {
        startServer0("127.0.0.1", 7000);
    }
    public static void startServer0(String hostname, int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 使用字符串编解码器
                            ch.pipeline()
                                    .addLast(new StringDecoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture sync = serverBootstrap.bind(hostname, port).sync();
            System.out.println("netty server is starting, ip: " + hostname + ", port: " + port);
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
新建NettyServerHandler
package com.dance.netty.netty.dubbo.common.netty;
import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("有人注册");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String msgStr = msg.toString();
        // 获取客户端发送的数据
        System.out.println("msg is " + msgStr);
        // 客户端在调用服务器的API的时候, 我们可以自定义一个协议
        // 比如我们要求 每次发送消息都必须以固定格式开头 比如
        // 服务名称#方法名称#参数
        // 例如: HelloService#printHello#this is msg
        if(msgStr.startsWith("HelloService#printHello#")){
            // 传入参数的时候 去除协议头
            ctx.writeAndFlush(HelloServiceImpl.getInstance().printHello(msgStr.substring(msgStr.lastIndexOf('#') + 1)));
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
新建ServerBootstrap
package com.dance.netty.netty.dubbo.provider.server;
import com.dance.netty.netty.dubbo.common.netty.NettyServer;
/**
 * 服务启动类
 */
public class ServerBootstrap {
    public static void main(String[] args) {
        NettyServer.startServer0();
    }
}
新建NettyClientHandler
package com.dance.netty.netty.dubbo.common.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.SocketAddress;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> {
    /**
     * 上下文对象
     */
    private  ChannelHandlerContext context;
    /**
     * 远程调用的返回结果
     */
    private String result;
    public void setParams(String params) {
        this.params = params;
    }
    /**
     * 客户端调用方法时传入的参数
     */
    private String params;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 在其他方法会使用
        context = ctx;
    }
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        // 唤醒在这个方法上等待的线程
        notify();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    /**
     * 被代理对象调用, 发送数据给服务器 => wait => 等待被读取唤醒(也就是服务器有数据回送时) => 返回结果
     *  其实就是将异步通过 wait 和 notify 变成了同步等待
     * @return 返回结果
     * @throws Exception 线程异常
     */
    @Override
    public synchronized String call() throws Exception {
        // 发送参数到 服务器
        context.writeAndFlush(params);
        // 等待read读取到数据
        wait();
        // 返回响应的数据
        return result;
    }
}
新建NettyClient
package com.dance.netty.netty.dubbo.common.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
    /**
     * 创建线程池
     */
    private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    /**
     * 用于执行远程调用的处理器
     */
    private static NettyClientHandler nettyClientHandler;
    // 编写方法使用代理模式, 获取一个代理对象
    public Object getBean(final Class<?> serviceClass, final String protocol){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {
            if (nettyClientHandler == null) {
                initClient();
            }
            // 设置要发给服务器端的信息
            // 采用 协议头 + 参数[0] 格式
            nettyClientHandler.setParams(protocol + args[0]);
            // 线程池提交一个任务
            return executorService.submit(nettyClientHandler).get();
        });
    }
    private static void initClient(){
        nettyClientHandler = new NettyClientHandler();
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast("stringDecoder", new StringDecoder())
                                    .addLast("stringEncoder", new StringEncoder())
                                    .addLast("nettyClientHandler", nettyClientHandler);
                        }
                    });
            bootstrap.connect("127.0.0.1", 7000).sync();
            // 注意 Client端不能阻塞
//            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 重点 千万不要关闭
//            eventExecutors.shutdownGracefully();
        }
    }
}
新建ClientBootstrap
package com.dance.netty.netty.dubbo.consumer.client;
import com.dance.netty.netty.dubbo.common.api.HelloService;
import com.dance.netty.netty.dubbo.common.netty.NettyClient;
public class ClientBootstrap {
    private static final String PROTOCOL = "HelloService#printHello#";
    public static void main(String[] args) {
        // 创建一个消费者
        NettyClient nettyClient = new NettyClient();
        // 创建代理对象
        HelloService helloService = (HelloService) nettyClient.getBean(HelloService.class, PROTOCOL);
        // 通过代理对象调用服务提供者的方法(服务)
        String result = helloService.printHello("hi dubbo rpc");
        System.out.println("调用方法执行返回结果: " + result);
    }
}
测试
Server端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
netty server is starting, ip: 127.0.0.1, port: 7000
msg is HelloService#printHello#hi dubbo rpc
接收到参数: hi dubbo rpc
Client端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
调用方法执行返回结果: hello msg!
我们只通过接口就调用到了提供者的提供的接口,实现了通过Netty完成了RPC的远程调用

