FC: function call (寻址调用)
RPC: remote process call (socket)
SC: system call 系统调用(软中断)
IPC: 管道。信号量,socket

https://ke.qq.com/webcourse/index.html#cid=398381&term_id=100475149&taid=10131175016633389&vid=5285890806541162145
观看至: 00:48:00

1. 基于netty的RPC框架的基本实现通讯

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.PooledByteBufAllocator;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import lombok.Data;
  10. import lombok.ToString;
  11. import lombok.experimental.Accessors;
  12. import org.junit.Test;
  13. import java.io.*;
  14. import java.lang.reflect.InvocationHandler;
  15. import java.lang.reflect.Method;
  16. import java.lang.reflect.Proxy;
  17. import java.net.InetSocketAddress;
  18. import java.util.Random;
  19. import java.util.UUID;
  20. import java.util.concurrent.ConcurrentHashMap;
  21. import java.util.concurrent.CountDownLatch;
  22. public class MyRPCTestV1 {
  23. public void startServer() throws InterruptedException {
  24. NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);
  25. ServerBootstrap serverBootstrap = new ServerBootstrap();
  26. serverBootstrap.group(eventExecutors,eventExecutors);
  27. serverBootstrap.channel(NioServerSocketChannel.class);
  28. serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
  29. @Override
  30. protected void initChannel(NioSocketChannel ch) throws Exception {
  31. System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
  32. ChannelPipeline pipeline = ch.pipeline();
  33. pipeline.addLast(new ServerRequestHandler());
  34. }
  35. });
  36. ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));
  37. bind.sync().channel().closeFuture().sync();
  38. }
  39. /**
  40. * 模拟consumer端
  41. */
  42. @Test
  43. public void testClient() throws IOException {
  44. new Thread(()->{
  45. try {
  46. startServer();
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }).start();
  51. System.out.println("服务端已启动监听...");
  52. //创建20个纤程
  53. Thread[] threads = new Thread[20];
  54. //初始化这20个线程
  55. for (int i = 0; i < threads.length; i++) {
  56. int finalI = i;
  57. threads[i] = new Thread(()->{
  58. ProductService product = proxyGet(ProductService.class);
  59. product.say("hello"+ finalI);
  60. });
  61. }
  62. //启动这20个线程
  63. for (Thread thread : threads) {
  64. thread.start();
  65. }
  66. System.in.read();
  67. }
  68. /**
  69. * 代理请求,使用java原生的代理Proxy类
  70. * @param interfaceInfo
  71. * @param <T>
  72. * @return
  73. */
  74. private static <T> T proxyGet(Class<T> interfaceInfo) {
  75. ClassLoader classLoader = interfaceInfo.getClassLoader();
  76. Class<?>[] methodInfo = {interfaceInfo};
  77. return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
  78. @Override
  79. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  80. /**
  81. * 1、收集调用目标的服务。方法。参数。 封装成一个message。
  82. */
  83. String className = interfaceInfo.getName();
  84. String methodName = method.getName();
  85. Class<?>[] parameterTypes = method.getParameterTypes();
  86. MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
  87. /**
  88. * 2. 生成requestId再加上message.发送请求 本地缓存requestId
  89. * 自定义协议:
  90. * msg-header:
  91. * msg-content:
  92. */
  93. ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
  94. ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);
  95. objectOut.writeObject(msgBody);
  96. byte[] msgBodyByteArray = byteArrayOut.toByteArray();
  97. MsgHeader msgHeader = this.createHeaderByMsgBody(msgBodyByteArray);
  98. byteArrayOut.reset();
  99. objectOut = new ObjectOutputStream(byteArrayOut);
  100. objectOut.writeObject(msgHeader);
  101. byte[] msgHeaderByteArray = byteArrayOut.toByteArray();
  102. System.out.printf("客户端:“发送至服务端的消息头大小为:%d”\n", msgHeaderByteArray.length);
  103. System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader.toString());
  104. System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody.toString());
  105. /**
  106. * 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。
  107. */
  108. ClientFactory clientFactory = ClientFactory.getInstance();
  109. NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));
  110. /**
  111. * 4. 真正发送数据, 走I/O
  112. */
  113. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);
  114. CountDownLatch countDownLatch = new CountDownLatch(1);
  115. //注册响应事件
  116. ResponseSyncHandler.addCallback(msgHeader.getRequestId(), new Runnable() {
  117. @Override
  118. public void run() {
  119. countDownLatch.countDown();
  120. }
  121. });
  122. byteBuf.writeBytes(msgHeaderByteArray);
  123. byteBuf.writeBytes(msgBodyByteArray);
  124. ChannelFuture future = clientChannel.writeAndFlush(byteBuf);
  125. future.sync();
  126. //仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据
  127. //需要程序就此卡主,一直等待服务端有响应返回。
  128. countDownLatch.await();
  129. /**
  130. * 5. 获得服务端响应,这里需要考虑2个问题
  131. * 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!
  132. * 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId
  133. */
  134. return null;
  135. }
  136. /**
  137. * 通过消息体封装消息头
  138. * @param msgBodyByteArray
  139. * @return
  140. */
  141. private MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {
  142. MsgHeader msgHeader = new MsgHeader();
  143. msgHeader.setFlag(0x14141414);
  144. msgHeader.setDataLen(msgBodyByteArray.length);
  145. msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
  146. return msgHeader;
  147. }
  148. });
  149. }
  150. /**
  151. * 服务端的处理
  152. */
  153. private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{
  154. @Override
  155. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  156. ByteBuf byteBuf = (ByteBuf) msg;
  157. ByteBuf copyByteBuf = byteBuf.copy();
  158. //获取消息头
  159. //125怎么来的?是消息头转换为字节数组的大小,看控制台打印。
  160. if (byteBuf.readableBytes() >= 125){
  161. byte[] bytes = new byte[125];
  162. byteBuf.readBytes(bytes);
  163. ByteArrayInputStream in = new ByteArrayInputStream(bytes);
  164. ObjectInputStream oin = new ObjectInputStream(in);
  165. MsgHeader msgHeader = (MsgHeader) oin.readObject();
  166. System.out.printf("服务端:“收到客户端消息头:%s”\n",msgHeader.toString());
  167. //看看数据流内还有其他的数据,可能时消息体,获取消息体
  168. if (byteBuf.readableBytes() >= msgHeader.getDataLen()){
  169. byte[] data = new byte[(int)msgHeader.getDataLen()];
  170. byteBuf.readBytes(data);
  171. ByteArrayInputStream din = new ByteArrayInputStream(data);
  172. ObjectInputStream doin = new ObjectInputStream(din);
  173. MsgBody body = (MsgBody) doin.readObject();
  174. System.out.printf("服务端:“收到客户端消息体:%s”\n",body.toString());
  175. }
  176. System.out.println("服务端:“正在处理....”");
  177. // Thread.sleep(1000);
  178. System.out.println("服务端:“处理完毕!”");
  179. //原封不动的把消息响应给客户端
  180. ChannelFuture future = ctx.writeAndFlush(copyByteBuf);
  181. System.out.printf("服务端:“发送给客户端消息头:%s”\n", msgHeader.toString());
  182. ctx.close();
  183. future.sync();
  184. }
  185. }
  186. }
  187. @Data
  188. private static class ResponseSyncHandler{
  189. //使用一个集合存储requestId与对应的任务。
  190. private static ConcurrentHashMap<Long,Runnable> mapping = new ConcurrentHashMap<>();
  191. //添加requestId与任务的映射
  192. public static void addCallback(long requestId, Runnable callback){
  193. mapping.putIfAbsent(requestId,callback);
  194. }
  195. //找出requestId与任务的映射,并执行任务,执行完毕后删除
  196. public static void runCallBack(long requestId) {
  197. mapping.get(requestId).run();
  198. removeCallBack(requestId);
  199. }
  200. //删除requestId与任务的映射
  201. private static void removeCallBack(long requestId) {
  202. mapping.remove(requestId);
  203. }
  204. }
  205. /**
  206. * 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
  207. * 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
  208. */
  209. private static class ClientFactory{
  210. private static final ClientFactory instance = new ClientFactory();
  211. private ClientFactory(){ }
  212. public static ClientFactory getInstance(){
  213. return instance;
  214. }
  215. ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
  216. /**
  217. * 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
  218. * @param address
  219. * @return
  220. */
  221. public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {
  222. ClientPool clientPool = outboxs.get(address);
  223. //没有则初始化一个
  224. if (clientPool == null) {
  225. outboxs.putIfAbsent(address,new ClientPool(10));
  226. clientPool = outboxs.get(address);
  227. }
  228. //从连接池中随机取出一个连接。
  229. int i = new Random().nextInt(10);
  230. if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
  231. return clientPool.getClients()[i];
  232. }
  233. synchronized (clientPool.getLocks()[i]){
  234. return clientPool.getClients()[i] = create(address);
  235. }
  236. }
  237. /**
  238. * 通过地址去创建一个netty的客户端
  239. * @param address
  240. * @return
  241. */
  242. private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {
  243. NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);
  244. Bootstrap bootstrap = new Bootstrap();
  245. bootstrap.group(clientWorker);
  246. bootstrap.channel(NioSocketChannel.class);
  247. bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
  248. @Override
  249. protected void initChannel(NioSocketChannel ch) throws Exception {
  250. ChannelPipeline pipeline = ch.pipeline();
  251. pipeline.addLast(new ClientHandler());
  252. }
  253. });
  254. ChannelFuture connect = bootstrap.connect(address);
  255. NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
  256. return client;
  257. }
  258. }
  259. /**
  260. * 客戶端响应服务端的处理器
  261. */
  262. private static class ClientHandler extends ChannelInboundHandlerAdapter{
  263. @Override
  264. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  265. ByteBuf byteBuf = (ByteBuf) msg;
  266. //获取消息头
  267. //125怎么来的?是消息头转换为字节数组的大小,看控制台打印。
  268. if (byteBuf.readableBytes() >= 125){
  269. byte[] bytes = new byte[125];
  270. byteBuf.readBytes(bytes);
  271. ByteArrayInputStream in = new ByteArrayInputStream(bytes);
  272. ObjectInputStream oin = new ObjectInputStream(in);
  273. MsgHeader msgHeader = (MsgHeader) oin.readObject();
  274. System.out.printf("客户端:“收到服务端消息头:%s”\n",msgHeader.toString());
  275. //countDownLatch继续执行
  276. ResponseSyncHandler.runCallBack(msgHeader.getRequestId());
  277. //看看数据流内还有其他的数据,可能时消息体,获取消息体
  278. if (byteBuf.readableBytes() >= msgHeader.getDataLen()){
  279. byte[] data = new byte[(int)msgHeader.getDataLen()];
  280. byteBuf.readBytes(data);
  281. ByteArrayInputStream din = new ByteArrayInputStream(data);
  282. ObjectInputStream doin = new ObjectInputStream(din);
  283. MsgBody body = (MsgBody) doin.readObject();
  284. System.out.printf("客户端:“收到服务端消息体:%s”\n",body);
  285. }
  286. }
  287. }
  288. }
  289. /**
  290. * 客户端连接池:
  291. */
  292. @Data
  293. private static class ClientPool{
  294. //客户端连接数组
  295. private NioSocketChannel[] clients;
  296. //伴生锁
  297. private Object[] locks;
  298. ClientPool(int poolSize){
  299. clients = new NioSocketChannel[poolSize];
  300. locks = new Object[poolSize];
  301. for (int i = 0; i < poolSize; i++) {
  302. locks[i] = new Object();
  303. }
  304. }
  305. }
  306. /**
  307. * 消息头:客户端与服务端交互的消息头
  308. */
  309. @Data
  310. @Accessors(chain = true)
  311. @ToString
  312. private static class MsgHeader implements Serializable{
  313. int flag;
  314. long dataLen;
  315. long requestId;
  316. }
  317. /**
  318. * 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。
  319. */
  320. @Data
  321. @Accessors(chain = true)
  322. @ToString
  323. private static class MsgBody implements Serializable {
  324. private String className; //目标类名
  325. private String methodName; //目标方法名
  326. private Class[] parameterTypes; //目标方法参数类型列表
  327. private Object[] args; //目标方法参数列表
  328. }
  329. /**
  330. * 调用的远端方法。
  331. */
  332. interface ProductService{
  333. void say(String msg);
  334. }
  335. }

