image.png

常用解码器

  • FixedLengthFrameDecoder:基于固定长度的解码器
  • LineBasedFrameDecoder:基于 \n,\r 的解码器
  • DelimiterBasedFrameDecoder:基于分隔符的解码器
  • LengthFieldBasedFrameDecoder:基于长度的解码器

ByteBuf

ByteBuf 的分类

  • Pooled 和 Unpooled
    • Pooled:从已经分配好的内存中取内存
    • Unpooled:新创建一块内存
  • Unsafe 和 非Unsafe
    • Unsafe:依赖 JDK 底层的 Unsafe 对象
    • 非Unsafe:不依赖 JDK 底层的 Unsafe 对象
  • Heap 和 Direct
    • Heap 底层就是 byte 数组
    • Direct 依赖于 Nio 的 ByteBuffer 创建出的 DirectByteBuffer(堆外内存)
  1. * <pre>
  2. * +-------------------+------------------+------------------+
  3. * | discardable bytes | readable bytes | writable bytes |
  4. * | | (CONTENT) | |
  5. * +-------------------+------------------+------------------+
  6. * | | | |
  7. * 0 <= readerIndex <= writerIndex <= capacity
  8. * </pre>
  1. public class TestByteBuf {
  2. public static void main(String[] args) {
  3. /*
  4. 内部使用的用:堆内内存
  5. 要写出去的用:堆外内存
  6. */
  7. // 1、创建一个非池化的 ByteBuf,大小为 10 个字节
  8. ByteBuf buf = Unpooled.buffer(10);
  9. System.out.println("原始 ByteBuf 为 =======================> " + buf.toString());
  10. System.out.println("1、ByteBuf 中的内容为 ==================> " + Arrays.toString(buf.array()) + "\n");
  11. // 2、写入一段内容(影响 widx)
  12. byte[] bytes = {1, 2, 3, 4, 5};
  13. buf.writeBytes(bytes);
  14. System.out.println("写入的 bytes 为 =======================> " + Arrays.toString(bytes));
  15. System.out.println("写入一段内容后的 ByteBuf 为 ============> " + buf.toString());
  16. System.out.println("2、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  17. // 3、读取一段内容(影响 ridx)
  18. byte b1 = buf.readByte();
  19. byte b2 = buf.readByte();
  20. System.out.println("读取的 bytes 为 =======================> " + Arrays.toString(new byte[]{b1, b2}));
  21. System.out.println("读取一段内容后的 ByteBuf 为 ============> " + buf.toString());
  22. System.out.println("3、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  23. // 4、将读取的内容丢弃(影响 ridx, widx)
  24. buf.discardReadBytes();
  25. System.out.println("将读取的内容丢弃后的 ByteBuf 为 ==========> " + buf.toString());
  26. System.out.println("4、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  27. // 5、清空读写指针(影响 ridx = 0, widx = 0)
  28. buf.clear();
  29. System.out.println("清空读写指针后的 ByteBuf 为 ============> " + buf.toString());
  30. System.out.println("5、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  31. // 6、再次写入一段内容,比第一次写入的少(影响 widx)
  32. bytes = new byte[]{1, 2, 3};
  33. buf.writeBytes(bytes);
  34. System.out.println("再次写入的 bytes 为 ===================> " + Arrays.toString(bytes));
  35. System.out.println("再次写入一段内容后的 ByteBuf 为 ========> " + buf.toString());
  36. System.out.println("6、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  37. // 7、将 buf 清零(不影响指针)
  38. buf.setZero(0, buf.capacity());
  39. System.out.println("将 buf 清零后的 ByteBuf 为 ============> " + buf.toString());
  40. System.out.println("7、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  41. // 8、再次写入一段内容超过容量的内容(从 widx 开始写,并开始扩容)
  42. bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
  43. buf.writeBytes(bytes);
  44. System.out.println("再次写入的 bytes 为 ===================> " + Arrays.toString(bytes));
  45. System.out.println("再次写入一段内容后的 ByteBuf 为 ========> " + buf.toString());
  46. System.out.println("8、ByteBuf 中的内容为 =================> " + Arrays.toString(buf.array()) + "\n");
  47. }
  48. }
  49. 原始 ByteBuf =======================> UnpooledByteBufAllocator$(ridx: 0, widx: 0, cap: 10)
  50. 1ByteBuf 中的内容为 ==================> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  51. 写入的 bytes =======================> [1, 2, 3, 4, 5]
  52. 写入一段内容后的 ByteBuf ============> UnpooledByteBufAllocator$(ridx: 0, widx: 5, cap: 10)
  53. 2ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
  54. 读取的 bytes =======================> [1, 2]
  55. 读取一段内容后的 ByteBuf ============> UnpooledByteBufAllocator$(ridx: 2, widx: 5, cap: 10)
  56. 3ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
  57. 将读取的内容丢弃后的 ByteBuf ==========> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)
  58. 4ByteBuf 中的内容为 =================> [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
  59. 清空读写指针后的 ByteBuf ============> UnpooledByteBufAllocator$(ridx: 0, widx: 0, cap: 10)
  60. 5ByteBuf 中的内容为 =================> [3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
  61. 再次写入的 bytes ===================> [1, 2, 3]
  62. 再次写入一段内容后的 ByteBuf ========> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)
  63. 6ByteBuf 中的内容为 =================> [1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
  64. buf 清零后的 ByteBuf ============> UnpooledByteBufAllocator$(ridx: 0, widx: 3, cap: 10)
  65. 7ByteBuf 中的内容为 =================> [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  66. 再次写入的 bytes ===================> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
  67. 再次写入一段内容后的 ByteBuf ========> UnpooledByteBufAllocator$(ridx: 0, widx: 15, cap: 64)
  68. 8ByteBuf 中的内容为 =================> [0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ..., 0 ]

Web 客户端 Demo

模拟 Web 请求

TestServer

  1. public class TestServer {
  2. public static void main(String[] args) {
  3. // 接收客户端连接的线程组:主线程,通常给 1
  4. EventLoopGroup masterGroup = new NioEventLoopGroup(1);
  5. // 处理读写时间的线程组:工作线程,默认是 cpu 的核心数 * 2
  6. EventLoopGroup workGroup = new NioEventLoopGroup();
  7. try {
  8. // 创建服务器启动助手来配置参数
  9. ServerBootstrap serverBootstrap = new ServerBootstrap();
  10. serverBootstrap.group(masterGroup, workGroup) // 设置线程组
  11. .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道
  12. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列中等待连接的个数
  13. .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持活动连接状态
  14. .childHandler(new TestServerInitializer());
  15. ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
  16. System.out.println("Server is starting on 9090...");
  17. channelFuture.channel().closeFuture().sync(); // 等待关闭,加上 sync 起到阻塞作用
  18. } catch (Exception ex) {
  19. ex.printStackTrace();
  20. } finally {
  21. masterGroup.shutdownGracefully();
  22. workGroup.shutdownGracefully();
  23. }
  24. }
  25. }

TestServerInitializer

  1. public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
  2. /*
  3. HttpRequestEncoder:将 HttpRequest 或 HttpContent 编码成 ByteBuf
  4. HttpRequestDecoder:将 ByteBuf 解码成 HttpRequest 或 HttpContent
  5. HttpResponseEncoder:将 HttpResponse 或 HttpContent 编码成 ByteBuf
  6. HttpResponseDecoder:将 ByteBuf 解码成 HttpResponse 或 HttpContent
  7. */
  8. @Override
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ChannelPipeline pipeline = ch.pipeline();
  11. pipeline.addLast("httpServerCodec", new HttpServerCodec()); // ChannelDuplexHandler = Inbound + Outbound
  12. pipeline.addLast("testServerHandler", new TestServerHandler()); // ChannelInboundHandler
  13. }
  14. }

TestServerHandler

  1. public class TestServerHandler extends SimpleChannelInboundHandler<HttpObject> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject) throws Exception {
  4. if (httpObject instanceof HttpRequest) {
  5. HttpRequest httpRequest = (HttpRequest) httpObject;
  6. String uri = httpRequest.uri();
  7. System.out.println("uri = " + uri);
  8. ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
  9. FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
  10. fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
  11. fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
  12. ctx.writeAndFlush(fullHttpResponse);
  13. }
  14. }
  15. }

Socket Demo

模拟 Socket 请求

TestServer

  1. public class TestServer {
  2. public static void main(String[] args) {
  3. EventLoopGroup masterGroup = new NioEventLoopGroup(1);
  4. EventLoopGroup workGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap serverBootstrap = new ServerBootstrap();
  7. serverBootstrap.group(masterGroup, workGroup) // 设置线程组
  8. .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器通道
  9. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列中等待连接的个数
  10. .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持活动连接状态
  11. .childHandler(new TestServerInitializer());
  12. ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
  13. System.out.println("Server is starting on 9090...");
  14. channelFuture.channel().closeFuture().sync(); // 等待关闭,加上 sync 起到阻塞作用
  15. } catch (Exception ex) {
  16. ex.printStackTrace();
  17. } finally {
  18. masterGroup.shutdownGracefully();
  19. workGroup.shutdownGracefully();
  20. }
  21. }
  22. }

TestServerInitializer

  1. public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
  2. /*
  3. 常用解码器:
  4. 1、FixedLengthFrameDecoder:基于固定长度的解码器
  5. 2、LineBasedFrameDecoder:基于 \n,\r 的解码器
  6. 3、DelimiterBasedFrameDecoder:基于分隔符的解码器
  7. 4、LengthFieldBasedFrameDecoder:基于长度的解码器
  8. */
  9. @Override
  10. protected void initChannel(SocketChannel ch) throws Exception {
  11. ChannelPipeline pipeline = ch.pipeline();
  12. // ChannelInboundHandler
  13. // lengthFieldOffset: 长度字段偏差,lengthFieldLength: 长度字段占的字节数
  14. // lengthAdjustment: 添加到长度字段的补偿值,initialBytesToStrip: 从解码帧中第一次去除的字节数
  15. // 24来自客户端的问候24来自客户端的问候24来自客户端的问候 => 来自客户端的问候 来自客户端的问候 来自客户端的问候
  16. pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
  17. // ChannelOutboundHandler: 计算当前待发送消息的二进制字节长度,将该长度添加到 ByteBuf 的缓冲区头中,来标识发送字符串的长度
  18. pipeline.addLast(new LengthFieldPrepender(4));
  19. // ChannelInboundHandler: 将 byte 解码成 String 数据
  20. pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
  21. // ChannelOutboundHandler: 将 String 编码成 byte 数据
  22. pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
  23. // ChannelInboundHandler
  24. pipeline.addLast(new TestServerHandler());
  25. }
  26. }

TestServerHandler

  1. public class TestServerHandler extends SimpleChannelInboundHandler<String> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
  4. System.out.println(channelHandlerContext.channel().remoteAddress() + ", " + s);
  5. channelHandlerContext.writeAndFlush("form server: " + UUID.randomUUID());
  6. }
  7. @Override
  8. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  9. cause.printStackTrace();
  10. ctx.close();
  11. }
  12. }

