WebSocket长连接

简介

  1. WebSocket HTML5 中的协议,是构建在 HTTP 协议之上的一个网络通信协议,其以长连接的方式实现了客户端与服务端的全双工通信。HTTP/1.1 版本协议中具有 keep-alive 属性,实现的是半双工通信。

握手原理

Netty高级应用实战篇(上) - 图1

  • Clinet端生产一个随机的Key,保存到浏览器和请求头中。
  • Clinet发送请求给Server端。
  • Server从请求头属性中判断出这事一个ws(WebSocket)请求,读到请求头中的key属性。
  • Server对key加密,放到响应头accept属性中。
  • Server将响应回应给Clinet端。
  • Clinet端从头中判断是一个ws(WebSocket)响应,读取头中的accpet属性。
  • Clinet端进行解密后,将结果保存在浏览中的key进行比较,如果匹配,则连接。否则不连接。

场景需求分析

  1. 在页面上有两个左右并排的文本域,它们的中间有一个“发送”按钮。在左侧文本域中输入文本内容后,单击发送按钮,会显示到右侧文本域中。

客户端页面定义

  1. src/main 下定义一个目录 webapp。在其中定义 html 页面。
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>index</title>
  6. </head>
  7. <script type="text/javascript">
  8. // 当前页面一打开就会执行的代码
  9. var socket;
  10. if(window.WebSocket) {
  11. // 创建一个WebSocket连接
  12. socket = new WebSocket("ws://localhost:8888/gc");
  13. // 当与服务端的ws连接创建成功后会触发onopen的执行
  14. socket.onopen = function (ev) {
  15. // 在右侧文本域中显示连接建立提示
  16. var ta = document.getElementById("responseText");
  17. ta.value = "连接已建立";
  18. }
  19. // 当接收到服务端发送的消息时会触发onmessage的执行
  20. socket.onmessage = function (ev) {
  21. // 将服务端发送来的消息在右侧文本域中显示,在原有内容基础上进行拼接
  22. var ta = document.getElementById("responseText");
  23. ta.value = ta.value + "\n" + ev.data;
  24. }
  25. // 当与服务端的ws连接断开时会触发onclose的执行
  26. socket.onclose = function (ev) {
  27. // 将连接关闭消息在右侧文本域中显示,在原有内容基础上进行拼接
  28. var ta = document.getElementById("responseText");
  29. ta.value = ta.value + "\n连接已关闭";
  30. }
  31. } else {
  32. alert("浏览器不支持WebSocket");
  33. }
  34. // 定义发送按钮的发送方法
  35. function send(msg) {
  36. // 若当前浏览器不支持WebSocket,则直接结束
  37. if(!window.WebSocket) return;
  38. // 若ws连接已打开,则向服务器发送消息
  39. if(socket.readyState == WebSocket.OPEN) {
  40. // 通过ws连接向服务器发送消息
  41. socket.send(msg);
  42. }
  43. }
  44. </script>
  45. <body>
  46. <form>
  47. <textarea id="message" style="width: 150px; height: 150px"></textarea>
  48. <input type="button" value="发送" onclick="send(this.form.message.value)">
  49. <textarea id="responseText" style="width: 150px; height: 150px"></textarea>
  50. </form>
  51. </body>
  52. </html>

服务端定义

  1. package com.gc.socket.server;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import io.netty.handler.codec.FixedLengthFrameDecoder;
  11. import io.netty.handler.codec.LineBasedFrameDecoder;
  12. import io.netty.handler.codec.http.HttpObjectAggregator;
  13. import io.netty.handler.codec.http.HttpServerCodec;
  14. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  15. import io.netty.handler.codec.string.StringDecoder;
  16. import io.netty.handler.codec.string.StringEncoder;
  17. import io.netty.handler.logging.LogLevel;
  18. import io.netty.handler.logging.LoggingHandler;
  19. import io.netty.handler.stream.ChunkedWriteHandler;
  20. import io.netty.util.CharsetUtil;
  21. /**
  22. * @description:
  23. * @author: GC
  24. * @create: 2020-11-10 16:11
  25. **/
  26. public class Server {
  27. public static void main(String[] args) {
  28. //定义用于处理客户端连接的EventLoopGroup
  29. EventLoopGroup parentEventLoop = new NioEventLoopGroup();
  30. //定义用于处理客户端请求的EventLoopGroup
  31. EventLoopGroup childEventLoop = new NioEventLoopGroup();
  32. try{
  33. //服务端专用Bootstrap.主要用于将属绑定起来。建立关系
  34. ServerBootstrap bootstrap = new ServerBootstrap();
  35. //绑定处理客户端的EventLoopGroup对象
  36. bootstrap.group(parentEventLoop, childEventLoop)
  37. //使用NIO的方式
  38. .channel(NioServerSocketChannel.class)
  39. .handler(new LoggingHandler(LogLevel.ERROR))
  40. //绑定消息收发时的编码/解码器。 和自定义的Handler绑定
  41. .childHandler(new ChannelInitializer<SocketChannel>() {
  42. @Override
  43. protected void initChannel(SocketChannel socketChannel) throws Exception {
  44. ChannelPipeline pipeline = socketChannel.pipeline();
  45. //HttpRequestDecoder和HttpResponseEncoder的组合使
  46. pipeline.addLast(new HttpServerCodec());
  47. //可以毫无困难地发送大型数据流。
  48. pipeline.addLast(new ChunkedWriteHandler());
  49. //聚合处理器,聚合请求或者响应
  50. pipeline.addLast(new HttpObjectAggregator(111));
  51. //处理程序为你运行一个websocket服务器做了所有繁重的工作。
  52. // 它负责websocket握手以及控制框架的处理(Close,Ping,Pong)。
  53. // 文本和二进制数据帧被传递到管道中的下一个处理程序(由您执行)进行处理。
  54. pipeline.addLast(new WebSocketServerProtocolHandler("/gc"));
  55. //自定义业务处理器
  56. pipeline.addLast(new ServerHandler());
  57. }
  58. });
  59. //声明服务端绑定的端口
  60. ChannelFuture future = bootstrap.bind(8888).sync();
  61. //当Channel调用了close()方法,并且成功关闭之后,才会调用此代码
  62. future.channel().closeFuture().sync();
  63. }catch (Exception e){
  64. e.printStackTrace();
  65. }finally {
  66. //优雅的关闭方式
  67. parentEventLoop.shutdownGracefully();
  68. childEventLoop.shutdownGracefully();
  69. }
  70. }
  71. }