控制台输出打印:

服务端已启动监听… 客户端:“发送至服务端的消息头大小为:113” 客户端:“发送至服务端的消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“发送至服务端的消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])” 服务端:“收到一个客户端连接,端口号:54623” 服务端:“收到客户端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 服务端:“收到客户端消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])” 服务端:“正在处理….” 服务端:“处理完毕!” 服务端:“发送给客户端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“收到服务端消息头:MsgHeader(flag=336860180, dataLen=357, requestId=6236150191981682042)” 客户端:“收到服务端消息体:MsgBody(className=com.bjmashibing.system.io.mynetty.ProductService, methodName=say, parameterTypes=[class java.lang.String], args=[hello])”

Process finished with exit code 0

上面的示例虽然基本实现了RPC的基本思路,但是有一些致命的缺陷,比如在多线程并发的情况下会出现,例如我们把**testClient**方法的代码改成这样。

  1. /**
  2. * 模拟consumer端
  3. */
  4. @Test
  5. public void testClient() throws IOException {
  6. new Thread(()->{
  7. try {
  8. startServer();
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }).start();
  13. System.out.println("服务端已启动监听...");
  14. //创建20个线程
  15. Thread[] threads = new Thread[100];
  16. //初始化这20个线程
  17. for (int i = 0; i < threads.length; i++) {
  18. threads[i] = new Thread(()->{
  19. ProductService product = proxyGet(ProductService.class);
  20. product.say("hell o");
  21. });
  22. }
  23. //启动这20个线程
  24. for (Thread thread : threads) {
  25. thread.start();
  26. }
  27. System.in.read();
  28. }

总结一下,4.1的代码有这些问题:

  1. 服务端的ByteBuf可能包含多个header+body,但是我们只取了一个,所以造成了有一些数据丢了。
  2. 服务端的ByteBuf包含的数据不是完整的header+body对,可能包含的是header+body+header,也可能只是个header,也可能是个body。总结来说:netty不能保证每次的数据流的完整性。

image.png

image.png

image.png

image.png

通过以上的问题发现,netty还具有拆包粘包的问题。

解决方式是:我们可以维护一个足够长的ByteBuf当作缓存,把最后的不完整的header+body放入准备好的缓存中,下次再有ByteBuf进来和缓存中的一部分header+body拼接,就是一个完整的header+body。这也是netty的解决方式,所幸的是,netty已经有内置方式解决此问题,不需要我们实现。

2. MyRPCTestV2 - Decode解决拆包粘包

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.PooledByteBufAllocator;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.ByteToMessageDecoder;
  10. import lombok.AllArgsConstructor;
  11. import lombok.Data;
  12. import lombok.NoArgsConstructor;
  13. import lombok.ToString;
  14. import lombok.experimental.Accessors;
  15. import org.junit.Test;
  16. import java.io.*;
  17. import java.lang.reflect.InvocationHandler;
  18. import java.lang.reflect.Method;
  19. import java.lang.reflect.Proxy;
  20. import java.net.InetSocketAddress;
  21. import java.util.List;
  22. import java.util.Random;
  23. import java.util.UUID;
  24. import java.util.concurrent.CompletableFuture;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. import java.util.concurrent.CountDownLatch;
  27. import java.util.concurrent.atomic.AtomicInteger;
  28. public class MyRPCTestV2 {
  29. //维护一个全局变量 消息头的大小,这样不用写死125
  30. public static int MSG_HEADER_LENGTH;
  31. static {
  32. try {
  33. MSG_HEADER_LENGTH = objToBytes(new MsgHeader()).length;
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. /**
  39. * 启动server端
  40. * @throws InterruptedException
  41. */
  42. public void startServer() throws InterruptedException {
  43. NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10);
  44. ServerBootstrap serverBootstrap = new ServerBootstrap();
  45. serverBootstrap.group(eventExecutors,eventExecutors);
  46. serverBootstrap.channel(NioServerSocketChannel.class);
  47. serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
  48. @Override
  49. protected void initChannel(NioSocketChannel ch) throws Exception {
  50. System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
  51. ChannelPipeline pipeline = ch.pipeline();
  52. pipeline.addLast(new MyRPCTestV2.MsgDecode());
  53. pipeline.addLast(new MyRPCTestV2.ServerRequestHandler());
  54. }
  55. });
  56. ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));
  57. bind.sync().channel().closeFuture().sync();
  58. }
  59. /**
  60. * 模拟consumer端
  61. */
  62. @Test
  63. public void testClient() throws IOException {
  64. new Thread(()->{
  65. try {
  66. startServer();
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. }).start();
  71. System.out.println("服务端已启动监听...");
  72. //多线程内自增计数器。为了控制台打印
  73. AtomicInteger atomicInteger = new AtomicInteger(0);
  74. //创建20个纤程
  75. Thread[] threads = new Thread[100];
  76. //初始化这20个线程
  77. for (int i = 0; i < threads.length; i++) {
  78. int finalI = i;
  79. threads[i] = new Thread(()->{
  80. ProductService product = proxyGet(ProductService.class);
  81. String param = "hello" + atomicInteger.incrementAndGet();
  82. String result = product.say(param);
  83. System.err.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);
  84. });
  85. }
  86. //启动这20个线程
  87. for (Thread thread : threads) {
  88. thread.start();
  89. }
  90. System.in.read();
  91. }
  92. /**
  93. * 代理请求,使用java原生的代理Proxy类
  94. * @param interfaceInfo
  95. * @param <T>
  96. * @return
  97. */
  98. private static <T> T proxyGet(Class<T> interfaceInfo) {
  99. ClassLoader classLoader = interfaceInfo.getClassLoader();
  100. Class<?>[] methodInfo = {interfaceInfo};
  101. return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
  102. @Override
  103. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  104. /**
  105. * 1、收集调用目标的服务。方法。参数。 封装成一个message。
  106. */
  107. String className = interfaceInfo.getName();
  108. String methodName = method.getName();
  109. Class<?>[] parameterTypes = method.getParameterTypes();
  110. MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
  111. /**
  112. * 2. 生成requestId再加上message.发送请求 本地缓存requestId
  113. * 自定义协议:
  114. * msg-header:
  115. * msg-content:
  116. */
  117. byte[] msgBodyByteArray = MyRPCTestV2.objToBytes(msgBody);
  118. MsgHeader msgHeader = createHeaderByMsgBody(msgBodyByteArray);
  119. byte[] msgHeaderByteArray = MyRPCTestV2.objToBytes(msgHeader);
  120. System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader);
  121. System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody);
  122. /**
  123. * 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。
  124. */
  125. ClientFactory clientFactory = ClientFactory.getInstance();
  126. NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));
  127. /**
  128. * 4. 真正发送数据, 走I/O
  129. */
  130. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);
  131. //使用带返回值的线程,Callable, 处理线程的结果
  132. CompletableFuture<String> res = new CompletableFuture<>();
  133. CountDownLatch countDownLatch = new CountDownLatch(1);
  134. //注册响应事件
  135. ServerResponseMappingCallback.addCallback(msgHeader.getRequestId(), res);
  136. byteBuf.writeBytes(msgHeaderByteArray);
  137. byteBuf.writeBytes(msgBodyByteArray);
  138. ChannelFuture future = clientChannel.writeAndFlush(byteBuf);
  139. future.sync();
  140. //仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据
  141. /**
  142. * 5. 获得服务端响应,这里需要考虑2个问题
  143. * 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!
  144. * 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId
  145. */
  146. return res.get(); //会阻塞
  147. }
  148. });
  149. }
  150. /**
  151. * 通过消息体封装消息头
  152. * @param msgBodyByteArray
  153. * @return
  154. */
  155. private static MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {
  156. MsgHeader msgHeader = new MsgHeader();
  157. msgHeader.setFlag(0x14141414);
  158. msgHeader.setDataLen(msgBodyByteArray.length);
  159. msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
  160. return msgHeader;
  161. }
  162. /**
  163. * 对象转换为字节流,前提是参数类要实现序列化Serializable,不然会出问题
  164. * @param obj
  165. * @return
  166. * @throws IOException
  167. */
  168. private synchronized static byte[] objToBytes(Object obj) throws IOException {
  169. ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
  170. ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);
  171. objectOut.writeObject(obj);
  172. byte[] objByteArray = byteArrayOut.toByteArray();
  173. return objByteArray;
  174. }
  175. /**
  176. * 服务端的处理
  177. */
  178. private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{
  179. @Override
  180. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  181. //接收客户端数据
  182. MyRPCTestV2.PackageMsg clientPackageMsg = (MyRPCTestV2.PackageMsg) msg;
  183. System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());
  184. String iothreadName = Thread.currentThread().getName();
  185. /**
  186. *业务处理,此处有2种方式:
  187. * 1.直接在当前线程内处理业务并返回给客户端结果数据
  188. * 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。
  189. * netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法
  190. */
  191. // ctx.executor().execute(()->{
  192. ctx.executor().parent().execute(()->{
  193. try {
  194. //业务处理
  195. System.out.println("服务端: 业务处理中...");
  196. Thread.sleep(500L);
  197. System.out.println("服务端: 业务处理完毕!");
  198. //响应客户端
  199. //消息体
  200. String execThreadName = Thread.currentThread().getName();
  201. String res = "服务端IO线程是:"+iothreadName+", 服务端业务线程是:"+execThreadName+", 业务处理结果是: "+clientPackageMsg.getBody().getArgs()[0];
  202. System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);
  203. MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);
  204. byte[] serverMsgBodyByteArray = MyRPCTestV2.objToBytes(serverMsgBody);
  205. //消息头
  206. MsgHeader serverMsgHeader = new MsgHeader();
  207. serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);
  208. serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());
  209. serverMsgHeader.setFlag(0x14141415);
  210. byte[] serverMsgHeaderByteArray = MyRPCTestV2.objToBytes(serverMsgHeader);
  211. //封装成整体消息
  212. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);
  213. byteBuf.writeBytes(serverMsgHeaderByteArray);
  214. byteBuf.writeBytes(serverMsgBodyByteArray);
  215. //写出客户端并刷新缓冲区
  216. ctx.writeAndFlush(byteBuf);
  217. } catch (Exception e) {
  218. e.printStackTrace();
  219. }
  220. });
  221. }
  222. }
  223. /**
  224. * 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
  225. * 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
  226. * 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
  227. *
  228. * 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
  229. * 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
  230. * 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
  231. * 不然就留着让netty缓存起来加入下一次读取。
  232. */
  233. private static class MsgDecode extends ByteToMessageDecoder{
  234. @Override
  235. protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
  236. while(inByteBuf.readableBytes() >= MyRPCTestV2.MSG_HEADER_LENGTH) {
  237. byte[] bytes = new byte[MyRPCTestV2.MSG_HEADER_LENGTH];
  238. inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
  239. ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
  240. MsgHeader header = (MsgHeader) oin.readObject();
  241. //DECODE在2个方向都使用
  242. //通信的协议
  243. if(inByteBuf.readableBytes() >= header.getDataLen()+MyRPCTestV2.MSG_HEADER_LENGTH){
  244. //处理指针
  245. inByteBuf.readBytes(MyRPCTestV2.MSG_HEADER_LENGTH); //移动指针到body开始的位置
  246. byte[] data = new byte[(int)header.getDataLen()];
  247. inByteBuf.readBytes(data);
  248. ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
  249. //0x14141414 表示客户端向服务端发送
  250. //0x14141415 表示服务端向客户端发送
  251. if(header.getFlag() == 0x14141414){
  252. MsgBody body = (MsgBody) doin.readObject();
  253. out.add(new MyRPCTestV2.PackageMsg(header,body));
  254. }else if(header.getFlag() == 0x14141415){
  255. MsgBody body = (MsgBody) doin.readObject();
  256. out.add(new MyRPCTestV2.PackageMsg(header,body));
  257. }
  258. }else{
  259. break;
  260. }
  261. }
  262. }
  263. }
  264. @Data
  265. private static class ServerResponseMappingCallback{
  266. //使用一个集合存储requestId与对应的任务。
  267. private static ConcurrentHashMap<Long,CompletableFuture<String>> mapping = new ConcurrentHashMap<>();
  268. //添加requestId与任务的映射
  269. public static void addCallback(long requestId, CompletableFuture<String> callback){
  270. mapping.putIfAbsent(requestId,callback);
  271. }
  272. //找出requestId与任务的映射,并执行任务,执行完毕后删除
  273. public static void runCallBack(MyRPCTestV2.PackageMsg packageMsg) {
  274. mapping.get(packageMsg.getHeader().getRequestId()).complete(packageMsg.getBody().getRes().toString());
  275. removeCallBack(packageMsg.getHeader().getRequestId());
  276. }
  277. //删除requestId与任务的映射
  278. private static void removeCallBack(long requestId) {
  279. mapping.remove(requestId);
  280. }
  281. }
  282. /**
  283. * 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
  284. * 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
  285. */
  286. private static class ClientFactory{
  287. private static final ClientFactory instance = new ClientFactory();
  288. private ClientFactory(){ }
  289. public static ClientFactory getInstance(){
  290. return instance;
  291. }
  292. ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
  293. /**
  294. * 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
  295. * @param address
  296. * @return
  297. */
  298. public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {
  299. ClientPool clientPool = outboxs.get(address);
  300. //没有则初始化一个
  301. if (clientPool == null) {
  302. outboxs.putIfAbsent(address,new ClientPool(10));
  303. clientPool = outboxs.get(address);
  304. }
  305. //从连接池中随机取出一个连接。
  306. int i = new Random().nextInt(10);
  307. if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
  308. return clientPool.getClients()[i];
  309. }
  310. synchronized (clientPool.getLocks()[i]){
  311. return clientPool.getClients()[i] = create(address);
  312. }
  313. }
  314. /**
  315. * 通过地址去创建一个netty的客户端
  316. * @param address
  317. * @return
  318. */
  319. private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {
  320. NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);
  321. Bootstrap bootstrap = new Bootstrap();
  322. bootstrap.group(clientWorker);
  323. bootstrap.channel(NioSocketChannel.class);
  324. bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
  325. @Override
  326. protected void initChannel(NioSocketChannel ch) throws Exception {
  327. ChannelPipeline pipeline = ch.pipeline();
  328. pipeline.addLast(new MsgDecode());
  329. pipeline.addLast(new ClientHandler());
  330. }
  331. });
  332. ChannelFuture connect = bootstrap.connect(address);
  333. NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
  334. return client;
  335. }
  336. }
  337. /**
  338. * 客戶端响应服务端的处理器
  339. */
  340. private static class ClientHandler extends ChannelInboundHandlerAdapter{
  341. @Override
  342. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  343. MyRPCTestV2.PackageMsg packageMsg = (MyRPCTestV2.PackageMsg) msg;
  344. //解锁门栓,让客户端程序继续执行
  345. ServerResponseMappingCallback.runCallBack(packageMsg);
  346. }
  347. }
  348. /**
  349. * 客户端连接池:
  350. */
  351. @Data
  352. private static class ClientPool{
  353. //客户端连接数组
  354. private NioSocketChannel[] clients;
  355. //伴生锁
  356. private Object[] locks;
  357. ClientPool(int poolSize){
  358. clients = new NioSocketChannel[poolSize];
  359. locks = new Object[poolSize];
  360. for (int i = 0; i < poolSize; i++) {
  361. locks[i] = new Object();
  362. }
  363. }
  364. }
  365. /**
  366. * 封装成一个包,不要两个包了
  367. */
  368. @Data
  369. @AllArgsConstructor
  370. @NoArgsConstructor
  371. @ToString
  372. private static class PackageMsg implements Serializable{
  373. private MsgHeader header;
  374. private MsgBody body;
  375. }
  376. /**
  377. * 消息头:客户端与服务端交互的消息头
  378. */
  379. @Data
  380. @Accessors(chain = true)
  381. @ToString
  382. private static class MsgHeader implements Serializable{
  383. int flag;
  384. long dataLen;
  385. long requestId;
  386. }
  387. /**
  388. * 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。
  389. */
  390. @Data
  391. @Accessors(chain = true)
  392. @ToString
  393. private static class MsgBody implements Serializable {
  394. private String className; //目标类名
  395. private String methodName; //目标方法名
  396. private Class[] parameterTypes; //目标方法参数类型列表
  397. private Object[] args; //目标方法参数列表
  398. private Object res; //方法的返回值
  399. }
  400. /**
  401. * 调用的远端方法。
  402. */
  403. interface ProductService{
  404. String say(String msg);
  405. }
  406. }