TestClient

public class TestClient {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(bossGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new TestClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
        }
    }
}

TestClientInitializer

public class TestClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        /*
         * 1) lengthFieldOffset:长度字段的偏差
         * 2) lengthFieldLength:长度字段占的字节数
         * 3) lengthAdjustment:添加到长度字段的补偿值
         * 4) initialBytesToStrip:从解码帧中第一次去除的字节数
         */
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4));// 计算当前待发送消息的二进制字节长度,将该长度添加到ByteBuf的缓冲区头中
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));  // 将byte数据解码成String
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));  // 将字符串编码成byte数据
        pipeline.addLast(new TestClientHandler());
    }
}

TestClientHandler

public class TestClientHandler extends SimpleChannelInboundHandler<String> {

    // 读取服务端数据
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(channelHandlerContext.channel().remoteAddress() + ", client output = " + s);
        // channelHandlerContext.writeAndFlush("form client"+ LocalDateTime.now());
    }

    // 通道就绪
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush("来自客户端的问候" + i);
        }
    }

    //有异常发生
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.channel().close();
    }
}

聊天室

模拟多人聊天室

TestServer

public class TestServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new TestServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
            System.out.println("Server is starting on 8989...");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

TestServerInitializer

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // ChannelInboundHandler,基于分隔符的解码器
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 字节 转 String
        pipeline.addLast(new TestServerHandler());

        // ChannelOutboundHandler
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // String 转 字节
    }
}