业务处理器定义

  1. package com.gc.socket.server;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  5. import java.util.Date;
  6. import java.util.Random;
  7. import java.util.UUID;
  8. import java.util.concurrent.TimeUnit;
  9. public class ServerHandler extends ChannelInboundHandlerAdapter {
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  11. String text = ((TextWebSocketFrame)msg).text();
  12. ctx.channel().writeAndFlush(new TextWebSocketFrame("客户端:"+text));
  13. System.out.println("本地地址为:"+ctx.channel().localAddress());
  14. System.out.println("远程地址为:"+ctx.channel().remoteAddress()+" ---> 收到消息:"+msg);
  15. }
  16. //当发生Throwable异常时,会调用该方法
  17. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  18. ctx.fireExceptionCaught(cause);
  19. ctx.close();
  20. }
  21. }

Socket实现群聊

简介

  1. 本例要实现一个网络群聊工具。参与聊天的客户端消息是通过服务端进行广播的。

服务端定义

  1. package com.gc.socket.server;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import io.netty.handler.codec.FixedLengthFrameDecoder;
  11. import io.netty.handler.codec.LineBasedFrameDecoder;
  12. import io.netty.handler.codec.http.HttpObjectAggregator;
  13. import io.netty.handler.codec.http.HttpServerCodec;
  14. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  15. import io.netty.handler.codec.string.StringDecoder;
  16. import io.netty.handler.codec.string.StringEncoder;
  17. import io.netty.handler.logging.LogLevel;
  18. import io.netty.handler.logging.LoggingHandler;
  19. import io.netty.handler.stream.ChunkedWriteHandler;
  20. import io.netty.util.CharsetUtil;
  21. /**
  22. * @description:
  23. * @author: GC
  24. * @create: 2020-11-10 16:11
  25. **/
  26. public class Server {
  27. public static void main(String[] args) {
  28. //定义用于处理客户端连接的EventLoopGroup
  29. EventLoopGroup parentEventLoop = new NioEventLoopGroup();
  30. //定义用于处理客户端请求的EventLoopGroup
  31. EventLoopGroup childEventLoop = new NioEventLoopGroup();
  32. try{
  33. //服务端专用Bootstrap.主要用于将属绑定起来。建立关系
  34. ServerBootstrap bootstrap = new ServerBootstrap();
  35. //绑定处理客户端的EventLoopGroup对象
  36. bootstrap.group(parentEventLoop, childEventLoop)
  37. //使用NIO的方式
  38. .channel(NioServerSocketChannel.class)
  39. .handler(new LoggingHandler(LogLevel.ERROR))
  40. //绑定消息收发时的编码/解码器。 和自定义的Handler绑定
  41. .childHandler(new ChannelInitializer<SocketChannel>() {
  42. @Override
  43. protected void initChannel(SocketChannel socketChannel) throws Exception {
  44. ChannelPipeline pipeline = socketChannel.pipeline();
  45. //因为要传输Clinet端的消息,所以肯定是先收到消息,所以解码器放前面
  46. pipeline.addLast(new LineBasedFrameDecoder(2024));
  47. pipeline.addLast(new StringDecoder());
  48. pipeline.addLast(new StringEncoder());
  49. //自定义业务处理器
  50. pipeline.addLast(new ServerHandler());
  51. }
  52. });
  53. //声明服务端绑定的端口
  54. ChannelFuture future = bootstrap.bind(8888).sync();
  55. System.out.println("Server 启动成功");
  56. //当Channel调用了close()方法,并且成功关闭之后,才会调用此代码
  57. future.channel().closeFuture().sync();
  58. }catch (Exception e){
  59. e.printStackTrace();
  60. }finally {
  61. //优雅的关闭方式
  62. parentEventLoop.shutdownGracefully();
  63. childEventLoop.shutdownGracefully();
  64. }
  65. }
  66. }

