基本原理 :要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络 通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输 协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输 协议,网络IO,主要有bio、nio、aio三种方式,所有的分布式应用通讯都基于这个原理而实现.
什么是RPC :RPC全称为remote procedure call,即远程过程调用。借助RPC可以做到像本地调用一样去调用远程服 务,是一种进程间的通信方式. 比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用 B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来 表达调用的语义和传达调用的数据。需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调 用过程
RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。
1、客户端(Client),服务的调用方。
2、客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后 通过网络远程发送给服务方。
3、服务端(Server),真正的服务提供者。
4、服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。
1. 客户端(client)以本地调用方式(即以接口的方式)调用服务;
2. 客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体 (将消息体对象序列化为二进制);
3. 客户端通过socket将消息发送到服务端;
4. 服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
5. 服务端存根( server stub)根据解码结果调用本地的服务;
6. 服务处理
7. 本地服务执行并将结果返回给服务端存根( server stub);
8. 服务端存根( server stub)将返回结果打包成消息(是要编码的)(将结果消息对象序列化);
9. 服务端(server)通过socket将消息发送到客户端;
10. 客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
11. 客户端(client)得到最终结果。
RPC的目标是要把2、3、4、5、7、8、9、10这些步骤都封装起来。只剩下1、6、11
注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要 将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。 在java中RPC框架比较多,常见的有Hessian、gRPC、Dubbo 等,其实对 于RPC框架而言,核心模块 就是通讯和序列化
RMI
Java RMI,即远程方法调用(Remote Method Invocation),一种用于实现远程过程调用(RPCRemote procedure call)的Java API, 能直接传输序列化后的Java对象。它的实现依赖于Java虚拟机,因此它仅支持从一个JVM到另一个JVM的调用。 (必须是java的语言)
1.客户端从远程服务器的注册表中查询并获取远程对象引用。
2.桩对象与远程对象具有相同的接口和方法列表,当客户端调用远程对象时,实际上是由相应的桩 对象代理完成的。
3.远程引用层在将桩的本地引用转换为服务器上对象的远程引用后,再将调用传递给传输层 (Transport),由传输层通过TCP协议发送调用;
4.在服务器端,传输层监听入站连接,它一旦接收到客户端远程调用后,就将这个引用转发给其上 层的远程引用层;
5)服务器端的远程引用层将客户端发送的远程应用转换为本地虚拟机的引用 后,再将请求传递给骨架(Skeleton); 6)骨架读取参数,又将请求传递给服务器,最后由服务 器进行实际的方法调用。
5.如果远程方法调用后有返回值,则服务器将这些结果又沿着“骨架->远程引用层->传输层”向下传 递;
6.客户端的传输层接收到返回值后,又沿着“传输层->远程引用层->桩”向上传递,然后由桩来反序 列化这些返回值,并将最终的结果传递给客户端程序。
需求分析:
1. 服务端提供根据ID查询用户的方法 (要统一的接口进行远程通讯)
2. 客户端调用服务端方法, 并返回用户对象
3. 要求使用RMI进行远程通信 (要RMI的服务端和客户端)
创建用户对象
/*因为需要在网络中心进行传输,所以需要实现序列化接口*/
public class User implements Serializable {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
创建其对应的接口及实现类
/*必须是Remote类型的*/
public interface IUserService extends Remote {
User getByUserId(int id) throws RemoteException;
}
/*实现类也必须是要实现UnicastRemoteObject */
public class UserServiceImpl extends UnicastRemoteObject implements IUserService {
Map<Object, User> userMap = new HashMap();
public UserServiceImpl() throws RemoteException {
super();
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
@Override
public User getByUserId(int id) throws RemoteException {
return userMap.get(id);
}
}
创建服务端
public class RMIServer {
public static void main(String[] args) {
try {
//1.注册Registry实例. 绑定端口
Registry registry = LocateRegistry.createRegistry(9998);
//2.创建远程对象
IUserService userService = new UserServiceImpl();
//3.将远程对象注册到RMI服务器上即(服务端注册表上)
registry.rebind("userService", userService);
System.out.println("---RMI服务端启动成功----");
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
创建客户端
public class RMIClient {
public static void main(String[] args) throws RemoteException, NotBoundException {
//1.获取Registry实例(输入服务端的ip及端口号)
Registry registry = LocateRegistry.getRegistry("127.0.0.1", 9998);
//2.通过Registry实例查找远程对象
IUserService userService = (IUserService) registry.lookup("userService");
User user = userService.getByUserId(2);
System.out.println(user.getId() + "----" + user.getName());
}
}
基于Netty实现RPC框架
需求介绍 : dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架,消费者和提 供者约定接口和协议,消费者远程调用提供者的服务,
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定,
2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
3. 创建一个消费者,该类需要透明的调用自己不存在的方法(就是远程调用),内部需要使用 Netty 进行数据通信
4. 提供者与消费者数据传输使用json字符串数据格式
5. 提供者使用netty集成spring boot 环境实现
案例: 客户端远程调用服务端提供根据ID查询user对象的方法.
服务接口代码实现
1、引入依赖
<!--netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!--json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<!--lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
2、创建远程服务接口
/**
* 用户服务
*/
public interface IUserService {
/**
* 根据ID查询用户
*
* @param id
* @return
*/
User getById(int id);
}
3、封装请求和响应对象
因为要进行远程调用,是经过网络传输,那么将传输的信息转为对象,以字节码进行传输
/**
* 封装的请求对象
*/
@Data
public class RpcRequest {
/**
* 请求对象的ID
*/
private String requestId;
/** 请求服务的那个类
* 类名
*/
private String className;
/**请求服务的那个方法
* 方法名
*/
private String methodName;
/**
* 参数类型
*/
private Class<?>[] parameterTypes;
/**
* 入参
*/
private Object[] parameters;
}
/////////////////////////////////////////////////////////////////////
/**
* 封装的响应对象
*/
@Data
public class RpcResponse {
/**
* 响应ID
*/
private String requestId;
/**
* 错误信息
*/
private String error;
/**
* 返回的结果
*/
private Object result;
}
4、传递的实体对象
/*传递的对象*/
@Data
public class User {
private int id;
private String name;
}
服务端代码实现
1、添加依赖
<!--引入接口服务-->
<dependency>
<groupId>com.lagou</groupId>
<artifactId>lg-rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--spring相关依赖 -->
<!--Spring的上下文-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<!--自动配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
2、自定义暴露注解
/**编写这个注解,然后如果添加上这个注解,
* 无论是类还是接口,都是可以对外暴露,实现对外提供服务的了
* 对外暴露服务接口
*/
@Target(ElementType.TYPE) // 用于接口和类上
@Retention(RetentionPolicy.RUNTIME)// 在运行时可以获取到
public @interface RpcService {
}
3、编写提供服务实现类
@RpcService /*进行对外暴露*/
@Service /*将实现类加入到ioc容器当中*/
public class UserServiceImpl implements IUserService {
Map<Object, User> userMap = new HashMap();
@Override
public User getById(int id) {
if (userMap.size() == 0) {
User user1 = new User();
user1.setId(1);
user1.setName("张三");
User user2 = new User();
user2.setId(2);
user2.setName("李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
return userMap.get(id);
}
}
4、编写远程服务启动类
处理服务器的启动,线程管理等
/**
* 启动类,
* 实现DisposableBean接口,主要是为了当ioc容器销毁的时候
* 执行的它一个destroy方法,在里面可以关闭线程组
*
* 服务端写好之后,可以在需要调用的地方进行服务注入
*/
@Service /*也暴露ioc中*/
public class RpcServer implements DisposableBean {
/*需要两个线程组,在销毁的时候要进行关闭*/
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
@Autowired
RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) {
try {
//1. 创建线程组
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
//2. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3. 设置参数
serverBootstrap.group(bossGroup, workerGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class)// 设置通道的实现
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception { //通道初始化对象
// 通过channel得到pipeline
ChannelPipeline pipeline = channel.pipeline();
//添加String的编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//业务处理类
pipeline.addLast(rpcServerHandler);
}
});
//4.绑定端口(异步改同步)
ChannelFuture sync = serverBootstrap.bind(ip, port).sync();
System.out.println("==========服务端启动成功==========");
// 监听通道关闭的状态
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
5、编写服务业务处理类
处理真正的业务逻辑
/**
* 服务端业务处理类
* 1.将标有@RpcService注解的bean缓存(缓存目的:当客户端再次去调用给的时候可以缓存当中拿到对外暴露的处理类)
* 2.接收客户端请求
* 3.根据传递过来的beanName从缓存中查找到对应的bean
* 4.解析请求中的方法名称. 参数类型 参数信息
* 5.反射调用bean的方法
* 6.给客户端进行响应
* 客户端&服务端是以String传递,所以泛型就是String
*
*通过SimpleChannelInboundHandler实现消息入站
*/
@Component /*要被Spring管理*/
@ChannelHandler.Sharable /*通道实现共享,否则只能第一个上来可以*/
public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
/*定义一个缓存集合进行缓存bean对象*/
private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();
/**
* 1.将标有@RpcService注解的bean缓存
* 那就需要从容器找到这样的注解哦,那么首先就得从容器中找到这样的对象
* 要想找到:就必须实现ApplicationContextAware接口,实现setApplicationContext方法,在
* 方法里面找到标有自定义注解的对象了
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
/*通过容器对象,查找添加有RpcService注解的bean对象*/
Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceMap != null && serviceMap.size() > 0) {
/*如果进来说明找到了标RpcService的注解*/
Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
/*先进行遍历*/
for (Map.Entry<String, Object> item : entries) {
/*这个serviceBean就是具体的bean对象(UserServiceImpl)*/
Object serviceBean = item.getValue();
/*再进一步判断是否实现接口,如果没有实现接口,则长度是0的*/
if (serviceBean.getClass().getInterfaces().length == 0) {
throw new RuntimeException("服务必须实现接口");
}
/*如果实现了接口,(有可能实现多个接口哦)默认取第一个进行对外暴露*/
//默认取第一个接口作为缓存bean的名称
String name = serviceBean.getClass().getInterfaces()[0].getName();
/*放入缓存信息,把bean的名称放入即可,value是真实的名称是serviceBean*/
SERVICE_INSTANCE_MAP.put(name, serviceBean);
}
}
}
/**
* 通道读取就绪事件
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
//1.接收客户端请求- 将msg(json的字符串)转化RpcRequest对象
RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());// 请求id是一样的
try {
//业务处理
rpcResponse.setResult(handler(rpcRequest));
} catch (Exception exception) {
exception.printStackTrace();
rpcResponse.setError(exception.getMessage());
}
//6.给客户端进行响应(需要将json对象转换为字符串)
channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
}
/**
* 业务处理逻辑
* 就是根据传过来的参数信息,进行实质的调用
* @return
*/
public Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
// 3.根据传递过来的beanName从缓存中查找到对应的bean(都是以接口名作为bean的名字)
Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
if (serviceBean == null) {
throw new RuntimeException("根据beanName找不到服务,beanName:" + rpcRequest.getClassName());
}
//4.解析请求中的方法名称. 参数类型 参数信息
Class<?> serviceBeanClass = serviceBean.getClass();//通过bean获取class的类型
String methodName = rpcRequest.getMethodName();//获取请求方法的名称
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();//获取请求参数类型
Object[] parameters = rpcRequest.getParameters();//获取具体的参数
//5.反射调用bean的方法- CGLIB反射调用
FastClass fastClass = FastClass.create(serviceBeanClass);//得到实例对象
FastMethod method = fastClass.getMethod(methodName, parameterTypes);// 传入方法名称和参
/*这个就是去执行实质的调用了*/
return method.invoke(serviceBean, parameters); //invoke要返回具体的值(bean的名称,参数)
}
}
6、如何启动Spring容器及服务
/*同时服务端的启动是,先启动Springboot,在启动rpc服务,
* 因为rpcServer是一个线程,所以可以通过线程启动的方式来启动,那么就需要继承CommandLineRunner接口
* 实现run方法,然后再run方法里面进行启动,
* 那么run方法里面如何启动呢,先将RpcServer注入,
* 然后再run方法里面启动一个线程(创建一个线程),new Runnable(),然后实现里面的run方法。
* 将run进行启动*/
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
// 在方法里面可以传入端口和ip
rpcServer.startServer("127.0.01", 8899);
}
}).start();
}
}
客户端代码实现
1、把接口依赖加入
<dependency>
<groupId>com.lagou</groupId>
<artifactId>lg-rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2、客户端初始化
/**
* 客户端
* 1.连接Netty服务端
* 2.提供给调用者主动关闭资源的方法(什么时候资源调用完了就可以关闭了)
* 3.给提供者,提供消息发送的方法(通过这个方法才能将消息发送到服务端)
*/
public class RpcClient {
private EventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler();
/*定义一个线程池*/
private ExecutorService executorService = Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
/**
* 初始化方法-连接Netty服务端
*/
public void initClient() {
try {
//1.创建线程组
group = new NioEventLoopGroup();
//2.创建启动助手
Bootstrap bootstrap = new Bootstrap();
//3.设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//String类型编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加客户端处理类
pipeline.addLast(rpcClientHandler);
}
});
//4.连接Netty服务端
channel = bootstrap.connect(ip, port).sync().channel();
} catch (Exception exception) {
exception.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
/**
* 提供给调用者主动关闭资源的方法
*/
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 提供消息发送的方法
* 因为rpcClientHandler是一个线程,所以定义一个executorService,
* 将这个线程放到线程池中即可
*/
public Object send(String msg) throws ExecutionException, InterruptedException {
rpcClientHandler.setRequestMsg(msg);//具体要传递的信息通过set方法进行赋值
Future submit = executorService.submit(rpcClientHandler); // 返回Future对象
return submit.get();// 通过这个对象,可以得到返回的具体的值responseMsg
}
}
3、客户端业务处理类
/**
* 客户端处理类
* 1.发送消息(发送完消息后将线程进行等待)
* 2.接收消息(接收完消息后将线程进行唤醒)
* 发送完了什么是时候接收呢,可以通过线程的方式
* 这样就能变成同步的模式 了
*
* 因为是基于线程的方式,所以要实现一个线程的接口Callable,并实现里面的方法call()
* 在call方法里面进行消息的发送,发送出去的消息通过requestMsg提供的set方法进行获取发送的信息
* 发送完后就处于等待状态,那么如何唤醒呢,就在读取里面进行唤醒
*call方法有一个返回值,这个返回值就是服务端发送过来的具体消息,所以就得定义一个responseMsg服务端的消息
*
* 注意:下面的等待和唤醒的时候,需要加上synchronized
*/
public class RpcClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
ChannelHandlerContext context;
//发送的消息
String requestMsg;
//服务端的消息
String responseMsg;
public void setRequestMsg(String requestMsg) {
this.requestMsg = requestMsg;
}
/**
* 通道连接就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
/**
* 通道读取就绪事件
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
responseMsg = msg;
//唤醒等待的线程
notify();
}
/**
* 发送消息到服务端
*
* @return call方法有一个返回值,这个返回值就是服务端发送过来的具体消息
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
//消息发送
context.writeAndFlush(requestMsg);
//线程等待
wait();
return responseMsg;
}
}
4、创建客户端代理类
/**
* 客户端代理类-创建代理对象
* 1.封装request请求对象
* 2.创建RpcClient对象
* 3.发送消息
* 4.返回结果
*/
public class RpcClientProxy {
public static Object createProxy(Class serviceClass) {
//通过他们当前线程获取类加载器
// Class数组
// InvocationHandler
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1.封装request请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);// 具体的参数
//2.创建RpcClient对象
RpcClient rpcClient = new RpcClient("127.0.0.1", 8899);
try {
//3.发送消息
Object responseMsg = rpcClient.send(JSON.toJSONString(rpcRequest));
// 将返回来的信息转换为对象
RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null) {
throw new RuntimeException(rpcResponse.getError());
}
//4.返回结果
Object result = rpcResponse.getResult();
/*因为请求的是User对象,那么返回的也应该是User对象
* 通过method.getReturnType()来获取返回结果的类型
* 将参数信息,和需要转换的类型,就可以将其转换成对应的对象类型了*/
return JSON.parseObject(result.toString(), method.getReturnType());
} catch (Exception e) {
throw e;
} finally {
rpcClient.close();
}
}
});
}
}
5、客户端测试类
public class ClientBootStrap {
public static void main(String[] args) {
IUserService userService = (IUserService) RpcClientProxy.createProxy(IUserService.class);
// 通过代理类得到的代理对象来获取信息,返回的也是User对象
User user = userService.getById(1);
System.out.println(user);
}
}