TestServerHandler

public class TestServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // channel 助手类(拦截器)的添加
    // 对于上线:handlerAdded 先执行,channelActive 后执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // group.writeAndFlush 会发送给组内的所有人
        group.writeAndFlush(channel.remoteAddress() + " 加入了\n");
        group.add(channel); // 不转发给自己,所以这行代码在下面
    }

    // channel活跃 通道准备就绪事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + " 上线了");
        System.out.println("在线人数 = " + group.size() + "\n");
    }

    // channel读取数据
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel channel = channelHandlerContext.channel();
        // 因为自己已经在群组里了,所以要循环判断,不发送给自己
        group.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush(channel.remoteAddress() + ":" + s + "\r\n");
            }
        });
        // 向下传播
        channelHandlerContext.fireChannelRead(s);
    }

    // channel不活跃  通道关闭事件
    // 对于下线:channelInactive 先执行,handlerRemoved 后执行
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // ctx.channel 会自动从 group 中 remove 掉 channel
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + " 下线了");
        System.out.println("在线人数 = " + group.size() + "\n");
    }

    // channel 助手类(拦截器)移除
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        group.writeAndFlush(channel.remoteAddress() + " 离开了\n");
    }

    // channel注册事件
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
        super.channelRegistered(ctx);
    }

    // channel取消注册事件
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered");
        super.channelUnregistered(ctx);
    }

    // 发生异常回调
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