服务端业务处理器定义

  1. package com.gc.socket.server;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.channel.group.ChannelGroup;
  6. import io.netty.channel.group.DefaultChannelGroup;
  7. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  8. import io.netty.util.concurrent.EventExecutor;
  9. import io.netty.util.concurrent.GlobalEventExecutor;
  10. import java.util.Date;
  11. import java.util.Random;
  12. import java.util.UUID;
  13. import java.util.concurrent.TimeUnit;
  14. public class ServerHandler extends ChannelInboundHandlerAdapter {
  15. //消息组,聊天的人都在这个组里面。类似社交软件的群聊
  16. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  17. /**
  18. * 读取到发送的消息
  19. * @param ctx
  20. * @param msg
  21. * @throws Exception
  22. */
  23. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  24. Channel channel = ctx.channel();
  25. channelGroup.forEach(ch -> {
  26. if(ch != channel)//不是自己,显示别人的地址
  27. ch.writeAndFlush(channel.remoteAddress()+":"+msg+"\n");
  28. else//是自己,不需要显示
  29. ch.writeAndFlush(msg+"\n");
  30. });
  31. }
  32. /**
  33. * 用户上线,会触发此方法
  34. * @param ctx
  35. * @throws Exception
  36. */
  37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  38. Channel channel = ctx.channel();
  39. channelGroup.writeAndFlush(channel.remoteAddress()+"上线"+"\n");
  40. channelGroup.add(channel);
  41. }
  42. /**
  43. * 用户下线,会触发此犯法
  44. * @param ctx
  45. * @throws Exception
  46. */
  47. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  48. Channel channel = ctx.channel();
  49. channelGroup.writeAndFlush(channel.remoteAddress()+"下线。当前在线人数:"+channelGroup.size()+"\n");
  50. //当触发此方法时,会自动从channelGroup中删除。以下代码是当channel在线时,将它踢出channelGroup
  51. //channelGroup.remove(channel);
  52. }
  53. //当发生Throwable异常时,会调用该方法
  54. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  55. ctx.fireExceptionCaught(cause);
  56. ctx.close();
  57. }
  58. }

客户端定义

  1. package com.gc.socket.clinet;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.FixedLengthFrameDecoder;
  8. import io.netty.handler.codec.LineBasedFrameDecoder;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. import io.netty.handler.codec.string.StringEncoder;
  11. import io.netty.util.CharsetUtil;
  12. import java.io.*;
  13. import java.net.SocketAddress;
  14. public class Clinet {
  15. public static void main(String[] args) throws Exception {
  16. //客户端处理服务端的EventLoopGroup
  17. EventLoopGroup clint = new NioEventLoopGroup();
  18. //Bootstrap专门用于客户端的属性设置以及关系绑定
  19. Bootstrap bootstrap = new Bootstrap();
  20. bootstrap.group(clint)
  21. //使用NIO客户端的Channel
  22. .channel(NioSocketChannel.class)
  23. //绑定编码器/解码器。以及自定义的Handler
  24. .handler(new ChannelInitializer<SocketChannel>(){
  25. @Override
  26. protected void initChannel(SocketChannel socketChannel) throws Exception {
  27. ChannelPipeline pipeline = socketChannel.pipeline();
  28. //当上线后,Client端会收到Server端的提示信息,所以这里解码器放前面
  29. pipeline.addLast(new LineBasedFrameDecoder(2048));
  30. pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
  31. pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
  32. pipeline.addLast(new ClinetHandler());
  33. }
  34. });
  35. //指定连接服务端的端口
  36. ChannelFuture future = bootstrap.connect("127.0.0.1",8888).sync();
  37. System.out.println("Clinet 启动成功");
  38. Channel channel = future.channel();
  39. InputStream in;
  40. InputStreamReader is = new InputStreamReader(System.in , "UTF-8");
  41. BufferedReader br = new BufferedReader(is);
  42. while (true) {
  43. channel.writeAndFlush(br.readLine()+"\n");
  44. }
  45. //这里不关闭资源,因为客户端要持续输入。
  46. }
  47. }

客户端业务处理器定义

  1. package com.gc.socket.clinet;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import java.util.UUID;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * ChannelInboundHandlerAdapter 类中的 channelRead方法不会自动释放资源。所以会不断的发送消息
  9. * SimpleChannelInboundHandler 类中的 channelRead0()方法不会自动释放资源。所以只要Clinet端触发了消息发送, 双方两段就会一直死循环发送消息。
  10. *
  11. */
  12. public class ClinetHandler extends SimpleChannelInboundHandler<String> {
  13. /**
  14. * 读取服务端发送的消息
  15. * @param ctx
  16. * @param msg
  17. * @throws Exception
  18. */
  19. public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  20. System.err.println("Clinet:"+msg);
  21. }
  22. //当发生异常,捕获并关闭
  23. @Override
  24. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  25. cause.printStackTrace();
  26. ctx.close();
  27. }
  28. }

读写空闲监测

服务端修改

Netty高级应用实战篇(上) - 图2

服务端业务处理器修改

Netty高级应用实战篇(上) - 图3

心跳机制

简介

  1. 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还“活着”, 以确保 TCP 连接的有效性。