这次的优化还是很大的,具体优化如下:

  1. 使用一个解码器解**MsgDecode**决了拆包粘包的问题,保证服务端与客户端每次从ByteBuf读取到的数据是完整的一对header+body,这是本次最大的改动。
  2. 使用一个全局静态变量MSG_HEADER_LENGTH来维护一个全局变量, 这样就不用每次都写死125了。
  3. 新增了一个静态方法objToBytes(),把对象转为字节数组,不需要重复造轮子,前提是需要实现Serializable
  4. 新增了一个类 PackageMsg 来包装 MsgHeaderMsgBody
  5. CountDownLatch换成了CompletableFuture,并在客户端新增了返回数据的读取+打印。

3. MyRPCTestV3 - 进一步优化

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.PooledByteBufAllocator;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.ByteToMessageDecoder;
  10. import lombok.AllArgsConstructor;
  11. import lombok.Data;
  12. import lombok.NoArgsConstructor;
  13. import lombok.ToString;
  14. import lombok.experimental.Accessors;
  15. import org.junit.Test;
  16. import java.io.*;
  17. import java.lang.reflect.InvocationHandler;
  18. import java.lang.reflect.Method;
  19. import java.lang.reflect.Proxy;
  20. import java.net.InetSocketAddress;
  21. import java.util.List;
  22. import java.util.Random;
  23. import java.util.UUID;
  24. import java.util.concurrent.CompletableFuture;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. import java.util.concurrent.CountDownLatch;
  27. import java.util.concurrent.atomic.AtomicInteger;
  28. public class MyRPCTestV3 {
  29. //维护一个全局变量 消息头的大小,这样不用写死125
  30. public static int MSG_HEADER_LENGTH;
  31. static {
  32. try {
  33. MSG_HEADER_LENGTH = objToBytes(new MsgHeader()).length;
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. /**
  39. * 启动server端
  40. * @throws InterruptedException
  41. */
  42. public void startServer() throws InterruptedException {
  43. Man man = new Man();
  44. Dispatcher dispatcher = new Dispatcher().register(MyRPCTestV3.Person.class.getName(),man);
  45. NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10);
  46. ServerBootstrap serverBootstrap = new ServerBootstrap();
  47. serverBootstrap.group(eventExecutors,eventExecutors);
  48. serverBootstrap.channel(NioServerSocketChannel.class);
  49. serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
  50. @Override
  51. protected void initChannel(NioSocketChannel ch) throws Exception {
  52. System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
  53. ChannelPipeline pipeline = ch.pipeline();
  54. pipeline.addLast(new MyRPCTestV3.MsgDecode());
  55. pipeline.addLast(new MyRPCTestV3.ServerRequestHandler(dispatcher));
  56. }
  57. });
  58. ChannelFuture bind = serverBootstrap.bind(new InetSocketAddress("localhost", 9090));
  59. bind.sync().channel().closeFuture().sync();
  60. }
  61. /**
  62. * 模拟consumer端
  63. */
  64. @Test
  65. public void testClient() throws IOException {
  66. new Thread(()->{
  67. try {
  68. startServer();
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. }).start();
  73. System.out.println("服务端已启动监听...");
  74. //多线程内自增计数器。为了控制台打印
  75. AtomicInteger atomicInteger = new AtomicInteger(0);
  76. //创建20个纤程
  77. Thread[] threads = new Thread[100];
  78. //初始化这20个线程
  79. for (int i = 0; i < threads.length; i++) {
  80. int finalI = i;
  81. threads[i] = new Thread(()->{
  82. Person man = proxyGet(Person.class);
  83. String param = "hello" + atomicInteger.incrementAndGet();
  84. String result = man.say(param);
  85. System.err.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);
  86. });
  87. }
  88. //启动这20个线程
  89. for (Thread thread : threads) {
  90. thread.start();
  91. }
  92. System.in.read();
  93. }
  94. /**
  95. * 代理请求,使用java原生的代理Proxy类
  96. * @param interfaceInfo
  97. * @param <T>
  98. * @return
  99. */
  100. private static <T> T proxyGet(Class<T> interfaceInfo) {
  101. ClassLoader classLoader = interfaceInfo.getClassLoader();
  102. Class<?>[] methodInfo = {interfaceInfo};
  103. return (T)Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
  104. @Override
  105. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  106. /**
  107. * 1、收集调用目标的服务。方法。参数。 封装成一个message。
  108. */
  109. String className = interfaceInfo.getName();
  110. String methodName = method.getName();
  111. Class<?>[] parameterTypes = method.getParameterTypes();
  112. MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
  113. /**
  114. * 2. 生成requestId再加上message.发送请求 本地缓存requestId
  115. * 自定义协议:
  116. * msg-header:
  117. * msg-content:
  118. */
  119. byte[] msgBodyByteArray = MyRPCTestV3.objToBytes(msgBody);
  120. MsgHeader msgHeader = createHeaderByMsgBody(msgBodyByteArray);
  121. byte[] msgHeaderByteArray = MyRPCTestV3.objToBytes(msgHeader);
  122. System.out.printf("客户端:“发送至服务端的消息头:%s”\n", msgHeader);
  123. System.out.printf("客户端:“发送至服务端的消息体:%s”\n", msgBody);
  124. /**
  125. * 3. 需要维护一个连接池,从线程池中取得一个与目标地址的客户端连接。
  126. */
  127. ClientFactory clientFactory = ClientFactory.getInstance();
  128. NioSocketChannel clientChannel = clientFactory.getClient(new InetSocketAddress("localhost",9090));
  129. /**
  130. * 4. 真正发送数据, 走I/O
  131. */
  132. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeaderByteArray.length + msgBodyByteArray.length);
  133. //使用带返回值的线程,Callable, 处理线程的结果
  134. CompletableFuture<String> res = new CompletableFuture<>();
  135. CountDownLatch countDownLatch = new CountDownLatch(1);
  136. //注册响应事件
  137. ServerResponseMappingCallback.addCallback(msgHeader.getRequestId(), res);
  138. byteBuf.writeBytes(msgHeaderByteArray);
  139. byteBuf.writeBytes(msgBodyByteArray);
  140. ChannelFuture future = clientChannel.writeAndFlush(byteBuf);
  141. future.sync();
  142. //仅仅是把数据写出去了。需要一个服务端接收数据并返回响应数据。所以客户端需要一个handler处理服务端返回的数据
  143. /**
  144. * 5. 获得服务端响应,这里需要考虑2个问题
  145. * 问题1:消息发出后如何服务端的响应是异步的,如何等待消息返回?Thread.sleep()吗?使用CountDownLatch!
  146. * 问题2:即使服务端有消息返回,如何判断响应数据就是本次请求发出的?使用requestId
  147. */
  148. return res.get(); //会阻塞
  149. }
  150. });
  151. }
  152. /**
  153. * 通过消息体封装消息头
  154. * @param msgBodyByteArray
  155. * @return
  156. */
  157. private static MsgHeader createHeaderByMsgBody(byte[] msgBodyByteArray) {
  158. MsgHeader msgHeader = new MsgHeader();
  159. msgHeader.setFlag(0x14141414);
  160. msgHeader.setDataLen(msgBodyByteArray.length);
  161. msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
  162. return msgHeader;
  163. }
  164. /**
  165. * 对象转换为字节流,前提是参数类要实现序列化Serializable,不然会出问题
  166. * @param obj
  167. * @return
  168. * @throws IOException
  169. */
  170. private synchronized static byte[] objToBytes(Object obj) throws IOException {
  171. ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
  172. ObjectOutputStream objectOut = new ObjectOutputStream(byteArrayOut);
  173. objectOut.writeObject(obj);
  174. byte[] objByteArray = byteArrayOut.toByteArray();
  175. return objByteArray;
  176. }
  177. /**
  178. * 服务端的处理
  179. */
  180. @Data
  181. @AllArgsConstructor
  182. @NoArgsConstructor
  183. private static class ServerRequestHandler extends ChannelInboundHandlerAdapter{
  184. private MyRPCTestV3.Dispatcher dispatcher;
  185. @Override
  186. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  187. //接收客户端数据
  188. MyRPCTestV3.PackageMsg clientPackageMsg = (MyRPCTestV3.PackageMsg) msg;
  189. System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());
  190. String iothreadName = Thread.currentThread().getName();
  191. /**
  192. *业务处理,此处有2种方式:
  193. * 1.直接在当前线程内处理业务并返回给客户端结果数据
  194. * 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。
  195. * netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法
  196. */
  197. // ctx.executor().execute(()->{
  198. ctx.executor().parent().execute(()->{
  199. try {
  200. //业务处理 使用反射调用目标的方法
  201. System.out.println("服务端: 业务处理中...");
  202. String className = clientPackageMsg.getBody().getClassName();
  203. String methodName = clientPackageMsg.getBody().getMethodName();
  204. Object o = dispatcher.get(className);
  205. Method method = o.getClass().getMethod(methodName, clientPackageMsg.getBody().getParameterTypes());
  206. Object res = method.invoke(o, clientPackageMsg.getBody().getArgs());
  207. System.out.println("服务端: 业务处理完毕!");
  208. //响应客户端
  209. //消息体
  210. System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);
  211. MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);
  212. byte[] serverMsgBodyByteArray = MyRPCTestV3.objToBytes(serverMsgBody);
  213. //消息头
  214. MsgHeader serverMsgHeader = new MsgHeader();
  215. serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);
  216. serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());
  217. serverMsgHeader.setFlag(0x14141415);
  218. byte[] serverMsgHeaderByteArray = MyRPCTestV3.objToBytes(serverMsgHeader);
  219. //消息体+消息体写入ByteBuf
  220. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);
  221. byteBuf.writeBytes(serverMsgHeaderByteArray);
  222. byteBuf.writeBytes(serverMsgBodyByteArray);
  223. //ByteBuf写出客户端并刷新缓冲区
  224. ctx.writeAndFlush(byteBuf);
  225. } catch (Exception e) {
  226. e.printStackTrace();
  227. }
  228. });
  229. }
  230. }
  231. /**
  232. * 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
  233. * 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
  234. * 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
  235. *
  236. * 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
  237. * 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
  238. * 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
  239. * 不然就留着让netty缓存起来加入下一次读取。
  240. */
  241. private static class MsgDecode extends ByteToMessageDecoder{
  242. @Override
  243. protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
  244. while(inByteBuf.readableBytes() >= MyRPCTestV3.MSG_HEADER_LENGTH) {
  245. byte[] bytes = new byte[MyRPCTestV3.MSG_HEADER_LENGTH];
  246. inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
  247. ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
  248. MsgHeader header = (MsgHeader) oin.readObject();
  249. //DECODE在2个方向都使用
  250. //通信的协议
  251. if(inByteBuf.readableBytes() >= header.getDataLen()+ MyRPCTestV3.MSG_HEADER_LENGTH){
  252. //处理指针
  253. inByteBuf.readBytes(MyRPCTestV3.MSG_HEADER_LENGTH); //移动指针到body开始的位置
  254. byte[] data = new byte[(int)header.getDataLen()];
  255. inByteBuf.readBytes(data);
  256. ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
  257. //0x14141414 表示客户端向服务端发送
  258. //0x14141415 表示服务端向客户端发送
  259. if(header.getFlag() == 0x14141414){
  260. MsgBody body = (MsgBody) doin.readObject();
  261. out.add(new MyRPCTestV3.PackageMsg(header,body));
  262. }else if(header.getFlag() == 0x14141415){
  263. MsgBody body = (MsgBody) doin.readObject();
  264. out.add(new MyRPCTestV3.PackageMsg(header,body));
  265. }
  266. }else{
  267. break;
  268. }
  269. }
  270. }
  271. }
  272. @Data
  273. private static class ServerResponseMappingCallback{
  274. //使用一个集合存储requestId与对应的任务。
  275. private static ConcurrentHashMap<Long,CompletableFuture<String>> mapping = new ConcurrentHashMap<>();
  276. //添加requestId与任务的映射
  277. public static void addCallback(long requestId, CompletableFuture<String> callback){
  278. mapping.putIfAbsent(requestId,callback);
  279. }
  280. //找出requestId与任务的映射,并执行任务,执行完毕后删除
  281. public static void runCallBack(MyRPCTestV3.PackageMsg packageMsg) {
  282. mapping.get(packageMsg.getHeader().getRequestId()).complete(packageMsg.getBody().getRes().toString());
  283. removeCallBack(packageMsg.getHeader().getRequestId());
  284. }
  285. //删除requestId与任务的映射
  286. private static void removeCallBack(long requestId) {
  287. mapping.remove(requestId);
  288. }
  289. }
  290. /**
  291. * 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
  292. * 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
  293. */
  294. private static class ClientFactory{
  295. private static final ClientFactory instance = new ClientFactory();
  296. private ClientFactory(){ }
  297. public static ClientFactory getInstance(){
  298. return instance;
  299. }
  300. ConcurrentHashMap<InetSocketAddress,ClientPool> outboxs = new ConcurrentHashMap<>();
  301. /**
  302. * 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
  303. * @param address
  304. * @return
  305. */
  306. public synchronized NioSocketChannel getClient(InetSocketAddress address) throws InterruptedException {
  307. ClientPool clientPool = outboxs.get(address);
  308. //没有则初始化一个
  309. if (clientPool == null) {
  310. outboxs.putIfAbsent(address,new ClientPool(10));
  311. clientPool = outboxs.get(address);
  312. }
  313. //从连接池中随机取出一个连接。
  314. int i = new Random().nextInt(10);
  315. if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
  316. return clientPool.getClients()[i];
  317. }
  318. synchronized (clientPool.getLocks()[i]){
  319. return clientPool.getClients()[i] = create(address);
  320. }
  321. }
  322. /**
  323. * 通过地址去创建一个netty的客户端
  324. * @param address
  325. * @return
  326. */
  327. private NioSocketChannel create(InetSocketAddress address) throws InterruptedException {
  328. NioEventLoopGroup clientWorker = new NioEventLoopGroup(1);
  329. Bootstrap bootstrap = new Bootstrap();
  330. bootstrap.group(clientWorker);
  331. bootstrap.channel(NioSocketChannel.class);
  332. bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
  333. @Override
  334. protected void initChannel(NioSocketChannel ch) throws Exception {
  335. ChannelPipeline pipeline = ch.pipeline();
  336. pipeline.addLast(new MsgDecode());
  337. pipeline.addLast(new ClientHandler());
  338. }
  339. });
  340. ChannelFuture connect = bootstrap.connect(address);
  341. NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
  342. return client;
  343. }
  344. }
  345. /**
  346. * 客戶端响应服务端的处理器
  347. */
  348. private static class ClientHandler extends ChannelInboundHandlerAdapter{
  349. @Override
  350. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  351. MyRPCTestV3.PackageMsg packageMsg = (MyRPCTestV3.PackageMsg) msg;
  352. //解锁门栓,让客户端程序继续执行
  353. ServerResponseMappingCallback.runCallBack(packageMsg);
  354. }
  355. }
  356. /**
  357. * 客户端连接池:
  358. */
  359. @Data
  360. private static class ClientPool{
  361. //客户端连接数组
  362. private NioSocketChannel[] clients;
  363. //伴生锁
  364. private Object[] locks;
  365. ClientPool(int poolSize){
  366. clients = new NioSocketChannel[poolSize];
  367. locks = new Object[poolSize];
  368. for (int i = 0; i < poolSize; i++) {
  369. locks[i] = new Object();
  370. }
  371. }
  372. }
  373. /**
  374. * 封装成一个包,不要两个包了
  375. */
  376. @Data
  377. @AllArgsConstructor
  378. @NoArgsConstructor
  379. @ToString
  380. private static class PackageMsg implements Serializable{
  381. private MsgHeader header;
  382. private MsgBody body;
  383. }
  384. /**
  385. * 消息头:客户端与服务端交互的消息头
  386. */
  387. @Data
  388. @Accessors(chain = true)
  389. @ToString
  390. private static class MsgHeader implements Serializable{
  391. int flag;
  392. long dataLen;
  393. long requestId;
  394. }
  395. /**
  396. * 消息体:客户端与服务端交互的消息体,里面封装的是目标方法的内容。
  397. */
  398. @Data
  399. @Accessors(chain = true)
  400. @ToString
  401. private static class MsgBody implements Serializable {
  402. private String className; //服务端的类名
  403. private String methodName; //服务端的方法名
  404. private Class[] parameterTypes; //服务端的方法参数类型列表
  405. private Object[] args; //服务端的方法参数列表
  406. private Object res; //服务端方法的返回值
  407. }
  408. private static interface Person{
  409. String say(String msg);
  410. }
  411. private static class Man implements MyRPCTestV3.Person{
  412. @Override
  413. public String say(String msg) {
  414. return "I'm a man";
  415. }
  416. }
  417. /**
  418. * 这是一个可以并发访问的调度器,里面维护了一个可并发访问的map。
  419. * 这个map里面放入了一些以类名为key,以实体对象为value的键值对。
  420. * 既然是RPC调用,就要像调用本地方法那样调用远程服务器方法。远程客户端发起对此类对象的调用的时候能从这个集合内找到这个对象。
  421. */
  422. private static class Dispatcher{
  423. private static ConcurrentHashMap<String,Object> invokeMap = new ConcurrentHashMap<>();
  424. public MyRPCTestV3.Dispatcher register(String key, Object obj){
  425. invokeMap.put(key,obj);
  426. return this;
  427. }
  428. public Object get(String key){
  429. return invokeMap.get(key);
  430. }
  431. }
  432. }