TestClient

public class TestClient {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(bossGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new TestClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8989).sync();

            Channel channel = channelFuture.channel();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            for (; ; ) {
                channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
        }
    }
}

TestClientInitializer

public class TestClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));//基于分隔符的解码器
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new TestClientHandler());
    }
}

TestClientHandler

public class TestClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim() + "\n");
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

心跳检测

利用 netty 自带的 IdleStateHandler 来做心跳检测机制

TestServer

public class TestServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();  // 接收客户端连接的线程组
        EventLoopGroup workGroup = new NioEventLoopGroup(); // 真正处理读写事件的线程组

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new TestServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

TestServerInitializer

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 2. 配置handler
        // 2.1 websocket 基于 http 协议,需要 http 编码和解码工具
        pipeline.addLast(new HttpServerCodec());
        // 2.2 对于大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        // 2.3 对于 http 的消息进行聚合,聚合成 FullHttpRequest 或者 FullHttpResponse
        pipeline.addLast(new HttpObjectAggregator(1024 * 64));
        // 针对客户端,如果 1 分钟之内没有向服务器发送读写心跳,则主动断开
        pipeline.addLast(new IdleStateHandler(40, 50, 45));
        // 自定义的读写空闲状态检测
        pipeline.addLast(new HeartBeatHandler());
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // 定义自己的handler,主要是对请求进行处理和发送
        pipeline.addLast(new ChatHandler());
    }
}

TestServerHandler

public class TestServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            String str = "";
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    str = "读空闲";
                    System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
                    break;
                case WRITER_IDLE:
                    str = "写空闲";
                    System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
                    break;
                case ALL_IDLE:
                    str = "读写空闲";
                    System.out.println(ctx.channel().remoteAddress() + "----超时事件-----" + str);
                    ctx.channel().close();
                    break;
            }
        }
    }
}

HeartBeatHandler

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    /**
     * 用户事件触发的处理器
     *
     * @param ctx 上下文
     * @param evt 事件
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 判断 evt 是否属于 IdleStateEvent,用于触发用户事件,包含读空闲,写空闲,读写空闲
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // 读空闲,不做处理
                System.out.println("进入读空闲");
            } else if (event.state() == IdleState.WRITER_IDLE) {
                // 写空闲,不做处理
                System.out.println("进入写空闲");
            } else if (event.state() == IdleState.ALL_IDLE) {
                System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
                // 关闭channel
                Channel channel = ctx.channel();
                channel.close();
                System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
            }

        }
    }
}