客户端定义

  1. package com.gc.socket.clinet;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.FixedLengthFrameDecoder;
  8. import io.netty.handler.codec.LineBasedFrameDecoder;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. import io.netty.handler.codec.string.StringEncoder;
  11. import io.netty.util.CharsetUtil;
  12. import java.io.*;
  13. import java.net.SocketAddress;
  14. public class Clinet {
  15. public static void main(String[] args) throws Exception {
  16. //客户端处理服务端的EventLoopGroup
  17. EventLoopGroup clint = new NioEventLoopGroup();
  18. //Bootstrap专门用于客户端的属性设置以及关系绑定
  19. Bootstrap bootstrap = new Bootstrap();
  20. bootstrap.group(clint)
  21. //使用NIO客户端的Channel
  22. .channel(NioSocketChannel.class)
  23. //绑定编码器/解码器。以及自定义的Handler
  24. .handler(new ChannelInitializer<SocketChannel>(){
  25. @Override
  26. protected void initChannel(SocketChannel socketChannel) throws Exception {
  27. ChannelPipeline pipeline = socketChannel.pipeline();
  28. //当上线后,Client端会收到Server端的提示信息,所以这里解码器放前面
  29. pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
  30. pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
  31. pipeline.addLast(new ClinetHandler(bootstrap));
  32. }
  33. });
  34. //指定连接服务端的端口
  35. ChannelFuture future = bootstrap.connect("127.0.0.1",8888).sync();
  36. System.out.println("Clinet 启动成功");
  37. }
  38. }

客户端业务处理器

  1. package com.gc.socket.clinet;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.util.concurrent.GenericFutureListener;
  8. import io.netty.util.concurrent.ScheduledFuture;
  9. import java.util.Random;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * ChannelInboundHandlerAdapter 类中的 channelRead方法不会自动释放资源。所以会不断的发送消息
  13. * SimpleChannelInboundHandler 类中的 channelRead0()方法不会自动释放资源。所以只要Clinet端触发了消息发送, 双方两段就会一直死循环发送消息。
  14. *
  15. */
  16. public class ClinetHandler extends ChannelInboundHandlerAdapter {
  17. //用于添加任务定时触发
  18. private ScheduledFuture<?> scheduled;
  19. //用于监听任务过期
  20. private GenericFutureListener listener;
  21. //用于客户端断链重连
  22. private Bootstrap bootstrap;
  23. public ClinetHandler(Bootstrap bootstrap){
  24. this.bootstrap = bootstrap;
  25. }
  26. /**
  27. * 用户上线后,触发此方法
  28. * @param ctx
  29. * @throws Exception
  30. */
  31. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  32. setRandom(ctx.channel());
  33. }
  34. /**
  35. * 当用户下线后,触发此方法
  36. * @param ctx
  37. * @throws Exception
  38. */
  39. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  40. //下线后,删除监听任务。 否则数据量大了,可能会导致OOM
  41. scheduled.removeListener(listener);
  42. System.out.println("已断链, 准备重连。");
  43. bootstrap.connect("localhost", 8888).sync();
  44. }
  45. /**
  46. * 心跳监测时间
  47. * @throws Exception
  48. */
  49. public void setRandom(Channel ctx) throws Exception {
  50. //随机下次像Server端发送心跳的时间
  51. int random = new Random().nextInt(5)+1;
  52. System.out.println(random+"S后发送心跳");
  53. scheduled = ctx.eventLoop().schedule(()->{
  54. //当前channel处于活动状态, 说明没死,发送心跳监测
  55. if(ctx.isActive()){
  56. System.out.println("向服务端发送心跳");
  57. ctx.writeAndFlush("ping");
  58. }else{
  59. //已经死了
  60. System.out.println("心跳超时,连接断开");
  61. }
  62. },random, TimeUnit.SECONDS);
  63. //给当前方法定义监听器
  64. listener = future1 -> {
  65. setRandom(ctx);
  66. };
  67. //添加一个定时监听
  68. scheduled.addListener(listener);
  69. }
  70. //当发生异常,捕获并关闭
  71. @Override
  72. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  73. cause.printStackTrace();
  74. ctx.close();
  75. }
  76. }