https://ke.qq.com/webcourse/index.html#cid=398381&term_id=100475149&taid=9982697997210669&vid=5285890806209632021
观看至:01:34:07

4. 逻辑清晰版分层分类拆分

image.png

4.1 client 客户端

4.1.1 ClientFactory.java

  1. package myrpc.client;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.PooledByteBufAllocator;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.ChannelPipeline;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import myrpc.protocol.MsgBody;
  11. import myrpc.protocol.MsgHeader;
  12. import java.io.IOException;
  13. import java.net.InetSocketAddress;
  14. import java.util.Random;
  15. import java.util.concurrent.CompletableFuture;
  16. import java.util.concurrent.ConcurrentHashMap;
  17. /**
  18. * 客户端工厂: 单例模式,从连接池中取出连接。 构造目标地址,生成client
  19. * 一个客户端可以连接多个服务端,每个服务端都会维护一个连接池
  20. */
  21. public class ClientFactory {
  22. private int POOL_SIZE = 5;
  23. private NioEventLoopGroup clientWorker;
  24. private ClientFactory(){}
  25. private static final ClientFactory factory = new ClientFactory();
  26. //一个consumer 可以连接很多的provider,每一个provider都有自己的pool K,V
  27. private ConcurrentHashMap<InetSocketAddress, ClientPool> outboxs = new ConcurrentHashMap<>();
  28. public static ClientFactory getFactory(){
  29. return factory;
  30. }
  31. /**
  32. * 通过参数MsgBody生成MsgHeader,并将header+body写入socket中
  33. * @param body
  34. * @return
  35. * @throws IOException
  36. */
  37. public static CompletableFuture<Object> transport(MsgBody body) throws IOException {
  38. //生成header
  39. byte[] bodyBytesArray = body.toByte();
  40. MsgHeader header = body.createHeader();
  41. byte[] headerBytesArray = header.toByte();
  42. //将header+body写入ByteBuf
  43. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(headerBytesArray.length + bodyBytesArray.length);
  44. byteBuf.writeBytes(headerBytesArray);
  45. byteBuf.writeBytes(bodyBytesArray);
  46. //使用Callable+CompletableFuture
  47. CompletableFuture<Object> future = new CompletableFuture<>();
  48. ServerResponseMappingCallback.addCallback(header.getRequestId(), future);
  49. //将ByteBuf写入Socket
  50. NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress("localhost", 9090));
  51. ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
  52. return future;
  53. }
  54. /**
  55. * 从连接池中取出一个连接,么有则创建一个,取出的方式为随机获取。
  56. * @param address
  57. * @return
  58. */
  59. public synchronized NioSocketChannel getClient(InetSocketAddress address){
  60. //TODO 在并发情况下一定要谨慎
  61. ClientPool clientPool = outboxs.get(address);
  62. //没有则初始化一个
  63. if(clientPool == null){
  64. synchronized(outboxs){
  65. if(clientPool == null){
  66. outboxs.putIfAbsent(address,new ClientPool(POOL_SIZE));
  67. clientPool = outboxs.get(address);
  68. }
  69. }
  70. }
  71. //从连接池中随机取出一个连接。
  72. int i = new Random().nextInt(POOL_SIZE);
  73. if (clientPool.getClients()[i] != null && clientPool.getClients()[i].isActive()){
  74. return clientPool.getClients()[i];
  75. }
  76. synchronized (clientPool.getLocks()[i]){
  77. return clientPool.getClients()[i] = create(address);
  78. }
  79. }
  80. /**
  81. * 创建一个Socket
  82. * @param address
  83. * @return
  84. */
  85. private NioSocketChannel create(InetSocketAddress address){
  86. //基于 netty 的客户端创建方式
  87. clientWorker = new NioEventLoopGroup(1);
  88. Bootstrap bs = new Bootstrap();
  89. ChannelFuture connect = bs.group(clientWorker)
  90. .channel(NioSocketChannel.class)
  91. .handler(new ChannelInitializer<NioSocketChannel>() {
  92. @Override
  93. protected void initChannel(NioSocketChannel ch) throws Exception {
  94. ChannelPipeline p = ch.pipeline();
  95. p.addLast(new ClientMsgDecode());
  96. p.addLast(new ClientHandler());
  97. }
  98. }).connect(address);
  99. try {
  100. NioSocketChannel client = (NioSocketChannel)connect.sync().channel();
  101. return client;
  102. } catch (InterruptedException e) {
  103. e.printStackTrace();
  104. }
  105. return null;
  106. }
  107. }

