常用的RPC框架包括 Dubbo、SpringCloud、Rpcx等,RPC框架中服务端也称为提供者
RPC远程调用过程
调用流程说明:
1、服务消费方(client)以本地调用方式调用服务
2、client stub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
3、client stub将消息进行编码并发送到服务端
4、server stub收到消息后进行解码
5、server stub根据解码结果调用本地的服务
6、本地服务执行并将结果返回给server stub
7、server stub将返回导入结果进行编码并发送至消费方
8、client stub接收到消息并进行解码
9、服务消费方(client)得到结果
RPC的目标就是将2~8这些步骤都封装起来,用户无需关心其细节,只需要像调用本地方法一样即可完成远程服务调用
自定义Dubbo RPC:
模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据
![@H3(XLE]_HRE%1$25OV2I9.png
自定义RPC框架设计说明
设计说明:
1、创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部使用Netty请求,提供者返回数据
代码实现:
公共接口:
package dubborpc.publicinterface;/* 服务提供方和消费方都需要的接口 */public interface HelloService {String hello(String msg);}
package dubborpc.provider;import dubborpc.publicinterface.HelloService;public class HelloServiceImpl implements HelloService {//当有消费方调用该方法时返回结果@Overridepublic String hello(String msg) {System.out.println("收到客户端消息="+msg);//根据message返回不同的结果if(msg!=null){return "客户端已收到你的消息["+msg+"]";}else{return "客户端已收到你的空消息";}}}
服务器端:
package dubborpc.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class NettyServer {public static void startServer(String hostname,int port){startServer0(hostname, port);}//编写一个方法,完成对NettyServer的初始化与启动工作private static void startServer0(String hostname,int port){EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringEncoder());pipeline.addLast(new StringDecoder());pipeline.addLast(new NettyServerHandler()); //自定义业务处理器}});ChannelFuture channelFuture = bootstrap.bind(hostname, port).sync();System.out.println("服务提供方开始提供服务...");channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}
package dubborpc.netty;import dubborpc.provider.HelloServiceImpl;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/* 服务器端Handler */public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 获取客户端发送的消息并调用服务System.out.println("msg="+msg);//客户端在调用服务器API时走自定义协议,例如要求每次发送的数据必须以 "HELLORPC#"开头if(msg.toString().startsWith("HELLORPC#")){String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));ctx.writeAndFlush(result);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
package dubborpc.provider;import dubborpc.netty.NettyServer;/* ServerBootstrap会启动一个服务提供者,就是NettyServer */public class ServerBootstrap {public static void main(String[] args) {NettyServer.startServer("127.0.0.1",7000);}}
客户端:
package dubborpc.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.lang.reflect.Proxy;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class NettyClient {private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,10,200, TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));private static NettyClientHandler client;/** 编写方法使用代理模式,获取一个代理对象*/public Object getBean(final Class<?> serviceClass,final String providerName){return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy, method, args) -> {//{} 部分的代码,客户端每调用一次hello就会进入到该代码if(client ==null){initclient(); //进行初始化}/*设置要发给服务器端的信息* @param providerName: 协议头* @param args: 客户端低啊用api hello(xxx)方法的参数*/client.setPara(providerName+args[0]);return threadPool.submit(client).get(); //返回服务器返回的结果});}//初始化客户端private static void initclient(){client = new NettyClientHandler();//创建EventLoopGroupEventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true) //TCP不延时.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(client);}});try {bootstrap.connect("127.0.0.1", 7000).sync();} catch (InterruptedException e) {e.printStackTrace();}}}
package dubborpc.netty;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.Callable;public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {private ChannelHandlerContext context; //上下文private String result; //返回结果private String para; //客户端调用方法时传入的参数@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {context = ctx; //在其他方法中会使用到上下文}/* 处于读取状态时将被调用 */@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {result = (String) msg;notify(); //接收到服务器返回结果,唤醒等待的call线程}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}//被代理对象调用,发送数据给服务器后等待被唤醒@Overridepublic synchronized Object call() throws Exception {context.writeAndFlush(para);wait();//等待服务器返回结果后被唤醒return result;}//设置参数void setPara(String para){this.para = para;}}
package dubborpc.customer;import dubborpc.netty.NettyClient;import dubborpc.publicinterface.HelloService;public class ClientBootstrap {public static final String providerName = "HELLORPC#";//定义协议头public static void main(String[] args) {//创建消费者NettyClient customer = new NettyClient();//创建代理对象HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);//通过代理对象调用服务提供者的方法String res = service.hello("你好,自定义Dubbo~");System.out.println("调用的结果="+res);}}
测试:
启动两个Bootstrap服务,自定义RPC框架成功实现通信