服务端定义

  1. package com.gc.socket.server;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import io.netty.handler.logging.LogLevel;
  10. import io.netty.handler.logging.LoggingHandler;
  11. import io.netty.handler.timeout.IdleStateHandler;
  12. /**
  13. * @description:
  14. * @author: GC
  15. * @create: 2020-11-10 16:11
  16. **/
  17. public class Server {
  18. public static void main(String[] args) {
  19. //定义用于处理客户端连接的EventLoopGroup
  20. EventLoopGroup parentEventLoop = new NioEventLoopGroup();
  21. //定义用于处理客户端请求的EventLoopGroup
  22. EventLoopGroup childEventLoop = new NioEventLoopGroup();
  23. try{
  24. //服务端专用Bootstrap.主要用于将属绑定起来。建立关系
  25. ServerBootstrap bootstrap = new ServerBootstrap();
  26. //绑定处理客户端的EventLoopGroup对象
  27. bootstrap.group(parentEventLoop, childEventLoop)
  28. //使用NIO的方式
  29. .channel(NioServerSocketChannel.class)
  30. .handler(new LoggingHandler(LogLevel.ERROR))
  31. //绑定消息收发时的编码/解码器。 和自定义的Handler绑定
  32. .childHandler(new ChannelInitializer<SocketChannel>() {
  33. @Override
  34. protected void initChannel(SocketChannel socketChannel) throws Exception {
  35. ChannelPipeline pipeline = socketChannel.pipeline();
  36. pipeline.addLast(new StringDecoder());
  37. pipeline.addLast(new StringEncoder());
  38. /**
  39. * @prarm1:在指定的时间内,服务端没发生读操作(时间单位:S)
  40. * @prarm2:在指定的时间内,服务端没发生写操作(时间单位:S)
  41. * @prarm3:在指定的时间内,服务端即要发生读操作,也要发生写操作。如果只出现了两个操作中的一个,时间到达时,还是会被连接中断(时间单位:S)
  42. */
  43. pipeline.addLast(new IdleStateHandler(3,0,0));
  44. //自定义业务处理器
  45. pipeline.addLast(new ServerHandler());
  46. }
  47. });
  48. //声明服务端绑定的端口
  49. ChannelFuture future = bootstrap.bind(8888).sync();
  50. System.out.println("Server 启动成功");
  51. //当Channel调用了close()方法,并且成功关闭之后,才会调用此代码
  52. future.channel().closeFuture().sync();
  53. }catch (Exception e){
  54. e.printStackTrace();
  55. }finally {
  56. //优雅的关闭方式
  57. parentEventLoop.shutdownGracefully();
  58. childEventLoop.shutdownGracefully();
  59. }
  60. }
  61. }

服务端业务处理器定义

  1. package com.gc.socket.server;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. public class ServerHandler extends ChannelInboundHandlerAdapter {
  8. /**
  9. * 读取客户端发送的消息
  10. * @param ctx
  11. * @param msg
  12. * @throws Exception
  13. */
  14. @Override
  15. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  16. System.out.println("接收到Client发送的消息:" + msg);
  17. }
  18. /**
  19. * 异常捕捉
  20. * @param ctx
  21. * @param cause
  22. * @throws Exception
  23. */
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  26. ctx.close();
  27. }
  28. /**
  29. * 用于捕获当前Server中的各种事件
  30. * @param ctx
  31. * @param evt
  32. * @throws Exception
  33. */
  34. @Override
  35. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  36. if (evt instanceof IdleStateEvent) {
  37. IdleState state = ((IdleStateEvent) evt).state();
  38. if (state == IdleState.READER_IDLE) {
  39. System.out.println("将要断开连接");
  40. ctx.close();
  41. } else {
  42. super.userEventTriggered(ctx, evt);
  43. }
  44. }
  45. }
  46. }

手写Tomcat

简介

  1. 确切地说,这里要手写的是一个 Web 容器,一个类似于 Tomcat 的容器,用于处理 HTTP请求。该Web 容器没有实现 JavaEE Servlet 规范,不是一个 Servlet 容器。但其是模拟着Tomcat 来写的,这里定义了自己的请求、响应及 Servlet,分别命名为了 NettyRequestNettyResponse Servnet
  2. 我们这里要定义一个 Tomcat,这个 Web 容器提供给用户后,用户只需要按照使用步骤就可以将其自定义的 Servnet 发布到该 Tomcat 中。我们现在给出用户对于该 Tomcat 的使用步骤:
  • 用户只需将自定义的 Servnet 放入到指定的包中。例如,com.abc.webapp 包中。
  • 用户在访问时,需要将自定义的 Servnet 的简单类名全小写后的字符串作为该 Servnet的 Name 进行访问。
  • 若没有指定的 Servnet,则访问默认的 Servnet。

思路定义

  • Servnet规范包

    1. NettyRequest
      包含了传统HttpServletRequest中的方法。如:获取请求路径,获取URL中的参数,获取方法名称,获取参数等等。

    2. NettyResponse
      包含了传统HttpServletResponse中的方法。如:读/写操作。

    3. Servnet
      和Servlet一样,有doPost(),doGet()

  • Tomcat内核包

    1. main()
      用于加载指定目录下的接口。

    2. DefaultNettyRequest,DefaultNettyResponse
      我们自定义NettyResponse,NettyResponse的具体实现。

    3. DefaultServnet
      我们自定义的Servnet 具体实现。

    4. TomcatHandler
      请求解析业务处理器,主要用于解析接收请求。

    5. TomcatServer
      当启动容器时,初始化资源到本地缓存。

  • webapp
    要访问的接口,都在该目录下。

项目结构

Netty高级应用实战篇(上) - 图4

编码实现

定义NettyRequest规范

  1. package com.gc.servnet;
  2. import com.sun.deploy.net.HttpRequest;
  3. import java.util.List;
  4. import java.util.Map;
  5. /**
  6. * @description:
  7. * @author: GC
  8. * @create: 2020-11-13 15:56
  9. **/
  10. public interface NettyRequest {
  11. //获取URL中的参数
  12. String getUri();
  13. //获取方法名称
  14. String getMethod();
  15. //获取请求路径
  16. String getPath();
  17. //获取本次请求中所有的参数
  18. Map<String, List<String>> getParameters();
  19. //获取指定名称的参数值
  20. List<String> getParameters(String name);
  21. //指定参数名称获取参数值(第一个值)
  22. String getParameter(String name);
  23. }