4.1.2 ClientHandler.java

  1. package myrpc.client;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import myrpc.protocol.PackageMsg;
  5. /**
  6. * 客戶端响应服务端的处理器
  7. */
  8. public class ClientHandler extends ChannelInboundHandlerAdapter{
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  11. PackageMsg packageMsg = (PackageMsg) msg;
  12. //让客户端程序继续执行
  13. ServerResponseMappingCallback.runCallBack(packageMsg);
  14. }
  15. @Override
  16. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  17. System.err.println("发生了一个异常,异常原因:"+cause.getMessage());
  18. }
  19. }

4.1.3 ClientMsgDecode.java

  1. package myrpc.client;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ByteToMessageDecoder;
  5. import myrpc.protocol.MsgBody;
  6. import myrpc.protocol.MsgHeader;
  7. import myrpc.protocol.PackageMsg;
  8. import java.io.ByteArrayInputStream;
  9. import java.io.ObjectInputStream;
  10. import java.util.List;
  11. /**
  12. * 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
  13. * 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
  14. * 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
  15. *
  16. * 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
  17. * 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
  18. * 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
  19. * 不然就留着让netty缓存起来加入下一次读取。
  20. */
  21. public class ClientMsgDecode extends ByteToMessageDecoder {
  22. @Override
  23. protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
  24. while(inByteBuf.readableBytes() >= MsgHeader.MSG_HEADER_LENGTH) {
  25. byte[] bytes = new byte[MsgHeader.MSG_HEADER_LENGTH];
  26. inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
  27. ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
  28. MsgHeader header = (MsgHeader) oin.readObject();
  29. //DECODE在2个方向都使用
  30. //通信的协议
  31. if(inByteBuf.readableBytes() >= header.getDataLen()+ MsgHeader.MSG_HEADER_LENGTH){
  32. //处理指针
  33. inByteBuf.readBytes(MsgHeader.MSG_HEADER_LENGTH); //移动指针到body开始的位置
  34. byte[] data = new byte[(int)header.getDataLen()];
  35. inByteBuf.readBytes(data);
  36. ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
  37. //0x14141414 表示客户端向服务端发送
  38. //0x14141415 表示服务端向客户端发送
  39. if(header.getFlag() == 0x14141414){
  40. MsgBody body = (MsgBody) doin.readObject();
  41. out.add(new PackageMsg(header,body));
  42. }else if(header.getFlag() == 0x14141415){
  43. MsgBody body = (MsgBody) doin.readObject();
  44. out.add(new PackageMsg(header,body));
  45. }
  46. }else{
  47. break;
  48. }
  49. }
  50. }
  51. }

