FC: function call (寻址调用)
RPC: remote process call (socket)
SC: system call 系统调用(软中断)
IPC: 管道。信号量,socket
https://ke.qq.com/webcourse/index.html#cid=398381&term_id=100475149&taid=10131175016633389&vid=5285890806541162145
观看至: 00:48:00
1. 基于netty的RPC框架的基本实现通讯
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.Data;import lombok.ToString;import lombok.experimental.Accessors;import org.junit.Test;import java.io.*;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.util.Random;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CountDownLatch;public class MyRPCTestV1 {public void startServer() throws InterruptedException {NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(eventExecutors,eventExecutors);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ServerRequestHandler());}});ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));bind.sync().channel().closeFuture().sync();}/*** 模拟consumer端*/@Testpublic void testClient() throws IOException {new Thread(()->{try {startServer();} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println("服务端已启动监听...");//创建20个纤程Thread[] threads = new Thread[20];//初始化这20个线程for (int i = 0; i < threads.length; i++) {int finalI = i;threads[i] = new Thread(()->{ProductService product = proxyGet(ProductService.class);product.say("hello"+ finalI);});}//启动这20个线程for (Thread thread : threads) {thread.start();}System.in.read();}/*** 代理请求,使用java原生的代理Proxy类* @param interfaceInfo* @param <T>* @return*/private static <T> T proxyGet(Class<T> interfaceInfo) {ClassLoader classLoader = interfaceInfo.getClassLoader();Class<?>[] methodInfo = {interfaceInfo};return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 1、收集调用目标的服务。方法。参数。 封装成一个message。*/String className = interfaceInfo.getName();String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);/*** 2. 生成requestId再加上message.发送请求 本地缓存requestId* 自定义协议:* msg-header:* msg-content:*/ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);objectOut.writeObject(msgBody);byte[] msgBodyByteArray = byteArrayOut.toByteArray();MsgHeader msgHeader = this.createHeaderByMsgBody(msgBodyByteArray);byteArrayOut.reset();objectOut = new ObjectOutputStream(byteArrayOut);objectOut.writeObject(msgHeader);byte[] msgHeaderByteArray = byteArrayOut.toByteArray();System.out.printf("客户端:“发送至服务端的消息头大小为:%d”\n", msgHeaderByteArray.length);System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader.toString());System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody.toString());/*** 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。*/ClientFactory clientFactory = ClientFactory.getInstance();NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));/*** 4. 真正发送数据, 走I/O*/ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);CountDownLatch countDownLatch = new CountDownLatch(1);//注册响应事件ResponseSyncHandler.addCallback(msgHeader.getRequestId(), new Runnable() {@Overridepublic void run() {countDownLatch.countDown();}});byteBuf.writeBytes(msgHeaderByteArray);byteBuf.writeBytes(msgBodyByteArray);ChannelFuture future = clientChannel.writeAndFlush(byteBuf);future.sync();//仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据//需要程序就此卡主,一直等待服务端有响应返回。countDownLatch.await();/*** 5. 获得服务端响应,这里需要考虑2个问题* 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!* 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId*/return null;}/*** 通过消息体封装消息头* @param msgBodyByteArray* @return*/private MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {MsgHeader msgHeader = new MsgHeader();msgHeader.setFlag(0x14141414);msgHeader.setDataLen(msgBodyByteArray.length);msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));return msgHeader;}});}/*** 服务端的处理*/private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;ByteBuf copyByteBuf = byteBuf.copy();//获取消息头//125怎么来的?是消息头转换为字节数组的大小,看控制台打印。if (byteBuf.readableBytes() >= 125){byte[] bytes = new byte[125];byteBuf.readBytes(bytes);ByteArrayInputStream in = new ByteArrayInputStream(bytes);ObjectInputStream oin = new ObjectInputStream(in);MsgHeader msgHeader = (MsgHeader) oin.readObject();System.out.printf("服务端:“收到客户端消息头:%s”\n",msgHeader.toString());//看看数据流内还有其他的数据,可能时消息体,获取消息体if (byteBuf.readableBytes() >= msgHeader.getDataLen()){byte[] data = new byte[(int)msgHeader.getDataLen()];byteBuf.readBytes(data);ByteArrayInputStream din = new ByteArrayInputStream(data);ObjectInputStream doin = new ObjectInputStream(din);MsgBody body = (MsgBody) doin.readObject();System.out.printf("服务端:“收到客户端消息体:%s”\n",body.toString());}System.out.println("服务端:“正在处理....”");// Thread.sleep(1000);System.out.println("服务端:“处理完毕!”");//原封不动的把消息响应给客户端ChannelFuture future = ctx.writeAndFlush(copyByteBuf);System.out.printf("服务端:“发送给客户端消息头:%s”\n", msgHeader.toString());ctx.close();future.sync();}}}@Dataprivate static class ResponseSyncHandler{//使用一个集合存储requestId与对应的任务。private static ConcurrentHashMap<Long,Runnable> mapping = new ConcurrentHashMap<>();//添加requestId与任务的映射public static void addCallback(long requestId, Runnable callback){mapping.putIfAbsent(requestId,callback);}//找出requestId与任务的映射,并执行任务,执行完毕后删除public static void runCallBack(long requestId) {mapping.get(requestId).run();removeCallBack(requestId);}//删除requestId与任务的映射private static void removeCallBack(long requestId) {mapping.remove(requestId);}}/*** 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池*/private static class ClientFactory{private static final ClientFactory instance = new ClientFactory();private ClientFactory(){ }public static ClientFactory getInstance(){return instance;}ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();/*** 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。* @param address* @return*/public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {ClientPool clientPool = outboxs.get(address);//没有则初始化一个if (clientPool == null) {outboxs.putIfAbsent(address,new ClientPool(10));clientPool = outboxs.get(address);}//从连接池中随机取出一个连接。int i = new Random().nextInt(10);if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){return clientPool.getClients()[i];}synchronized (clientPool.getLocks()[i]){return clientPool.getClients()[i] = create(address);}}/*** 通过地址去创建一个netty的客户端* @param address* @return*/private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorker);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ClientHandler());}});ChannelFuture connect = bootstrap.connect(address);NioSocketChannel client = (NioSocketChannel) connect.sync().channel();return client;}}/*** 客戶端响应服务端的处理器*/private static class ClientHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;//获取消息头//125怎么来的?是消息头转换为字节数组的大小,看控制台打印。if (byteBuf.readableBytes() >= 125){byte[] bytes = new byte[125];byteBuf.readBytes(bytes);ByteArrayInputStream in = new ByteArrayInputStream(bytes);ObjectInputStream oin = new ObjectInputStream(in);MsgHeader msgHeader = (MsgHeader) oin.readObject();System.out.printf("客户端:“收到服务端消息头:%s”\n",msgHeader.toString());//countDownLatch继续执行ResponseSyncHandler.runCallBack(msgHeader.getRequestId());//看看数据流内还有其他的数据,可能时消息体,获取消息体if (byteBuf.readableBytes() >= msgHeader.getDataLen()){byte[] data = new byte[(int)msgHeader.getDataLen()];byteBuf.readBytes(data);ByteArrayInputStream din = new ByteArrayInputStream(data);ObjectInputStream doin = new ObjectInputStream(din);MsgBody body = (MsgBody) doin.readObject();System.out.printf("客户端:“收到服务端消息体:%s”\n",body);}}}}/*** 客户端连接池:*/@Dataprivate static class ClientPool{//客户端连接数组private NioSocketChannel[] clients;//伴生锁private Object[] locks;ClientPool(int poolSize){clients = new NioSocketChannel[poolSize];locks = new Object[poolSize];for (int i = 0; i < poolSize; i++) {locks[i] = new Object();}}}/*** 消息头:客户端与服务端交互的消息头*/@Data@Accessors(chain = true)@ToStringprivate static class MsgHeader implements Serializable{int flag;long dataLen;long requestId;}/*** 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。*/@Data@Accessors(chain = true)@ToStringprivate static class MsgBody implements Serializable {private String className; //目标类名private String methodName; //目标方法名private Class[] parameterTypes; //目标方法参数类型列表private Object[] args; //目标方法参数列表}/*** 调用的远端方法。*/interface ProductService{void say(String msg);}}
控制台输出打印:
服务端已启动监听… 客户端:“发送至服务端的消息头大小为:113” 客户端:“发送至服务端的消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“发送至服务端的消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])” 服务端:“收到一个客户端连接,端口号:54623” 服务端:“收到客户端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 服务端:“收到客户端消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])” 服务端:“正在处理….” 服务端:“处理完毕!” 服务端:“发送给客户端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“收到服务端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“收到服务端消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])”
Process finished with exit code 0
上面的示例虽然基本实现了RPC的基本思路,但是有一些致命的缺陷,比如在多线程并发的情况下会出现,例如我们把**testClient**方法的代码改成这样。
/*** 模拟consumer端*/@Testpublic void testClient() throws IOException {new Thread(()->{try {startServer();} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println("服务端已启动监听...");//创建20个线程Thread[] threads = new Thread[100];//初始化这20个线程for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{ProductService product = proxyGet(ProductService.class);product.say("hell o");});}//启动这20个线程for (Thread thread : threads) {thread.start();}System.in.read();}
总结一下,4.1的代码有这些问题:
- 服务端的ByteBuf可能包含多个header+body,但是我们只取了一个,所以造成了有一些数据丢了。
- 服务端的ByteBuf包含的数据不是完整的header+body对,可能包含的是header+body+header,也可能只是个header,也可能是个body。总结来说:netty不能保证每次的数据流的完整性。




通过以上的问题发现,netty还具有拆包粘包的问题。
解决方式是:我们可以维护一个足够长的ByteBuf当作缓存,把最后的不完整的header+body放入准备好的缓存中,下次再有ByteBuf进来和缓存中的一部分header+body拼接,就是一个完整的header+body。这也是netty的解决方式,所幸的是,netty已经有内置方式解决此问题,不需要我们实现。
2. MyRPCTestV2 - Decode解决拆包粘包
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.ByteToMessageDecoder;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;import lombok.experimental.Accessors;import org.junit.Test;import java.io.*;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;public class MyRPCTestV2 {//维护一个全局变量 消息头的大小,这样不用写死125public static int MSG_HEADER_LENGTH;static {try {MSG_HEADER_LENGTH = objToBytes(new MsgHeader()).length;} catch (IOException e) {e.printStackTrace();}}/*** 启动server端* @throws InterruptedException*/public void startServer() throws InterruptedException {NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10);ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(eventExecutors,eventExecutors);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyRPCTestV2.MsgDecode());pipeline.addLast(new MyRPCTestV2.ServerRequestHandler());}});ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));bind.sync().channel().closeFuture().sync();}/*** 模拟consumer端*/@Testpublic void testClient() throws IOException {new Thread(()->{try {startServer();} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println("服务端已启动监听...");//多线程内自增计数器。为了控制台打印AtomicInteger atomicInteger = new AtomicInteger(0);//创建20个纤程Thread[] threads = new Thread[100];//初始化这20个线程for (int i = 0; i < threads.length; i++) {int finalI = i;threads[i] = new Thread(()->{ProductService product = proxyGet(ProductService.class);String param = "hello" + atomicInteger.incrementAndGet();String result = product.say(param);System.err.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);});}//启动这20个线程for (Thread thread : threads) {thread.start();}System.in.read();}/*** 代理请求,使用java原生的代理Proxy类* @param interfaceInfo* @param <T>* @return*/private static <T> T proxyGet(Class<T> interfaceInfo) {ClassLoader classLoader = interfaceInfo.getClassLoader();Class<?>[] methodInfo = {interfaceInfo};return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 1、收集调用目标的服务。方法。参数。 封装成一个message。*/String className = interfaceInfo.getName();String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);/*** 2. 生成requestId再加上message.发送请求 本地缓存requestId* 自定义协议:* msg-header:* msg-content:*/byte[] msgBodyByteArray = MyRPCTestV2.objToBytes(msgBody);MsgHeader msgHeader = createHeaderByMsgBody(msgBodyByteArray);byte[] msgHeaderByteArray = MyRPCTestV2.objToBytes(msgHeader);System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader);System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody);/*** 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。*/ClientFactory clientFactory = ClientFactory.getInstance();NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));/*** 4. 真正发送数据, 走I/O*/ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);//使用带返回值的线程,Callable, 处理线程的结果CompletableFuture<String> res = new CompletableFuture<>();CountDownLatch countDownLatch = new CountDownLatch(1);//注册响应事件ServerResponseMappingCallback.addCallback(msgHeader.getRequestId(), res);byteBuf.writeBytes(msgHeaderByteArray);byteBuf.writeBytes(msgBodyByteArray);ChannelFuture future = clientChannel.writeAndFlush(byteBuf);future.sync();//仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据/*** 5. 获得服务端响应,这里需要考虑2个问题* 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!* 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId*/return res.get(); //会阻塞}});}/*** 通过消息体封装消息头* @param msgBodyByteArray* @return*/private static MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {MsgHeader msgHeader = new MsgHeader();msgHeader.setFlag(0x14141414);msgHeader.setDataLen(msgBodyByteArray.length);msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));return msgHeader;}/*** 对象转换为字节流,前提是参数类要实现序列化Serializable,不然会出问题* @param obj* @return* @throws IOException*/private synchronized static byte[] objToBytes(Object obj) throws IOException {ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);objectOut.writeObject(obj);byte[] objByteArray = byteArrayOut.toByteArray();return objByteArray;}/*** 服务端的处理*/private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收客户端数据MyRPCTestV2.PackageMsg clientPackageMsg = (MyRPCTestV2.PackageMsg) msg;System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());String iothreadName = Thread.currentThread().getName();/***业务处理,此处有2种方式:* 1.直接在当前线程内处理业务并返回给客户端结果数据* 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。* netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法*/// ctx.executor().execute(()->{ctx.executor().parent().execute(()->{try {//业务处理System.out.println("服务端: 业务处理中...");Thread.sleep(500L);System.out.println("服务端: 业务处理完毕!");//响应客户端//消息体String execThreadName = Thread.currentThread().getName();String res = "服务端IO线程是:"+iothreadName+", 服务端业务线程是:"+execThreadName+", 业务处理结果是: "+clientPackageMsg.getBody().getArgs()[0];System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);byte[] serverMsgBodyByteArray = MyRPCTestV2.objToBytes(serverMsgBody);//消息头MsgHeader serverMsgHeader = new MsgHeader();serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());serverMsgHeader.setFlag(0x14141415);byte[] serverMsgHeaderByteArray = MyRPCTestV2.objToBytes(serverMsgHeader);//封装成整体消息ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);byteBuf.writeBytes(serverMsgHeaderByteArray);byteBuf.writeBytes(serverMsgBodyByteArray);//写出客户端并刷新缓冲区ctx.writeAndFlush(byteBuf);} catch (Exception e) {e.printStackTrace();}});}}/*** 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象** 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,* 不然就留着让netty缓存起来加入下一次读取。*/private static class MsgDecode extends ByteToMessageDecoder{@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {while(inByteBuf.readableBytes() >= MyRPCTestV2.MSG_HEADER_LENGTH) {byte[] bytes = new byte[MyRPCTestV2.MSG_HEADER_LENGTH];inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));MsgHeader header = (MsgHeader) oin.readObject();//DECODE在2个方向都使用//通信的协议if(inByteBuf.readableBytes() >= header.getDataLen()+MyRPCTestV2.MSG_HEADER_LENGTH){//处理指针inByteBuf.readBytes(MyRPCTestV2.MSG_HEADER_LENGTH); //移动指针到body开始的位置byte[] data = new byte[(int)header.getDataLen()];inByteBuf.readBytes(data);ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));//0x14141414 表示客户端向服务端发送//0x14141415 表示服务端向客户端发送if(header.getFlag() == 0x14141414){MsgBody body = (MsgBody) doin.readObject();out.add(new MyRPCTestV2.PackageMsg(header,body));}else if(header.getFlag() == 0x14141415){MsgBody body = (MsgBody) doin.readObject();out.add(new MyRPCTestV2.PackageMsg(header,body));}}else{break;}}}}@Dataprivate static class ServerResponseMappingCallback{//使用一个集合存储requestId与对应的任务。private static ConcurrentHashMap<Long,CompletableFuture<String>> mapping = new ConcurrentHashMap<>();//添加requestId与任务的映射public static void addCallback(long requestId, CompletableFuture<String> callback){mapping.putIfAbsent(requestId,callback);}//找出requestId与任务的映射,并执行任务,执行完毕后删除public static void runCallBack(MyRPCTestV2.PackageMsg packageMsg) {mapping.get(packageMsg.getHeader().getRequestId()).complete(packageMsg.getBody().getRes().toString());removeCallBack(packageMsg.getHeader().getRequestId());}//删除requestId与任务的映射private static void removeCallBack(long requestId) {mapping.remove(requestId);}}/*** 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池*/private static class ClientFactory{private static final ClientFactory instance = new ClientFactory();private ClientFactory(){ }public static ClientFactory getInstance(){return instance;}ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();/*** 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。* @param address* @return*/public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {ClientPool clientPool = outboxs.get(address);//没有则初始化一个if (clientPool == null) {outboxs.putIfAbsent(address,new ClientPool(10));clientPool = outboxs.get(address);}//从连接池中随机取出一个连接。int i = new Random().nextInt(10);if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){return clientPool.getClients()[i];}synchronized (clientPool.getLocks()[i]){return clientPool.getClients()[i] = create(address);}}/*** 通过地址去创建一个netty的客户端* @param address* @return*/private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorker);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MsgDecode());pipeline.addLast(new ClientHandler());}});ChannelFuture connect = bootstrap.connect(address);NioSocketChannel client = (NioSocketChannel) connect.sync().channel();return client;}}/*** 客戶端响应服务端的处理器*/private static class ClientHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MyRPCTestV2.PackageMsg packageMsg = (MyRPCTestV2.PackageMsg) msg;//解锁门栓,让客户端程序继续执行ServerResponseMappingCallback.runCallBack(packageMsg);}}/*** 客户端连接池:*/@Dataprivate static class ClientPool{//客户端连接数组private NioSocketChannel[] clients;//伴生锁private Object[] locks;ClientPool(int poolSize){clients = new NioSocketChannel[poolSize];locks = new Object[poolSize];for (int i = 0; i < poolSize; i++) {locks[i] = new Object();}}}/*** 封装成一个包,不要两个包了*/@Data@AllArgsConstructor@NoArgsConstructor@ToStringprivate static class PackageMsg implements Serializable{private MsgHeader header;private MsgBody body;}/*** 消息头:客户端与服务端交互的消息头*/@Data@Accessors(chain = true)@ToStringprivate static class MsgHeader implements Serializable{int flag;long dataLen;long requestId;}/*** 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。*/@Data@Accessors(chain = true)@ToStringprivate static class MsgBody implements Serializable {private String className; //目标类名private String methodName; //目标方法名private Class[] parameterTypes; //目标方法参数类型列表private Object[] args; //目标方法参数列表private Object res; //方法的返回值}/*** 调用的远端方法。*/interface ProductService{String say(String msg);}}
这次的优化还是很大的,具体优化如下:
- 使用一个解码器解
**MsgDecode**决了拆包粘包的问题,保证服务端与客户端每次从ByteBuf读取到的数据是完整的一对header+body,这是本次最大的改动。 - 使用一个全局静态变量
MSG_HEADER_LENGTH来维护一个全局变量, 这样就不用每次都写死125了。 - 新增了一个静态方法
objToBytes(),把对象转为字节数组,不需要重复造轮子,前提是需要实现Serializable - 新增了一个类
PackageMsg来包装MsgHeader和MsgBody类 - 把
CountDownLatch换成了CompletableFuture,并在客户端新增了返回数据的读取+打印。
3. MyRPCTestV3 - 进一步优化
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.ByteToMessageDecoder;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;import lombok.experimental.Accessors;import org.junit.Test;import java.io.*;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;public class MyRPCTestV3 {//维护一个全局变量 消息头的大小,这样不用写死125public static int MSG_HEADER_LENGTH;static {try {MSG_HEADER_LENGTH = objToBytes(new MsgHeader()).length;} catch (IOException e) {e.printStackTrace();}}/*** 启动server端* @throws InterruptedException*/public void startServer() throws InterruptedException {Man man = new Man();Dispatcher dispatcher = new Dispatcher().register(MyRPCTestV3.Person.class.getName(),man);NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10);ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(eventExecutors,eventExecutors);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyRPCTestV3.MsgDecode());pipeline.addLast(new MyRPCTestV3.ServerRequestHandler(dispatcher));}});ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));bind.sync().channel().closeFuture().sync();}/*** 模拟consumer端*/@Testpublic void testClient() throws IOException {new Thread(()->{try {startServer();} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println("服务端已启动监听...");//多线程内自增计数器。为了控制台打印AtomicInteger atomicInteger = new AtomicInteger(0);//创建20个纤程Thread[] threads = new Thread[100];//初始化这20个线程for (int i = 0; i < threads.length; i++) {int finalI = i;threads[i] = new Thread(()->{Person man = proxyGet(Person.class);String param = "hello" + atomicInteger.incrementAndGet();String result = man.say(param);System.err.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);});}//启动这20个线程for (Thread thread : threads) {thread.start();}System.in.read();}/*** 代理请求,使用java原生的代理Proxy类* @param interfaceInfo* @param <T>* @return*/private static <T> T proxyGet(Class<T> interfaceInfo) {ClassLoader classLoader = interfaceInfo.getClassLoader();Class<?>[] methodInfo = {interfaceInfo};return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 1、收集调用目标的服务。方法。参数。 封装成一个message。*/String className = interfaceInfo.getName();String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);/*** 2. 生成requestId再加上message.发送请求 本地缓存requestId* 自定义协议:* msg-header:* msg-content:*/byte[] msgBodyByteArray = MyRPCTestV3.objToBytes(msgBody);MsgHeader msgHeader = createHeaderByMsgBody(msgBodyByteArray);byte[] msgHeaderByteArray = MyRPCTestV3.objToBytes(msgHeader);System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader);System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody);/*** 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。*/ClientFactory clientFactory = ClientFactory.getInstance();NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));/*** 4. 真正发送数据, 走I/O*/ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);//使用带返回值的线程,Callable, 处理线程的结果CompletableFuture<String> res = new CompletableFuture<>();CountDownLatch countDownLatch = new CountDownLatch(1);//注册响应事件ServerResponseMappingCallback.addCallback(msgHeader.getRequestId(), res);byteBuf.writeBytes(msgHeaderByteArray);byteBuf.writeBytes(msgBodyByteArray);ChannelFuture future = clientChannel.writeAndFlush(byteBuf);future.sync();//仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据/*** 5. 获得服务端响应,这里需要考虑2个问题* 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!* 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId*/return res.get(); //会阻塞}});}/*** 通过消息体封装消息头* @param msgBodyByteArray* @return*/private static MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {MsgHeader msgHeader = new MsgHeader();msgHeader.setFlag(0x14141414);msgHeader.setDataLen(msgBodyByteArray.length);msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));return msgHeader;}/*** 对象转换为字节流,前提是参数类要实现序列化Serializable,不然会出问题* @param obj* @return* @throws IOException*/private synchronized static byte[] objToBytes(Object obj) throws IOException {ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);objectOut.writeObject(obj);byte[] objByteArray = byteArrayOut.toByteArray();return objByteArray;}/*** 服务端的处理*/@Data@AllArgsConstructor@NoArgsConstructorprivate static class ServerRequestHandler extends ChannelInboundHandlerAdapter{private MyRPCTestV3.Dispatcher dispatcher;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收客户端数据MyRPCTestV3.PackageMsg clientPackageMsg = (MyRPCTestV3.PackageMsg) msg;System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());String iothreadName = Thread.currentThread().getName();/***业务处理,此处有2种方式:* 1.直接在当前线程内处理业务并返回给客户端结果数据* 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。* netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法*/// ctx.executor().execute(()->{ctx.executor().parent().execute(()->{try {//业务处理 使用反射调用目标的方法System.out.println("服务端: 业务处理中...");String className = clientPackageMsg.getBody().getClassName();String methodName = clientPackageMsg.getBody().getMethodName();Object o = dispatcher.get(className);Method method = o.getClass().getMethod(methodName, clientPackageMsg.getBody().getParameterTypes());Object res = method.invoke(o, clientPackageMsg.getBody().getArgs());System.out.println("服务端: 业务处理完毕!");//响应客户端//消息体System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);byte[] serverMsgBodyByteArray = MyRPCTestV3.objToBytes(serverMsgBody);//消息头MsgHeader serverMsgHeader = new MsgHeader();serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());serverMsgHeader.setFlag(0x14141415);byte[] serverMsgHeaderByteArray = MyRPCTestV3.objToBytes(serverMsgHeader);//消息体+消息体写入ByteBufByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);byteBuf.writeBytes(serverMsgHeaderByteArray);byteBuf.writeBytes(serverMsgBodyByteArray);//ByteBuf写出客户端并刷新缓冲区ctx.writeAndFlush(byteBuf);} catch (Exception e) {e.printStackTrace();}});}}/*** 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象** 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,* 不然就留着让netty缓存起来加入下一次读取。*/private static class MsgDecode extends ByteToMessageDecoder{@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {while(inByteBuf.readableBytes() >= MyRPCTestV3.MSG_HEADER_LENGTH) {byte[] bytes = new byte[MyRPCTestV3.MSG_HEADER_LENGTH];inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));MsgHeader header = (MsgHeader) oin.readObject();//DECODE在2个方向都使用//通信的协议if(inByteBuf.readableBytes() >= header.getDataLen()+ MyRPCTestV3.MSG_HEADER_LENGTH){//处理指针inByteBuf.readBytes(MyRPCTestV3.MSG_HEADER_LENGTH); //移动指针到body开始的位置byte[] data = new byte[(int)header.getDataLen()];inByteBuf.readBytes(data);ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));//0x14141414 表示客户端向服务端发送//0x14141415 表示服务端向客户端发送if(header.getFlag() == 0x14141414){MsgBody body = (MsgBody) doin.readObject();out.add(new MyRPCTestV3.PackageMsg(header,body));}else if(header.getFlag() == 0x14141415){MsgBody body = (MsgBody) doin.readObject();out.add(new MyRPCTestV3.PackageMsg(header,body));}}else{break;}}}}@Dataprivate static class ServerResponseMappingCallback{//使用一个集合存储requestId与对应的任务。private static ConcurrentHashMap<Long,CompletableFuture<String>> mapping = new ConcurrentHashMap<>();//添加requestId与任务的映射public static void addCallback(long requestId, CompletableFuture<String> callback){mapping.putIfAbsent(requestId,callback);}//找出requestId与任务的映射,并执行任务,执行完毕后删除public static void runCallBack(MyRPCTestV3.PackageMsg packageMsg) {mapping.get(packageMsg.getHeader().getRequestId()).complete(packageMsg.getBody().getRes().toString());removeCallBack(packageMsg.getHeader().getRequestId());}//删除requestId与任务的映射private static void removeCallBack(long requestId) {mapping.remove(requestId);}}/*** 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池*/private static class ClientFactory{private static final ClientFactory instance = new ClientFactory();private ClientFactory(){ }public static ClientFactory getInstance(){return instance;}ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();/*** 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。* @param address* @return*/public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {ClientPool clientPool = outboxs.get(address);//没有则初始化一个if (clientPool == null) {outboxs.putIfAbsent(address,new ClientPool(10));clientPool = outboxs.get(address);}//从连接池中随机取出一个连接。int i = new Random().nextInt(10);if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){return clientPool.getClients()[i];}synchronized (clientPool.getLocks()[i]){return clientPool.getClients()[i] = create(address);}}/*** 通过地址去创建一个netty的客户端* @param address* @return*/private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(clientWorker);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MsgDecode());pipeline.addLast(new ClientHandler());}});ChannelFuture connect = bootstrap.connect(address);NioSocketChannel client = (NioSocketChannel) connect.sync().channel();return client;}}/*** 客戶端响应服务端的处理器*/private static class ClientHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MyRPCTestV3.PackageMsg packageMsg = (MyRPCTestV3.PackageMsg) msg;//解锁门栓,让客户端程序继续执行ServerResponseMappingCallback.runCallBack(packageMsg);}}/*** 客户端连接池:*/@Dataprivate static class ClientPool{//客户端连接数组private NioSocketChannel[] clients;//伴生锁private Object[] locks;ClientPool(int poolSize){clients = new NioSocketChannel[poolSize];locks = new Object[poolSize];for (int i = 0; i < poolSize; i++) {locks[i] = new Object();}}}/*** 封装成一个包,不要两个包了*/@Data@AllArgsConstructor@NoArgsConstructor@ToStringprivate static class PackageMsg implements Serializable{private MsgHeader header;private MsgBody body;}/*** 消息头:客户端与服务端交互的消息头*/@Data@Accessors(chain = true)@ToStringprivate static class MsgHeader implements Serializable{int flag;long dataLen;long requestId;}/*** 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。*/@Data@Accessors(chain = true)@ToStringprivate static class MsgBody implements Serializable {private String className; //服务端的类名private String methodName; //服务端的方法名private Class[] parameterTypes; //服务端的方法参数类型列表private Object[] args; //服务端的方法参数列表private Object res; //服务端方法的返回值}private static interface Person{String say(String msg);}private static class Man implements MyRPCTestV3.Person{@Overridepublic String say(String msg) {return "I'm a man";}}/*** 这是一个可以并发访问的调度器,里面维护了一个可并发访问的map。* 这个map里面放入了一些以类名为key,以实体对象为value的键值对。* 既然是RPC调用,就要像调用本地方法那样调用远程服务器方法。远程客户端发起对此类对象的调用的时候能从这个集合内找到这个对象。*/private static class Dispatcher{private static ConcurrentHashMap<String,Object> invokeMap = new ConcurrentHashMap<>();public MyRPCTestV3.Dispatcher register(String key, Object obj){invokeMap.put(key,obj);return this;}public Object get(String key){return invokeMap.get(key);}}}
https://ke.qq.com/webcourse/index.html#cid=398381&term_id=100475149&taid=9982697997210669&vid=5285890806209632021
观看至:01:34:07
4. 逻辑清晰版分层分类拆分

4.1 client 客户端
4.1.1 ClientFactory.java
package myrpc.client;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import myrpc.protocol.MsgBody;import myrpc.protocol.MsgHeader;import java.io.IOException;import java.net.InetSocketAddress;import java.util.Random;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ConcurrentHashMap;/*** 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client* 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池*/public class ClientFactory {private int POOL_SIZE = 5;private NioEventLoopGroup clientWorker;private ClientFactory(){}private static final ClientFactory factory = new ClientFactory();//一个consumer 可以连接很多的provider,每一个provider都有自己的pool K,Vprivate ConcurrentHashMap<InetSocketAddress, ClientPool> outboxs = new ConcurrentHashMap<>();public static ClientFactory getFactory(){return factory;}/*** 通过参数MsgBody生成MsgHeader,并将header+body写入socket中* @param body* @return* @throws IOException*/public static CompletableFuture<Object> transport(MsgBody body) throws IOException {//生成headerbyte[] bodyBytesArray = body.toByte();MsgHeader header = body.createHeader();byte[] headerBytesArray = header.toByte();//将header+body写入ByteBufByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(headerBytesArray.length + bodyBytesArray.length);byteBuf.writeBytes(headerBytesArray);byteBuf.writeBytes(bodyBytesArray);//使用Callable+CompletableFutureCompletableFuture<Object> future = new CompletableFuture<>();ServerResponseMappingCallback.addCallback(header.getRequestId(), future);//将ByteBuf写入SocketNioSocketChannel clientChannel = factory.getClient(new InetSocketAddress("localhost", 9090));ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);return future;}/*** 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。* @param address* @return*/public synchronized NioSocketChannel getClient(InetSocketAddress address){//TODO 在并发情况下一定要谨慎ClientPool clientPool = outboxs.get(address);//没有则初始化一个if(clientPool == null){synchronized(outboxs){if(clientPool == null){outboxs.putIfAbsent(address,new ClientPool(POOL_SIZE));clientPool = outboxs.get(address);}}}//从连接池中随机取出一个连接。int i = new Random().nextInt(POOL_SIZE);if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){return clientPool.getClients()[i];}synchronized (clientPool.getLocks()[i]){return clientPool.getClients()[i] = create(address);}}/*** 创建一个Socket* @param address* @return*/private NioSocketChannel create(InetSocketAddress address){//基于 netty 的客户端创建方式clientWorker = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(clientWorker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new ClientMsgDecode());p.addLast(new ClientHandler());}}).connect(address);try {NioSocketChannel client = (NioSocketChannel)connect.sync().channel();return client;} catch (InterruptedException e) {e.printStackTrace();}return null;}}
4.1.2 ClientHandler.java
package myrpc.client;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import myrpc.protocol.PackageMsg;/*** 客戶端响应服务端的处理器*/public class ClientHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {PackageMsg packageMsg = (PackageMsg) msg;//让客户端程序继续执行ServerResponseMappingCallback.runCallBack(packageMsg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.err.println("发生了一个异常,异常原因:"+cause.getMessage());}}
4.1.3 ClientMsgDecode.java
package myrpc.client;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import myrpc.protocol.MsgBody;import myrpc.protocol.MsgHeader;import myrpc.protocol.PackageMsg;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.List;/*** 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象** 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,* 不然就留着让netty缓存起来加入下一次读取。*/public class ClientMsgDecode extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {while(inByteBuf.readableBytes() >= MsgHeader.MSG_HEADER_LENGTH) {byte[] bytes = new byte[MsgHeader.MSG_HEADER_LENGTH];inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));MsgHeader header = (MsgHeader) oin.readObject();//DECODE在2个方向都使用//通信的协议if(inByteBuf.readableBytes() >= header.getDataLen()+ MsgHeader.MSG_HEADER_LENGTH){//处理指针inByteBuf.readBytes(MsgHeader.MSG_HEADER_LENGTH); //移动指针到body开始的位置byte[] data = new byte[(int)header.getDataLen()];inByteBuf.readBytes(data);ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));//0x14141414 表示客户端向服务端发送//0x14141415 表示服务端向客户端发送if(header.getFlag() == 0x14141414){MsgBody body = (MsgBody) doin.readObject();out.add(new PackageMsg(header,body));}else if(header.getFlag() == 0x14141415){MsgBody body = (MsgBody) doin.readObject();out.add(new PackageMsg(header,body));}}else{break;}}}}
4.1.4 ClientPool.java
package myrpc.client;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.Data;/*** 客户端连接池:*/@Datapublic class ClientPool{private NioSocketChannel[] clients; //客户端连接数组private Object[] locks; //伴生锁public ClientPool(int poolSize){clients = new NioSocketChannel[poolSize];locks = new Object[poolSize];for (int i = 0; i < poolSize; i++) {locks[i] = new Object();}}}
4.1.5 ServerResponseMappingCallback.java
package myrpc.client;import myrpc.protocol.PackageMsg;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ConcurrentHashMap;public class ServerResponseMappingCallback {//使用一个集合存储requestId与对应的任务。private static ConcurrentHashMap<Long,CompletableFuture<Object>> mapping = new ConcurrentHashMap<>();//添加requestId与任务的映射public static void addCallback(long requestId, CompletableFuture<Object> callback){mapping.putIfAbsent(requestId,callback);}//找出requestId与任务的映射,并执行任务,执行完毕后删除public static void runCallBack(PackageMsg packageMsg) {CompletableFuture<Object> future = mapping.get(packageMsg.getHeader().getRequestId());future.complete(packageMsg.getBody().getRes().toString());removeCallBack(packageMsg.getHeader().getRequestId());}//删除requestId与任务的映射private static void removeCallBack(long requestId) {mapping.remove(requestId);}}
4.1.6 StartClient.java
package myrpc.client;import myrpc.proxy.MyProxy;import myrpc.service.Person;import org.junit.Test;import java.io.IOException;import java.util.concurrent.atomic.AtomicInteger;/*** 模拟consumer端*/public class StartClient {//模拟comsumer端 && provider@Testpublic void get() throws IOException {//多线程内自增计数器。为了控制台打印AtomicInteger index = new AtomicInteger(0);//创建20个纤程Thread[] threads = new Thread[100];//初始化这20个线程for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{Person person = MyProxy.proxyGet(Person.class);Integer param = index.incrementAndGet();String result = person.getById(param);System.out.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);});}//启动这20个线程for (Thread thread : threads) {thread.start();}System.in.read();}@Testpublic void testRPC() {// Car car = MyProxy.proxyGet(Car.class);// Persion zhangsan = car.oxox("zhangsan", 16);// System.out.println(zhangsan);}@Testpublic void testRpcLocal() {System.out.println("server started......");// Car car = MyProxy.proxyGet(Car.class);// Persion zhangsan = car.oxox("zhangsan", 16);// System.out.println(zhangsan);}}
4.2 server 服务端
4.2.1 Dispatcher.java
package myrpc.server;import java.util.concurrent.ConcurrentHashMap;/*** 这是一个可以并发访问的调度器,里面维护了一个可并发访问的map(invokeMap)。* 这个map里面放入了一些以类名为key,以实体对象为value的键值对。* 既然是RPC调用,就要像调用本地方法那样调用远程服务器方法。远程客户端发起对此类对象的调用的时候能从这个集合内找到这个对象。* 单例模式*/public class Dispatcher {private static Dispatcher dis = new Dispatcher();public static ConcurrentHashMap<String,Object> invokeMap = new ConcurrentHashMap<>();//单例模式下的获取实例的静态方法public static Dispatcher getDis(){return dis;}//单例模式private Dispatcher(){}/*** 注册* @param k* @param obj*/public Dispatcher register(String k,Object obj){invokeMap.put(k,obj);return this;}/*** 获取* @param k* @return*/public Object get(String k){return invokeMap.get(k);}}
4.2.2 ServerMsgDecode.java
package myrpc.server;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import myrpc.protocol.MsgBody;import myrpc.protocol.MsgHeader;import myrpc.protocol.PackageMsg;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.List;/*** 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。* 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,* 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象** 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,* 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小* 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,* 不然就留着让netty缓存起来加入下一次读取。*/public class ServerMsgDecode extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {while(inByteBuf.readableBytes() >= MsgHeader.MSG_HEADER_LENGTH) {byte[] bytes = new byte[MsgHeader.MSG_HEADER_LENGTH];inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));MsgHeader header = (MsgHeader) oin.readObject();//通信的协议if(inByteBuf.readableBytes() >= header.getDataLen()+ MsgHeader.MSG_HEADER_LENGTH){//处理指针inByteBuf.readBytes(MsgHeader.MSG_HEADER_LENGTH); //移动指针到body开始的位置byte[] data = new byte[(int)header.getDataLen()];inByteBuf.readBytes(data);ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));MsgBody body = (MsgBody) doin.readObject();out.add(new PackageMsg(header,body));}else{break;}}}}
4.2.3 ServerRequestHandler.java
package myrpc.server;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import lombok.*;import myrpc.protocol.*;import java.lang.reflect.Method;/*** 服务端的处理*/@Data@AllArgsConstructor@NoArgsConstructorpublic class ServerRequestHandler extends ChannelInboundHandlerAdapter {private Dispatcher dispatcher;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收客户端数据PackageMsg clientPackageMsg = (PackageMsg) msg;System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());/***业务处理,此处有2种方式:* 1.直接在当前线程内处理业务并返回给客户端结果数据* 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。* netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法*/// ctx.executor().execute(()->{ctx.executor().parent().execute(()->{try {//业务处理 使用反射调用目标的方法System.out.println("服务端: 业务处理中...");String className = clientPackageMsg.getBody().getClassName();String methodName = clientPackageMsg.getBody().getMethodName();Class[] parameterTypes = clientPackageMsg.getBody().getParameterTypes();Object[] args = clientPackageMsg.getBody().getArgs();Object object = dispatcher.get(className);Method method = object.getClass().getMethod(methodName, parameterTypes);Object res = method.invoke(object, args);System.out.println("服务端: 业务处理完毕!");//响应客户端//消息体System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);byte[] serverMsgBodyByteArray = serverMsgBody.toByte();//消息头MsgHeader serverMsgHeader = new MsgHeader();serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());serverMsgHeader.setFlag(0x14141415);byte[] serverMsgHeaderByteArray = serverMsgHeader.toByte();//消息体+消息体写入ByteBufByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);byteBuf.writeBytes(serverMsgHeaderByteArray);byteBuf.writeBytes(serverMsgBodyByteArray);//ByteBuf写出客户端并刷新缓冲区ctx.writeAndFlush(byteBuf);} catch (Exception e) {e.printStackTrace();}});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.err.println("发生了一个异常,异常原因:"+cause.getMessage());}}
4.2.4 StartServer.java
package myrpc.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import myrpc.service.*;import org.junit.Test;import java.net.InetSocketAddress;/***服务端*/public class StartServer {@Testpublic void startServer() {//将服务端持有的对象和方法注入到调度器中,调度器会持有服务端的对象方法列表Man man = new Man();Dispatcher dis = Dispatcher.getDis().register(Person.class.getName(), man);NioEventLoopGroup boss = new NioEventLoopGroup(20);NioEventLoopGroup worker = boss;ServerBootstrap sbs = new ServerBootstrap();ChannelFuture bind = sbs.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ServerMsgDecode());pipeline.addLast(new ServerRequestHandler(dis));}}).bind(new InetSocketAddress("localhost", 9090));try {System.out.println("服务端已启动!!");bind.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}}
4.3 protocol 协议
4.3.1 MsgBody.java
package myrpc.protocol;import lombok.Data;import lombok.ToString;import lombok.experimental.Accessors;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectOutputStream;import java.io.Serializable;import java.util.UUID;/*** @author: 马士兵教育* @create: 2020-08-16 20:35*/@Data@Accessors(chain = true)@ToStringpublic class MsgBody implements Serializable {private String className; //被调用方的类名private String methodName; //被调用方的方法名private Class[] parameterTypes; //被调用方的方法参数类型列表private Object[] args; //被调用方的方法参数列表private Object res; //被调用方的方法的返回值/*** 通过消息体封装消息头* @param msgBodyByteArray* @return*/public MsgHeader createHeader() throws IOException {byte[] msgBodyByteArray = this.toByte();MsgHeader msgHeader = new MsgHeader();msgHeader.setFlag(0x14141414);msgHeader.setDataLen(msgBodyByteArray.length);msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));return msgHeader;}/*** 将当前对象转换为字节数组* @return* @throws IOException*/public byte[] toByte() throws IOException{ByteArrayOutputStream out = new ByteArrayOutputStream();ObjectOutputStream oout = new ObjectOutputStream(out);oout.writeObject(this);byte[] bytes = out.toByteArray();return bytes;}}
4.3.2 MsgHeader.java
package myrpc.protocol;import lombok.Data;import lombok.ToString;import lombok.experimental.Accessors;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.ObjectOutputStream;import java.io.Serializable;/*** 消息头:客户端与服务端交互的消息头*/@Data@Accessors(chain = true)@ToStringpublic class MsgHeader implements Serializable{int flag; //标记:0x14141414表示为远程调用, 0x14141415表示为响应long dataLen; //消息体的大小long requestId; //每次调用的唯一标识public static int MSG_HEADER_LENGTH;//初始化MsgHeader的长度static {try {MSG_HEADER_LENGTH = new MsgHeader().toByte().length;} catch (IOException e) {e.printStackTrace();}}/*** 将当前对象转换为字节数组* @return* @throws IOException*/public byte[] toByte() throws IOException{ByteArrayOutputStream out = new ByteArrayOutputStream();ObjectOutputStream oout = new ObjectOutputStream(out);oout.writeObject(this);byte[] bytes = out.toByteArray();return bytes;}}
4.3.3 PackageMsg.java
package myrpc.protocol;import lombok.*;import java.io.Serializable;/*** 封装成一个包,不要两个包了*/@Data@AllArgsConstructor@NoArgsConstructor@ToStringpublic class PackageMsg implements Serializable {private MsgHeader header;private MsgBody body;}
4.4 proxy代理
4.4.1 MyProxy.java
package myrpc.proxy;import myrpc.client.ClientFactory;import myrpc.protocol.*;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.concurrent.CompletableFuture;/*** 代理请求,使用java原生的代理Proxy类*/public class MyProxy {public static <T>T proxyGet(Class<T> interfaceInfo){ClassLoader loader = interfaceInfo.getClassLoader();Class<?>[] methodInfo = {interfaceInfo};return (T) Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//1、收集调用目标的服务。方法。参数。 封装成一个message。String className = interfaceInfo.getName();String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);//使用带返回值的线程 Callable+CompletableFutureCompletableFuture future = ClientFactory.transport(msgBody);return future.get();//阻塞的}});}}
4.5 service模拟服务
4.5.1 Man.java
package myrpc.service;public class Man implements Person{@Overridepublic String getById(Integer id) {return "I'm a man id="+id;}}
4.5.2 Person.java
package myrpc.service;public interface Person {String getById(Integer id);}