定义NettyResponse规范

  1. package com.gc.servnet;
  2. /**
  3. * @description:
  4. * @author: GC
  5. * @create: 2020-11-13 15:56
  6. **/
  7. public interface NettyResponse {
  8. void write(String content) throws Exception;
  9. }

定义Servnet规范

  1. package com.gc.servnet;
  2. public abstract class Servnet {
  3. //定义Servlet中的doGet
  4. public abstract void doGet(NettyRequest request, NettyResponse response) throws Exception;
  5. //定义Servlet中的doPost
  6. public abstract void doPost(NettyRequest request, NettyResponse response) throws Exception;
  7. }

DefaultNettyRequest具体实现

  1. package com.gc.tomcat;
  2. import com.gc.servnet.NettyRequest;
  3. import io.netty.handler.codec.http.HttpRequest;
  4. import io.netty.handler.codec.http.QueryStringDecoder;
  5. import java.util.List;
  6. import java.util.Map;
  7. /**
  8. * @description:
  9. * @author: GC
  10. * @create: 2020-11-13 16:25
  11. **/
  12. public class DefaultNettyRequest implements NettyRequest {
  13. HttpRequest request;
  14. public DefaultNettyRequest(HttpRequest request){
  15. this.request = request;
  16. }
  17. @Override
  18. public String getUri() {
  19. return request.uri();
  20. }
  21. @Override
  22. public String getMethod() {
  23. return request.method().name();
  24. }
  25. @Override
  26. public String getPath() {
  27. QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
  28. return decoder.path();
  29. }
  30. @Override
  31. public Map<String, List<String>> getParameters() {
  32. QueryStringDecoder decoder = new QueryStringDecoder(getUri());
  33. return decoder.parameters();
  34. }
  35. private int count = 0;
  36. @Override
  37. public List<String> getParameters(String name) {
  38. Map<String, List<String>> map = getParameters();
  39. return map.get(name);
  40. }
  41. @Override
  42. public String getParameter(String name) {
  43. List<String> list = getParameters(name);
  44. if(null == list){
  45. System.out.println("要获取的参数不存在");
  46. return null;
  47. }
  48. return list.get(0);
  49. }
  50. }

DefaultNettyResponse具体实现

  1. package com.gc.tomcat;
  2. import com.gc.servnet.NettyResponse;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.http.*;
  6. import io.netty.util.internal.StringUtil;
  7. /**
  8. * @description:
  9. * @author: GC
  10. * @create: 2020-11-13 16:34
  11. **/
  12. public class DefaultNettyResponse implements NettyResponse {
  13. private HttpRequest request;
  14. private ChannelHandlerContext context;
  15. public DefaultNettyResponse(HttpRequest request, ChannelHandlerContext context) {
  16. this.request = request;
  17. this.context = context;
  18. }
  19. @Override
  20. public void write(String content) throws Exception {
  21. // 处理content为空的情况
  22. if (StringUtil.isNullOrEmpty(content)) {
  23. return;
  24. }
  25. // 创建响应对象
  26. FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
  27. HttpResponseStatus.OK,
  28. // 根据响应体内容大小为response对象分配存储空间
  29. Unpooled.wrappedBuffer(content.getBytes("UTF-8")));
  30. // 获取响应头
  31. HttpHeaders headers = response.headers();
  32. // 设置响应体类型
  33. headers.set(HttpHeaderNames.CONTENT_TYPE, "text/json");
  34. // 设置响应体长度
  35. headers.set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
  36. // 设置缓存过期时间
  37. headers.set(HttpHeaderNames.EXPIRES, 0);
  38. // 若HTTP请求是长连接,则响应也使用长连接
  39. if (HttpUtil.isKeepAlive(request)) {
  40. headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
  41. }
  42. context.writeAndFlush(response);
  43. }
  44. }

DefaultServnet具体实现

  1. package com.gc.tomcat;
  2. import com.gc.servnet.NettyRequest;
  3. import com.gc.servnet.NettyResponse;
  4. import com.gc.servnet.Servnet;
  5. /**
  6. * @description:
  7. * @author: GC
  8. * @create: 2020-11-13 16:40
  9. **/
  10. public class DefaultServnet extends Servnet {
  11. //定义Servlet中的doGet
  12. @Override
  13. public void doGet(NettyRequest request, NettyResponse response) throws Exception {
  14. String sernetName = request.getUri().split("/")[1];
  15. response.write("404 - no this servnet : " + sernetName);
  16. }
  17. //定义Servlet中的doPost
  18. @Override
  19. public void doPost(NettyRequest request, NettyResponse response) throws Exception {
  20. doGet(request, response);
  21. }
  22. }