4.1.4 ClientPool.java

  1. package myrpc.client;
  2. import io.netty.channel.socket.nio.NioSocketChannel;
  3. import lombok.Data;
  4. /**
  5. * 客户端连接池:
  6. */
  7. @Data
  8. public class ClientPool{
  9. private NioSocketChannel[] clients; //客户端连接数组
  10. private Object[] locks; //伴生锁
  11. public ClientPool(int poolSize){
  12. clients = new NioSocketChannel[poolSize];
  13. locks = new Object[poolSize];
  14. for (int i = 0; i < poolSize; i++) {
  15. locks[i] = new Object();
  16. }
  17. }
  18. }

4.1.5 ServerResponseMappingCallback.java

  1. package myrpc.client;
  2. import myrpc.protocol.PackageMsg;
  3. import java.util.concurrent.CompletableFuture;
  4. import java.util.concurrent.ConcurrentHashMap;
  5. public class ServerResponseMappingCallback {
  6. //使用一个集合存储requestId与对应的任务。
  7. private static ConcurrentHashMap<Long,CompletableFuture<Object>> mapping = new ConcurrentHashMap<>();
  8. //添加requestId与任务的映射
  9. public static void addCallback(long requestId, CompletableFuture<Object> callback){
  10. mapping.putIfAbsent(requestId,callback);
  11. }
  12. //找出requestId与任务的映射,并执行任务,执行完毕后删除
  13. public static void runCallBack(PackageMsg packageMsg) {
  14. CompletableFuture<Object> future = mapping.get(packageMsg.getHeader().getRequestId());
  15. future.complete(packageMsg.getBody().getRes().toString());
  16. removeCallBack(packageMsg.getHeader().getRequestId());
  17. }
  18. //删除requestId与任务的映射
  19. private static void removeCallBack(long requestId) {
  20. mapping.remove(requestId);
  21. }
  22. }

