常用的RPC框架包括 Dubbo、SpringCloud、Rpcx等,RPC框架中服务端也称为提供者
RPC远程调用过程
调用流程说明:
1、服务消费方(client)以本地调用方式调用服务
2、client stub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
3、client stub将消息进行编码并发送到服务端
4、server stub收到消息后进行解码
5、server stub根据解码结果调用本地的服务
6、本地服务执行并将结果返回给server stub
7、server stub将返回导入结果进行编码并发送至消费方
8、client stub接收到消息并进行解码
9、服务消费方(client)得到结果
RPC的目标就是将2~8这些步骤都封装起来,用户无需关心其细节,只需要像调用本地方法一样即可完成远程服务调用
自定义Dubbo RPC:
模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据
![@H3(XLE]_HRE%1$25OV2I9.png
自定义RPC框架设计说明
设计说明:
1、创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部使用Netty请求,提供者返回数据
代码实现:
公共接口:
package dubborpc.publicinterface;
/* 服务提供方和消费方都需要的接口 */
public interface HelloService {
String hello(String msg);
}
package dubborpc.provider;
import dubborpc.publicinterface.HelloService;
public class HelloServiceImpl implements HelloService {
//当有消费方调用该方法时返回结果
@Override
public String hello(String msg) {
System.out.println("收到客户端消息="+msg);
//根据message返回不同的结果
if(msg!=null){
return "客户端已收到你的消息["+msg+"]";
}else{
return "客户端已收到你的空消息";
}
}
}
服务器端:
package dubborpc.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
public static void startServer(String hostname,int port){
startServer0(hostname, port);
}
//编写一个方法,完成对NettyServer的初始化与启动工作
private static void startServer0(String hostname,int port){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new NettyServerHandler()); //自定义业务处理器
}
});
ChannelFuture channelFuture = bootstrap.bind(hostname, port).sync();
System.out.println("服务提供方开始提供服务...");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
package dubborpc.netty;
import dubborpc.provider.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/* 服务器端Handler */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取客户端发送的消息并调用服务
System.out.println("msg="+msg);
//客户端在调用服务器API时走自定义协议,例如要求每次发送的数据必须以 "HELLORPC#"开头
if(msg.toString().startsWith("HELLORPC#")){
String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
package dubborpc.provider;
import dubborpc.netty.NettyServer;
/* ServerBootstrap会启动一个服务提供者,就是NettyServer */
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1",7000);
}
}
客户端:
package dubborpc.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,10,
200, TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<>(10));
private static NettyClientHandler client;
/*
* 编写方法使用代理模式,获取一个代理对象
*/
public Object getBean(final Class<?> serviceClass,final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass},(proxy, method, args) -> {
//{} 部分的代码,客户端每调用一次hello就会进入到该代码
if(client ==null){
initclient(); //进行初始化
}
/*设置要发给服务器端的信息
* @param providerName: 协议头
* @param args: 客户端低啊用api hello(xxx)方法的参数
*/
client.setPara(providerName+args[0]);
return threadPool.submit(client).get(); //返回服务器返回的结果
});
}
//初始化客户端
private static void initclient(){
client = new NettyClientHandler();
//创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true) //TCP不延时
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(client);
}
});
try {
bootstrap.connect("127.0.0.1", 7000).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package dubborpc.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context; //上下文
private String result; //返回结果
private String para; //客户端调用方法时传入的参数
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx; //在其他方法中会使用到上下文
}
/* 处于读取状态时将被调用 */
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = (String) msg;
notify(); //接收到服务器返回结果,唤醒等待的call线程
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
//被代理对象调用,发送数据给服务器后等待被唤醒
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(para);
wait();//等待服务器返回结果后被唤醒
return result;
}
//设置参数
void setPara(String para){
this.para = para;
}
}
package dubborpc.customer;
import dubborpc.netty.NettyClient;
import dubborpc.publicinterface.HelloService;
public class ClientBootstrap {
public static final String providerName = "HELLORPC#";//定义协议头
public static void main(String[] args) {
//创建消费者
NettyClient customer = new NettyClient();
//创建代理对象
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
//通过代理对象调用服务提供者的方法
String res = service.hello("你好,自定义Dubbo~");
System.out.println("调用的结果="+res);
}
}
测试:
启动两个Bootstrap服务,自定义RPC框架成功实现通信