编写自定义TomcatHandler处理器

  1. package com.gc.tomcat;
  2. import com.gc.servnet.NettyRequest;
  3. import com.gc.servnet.NettyResponse;
  4. import com.gc.servnet.Servnet;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.handler.codec.http.HttpRequest;
  8. import java.util.Map;
  9. /**
  10. * @description:
  11. * @author: GC
  12. * @create: 2020-11-13 16:47
  13. **/
  14. public class TomcatHandler extends ChannelInboundHandlerAdapter {
  15. //启动时候加载到这个Map
  16. private Map<String, Servnet> nameToServnetMap;
  17. //如果代码被放射的方式或者其它的方式修改过,则重写加载
  18. private Map<String, String> nameToClassNameMap;
  19. //定义一个构造器,当Netty启动时候加载到Netty的Handler中被Netty管理
  20. public TomcatHandler(Map<String, Servnet> nameToServnetMap, Map<String, String> nameToClassNameMap){
  21. this.nameToServnetMap = nameToServnetMap;
  22. this.nameToClassNameMap = nameToClassNameMap;
  23. }
  24. @Override
  25. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  26. //如果是Http请求,则进入解析
  27. if(msg instanceof HttpRequest){
  28. HttpRequest request = (HttpRequest) msg;
  29. //拿到Servnet名称
  30. String serverName = request.uri().split("/")[1];
  31. serverName = serverName.substring(0,serverName.indexOf("?"));
  32. //获得客户端要访问的Servnet
  33. Servnet servnet = new DefaultServnet();
  34. //一级缓存是否有这个key
  35. if(nameToServnetMap.containsKey(serverName)){
  36. //直接拿出去
  37. servnet = nameToServnetMap.get(serverName);
  38. }else if(nameToClassNameMap.containsKey(serverName)){//一级缓存没有,二级缓存是否有这个key
  39. //双重检测,防止并发请求时,重复创建
  40. if(!nameToServnetMap.containsKey(serverName)) {
  41. synchronized (this) {
  42. if(!nameToServnetMap.containsKey(serverName)) {
  43. //有,获取到类的全路径限定路径
  44. String classPath = nameToClassNameMap.get(serverName);
  45. servnet = (Servnet) Class.forName(classPath).newInstance();
  46. nameToServnetMap.put(serverName, servnet);
  47. }
  48. }
  49. }
  50. }
  51. //开始根据路径处理请求
  52. //用于获取当前请求中的参数,路径等信息
  53. NettyRequest defultRequest = new DefaultNettyRequest(request);
  54. //用于获取当前请求中的上下文数据
  55. NettyResponse defultResponse = new DefaultNettyResponse(request, ctx);
  56. // 根据不同的请求类型,调用servnet实例的不同方法
  57. String methodName = request.method().name();
  58. if (methodName.equalsIgnoreCase("GET")) {
  59. servnet.doGet(defultRequest, defultResponse);
  60. } else if(methodName.equalsIgnoreCase("POST")) {
  61. servnet.doPost(defultRequest, defultResponse);
  62. }
  63. ctx.close();
  64. }
  65. }
  66. @Override
  67. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  68. cause.printStackTrace();
  69. ctx.close();
  70. }
  71. }

编写容器启动时初始化TomcatServer

  1. TomcatServerpackage com.gc.tomcat;
  2. import com.gc.servnet.Servnet;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.http.HttpServerCodec;
  8. import io.netty.handler.codec.string.StringDecoder;
  9. import io.netty.handler.codec.string.StringEncoder;
  10. import java.io.File;
  11. import java.io.UnsupportedEncodingException;
  12. import java.net.URL;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.concurrent.ConcurrentHashMap;
  16. /**
  17. * @description:
  18. * @author: GC
  19. * @create: 2020-11-13 17:24
  20. **/
  21. public class TomcatServer {
  22. // key为servnet的简单类名,value为对应servnet实例
  23. private Map<String, Servnet> nameToServnetMap = new ConcurrentHashMap<>();
  24. // key为servnet的简单类名,value为对应servnet类的全限定性类名
  25. private Map<String, String> nameToClassNameMap = new HashMap<>();
  26. //加载该路径下的类为Servnet
  27. private String basePackage;
  28. public TomcatServer(String basePackage) {
  29. this.basePackage = basePackage;
  30. }
  31. // 启动tomcat
  32. public void start() throws Exception {
  33. // 加载指定包中的所有Servnet的类名
  34. cacheClassName(basePackage);
  35. //资源准备完毕,启动Netty
  36. runServer();
  37. }
  38. private void cacheClassName(String basePackage) throws UnsupportedEncodingException {
  39. //获取指定路径下的类
  40. URL resource = this.getClass().getClassLoader().getResource(basePackage.replaceAll("\\.", "/"));
  41. if(null == resource){
  42. System.out.println("没有找到访问资源");
  43. return;
  44. }
  45. //将取到的资源转成一个File
  46. File dir = new File(resource.getPath());
  47. //找出该路径下所有的类
  48. for (File file : dir.listFiles()) {
  49. if (file.isDirectory()) {
  50. // 若当前遍历的file为目录,则递归调用当前方法
  51. cacheClassName(basePackage + "." + file.getName());
  52. } else if (file.getName().endsWith(".class")) {
  53. //读到了将文件后缀去掉
  54. String simpleClassName = file.getName().replace(".class", "").trim();
  55. nameToClassNameMap.put(simpleClassName.toLowerCase(), basePackage+"."+simpleClassName);
  56. }
  57. }
  58. System.out.println(nameToClassNameMap);
  59. }
  60. private void runServer() {
  61. EventLoopGroup parentLoopGroup = new NioEventLoopGroup();
  62. EventLoopGroup childLoopGroup = new NioEventLoopGroup();
  63. try {
  64. ServerBootstrap serverBootstrap = new ServerBootstrap();
  65. serverBootstrap.group(parentLoopGroup, childLoopGroup)
  66. //当出现并发时,设置这个队列的上限长度
  67. .option(ChannelOption.SO_BACKLOG, 1024)
  68. // 指定是否启用心跳机制来检测长连接的存活性,即客户端的存活性
  69. .childOption(ChannelOption.SO_KEEPALIVE, true)
  70. .channel(NioServerSocketChannel.class)
  71. .childHandler(new ChannelInitializer() {
  72. @Override
  73. protected void initChannel(Channel channel) throws Exception {
  74. ChannelPipeline channelPipeline = channel.pipeline();
  75. channelPipeline.addLast(new HttpServerCodec());
  76. channelPipeline.addLast(new TomcatHandler(nameToServnetMap, nameToClassNameMap));
  77. }
  78. });
  79. ChannelFuture future = serverBootstrap.bind(8888).sync();
  80. System.out.println("启动成功");
  81. future.channel().closeFuture().sync();
  82. }catch (Exception e){
  83. e.printStackTrace();
  84. }finally {
  85. parentLoopGroup.shutdownGracefully();
  86. childLoopGroup.shutdownGracefully();
  87. }
  88. }
  89. }