4.1.6 StartClient.java

  1. package myrpc.client;
  2. import myrpc.proxy.MyProxy;
  3. import myrpc.service.Person;
  4. import org.junit.Test;
  5. import java.io.IOException;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. /**
  8. * 模拟consumer端
  9. */
  10. public class StartClient {
  11. //模拟comsumer端 && provider
  12. @Test
  13. public void get() throws IOException {
  14. //多线程内自增计数器。为了控制台打印
  15. AtomicInteger index = new AtomicInteger(0);
  16. //创建20个纤程
  17. Thread[] threads = new Thread[100];
  18. //初始化这20个线程
  19. for (int i = 0; i < threads.length; i++) {
  20. threads[i] = new Thread(()->{
  21. Person person = MyProxy.proxyGet(Person.class);
  22. Integer param = index.incrementAndGet();
  23. String result = person.getById(param);
  24. System.out.printf("客户端:'服务端响应结果: %s', 客户端参数:%s\n", result,param);
  25. });
  26. }
  27. //启动这20个线程
  28. for (Thread thread : threads) {
  29. thread.start();
  30. }
  31. System.in.read();
  32. }
  33. @Test
  34. public void testRPC() {
  35. // Car car = MyProxy.proxyGet(Car.class);
  36. // Persion zhangsan = car.oxox("zhangsan", 16);
  37. // System.out.println(zhangsan);
  38. }
  39. @Test
  40. public void testRpcLocal() {
  41. System.out.println("server started......");
  42. // Car car = MyProxy.proxyGet(Car.class);
  43. // Persion zhangsan = car.oxox("zhangsan", 16);
  44. // System.out.println(zhangsan);
  45. }
  46. }

4.2 server 服务端

4.2.1 Dispatcher.java

  1. package myrpc.server;
  2. import java.util.concurrent.ConcurrentHashMap;
  3. /**
  4. * 这是一个可以并发访问的调度器,里面维护了一个可并发访问的map(invokeMap)。
  5. * 这个map里面放入了一些以类名为key,以实体对象为value的键值对。
  6. * 既然是RPC调用,就要像调用本地方法那样调用远程服务器方法。远程客户端发起对此类对象的调用的时候能从这个集合内找到这个对象。
  7. * 单例模式
  8. */
  9. public class Dispatcher {
  10. private static Dispatcher dis = new Dispatcher();
  11. public static ConcurrentHashMap<String,Object> invokeMap = new ConcurrentHashMap<>();
  12. //单例模式下的获取实例的静态方法
  13. public static Dispatcher getDis(){
  14. return dis;
  15. }
  16. //单例模式
  17. private Dispatcher(){
  18. }
  19. /**
  20. * 注册
  21. * @param k
  22. * @param obj
  23. */
  24. public Dispatcher register(String k,Object obj){
  25. invokeMap.put(k,obj);
  26. return this;
  27. }
  28. /**
  29. * 获取
  30. * @param k
  31. * @return
  32. */
  33. public Object get(String k){
  34. return invokeMap.get(k);
  35. }
  36. }

4.2.2 ServerMsgDecode.java

  1. package myrpc.server;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ByteToMessageDecoder;
  5. import myrpc.protocol.MsgBody;
  6. import myrpc.protocol.MsgHeader;
  7. import myrpc.protocol.PackageMsg;
  8. import java.io.ByteArrayInputStream;
  9. import java.io.ObjectInputStream;
  10. import java.util.List;
  11. /**
  12. * 消息的解码器, 解码器是在一系列处理器中的一环, 负责对数据的统一处理,将处理后的数据流入下一环。
  13. * 这个解码器把,msgHeader 和 msgBody 合并为一个他们俩的包装对象MyRPCTestV2.PackageMsg,然后传递到下一环处理,
  14. * 下一环接收到的数据已经不是header或者body了。而是一个MyRPCTestV2.PackageMsg对象
  15. *
  16. * 首先ByteBuf的可读大小必须大于MyRPCTestV2.MSG_HEADER_LENGTH才能继续一个循环,
  17. * 在一个循环内先使用ByteBuf.getBytes()方法获得msgHeader,这样读取指针不会偏移,然后获得msgHeader内部的变量dataLen,这是msgBody的大小
  18. * 如果MyRPCTestV2.MSG_HEADER_LENGTH+dataLen大于ByteBuf的可读大小才能真正的偏移指针读取一对header+body,
  19. * 不然就留着让netty缓存起来加入下一次读取。
  20. */
  21. public class ServerMsgDecode extends ByteToMessageDecoder {
  22. @Override
  23. protected void decode(ChannelHandlerContext ctx, ByteBuf inByteBuf, List<Object> out) throws Exception {
  24. while(inByteBuf.readableBytes() >= MsgHeader.MSG_HEADER_LENGTH) {
  25. byte[] bytes = new byte[MsgHeader.MSG_HEADER_LENGTH];
  26. inByteBuf.getBytes(inByteBuf.readerIndex(),bytes); //从哪里读取,读多少,但是readindex不变
  27. ObjectInputStream oin = new ObjectInputStream(new ByteArrayInputStream(bytes));
  28. MsgHeader header = (MsgHeader) oin.readObject();
  29. //通信的协议
  30. if(inByteBuf.readableBytes() >= header.getDataLen()+ MsgHeader.MSG_HEADER_LENGTH){
  31. //处理指针
  32. inByteBuf.readBytes(MsgHeader.MSG_HEADER_LENGTH); //移动指针到body开始的位置
  33. byte[] data = new byte[(int)header.getDataLen()];
  34. inByteBuf.readBytes(data);
  35. ObjectInputStream doin = new ObjectInputStream(new ByteArrayInputStream(data));
  36. MsgBody body = (MsgBody) doin.readObject();
  37. out.add(new PackageMsg(header,body));
  38. }else{
  39. break;
  40. }
  41. }
  42. }
  43. }

