FC: function call (寻址调用)
RPC: remote process call (socket)
SC: system call 系统调用(软中断)
IPC: 管道。信号量,socket
https://ke.qq.com/webcourse/index.html#cid=398381&term_id=100475149&taid=10131175016633389&vid=5285890806541162145
观看至: 00:48:00
1. 基于netty的RPC框架的基本实现通讯
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
public class MyRPCTestV1 {
public void startServer() throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventExecutors,eventExecutors);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ServerRequestHandler());
}
});
ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));
bind.sync().channel().closeFuture().sync();
}
/**
* 模拟consumer端
*/
@Test
public void testClient() throws IOException {
new Thread(()->{
try {
startServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("服务端已启动监听...");
//创建20个纤程
Thread[] threads = new Thread[20];
//初始化这20个线程
for (int i = 0; i < threads.length; i++) {
int finalI = i;
threads[i] = new Thread(()->{
ProductService product = proxyGet(ProductService.class);
product.say("hello"+ finalI);
});
}
//启动这20个线程
for (Thread thread : threads) {
thread.start();
}
System.in.read();
}
/**
* 代理请求,使用java原生的代理Proxy类
* @param interfaceInfo
* @param <T>
* @return
*/
private static <T> T proxyGet(Class<T> interfaceInfo) {
ClassLoader classLoader = interfaceInfo.getClassLoader();
Class<?>[] methodInfo = {interfaceInfo};
return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/**
* 1、收集调用目标的服务。方法。参数。 封装成一个message。
*/
String className = interfaceInfo.getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
/**
* 2. 生成requestId再加上message.发送请求 本地缓存requestId
* 自定义协议:
* msg-header:
* msg-content:
*/
ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);
objectOut.writeObject(msgBody);
byte[] msgBodyByteArray = byteArrayOut.toByteArray();
MsgHeader msgHeader = this.createHeaderByMsgBody(msgBodyByteArray);
byteArrayOut.reset();
objectOut = new ObjectOutputStream(byteArrayOut);
objectOut.writeObject(msgHeader);
byte[] msgHeaderByteArray = byteArrayOut.toByteArray();
System.out.printf("客户端:“发送至服务端的消息头大小为:%d”\n", msgHeaderByteArray.length);
System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader.toString());
System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody.toString());
/**
* 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。
*/
ClientFactory clientFactory = ClientFactory.getInstance();
NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));
/**
* 4. 真正发送数据, 走I/O
*/
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);
CountDownLatch countDownLatch = new CountDownLatch(1);
//注册响应事件
ResponseSyncHandler.addCallback(msgHeader.getRequestId(), new Runnable() {
@Override
public void run() {
countDownLatch.countDown();
}
});
byteBuf.writeBytes(msgHeaderByteArray);
byteBuf.writeBytes(msgBodyByteArray);
ChannelFuture future = clientChannel.writeAndFlush(byteBuf);
future.sync();
//仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据
//需要程序就此卡主,一直等待服务端有响应返回。
countDownLatch.await();
/**
* 5. 获得服务端响应,这里需要考虑2个问题
* 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!
* 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId
*/
return null;
}
/**
* 通过消息体封装消息头
* @param msgBodyByteArray
* @return
*/
private MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {
MsgHeader msgHeader = new MsgHeader();
msgHeader.setFlag(0x14141414);
msgHeader.setDataLen(msgBodyByteArray.length);
msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
return msgHeader;
}
});
}
/**
* 服务端的处理
*/
private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
ByteBuf copyByteBuf = byteBuf.copy();
//获取消息头
//125怎么来的?是消息头转换为字节数组的大小,看控制台打印。
if (byteBuf.readableBytes() >= 125){
byte[] bytes = new byte[125];
byteBuf.readBytes(bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
MsgHeader msgHeader = (MsgHeader) oin.readObject();
System.out.printf("服务端:“收到客户端消息头:%s”\n",msgHeader.toString());
//看看数据流内还有其他的数据,可能时消息体,获取消息体
if (byteBuf.readableBytes() >= msgHeader.getDataLen()){
byte[] data = new byte[(int)msgHeader.getDataLen()];
byteBuf.readBytes(data);
ByteArrayInputStream din = new ByteArrayInputStream(data);
ObjectInputStream doin = new ObjectInputStream(din);
MsgBody body = (MsgBody) doin.readObject();
System.out.printf("服务端:“收到客户端消息体:%s”\n",body.toString());
}
System.out.println("服务端:“正在处理....”");
// Thread.sleep(1000);
System.out.println("服务端:“处理完毕!”");
//原封不动的把消息响应给客户端
ChannelFuture future = ctx.writeAndFlush(copyByteBuf);
System.out.printf("服务端:“发送给客户端消息头:%s”\n", msgHeader.toString());
ctx.close();
future.sync();
}
}
}
@Data
private static class ResponseSyncHandler{
//使用一个集合存储requestId与对应的任务。
private static ConcurrentHashMap<Long,Runnable> mapping = new ConcurrentHashMap<>();
//添加requestId与任务的映射
public static void addCallback(long requestId, Runnable callback){
mapping.putIfAbsent(requestId,callback);
}
//找出requestId与任务的映射,并执行任务,执行完毕后删除
public static void runCallBack(long requestId) {
mapping.get(requestId).run();
removeCallBack(requestId);
}
//删除requestId与任务的映射
private static void removeCallBack(long requestId) {
mapping.remove(requestId);
}
}
/**
* 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
*/
private static class ClientFactory{
private static final ClientFactory instance = new ClientFactory();
private ClientFactory(){ }
public static ClientFactory getInstance(){
return instance;
}
ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
/**
* 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
* @param address
* @return
*/
public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {
ClientPool clientPool = outboxs.get(address);
//没有则初始化一个
if (clientPool == null) {
outboxs.putIfAbsent(address,new ClientPool(10));
clientPool = outboxs.get(address);
}
//从连接池中随机取出一个连接。
int i = new Random().nextInt(10);
if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
return clientPool.getClients()[i];
}
synchronized (clientPool.getLocks()[i]){
return clientPool.getClients()[i] = create(address);
}
}
/**
* 通过地址去创建一个netty的客户端
* @param address
* @return
*/
private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {
NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientWorker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture connect = bootstrap.connect(address);
NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
return client;
}
}
/**
* 客戶端响应服务端的处理器
*/
private static class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
//获取消息头
//125怎么来的?是消息头转换为字节数组的大小,看控制台打印。
if (byteBuf.readableBytes() >= 125){
byte[] bytes = new byte[125];
byteBuf.readBytes(bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
MsgHeader msgHeader = (MsgHeader) oin.readObject();
System.out.printf("客户端:“收到服务端消息头:%s”\n",msgHeader.toString());
//countDownLatch继续执行
ResponseSyncHandler.runCallBack(msgHeader.getRequestId());
//看看数据流内还有其他的数据,可能时消息体,获取消息体
if (byteBuf.readableBytes() >= msgHeader.getDataLen()){
byte[] data = new byte[(int)msgHeader.getDataLen()];
byteBuf.readBytes(data);
ByteArrayInputStream din = new ByteArrayInputStream(data);
ObjectInputStream doin = new ObjectInputStream(din);
MsgBody body = (MsgBody) doin.readObject();
System.out.printf("客户端:“收到服务端消息体:%s”\n",body);
}
}
}
}
/**
* 客户端连接池:
*/
@Data
private static class ClientPool{
//客户端连接数组
private NioSocketChannel[] clients;
//伴生锁
private Object[] locks;
ClientPool(int poolSize){
clients = new NioSocketChannel[poolSize];
locks = new Object[poolSize];
for (int i = 0; i < poolSize; i++) {
locks[i] = new Object();
}
}
}
/**
* 消息头:客户端与服务端交互的消息头
*/
@Data
@Accessors(chain = true)
@ToString
private static class MsgHeader implements Serializable{
int flag;
long dataLen;
long requestId;
}
/**
* 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。
*/
@Data
@Accessors(chain = true)
@ToString
private static class MsgBody implements Serializable {
private String className; //目标类名
private String methodName; //目标方法名
private Class[] parameterTypes; //目标方法参数类型列表
private Object[] args; //目标方法参数列表
}
/**
* 调用的远端方法。
*/
interface ProductService{
void say(String msg);
}
}
控制台输出打印:
服务端已启动监听… 客户端:“发送至服务端的消息头大小为:113” 客户端:“发送至服务端的消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“发送至服务端的消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])” 服务端:“收到一个客户端连接,端口号:54623” 服务端:“收到客户端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 服务端:“收到客户端消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])” 服务端:“正在处理….” 服务端:“处理完毕!” 服务端:“发送给客户端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“收到服务端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“收到服务端消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])”
Process finished with exit code 0
上面的示例虽然基本实现了RPC的基本思路,但是有一些致命的缺陷,比如在多线程并发的情况下会出现,例如我们把**testClient**
方法的代码改成这样。
/**
* 模拟consumer端
*/
@Test
public void testClient() throws IOException {
new Thread(()->{
try {
startServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("服务端已启动监听...");
//创建20个线程
Thread[] threads = new Thread[100];
//初始化这20个线程
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
ProductService product = proxyGet(ProductService.class);
product.say("hell o");
});
}
//启动这20个线程
for (Thread thread : threads) {
thread.start();
}
System.in.read();
}
总结一下,4.1的代码有这些问题:
- 服务端的ByteBuf可能包含多个header+body,但是我们只取了一个,所以造成了有一些数据丢了。
- 服务端的ByteBuf包含的数据不是完整的header+body对,可能包含的是header+body+header,也可能只是个header,也可能是个body。总结来说:netty不能保证每次的数据流的完整性。
通过以上的问题发现,netty还具有拆包粘包的问题。
解决方式是:我们可以维护一个足够长的ByteBuf当作缓存,把最后的不完整的header+body
放入准备好的缓存中,下次再有ByteBuf进来和缓存中的一部分header+body
拼接,就是一个完整的header+body
。这也是netty的解决方式,所幸的是,netty已经有内置方式解决此问题,不需要我们实现。
2. MyRPCTestV2 - Decode解决拆包粘包
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class MyRPCTestV2 {
//维护一个全局变量 消息头的大小,这样不用写死125
public static int MSG_HEADER_LENGTH;
static {
try {
MSG_HEADER_LENGTH = objToBytes(new MsgHeader()).length;
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 启动server端
* @throws InterruptedException
*/
public void startServer() throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventExecutors,eventExecutors);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyRPCTestV2.MsgDecode());
pipeline.addLast(new MyRPCTestV2.ServerRequestHandler());
}
});
ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));
bind.sync().channel().closeFuture().sync();
}
/**
* 模拟consumer端
*/
@Test
public void testClient() throws IOException {
new Thread(()->{
try {
startServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("服务端已启动监听...");
//多线程内自增计数器。为了控制台打印
AtomicInteger atomicInteger = new AtomicInteger(0);
//创建20个纤程
Thread[] threads = new Thread[100];
//初始化这20个线程
for (int i = 0; i < threads.length; i++) {
int finalI = i;
threads[i] = new Thread(()->{
ProductService product = proxyGet(ProductService.class);
String param = "hello" + atomicInteger.incrementAndGet();
String result = product.say(param);
System.err.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);
});
}
//启动这20个线程
for (Thread thread : threads) {
thread.start();
}
System.in.read();
}
/**
* 代理请求,使用java原生的代理Proxy类
* @param interfaceInfo
* @param <T>
* @return
*/
private static <T> T proxyGet(Class<T> interfaceInfo) {
ClassLoader classLoader = interfaceInfo.getClassLoader();
Class<?>[] methodInfo = {interfaceInfo};
return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/**
* 1、收集调用目标的服务。方法。参数。 封装成一个message。
*/
String className = interfaceInfo.getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
/**
* 2. 生成requestId再加上message.发送请求 本地缓存requestId
* 自定义协议:
* msg-header:
* msg-content:
*/
byte[] msgBodyByteArray = MyRPCTestV2.objToBytes(msgBody);
MsgHeader msgHeader = createHeaderByMsgBody(msgBodyByteArray);
byte[] msgHeaderByteArray = MyRPCTestV2.objToBytes(msgHeader);
System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader);
System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody);
/**
* 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。
*/
ClientFactory clientFactory = ClientFactory.getInstance();
NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));
/**
* 4. 真正发送数据, 走I/O
*/
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);
//使用带返回值的线程,Callable, 处理线程的结果
CompletableFuture<String> res = new CompletableFuture<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
//注册响应事件
ServerResponseMappingCallback.addCallback(msgHeader.getRequestId(), res);
byteBuf.writeBytes(msgHeaderByteArray);
byteBuf.writeBytes(msgBodyByteArray);
ChannelFuture future = clientChannel.writeAndFlush(byteBuf);
future.sync();
//仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据
/**
* 5. 获得服务端响应,这里需要考虑2个问题
* 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!
* 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId
*/
return res.get(); //会阻塞
}
});
}
/**
* 通过消息体封装消息头
* @param msgBodyByteArray
* @return
*/
private static MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {
MsgHeader msgHeader = new MsgHeader();
msgHeader.setFlag(0x14141414);
msgHeader.setDataLen(msgBodyByteArray.length);
msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
return msgHeader;
}
/**
* 对象转换为字节流,前提是参数类要实现序列化Serializable,不然会出问题
* @param obj
* @return
* @throws IOException
*/
private synchronized static byte[] objToBytes(Object obj) throws IOException {
ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);
objectOut.writeObject(obj);
byte[] objByteArray = byteArrayOut.toByteArray();
return objByteArray;
}
/**
* 服务端的处理
*/
private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收客户端数据
MyRPCTestV2.PackageMsg clientPackageMsg = (MyRPCTestV2.PackageMsg) msg;
System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());
String iothreadName = Thread.currentThread().getName();
/**
*业务处理,此处有2种方式:
* 1.直接在当前线程内处理业务并返回给客户端结果数据
* 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。
* netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法
*/
// ctx.executor().execute(()->{
ctx.executor().parent().execute(()->{
try {
//业务处理
System.out.println("服务端: 业务处理中...");
Thread.sleep(500L);
System.out.println("服务端: 业务处理完毕!");
//响应客户端
//消息体
String execThreadName = Thread.currentThread().getName();
String res = "服务端IO线程是:"+iothreadName+", 服务端业务线程是:"+execThreadName+", 业务处理结果是: "+clientPackageMsg.getBody().getArgs()[0];
System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);
MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);
byte[] serverMsgBodyByteArray = MyRPCTestV2.objToBytes(serverMsgBody);
//消息头
MsgHeader serverMsgHeader = new MsgHeader();
serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);
serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());
serverMsgHeader.setFlag(0x14141415);
byte[] serverMsgHeaderByteArray = MyRPCTestV2.objToBytes(serverMsgHeader);
//封装成整体消息
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);
byteBuf.writeBytes(serverMsgHeaderByteArray);
byteBuf.writeBytes(serverMsgBodyByteArray);
//写出客户端并刷新缓冲区
ctx.writeAndFlush(byteBuf);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
/**
* 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
*
* 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
* 不然就留着让netty缓存起来加入下一次读取。
*/
private static class MsgDecode extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
while(inByteBuf.readableBytes() >= MyRPCTestV2.MSG_HEADER_LENGTH) {
byte[] bytes = new byte[MyRPCTestV2.MSG_HEADER_LENGTH];
inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
MsgHeader header = (MsgHeader) oin.readObject();
//DECODE在2个方向都使用
//通信的协议
if(inByteBuf.readableBytes() >= header.getDataLen()+MyRPCTestV2.MSG_HEADER_LENGTH){
//处理指针
inByteBuf.readBytes(MyRPCTestV2.MSG_HEADER_LENGTH); //移动指针到body开始的位置
byte[] data = new byte[(int)header.getDataLen()];
inByteBuf.readBytes(data);
ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
//0x14141414 表示客户端向服务端发送
//0x14141415 表示服务端向客户端发送
if(header.getFlag() == 0x14141414){
MsgBody body = (MsgBody) doin.readObject();
out.add(new MyRPCTestV2.PackageMsg(header,body));
}else if(header.getFlag() == 0x14141415){
MsgBody body = (MsgBody) doin.readObject();
out.add(new MyRPCTestV2.PackageMsg(header,body));
}
}else{
break;
}
}
}
}
@Data
private static class ServerResponseMappingCallback{
//使用一个集合存储requestId与对应的任务。
private static ConcurrentHashMap<Long,CompletableFuture<String>> mapping = new ConcurrentHashMap<>();
//添加requestId与任务的映射
public static void addCallback(long requestId, CompletableFuture<String> callback){
mapping.putIfAbsent(requestId,callback);
}
//找出requestId与任务的映射,并执行任务,执行完毕后删除
public static void runCallBack(MyRPCTestV2.PackageMsg packageMsg) {
mapping.get(packageMsg.getHeader().getRequestId()).complete(packageMsg.getBody().getRes().toString());
removeCallBack(packageMsg.getHeader().getRequestId());
}
//删除requestId与任务的映射
private static void removeCallBack(long requestId) {
mapping.remove(requestId);
}
}
/**
* 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
*/
private static class ClientFactory{
private static final ClientFactory instance = new ClientFactory();
private ClientFactory(){ }
public static ClientFactory getInstance(){
return instance;
}
ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
/**
* 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
* @param address
* @return
*/
public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {
ClientPool clientPool = outboxs.get(address);
//没有则初始化一个
if (clientPool == null) {
outboxs.putIfAbsent(address,new ClientPool(10));
clientPool = outboxs.get(address);
}
//从连接池中随机取出一个连接。
int i = new Random().nextInt(10);
if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
return clientPool.getClients()[i];
}
synchronized (clientPool.getLocks()[i]){
return clientPool.getClients()[i] = create(address);
}
}
/**
* 通过地址去创建一个netty的客户端
* @param address
* @return
*/
private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {
NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientWorker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MsgDecode());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture connect = bootstrap.connect(address);
NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
return client;
}
}
/**
* 客戶端响应服务端的处理器
*/
private static class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyRPCTestV2.PackageMsg packageMsg = (MyRPCTestV2.PackageMsg) msg;
//解锁门栓,让客户端程序继续执行
ServerResponseMappingCallback.runCallBack(packageMsg);
}
}
/**
* 客户端连接池:
*/
@Data
private static class ClientPool{
//客户端连接数组
private NioSocketChannel[] clients;
//伴生锁
private Object[] locks;
ClientPool(int poolSize){
clients = new NioSocketChannel[poolSize];
locks = new Object[poolSize];
for (int i = 0; i < poolSize; i++) {
locks[i] = new Object();
}
}
}
/**
* 封装成一个包,不要两个包了
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
private static class PackageMsg implements Serializable{
private MsgHeader header;
private MsgBody body;
}
/**
* 消息头:客户端与服务端交互的消息头
*/
@Data
@Accessors(chain = true)
@ToString
private static class MsgHeader implements Serializable{
int flag;
long dataLen;
long requestId;
}
/**
* 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。
*/
@Data
@Accessors(chain = true)
@ToString
private static class MsgBody implements Serializable {
private String className; //目标类名
private String methodName; //目标方法名
private Class[] parameterTypes; //目标方法参数类型列表
private Object[] args; //目标方法参数列表
private Object res; //方法的返回值
}
/**
* 调用的远端方法。
*/
interface ProductService{
String say(String msg);
}
}
这次的优化还是很大的,具体优化如下:
- 使用一个解码器解
**MsgDecode**
决了拆包粘包的问题,保证服务端与客户端每次从ByteBuf读取到的数据是完整的一对header+body
,这是本次最大的改动。 - 使用一个全局静态变量
MSG_HEADER_LENGTH
来维护一个全局变量, 这样就不用每次都写死125了。 - 新增了一个静态方法
objToBytes()
,把对象转为字节数组,不需要重复造轮子,前提是需要实现Serializable
- 新增了一个类
PackageMsg
来包装MsgHeader
和MsgBody
类 - 把
CountDownLatch
换成了CompletableFuture
,并在客户端新增了返回数据的读取+打印。
3. MyRPCTestV3 - 进一步优化
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class MyRPCTestV3 {
//维护一个全局变量 消息头的大小,这样不用写死125
public static int MSG_HEADER_LENGTH;
static {
try {
MSG_HEADER_LENGTH = objToBytes(new MsgHeader()).length;
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 启动server端
* @throws InterruptedException
*/
public void startServer() throws InterruptedException {
Man man = new Man();
Dispatcher dispatcher = new Dispatcher().register(MyRPCTestV3.Person.class.getName(),man);
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventExecutors,eventExecutors);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyRPCTestV3.MsgDecode());
pipeline.addLast(new MyRPCTestV3.ServerRequestHandler(dispatcher));
}
});
ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));
bind.sync().channel().closeFuture().sync();
}
/**
* 模拟consumer端
*/
@Test
public void testClient() throws IOException {
new Thread(()->{
try {
startServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("服务端已启动监听...");
//多线程内自增计数器。为了控制台打印
AtomicInteger atomicInteger = new AtomicInteger(0);
//创建20个纤程
Thread[] threads = new Thread[100];
//初始化这20个线程
for (int i = 0; i < threads.length; i++) {
int finalI = i;
threads[i] = new Thread(()->{
Person man = proxyGet(Person.class);
String param = "hello" + atomicInteger.incrementAndGet();
String result = man.say(param);
System.err.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);
});
}
//启动这20个线程
for (Thread thread : threads) {
thread.start();
}
System.in.read();
}
/**
* 代理请求,使用java原生的代理Proxy类
* @param interfaceInfo
* @param <T>
* @return
*/
private static <T> T proxyGet(Class<T> interfaceInfo) {
ClassLoader classLoader = interfaceInfo.getClassLoader();
Class<?>[] methodInfo = {interfaceInfo};
return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/**
* 1、收集调用目标的服务。方法。参数。 封装成一个message。
*/
String className = interfaceInfo.getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
/**
* 2. 生成requestId再加上message.发送请求 本地缓存requestId
* 自定义协议:
* msg-header:
* msg-content:
*/
byte[] msgBodyByteArray = MyRPCTestV3.objToBytes(msgBody);
MsgHeader msgHeader = createHeaderByMsgBody(msgBodyByteArray);
byte[] msgHeaderByteArray = MyRPCTestV3.objToBytes(msgHeader);
System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader);
System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody);
/**
* 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。
*/
ClientFactory clientFactory = ClientFactory.getInstance();
NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));
/**
* 4. 真正发送数据, 走I/O
*/
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);
//使用带返回值的线程,Callable, 处理线程的结果
CompletableFuture<String> res = new CompletableFuture<>();
CountDownLatch countDownLatch = new CountDownLatch(1);
//注册响应事件
ServerResponseMappingCallback.addCallback(msgHeader.getRequestId(), res);
byteBuf.writeBytes(msgHeaderByteArray);
byteBuf.writeBytes(msgBodyByteArray);
ChannelFuture future = clientChannel.writeAndFlush(byteBuf);
future.sync();
//仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据
/**
* 5. 获得服务端响应,这里需要考虑2个问题
* 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!
* 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId
*/
return res.get(); //会阻塞
}
});
}
/**
* 通过消息体封装消息头
* @param msgBodyByteArray
* @return
*/
private static MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {
MsgHeader msgHeader = new MsgHeader();
msgHeader.setFlag(0x14141414);
msgHeader.setDataLen(msgBodyByteArray.length);
msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
return msgHeader;
}
/**
* 对象转换为字节流,前提是参数类要实现序列化Serializable,不然会出问题
* @param obj
* @return
* @throws IOException
*/
private synchronized static byte[] objToBytes(Object obj) throws IOException {
ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);
objectOut.writeObject(obj);
byte[] objByteArray = byteArrayOut.toByteArray();
return objByteArray;
}
/**
* 服务端的处理
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{
private MyRPCTestV3.Dispatcher dispatcher;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收客户端数据
MyRPCTestV3.PackageMsg clientPackageMsg = (MyRPCTestV3.PackageMsg) msg;
System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());
String iothreadName = Thread.currentThread().getName();
/**
*业务处理,此处有2种方式:
* 1.直接在当前线程内处理业务并返回给客户端结果数据
* 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。
* netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法
*/
// ctx.executor().execute(()->{
ctx.executor().parent().execute(()->{
try {
//业务处理 使用反射调用目标的方法
System.out.println("服务端: 业务处理中...");
String className = clientPackageMsg.getBody().getClassName();
String methodName = clientPackageMsg.getBody().getMethodName();
Object o = dispatcher.get(className);
Method method = o.getClass().getMethod(methodName, clientPackageMsg.getBody().getParameterTypes());
Object res = method.invoke(o, clientPackageMsg.getBody().getArgs());
System.out.println("服务端: 业务处理完毕!");
//响应客户端
//消息体
System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);
MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);
byte[] serverMsgBodyByteArray = MyRPCTestV3.objToBytes(serverMsgBody);
//消息头
MsgHeader serverMsgHeader = new MsgHeader();
serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);
serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());
serverMsgHeader.setFlag(0x14141415);
byte[] serverMsgHeaderByteArray = MyRPCTestV3.objToBytes(serverMsgHeader);
//消息体+消息体写入ByteBuf
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);
byteBuf.writeBytes(serverMsgHeaderByteArray);
byteBuf.writeBytes(serverMsgBodyByteArray);
//ByteBuf写出客户端并刷新缓冲区
ctx.writeAndFlush(byteBuf);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
/**
* 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
*
* 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
* 不然就留着让netty缓存起来加入下一次读取。
*/
private static class MsgDecode extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
while(inByteBuf.readableBytes() >= MyRPCTestV3.MSG_HEADER_LENGTH) {
byte[] bytes = new byte[MyRPCTestV3.MSG_HEADER_LENGTH];
inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
MsgHeader header = (MsgHeader) oin.readObject();
//DECODE在2个方向都使用
//通信的协议
if(inByteBuf.readableBytes() >= header.getDataLen()+ MyRPCTestV3.MSG_HEADER_LENGTH){
//处理指针
inByteBuf.readBytes(MyRPCTestV3.MSG_HEADER_LENGTH); //移动指针到body开始的位置
byte[] data = new byte[(int)header.getDataLen()];
inByteBuf.readBytes(data);
ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
//0x14141414 表示客户端向服务端发送
//0x14141415 表示服务端向客户端发送
if(header.getFlag() == 0x14141414){
MsgBody body = (MsgBody) doin.readObject();
out.add(new MyRPCTestV3.PackageMsg(header,body));
}else if(header.getFlag() == 0x14141415){
MsgBody body = (MsgBody) doin.readObject();
out.add(new MyRPCTestV3.PackageMsg(header,body));
}
}else{
break;
}
}
}
}
@Data
private static class ServerResponseMappingCallback{
//使用一个集合存储requestId与对应的任务。
private static ConcurrentHashMap<Long,CompletableFuture<String>> mapping = new ConcurrentHashMap<>();
//添加requestId与任务的映射
public static void addCallback(long requestId, CompletableFuture<String> callback){
mapping.putIfAbsent(requestId,callback);
}
//找出requestId与任务的映射,并执行任务,执行完毕后删除
public static void runCallBack(MyRPCTestV3.PackageMsg packageMsg) {
mapping.get(packageMsg.getHeader().getRequestId()).complete(packageMsg.getBody().getRes().toString());
removeCallBack(packageMsg.getHeader().getRequestId());
}
//删除requestId与任务的映射
private static void removeCallBack(long requestId) {
mapping.remove(requestId);
}
}
/**
* 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
*/
private static class ClientFactory{
private static final ClientFactory instance = new ClientFactory();
private ClientFactory(){ }
public static ClientFactory getInstance(){
return instance;
}
ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
/**
* 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
* @param address
* @return
*/
public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {
ClientPool clientPool = outboxs.get(address);
//没有则初始化一个
if (clientPool == null) {
outboxs.putIfAbsent(address,new ClientPool(10));
clientPool = outboxs.get(address);
}
//从连接池中随机取出一个连接。
int i = new Random().nextInt(10);
if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
return clientPool.getClients()[i];
}
synchronized (clientPool.getLocks()[i]){
return clientPool.getClients()[i] = create(address);
}
}
/**
* 通过地址去创建一个netty的客户端
* @param address
* @return
*/
private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {
NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientWorker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MsgDecode());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture connect = bootstrap.connect(address);
NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
return client;
}
}
/**
* 客戶端响应服务端的处理器
*/
private static class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyRPCTestV3.PackageMsg packageMsg = (MyRPCTestV3.PackageMsg) msg;
//解锁门栓,让客户端程序继续执行
ServerResponseMappingCallback.runCallBack(packageMsg);
}
}
/**
* 客户端连接池:
*/
@Data
private static class ClientPool{
//客户端连接数组
private NioSocketChannel[] clients;
//伴生锁
private Object[] locks;
ClientPool(int poolSize){
clients = new NioSocketChannel[poolSize];
locks = new Object[poolSize];
for (int i = 0; i < poolSize; i++) {
locks[i] = new Object();
}
}
}
/**
* 封装成一个包,不要两个包了
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
private static class PackageMsg implements Serializable{
private MsgHeader header;
private MsgBody body;
}
/**
* 消息头:客户端与服务端交互的消息头
*/
@Data
@Accessors(chain = true)
@ToString
private static class MsgHeader implements Serializable{
int flag;
long dataLen;
long requestId;
}
/**
* 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。
*/
@Data
@Accessors(chain = true)
@ToString
private static class MsgBody implements Serializable {
private String className; //服务端的类名
private String methodName; //服务端的方法名
private Class[] parameterTypes; //服务端的方法参数类型列表
private Object[] args; //服务端的方法参数列表
private Object res; //服务端方法的返回值
}
private static interface Person{
String say(String msg);
}
private static class Man implements MyRPCTestV3.Person{
@Override
public String say(String msg) {
return "I'm a man";
}
}
/**
* 这是一个可以并发访问的调度器,里面维护了一个可并发访问的map。
* 这个map里面放入了一些以类名为key,以实体对象为value的键值对。
* 既然是RPC调用,就要像调用本地方法那样调用远程服务器方法。远程客户端发起对此类对象的调用的时候能从这个集合内找到这个对象。
*/
private static class Dispatcher{
private static ConcurrentHashMap<String,Object> invokeMap = new ConcurrentHashMap<>();
public MyRPCTestV3.Dispatcher register(String key, Object obj){
invokeMap.put(key,obj);
return this;
}
public Object get(String key){
return invokeMap.get(key);
}
}
}
https://ke.qq.com/webcourse/index.html#cid=398381&term_id=100475149&taid=9982697997210669&vid=5285890806209632021
观看至:01:34:07
4. 逻辑清晰版分层分类拆分
4.1 client 客户端
4.1.1 ClientFactory.java
package myrpc.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import myrpc.protocol.MsgBody;
import myrpc.protocol.MsgHeader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
* 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
*/
public class ClientFactory {
private int POOL_SIZE = 5;
private NioEventLoopGroup clientWorker;
private ClientFactory(){}
private static final ClientFactory factory = new ClientFactory();
//一个consumer 可以连接很多的provider,每一个provider都有自己的pool K,V
private ConcurrentHashMap<InetSocketAddress, ClientPool> outboxs = new ConcurrentHashMap<>();
public static ClientFactory getFactory(){
return factory;
}
/**
* 通过参数MsgBody生成MsgHeader,并将header+body写入socket中
* @param body
* @return
* @throws IOException
*/
public static CompletableFuture<Object> transport(MsgBody body) throws IOException {
//生成header
byte[] bodyBytesArray = body.toByte();
MsgHeader header = body.createHeader();
byte[] headerBytesArray = header.toByte();
//将header+body写入ByteBuf
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(headerBytesArray.length + bodyBytesArray.length);
byteBuf.writeBytes(headerBytesArray);
byteBuf.writeBytes(bodyBytesArray);
//使用Callable+CompletableFuture
CompletableFuture<Object> future = new CompletableFuture<>();
ServerResponseMappingCallback.addCallback(header.getRequestId(), future);
//将ByteBuf写入Socket
NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress("localhost", 9090));
ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
return future;
}
/**
* 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
* @param address
* @return
*/
public synchronized NioSocketChannel getClient(InetSocketAddress address){
//TODO 在并发情况下一定要谨慎
ClientPool clientPool = outboxs.get(address);
//没有则初始化一个
if(clientPool == null){
synchronized(outboxs){
if(clientPool == null){
outboxs.putIfAbsent(address,new ClientPool(POOL_SIZE));
clientPool = outboxs.get(address);
}
}
}
//从连接池中随机取出一个连接。
int i = new Random().nextInt(POOL_SIZE);
if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
return clientPool.getClients()[i];
}
synchronized (clientPool.getLocks()[i]){
return clientPool.getClients()[i] = create(address);
}
}
/**
* 创建一个Socket
* @param address
* @return
*/
private NioSocketChannel create(InetSocketAddress address){
//基于 netty 的客户端创建方式
clientWorker = new NioEventLoopGroup(1);
Bootstrap bs = new Bootstrap();
ChannelFuture connect = bs.group(clientWorker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ClientMsgDecode());
p.addLast(new ClientHandler());
}
}).connect(address);
try {
NioSocketChannel client = (NioSocketChannel)connect.sync().channel();
return client;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
4.1.2 ClientHandler.java
package myrpc.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import myrpc.protocol.PackageMsg;
/**
* 客戶端响应服务端的处理器
*/
public class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PackageMsg packageMsg = (PackageMsg) msg;
//让客户端程序继续执行
ServerResponseMappingCallback.runCallBack(packageMsg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("发生了一个异常,异常原因:"+cause.getMessage());
}
}
4.1.3 ClientMsgDecode.java
package myrpc.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import myrpc.protocol.MsgBody;
import myrpc.protocol.MsgHeader;
import myrpc.protocol.PackageMsg;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List;
/**
* 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
*
* 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
* 不然就留着让netty缓存起来加入下一次读取。
*/
public class ClientMsgDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
while(inByteBuf.readableBytes() >= MsgHeader.MSG_HEADER_LENGTH) {
byte[] bytes = new byte[MsgHeader.MSG_HEADER_LENGTH];
inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
MsgHeader header = (MsgHeader) oin.readObject();
//DECODE在2个方向都使用
//通信的协议
if(inByteBuf.readableBytes() >= header.getDataLen()+ MsgHeader.MSG_HEADER_LENGTH){
//处理指针
inByteBuf.readBytes(MsgHeader.MSG_HEADER_LENGTH); //移动指针到body开始的位置
byte[] data = new byte[(int)header.getDataLen()];
inByteBuf.readBytes(data);
ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
//0x14141414 表示客户端向服务端发送
//0x14141415 表示服务端向客户端发送
if(header.getFlag() == 0x14141414){
MsgBody body = (MsgBody) doin.readObject();
out.add(new PackageMsg(header,body));
}else if(header.getFlag() == 0x14141415){
MsgBody body = (MsgBody) doin.readObject();
out.add(new PackageMsg(header,body));
}
}else{
break;
}
}
}
}
4.1.4 ClientPool.java
package myrpc.client;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Data;
/**
* 客户端连接池:
*/
@Data
public class ClientPool{
private NioSocketChannel[] clients; //客户端连接数组
private Object[] locks; //伴生锁
public ClientPool(int poolSize){
clients = new NioSocketChannel[poolSize];
locks = new Object[poolSize];
for (int i = 0; i < poolSize; i++) {
locks[i] = new Object();
}
}
}
4.1.5 ServerResponseMappingCallback.java
package myrpc.client;
import myrpc.protocol.PackageMsg;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public class ServerResponseMappingCallback {
//使用一个集合存储requestId与对应的任务。
private static ConcurrentHashMap<Long,CompletableFuture<Object>> mapping = new ConcurrentHashMap<>();
//添加requestId与任务的映射
public static void addCallback(long requestId, CompletableFuture<Object> callback){
mapping.putIfAbsent(requestId,callback);
}
//找出requestId与任务的映射,并执行任务,执行完毕后删除
public static void runCallBack(PackageMsg packageMsg) {
CompletableFuture<Object> future = mapping.get(packageMsg.getHeader().getRequestId());
future.complete(packageMsg.getBody().getRes().toString());
removeCallBack(packageMsg.getHeader().getRequestId());
}
//删除requestId与任务的映射
private static void removeCallBack(long requestId) {
mapping.remove(requestId);
}
}
4.1.6 StartClient.java
package myrpc.client;
import myrpc.proxy.MyProxy;
import myrpc.service.Person;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 模拟consumer端
*/
public class StartClient {
//模拟comsumer端 && provider
@Test
public void get() throws IOException {
//多线程内自增计数器。为了控制台打印
AtomicInteger index = new AtomicInteger(0);
//创建20个纤程
Thread[] threads = new Thread[100];
//初始化这20个线程
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
Person person = MyProxy.proxyGet(Person.class);
Integer param = index.incrementAndGet();
String result = person.getById(param);
System.out.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);
});
}
//启动这20个线程
for (Thread thread : threads) {
thread.start();
}
System.in.read();
}
@Test
public void testRPC() {
// Car car = MyProxy.proxyGet(Car.class);
// Persion zhangsan = car.oxox("zhangsan", 16);
// System.out.println(zhangsan);
}
@Test
public void testRpcLocal() {
System.out.println("server started......");
// Car car = MyProxy.proxyGet(Car.class);
// Persion zhangsan = car.oxox("zhangsan", 16);
// System.out.println(zhangsan);
}
}
4.2 server 服务端
4.2.1 Dispatcher.java
package myrpc.server;
import java.util.concurrent.ConcurrentHashMap;
/**
* 这是一个可以并发访问的调度器,里面维护了一个可并发访问的map(invokeMap)。
* 这个map里面放入了一些以类名为key,以实体对象为value的键值对。
* 既然是RPC调用,就要像调用本地方法那样调用远程服务器方法。远程客户端发起对此类对象的调用的时候能从这个集合内找到这个对象。
* 单例模式
*/
public class Dispatcher {
private static Dispatcher dis = new Dispatcher();
public static ConcurrentHashMap<String,Object> invokeMap = new ConcurrentHashMap<>();
//单例模式下的获取实例的静态方法
public static Dispatcher getDis(){
return dis;
}
//单例模式
private Dispatcher(){
}
/**
* 注册
* @param k
* @param obj
*/
public Dispatcher register(String k,Object obj){
invokeMap.put(k,obj);
return this;
}
/**
* 获取
* @param k
* @return
*/
public Object get(String k){
return invokeMap.get(k);
}
}
4.2.2 ServerMsgDecode.java
package myrpc.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import myrpc.protocol.MsgBody;
import myrpc.protocol.MsgHeader;
import myrpc.protocol.PackageMsg;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.List;
/**
* 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
*
* 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
* 不然就留着让netty缓存起来加入下一次读取。
*/
public class ServerMsgDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
while(inByteBuf.readableBytes() >= MsgHeader.MSG_HEADER_LENGTH) {
byte[] bytes = new byte[MsgHeader.MSG_HEADER_LENGTH];
inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
MsgHeader header = (MsgHeader) oin.readObject();
//通信的协议
if(inByteBuf.readableBytes() >= header.getDataLen()+ MsgHeader.MSG_HEADER_LENGTH){
//处理指针
inByteBuf.readBytes(MsgHeader.MSG_HEADER_LENGTH); //移动指针到body开始的位置
byte[] data = new byte[(int)header.getDataLen()];
inByteBuf.readBytes(data);
ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
MsgBody body = (MsgBody) doin.readObject();
out.add(new PackageMsg(header,body));
}else{
break;
}
}
}
}
4.2.3 ServerRequestHandler.java
package myrpc.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.*;
import myrpc.protocol.*;
import java.lang.reflect.Method;
/**
* 服务端的处理
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServerRequestHandler extends ChannelInboundHandlerAdapter {
private Dispatcher dispatcher;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收客户端数据
PackageMsg clientPackageMsg = (PackageMsg) msg;
System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());
/**
*业务处理,此处有2种方式:
* 1.直接在当前线程内处理业务并返回给客户端结果数据
* 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。
* netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法
*/
// ctx.executor().execute(()->{
ctx.executor().parent().execute(()->{
try {
//业务处理 使用反射调用目标的方法
System.out.println("服务端: 业务处理中...");
String className = clientPackageMsg.getBody().getClassName();
String methodName = clientPackageMsg.getBody().getMethodName();
Class[] parameterTypes = clientPackageMsg.getBody().getParameterTypes();
Object[] args = clientPackageMsg.getBody().getArgs();
Object object = dispatcher.get(className);
Method method = object.getClass().getMethod(methodName, parameterTypes);
Object res = method.invoke(object, args);
System.out.println("服务端: 业务处理完毕!");
//响应客户端
//消息体
System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);
MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);
byte[] serverMsgBodyByteArray = serverMsgBody.toByte();
//消息头
MsgHeader serverMsgHeader = new MsgHeader();
serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);
serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());
serverMsgHeader.setFlag(0x14141415);
byte[] serverMsgHeaderByteArray = serverMsgHeader.toByte();
//消息体+消息体写入ByteBuf
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);
byteBuf.writeBytes(serverMsgHeaderByteArray);
byteBuf.writeBytes(serverMsgBodyByteArray);
//ByteBuf写出客户端并刷新缓冲区
ctx.writeAndFlush(byteBuf);
} catch (Exception e) {
e.printStackTrace();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("发生了一个异常,异常原因:"+cause.getMessage());
}
}
4.2.4 StartServer.java
package myrpc.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import myrpc.service.*;
import org.junit.Test;
import java.net.InetSocketAddress;
/**
*服务端
*/
public class StartServer {
@Test
public void startServer() {
//将服务端持有的对象和方法注入到调度器中,调度器会持有服务端的对象方法列表
Man man = new Man();
Dispatcher dis = Dispatcher.getDis().register(Person.class.getName(), man);
NioEventLoopGroup boss = new NioEventLoopGroup(20);
NioEventLoopGroup worker = boss;
ServerBootstrap sbs = new ServerBootstrap();
ChannelFuture bind = sbs.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ServerMsgDecode());
pipeline.addLast(new ServerRequestHandler(dis));
}
}).bind(new InetSocketAddress("localhost", 9090));
try {
System.out.println("服务端已启动!!");
bind.sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.3 protocol 协议
4.3.1 MsgBody.java
package myrpc.protocol;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.UUID;
/**
* @author: 马士兵教育
* @create: 2020-08-16 20:35
*/
@Data
@Accessors(chain = true)
@ToString
public class MsgBody implements Serializable {
private String className; //被调用方的类名
private String methodName; //被调用方的方法名
private Class[] parameterTypes; //被调用方的方法参数类型列表
private Object[] args; //被调用方的方法参数列表
private Object res; //被调用方的方法的返回值
/**
* 通过消息体封装消息头
* @param msgBodyByteArray
* @return
*/
public MsgHeader createHeader() throws IOException {
byte[] msgBodyByteArray = this.toByte();
MsgHeader msgHeader = new MsgHeader();
msgHeader.setFlag(0x14141414);
msgHeader.setDataLen(msgBodyByteArray.length);
msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
return msgHeader;
}
/**
* 将当前对象转换为字节数组
* @return
* @throws IOException
*/
public byte[] toByte() throws IOException{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(out);
oout.writeObject(this);
byte[] bytes = out.toByteArray();
return bytes;
}
}
4.3.2 MsgHeader.java
package myrpc.protocol;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* 消息头:客户端与服务端交互的消息头
*/
@Data
@Accessors(chain = true)
@ToString
public class MsgHeader implements Serializable{
int flag; //标记:0x14141414表示为远程调用, 0x14141415表示为响应
long dataLen; //消息体的大小
long requestId; //每次调用的唯一标识
public static int MSG_HEADER_LENGTH;
//初始化MsgHeader的长度
static {
try {
MSG_HEADER_LENGTH = new MsgHeader().toByte().length;
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 将当前对象转换为字节数组
* @return
* @throws IOException
*/
public byte[] toByte() throws IOException{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(out);
oout.writeObject(this);
byte[] bytes = out.toByteArray();
return bytes;
}
}
4.3.3 PackageMsg.java
package myrpc.protocol;
import lombok.*;
import java.io.Serializable;
/**
* 封装成一个包,不要两个包了
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class PackageMsg implements Serializable {
private MsgHeader header;
private MsgBody body;
}
4.4 proxy代理
4.4.1 MyProxy.java
package myrpc.proxy;
import myrpc.client.ClientFactory;
import myrpc.protocol.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.CompletableFuture;
/**
* 代理请求,使用java原生的代理Proxy类
*/
public class MyProxy {
public static <T>T proxyGet(Class<T> interfaceInfo){
ClassLoader loader = interfaceInfo.getClassLoader();
Class<?>[] methodInfo = {interfaceInfo};
return (T) Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1、收集调用目标的服务。方法。参数。 封装成一个message。
String className = interfaceInfo.getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
//使用带返回值的线程 Callable+CompletableFuture
CompletableFuture future = ClientFactory.transport(msgBody);
return future.get();//阻塞的
}
});
}
}
4.5 service模拟服务
4.5.1 Man.java
package myrpc.service;
public class Man implements Person{
@Override
public String getById(Integer id) {
return "I'm a man id="+id;
}
}
4.5.2 Person.java
package myrpc.service;
public interface Person {
String getById(Integer id);
}