编写需要访问接口(Servnet)

  1. package com.gc.webapp;
  2. import com.gc.servnet.NettyRequest;
  3. import com.gc.servnet.NettyResponse;
  4. import com.gc.servnet.Servnet;
  5. /**
  6. * @description:
  7. * @author: GC
  8. * @create: 2020-11-13 17:44
  9. **/
  10. public class GC1 extends Servnet {
  11. @Override
  12. public void doGet(NettyRequest request, NettyResponse response) throws Exception {
  13. String uri = request.getUri();
  14. String path = request.getPath();
  15. String method = request.getMethod();
  16. String name = request.getParameter("name");
  17. String content = "uri = " + uri + "\n" +
  18. "path = " + path + "\n" +
  19. "method = " + method + "\n" +
  20. "param = " + name;
  21. response.write(content);
  22. }
  23. @Override
  24. public void doPost(NettyRequest request, NettyResponse response) throws Exception {
  25. doGet(request, response);
  26. }
  27. }

启动类

  1. package com.gc.tomcat;
  2. /**
  3. * @description:
  4. * @author: GC
  5. * @create: 2020-11-13 16:24
  6. **/
  7. public class RunTomcat {
  8. public static void main(String[] args) throws Exception {
  9. TomcatServer tomcatServer = new TomcatServer("com.gc.webapp");
  10. tomcatServer.start();
  11. }
  12. }

手写Tomcat总结

  1. 在写之前,我们把思路整理了一下,然后把思路落地。

Servnet规范包

  1. Servnet包下,我们把传统的Servlet的基本规则和常用的方法功能以接口的形式定义出来了。包括Request中的获取参数,方法名称,访问路径等基本的操作。
  2. 以及Response向客户端的写操作定义。
  3. 还有Servnet中的doGetdoPos,我们这里只定义具体的声明,把具体的业务逻辑实现交给用户自己灵活编写。

Tomcat核心包

  1. 这里面主要是重写各个接口和抽象类的方法,把它们的定义的方法变成我们自己的具体业务实现。
  1. 我们使用DefaultNettyRequest和DefaultNettyResponse实现了NettyRequest和NettyResponse,在这里我们把定义实现了具体的操作。
  2. 我们使用DefaultServnet继承了Servent,分别处理get货post请求。让用户可以在自己的Servent中灵活的做业务逻辑。
  3. 我们编写了TomcatHandler来用于接受客户端的请求解析并处理。在TomcatHandler类中我们具体做的事情:

    • 接受用户的http请求。
    • 从一级缓存中拿到用户要访问的Servnet实例,如果没有则去二级缓存拿到类的全路径限定名,使用反射机制来动态创建出一个Servnet实例提供给本次请求需要用到的实例,创建好后把实例放到一级缓存。值得一提的是,我们怕当客户端出现并发访问出现线程安全问题可能会导致Map中的Servnet实例被重复创建,所以使用了双重检查锁来预防这个问题。
    • 当Servnet实例创建好后,我们将组装Request和Response,其中Respone中我们传递了本次客户端请求的上下文,从该上下文中拿到相关信息组装成一个请求头。
    • 然后获取到请求方式,分别执行响应的方法。
  4. TomcatServer的主要作用有两点:

    • 当容器启动时,模仿真正的Tomcat加载指定目录下的类(我们也可以称为具体的接口地址),到Tomcat的二级缓存中。其中Map的Key为具体的Servnet实例名称,Value为Servent源文件的具体路径。
    • 第二件事是我们在这个类中定义了Netty的启动代码。主要定义了:

      1. 当出现并发时,队列可缓存的最大成功。
      2. 启用心跳机制来检测客户端的存活性,及时的释放链接,减轻服务器的负载压力。
      3. 定义了Http的编码/解码为一体的编解码器。以及实现我们自己的Handler。
  5. 最后,我们定义了启动方法main() 。在这里面我们指定了需要扫描加载成Servlet实例的具体包名路径。