4.2.3 ServerRequestHandler.java

  1. package myrpc.server;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.PooledByteBufAllocator;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import lombok.*;
  7. import myrpc.protocol.*;
  8. import java.lang.reflect.Method;
  9. /**
  10. * 服务端的处理
  11. */
  12. @Data
  13. @AllArgsConstructor
  14. @NoArgsConstructor
  15. public class ServerRequestHandler extends ChannelInboundHandlerAdapter {
  16. private Dispatcher dispatcher;
  17. @Override
  18. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  19. //接收客户端数据
  20. PackageMsg clientPackageMsg = (PackageMsg) msg;
  21. System.out.printf("服务端:“接收到客户端发来的消息: %s”\n", clientPackageMsg.toString());
  22. /**
  23. *业务处理,此处有2种方式:
  24. * 1.直接在当前线程内处理业务并返回给客户端结果数据
  25. * 2.使用一个线程池作为业务处理线程池,把数据丢给线程池处理。当前线程只管接收数据。
  26. * netty已经帮我们准备好了第二种方式。 使用ctx.executor().execute(Runnable command)方法
  27. */
  28. // ctx.executor().execute(()->{
  29. ctx.executor().parent().execute(()->{
  30. try {
  31. //业务处理 使用反射调用目标的方法
  32. System.out.println("服务端: 业务处理中...");
  33. String className = clientPackageMsg.getBody().getClassName();
  34. String methodName = clientPackageMsg.getBody().getMethodName();
  35. Class[] parameterTypes = clientPackageMsg.getBody().getParameterTypes();
  36. Object[] args = clientPackageMsg.getBody().getArgs();
  37. Object object = dispatcher.get(className);
  38. Method method = object.getClass().getMethod(methodName, parameterTypes);
  39. Object res = method.invoke(object, args);
  40. System.out.println("服务端: 业务处理完毕!");
  41. //响应客户端
  42. //消息体
  43. System.out.printf("服务端:“响应客户端的结果为: %s”\n", res);
  44. MsgBody serverMsgBody = clientPackageMsg.getBody().setRes(res);
  45. byte[] serverMsgBodyByteArray = serverMsgBody.toByte();
  46. //消息头
  47. MsgHeader serverMsgHeader = new MsgHeader();
  48. serverMsgHeader.setDataLen(serverMsgBodyByteArray.length);
  49. serverMsgHeader.setRequestId(clientPackageMsg.getHeader().getRequestId());
  50. serverMsgHeader.setFlag(0x14141415);
  51. byte[] serverMsgHeaderByteArray = serverMsgHeader.toByte();
  52. //消息体+消息体写入ByteBuf
  53. ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(serverMsgBodyByteArray.length+serverMsgHeaderByteArray.length);
  54. byteBuf.writeBytes(serverMsgHeaderByteArray);
  55. byteBuf.writeBytes(serverMsgBodyByteArray);
  56. //ByteBuf写出客户端并刷新缓冲区
  57. ctx.writeAndFlush(byteBuf);
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. }
  61. });
  62. }
  63. @Override
  64. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  65. System.err.println("发生了一个异常,异常原因:"+cause.getMessage());
  66. }
  67. }

4.2.4 StartServer.java

  1. package myrpc.server;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import myrpc.service.*;
  10. import org.junit.Test;
  11. import java.net.InetSocketAddress;
  12. /**
  13. *服务端
  14. */
  15. public class StartServer {
  16. @Test
  17. public void startServer() {
  18. //将服务端持有的对象和方法注入到调度器中,调度器会持有服务端的对象方法列表
  19. Man man = new Man();
  20. Dispatcher dis = Dispatcher.getDis().register(Person.class.getName(), man);
  21. NioEventLoopGroup boss = new NioEventLoopGroup(20);
  22. NioEventLoopGroup worker = boss;
  23. ServerBootstrap sbs = new ServerBootstrap();
  24. ChannelFuture bind = sbs.group(boss, worker)
  25. .channel(NioServerSocketChannel.class)
  26. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  27. @Override
  28. protected void initChannel(NioSocketChannel ch) throws Exception {
  29. System.out.printf("服务端:“收到一个客户端连接,端口号:%d”\n",ch.remoteAddress().getPort());
  30. ChannelPipeline pipeline = ch.pipeline();
  31. pipeline.addLast(new ServerMsgDecode());
  32. pipeline.addLast(new ServerRequestHandler(dis));
  33. }
  34. }).bind(new InetSocketAddress("localhost", 9090));
  35. try {
  36. System.out.println("服务端已启动!!");
  37. bind.sync().channel().closeFuture().sync();
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }

4.3 protocol 协议

4.3.1 MsgBody.java

  1. package myrpc.protocol;
  2. import lombok.Data;
  3. import lombok.ToString;
  4. import lombok.experimental.Accessors;
  5. import java.io.ByteArrayOutputStream;
  6. import java.io.IOException;
  7. import java.io.ObjectOutputStream;
  8. import java.io.Serializable;
  9. import java.util.UUID;
  10. /**
  11. * @author: 马士兵教育
  12. * @create: 2020-08-16 20:35
  13. */
  14. @Data
  15. @Accessors(chain = true)
  16. @ToString
  17. public class MsgBody implements Serializable {
  18. private String className; //被调用方的类名
  19. private String methodName; //被调用方的方法名
  20. private Class[] parameterTypes; //被调用方的方法参数类型列表
  21. private Object[] args; //被调用方的方法参数列表
  22. private Object res; //被调用方的方法的返回值
  23. /**
  24. * 通过消息体封装消息头
  25. * @param msgBodyByteArray
  26. * @return
  27. */
  28. public MsgHeader createHeader() throws IOException {
  29. byte[] msgBodyByteArray = this.toByte();
  30. MsgHeader msgHeader = new MsgHeader();
  31. msgHeader.setFlag(0x14141414);
  32. msgHeader.setDataLen(msgBodyByteArray.length);
  33. msgHeader.setRequestId(Math.abs(UUID.randomUUID().getLeastSignificantBits()));
  34. return msgHeader;
  35. }
  36. /**
  37. * 将当前对象转换为字节数组
  38. * @return
  39. * @throws IOException
  40. */
  41. public byte[] toByte() throws IOException{
  42. ByteArrayOutputStream out = new ByteArrayOutputStream();
  43. ObjectOutputStream oout = new ObjectOutputStream(out);
  44. oout.writeObject(this);
  45. byte[] bytes = out.toByteArray();
  46. return bytes;
  47. }
  48. }

4.3.2 MsgHeader.java

  1. package myrpc.protocol;
  2. import lombok.Data;
  3. import lombok.ToString;
  4. import lombok.experimental.Accessors;
  5. import java.io.ByteArrayOutputStream;
  6. import java.io.IOException;
  7. import java.io.ObjectOutputStream;
  8. import java.io.Serializable;
  9. /**
  10. * 消息头:客户端与服务端交互的消息头
  11. */
  12. @Data
  13. @Accessors(chain = true)
  14. @ToString
  15. public class MsgHeader implements Serializable{
  16. int flag; //标记:0x14141414表示为远程调用, 0x14141415表示为响应
  17. long dataLen; //消息体的大小
  18. long requestId; //每次调用的唯一标识
  19. public static int MSG_HEADER_LENGTH;
  20. //初始化MsgHeader的长度
  21. static {
  22. try {
  23. MSG_HEADER_LENGTH = new MsgHeader().toByte().length;
  24. } catch (IOException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. /**
  29. * 将当前对象转换为字节数组
  30. * @return
  31. * @throws IOException
  32. */
  33. public byte[] toByte() throws IOException{
  34. ByteArrayOutputStream out = new ByteArrayOutputStream();
  35. ObjectOutputStream oout = new ObjectOutputStream(out);
  36. oout.writeObject(this);
  37. byte[] bytes = out.toByteArray();
  38. return bytes;
  39. }
  40. }

4.3.3 PackageMsg.java

  1. package myrpc.protocol;
  2. import lombok.*;
  3. import java.io.Serializable;
  4. /**
  5. * 封装成一个包,不要两个包了
  6. */
  7. @Data
  8. @AllArgsConstructor
  9. @NoArgsConstructor
  10. @ToString
  11. public class PackageMsg implements Serializable {
  12. private MsgHeader header;
  13. private MsgBody body;
  14. }

4.4 proxy代理

4.4.1 MyProxy.java

  1. package myrpc.proxy;
  2. import myrpc.client.ClientFactory;
  3. import myrpc.protocol.*;
  4. import java.lang.reflect.InvocationHandler;
  5. import java.lang.reflect.Method;
  6. import java.lang.reflect.Proxy;
  7. import java.util.concurrent.CompletableFuture;
  8. /**
  9. * 代理请求,使用java原生的代理Proxy类
  10. */
  11. public class MyProxy {
  12. public static <T>T proxyGet(Class<T> interfaceInfo){
  13. ClassLoader loader = interfaceInfo.getClassLoader();
  14. Class<?>[] methodInfo = {interfaceInfo};
  15. return (T) Proxy.newProxyInstance(loader, methodInfo, new InvocationHandler() {
  16. @Override
  17. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  18. //1、收集调用目标的服务。方法。参数。 封装成一个message。
  19. String className = interfaceInfo.getName();
  20. String methodName = method.getName();
  21. Class<?>[] parameterTypes = method.getParameterTypes();
  22. MsgBody msgBody = new MsgBody().setClassName(className).setMethodName(methodName).setParameterTypes(parameterTypes).setArgs(args);
  23. //使用带返回值的线程 Callable+CompletableFuture
  24. CompletableFuture future = ClientFactory.transport(msgBody);
  25. return future.get();//阻塞的
  26. }
  27. });
  28. }
  29. }

4.5 service模拟服务

4.5.1 Man.java

  1. package myrpc.service;
  2. public class Man implements Person{
  3. @Override
  4. public String getById(Integer id) {
  5. return "I'm a man id="+id;
  6. }
  7. }

4.5.2 Person.java

  1. package myrpc.service;
  2. public interface Person {
  3. String getById(Integer id);
  4. }