一、实例要求

实例要求:使用 IDEA 创建 Netty 项目

  1. Netty 服务器在 6668 端口监听,客户端能发送消息给服务器”hello,服务器~”
  2. 服务器可以回复消息给客户端”hello,客户端~”
  3. 目的:对 Netty 线程模型有一个初步认识,便于理解 Netty 模型理论
    1. 编写服务端
    2. 编写客户端
    3. 对 netty 程序进行分析,看看 netty 模型特点
    4. 说明:创建 Maven 项目,并引入 Netty 包

      二、实例

      1、服务端

      NettyServer

      ```java package com.supkingx.netty.simple;

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

  • @description:
  • @Author: wangchao
  • @Date: 2021/12/6 */ public class NettyServer { public static void main(String[] args) throws InterruptedException {

    1. // 创建 BossGroup 和 WorkerGroup
    2. // 说明
    3. // 1. 创建两个线程组 bossGroup 和 workerGroup
    4. // 2. bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
    5. // 3. 两个都是无限循环
    6. // 4. bossgroup 和 workGroup 含有的子线程(NIOEventGroup)的个数 默认实际是cpu的核数
    7. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    8. // 12核cpu,默认是24个线程 NettyRuntime.availableProcessors() * 2

    // NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    1. // 方便测试,设置为 3 个线程
    2. NioEventLoopGroup workerGroup = new NioEventLoopGroup(3);
    3. try {
    4. // 创建服务器端的启用对象,配置参数
    5. ServerBootstrap bootstrap = new ServerBootstrap();
    6. // 使用链式编程来进行设置
    7. bootstrap.group(bossGroup, workerGroup)// 设置两个现场组
    8. .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 作为服务器的通道实现
    9. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接数
    10. .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
    11. .childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道测试对象(匿名)
    12. // 给 pipeline 设置处理器
    13. @Override
    14. protected void initChannel(SocketChannel ch) throws Exception {
    15. // 在管道的最后添加一个处理器(自定义的)
    16. ch.pipeline().addLast(new NettyServerHandler());
    17. }
    18. }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
    19. System.out.println(".....服务器 is ready");
    20. // 绑定一个端口并且同步,生成一个 channelFuture 对象
    21. // 启动服务器并绑定端口
    22. ChannelFuture cf = bootstrap.bind(6677).sync();
    23. // 对关闭通道进行监听
    24. cf.channel().closeFuture().sync();
    25. } finally {
    26. bossGroup.shutdownGracefully();
    27. workerGroup.shutdownGracefully();
    28. }

    } } ```

    NettyServerHandler

    ```java package com.supkingx.netty.simple;

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;

/**

  • 说明
  • 1、我们自定义一个 Handler 需要继续netty 规定好的谋和 handlerAdapter(规范)
  • 2、这时我们自定义一个Handler,才能称为一个handler *
  • @description:
  • @Author: wangchao
  • @Date: 2021/12/6 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    // 读取数据实际(这里我们可以继续读取客户端发送的消息)

    /**

    • @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
    • @param msg 就是客户端发送的数据 默认Object
    • @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(“服务器读取线程 “ + Thread.currentThread().getName()); System.out.println(“server ctx=” + ctx); System.out.println(“看看channel 和 pipeline的关系”); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline();
  1. // 将 msg 转化成一个 ByteBuffer
  2. // ByteBuf 是 Netty 提供的,不是NIO 的 ByteBuffer
  3. ByteBuf buf = (ByteBuf) msg;
  4. System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
  5. System.out.println("客户端地址:" + ctx.channel().remoteAddress());
  6. }
  7. /**
  8. * 数据读取完毕
  9. * 可以在这里向通道写回数据
  10. *
  11. * @param ctx
  12. * @throws Exception
  13. */
  14. @Override
  15. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  16. // 是 write + flush
  17. // 将数据写入到缓存并刷新
  18. // 一般讲,我们对发送的数据进行编码
  19. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,client", CharsetUtil.UTF_8));
  20. }
  21. /**
  22. * 处理异常
  23. *
  24. * @param ctx
  25. * @param cause
  26. * @throws Exception
  27. */
  28. @Override
  29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  30. System.out.println("服务端异常了");
  31. cause.printStackTrace();
  32. ctx.close();
  33. }

}

  1. <a name="zUi9H"></a>
  2. ## 2、客户端
  3. <a name="YGdkr"></a>
  4. ### NettyClient
  5. ```java
  6. package com.supkingx.netty.simple;
  7. import io.netty.bootstrap.Bootstrap;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.nio.NioEventLoopGroup;
  11. import io.netty.channel.socket.SocketChannel;
  12. import io.netty.channel.socket.nio.NioSocketChannel;
  13. /**
  14. * @description:
  15. * @Author: wangchao
  16. * @Date: 2021/12/6
  17. */
  18. public class NettyClient {
  19. public static void main(String[] args) throws InterruptedException {
  20. // 客户端需要一个事件循环跑
  21. NioEventLoopGroup group = new NioEventLoopGroup();
  22. try {
  23. // 创建客户端启动对象
  24. // 注意客户端使用的不是 serverBootstrap 而是 BootStrap
  25. Bootstrap bootstrap = new Bootstrap();
  26. // 设置相关参数
  27. bootstrap.group(group)
  28. .channel(NioSocketChannel.class)
  29. .handler(new ChannelInitializer<SocketChannel>() {
  30. @Override
  31. protected void initChannel(SocketChannel channel) throws Exception {
  32. // 在管道的最后添加一个处理器(自定义的)
  33. channel.pipeline().addLast(new NettyClientHandler());
  34. }
  35. });
  36. System.out.println("客户端。。ok");
  37. // 启动客户端去连接服务器端
  38. // 关于 channelFuture 要分析,涉及到 netty 的异步模型
  39. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6677).sync();
  40. // 给关闭通道进行监听
  41. channelFuture.channel().closeFuture().sync();
  42. } finally {
  43. // 优雅的关闭
  44. group.shutdownGracefully();
  45. }
  46. }
  47. }

NettyClientHandler

  1. package com.supkingx.netty.simple;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import java.nio.charset.StandardCharsets;
  8. /**
  9. * @description:
  10. * @Author: wangchao
  11. * @Date: 2021/12/6
  12. */
  13. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  14. /**
  15. * 当通道就绪时,就会触发这个方法
  16. *
  17. * @param ctx
  18. * @throws Exception
  19. */
  20. @Override
  21. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  22. System.out.println("client " + ctx);
  23. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server!", StandardCharsets.UTF_8));
  24. }
  25. /**
  26. * 当通道有时间读取时,会触发
  27. *
  28. * @param ctx
  29. * @param msg
  30. * @throws Exception
  31. */
  32. @Override
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  34. ByteBuf buf = (ByteBuf) msg;
  35. System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
  36. System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
  37. }
  38. @Override
  39. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  40. System.out.println("客户端异常了");
  41. cause.printStackTrace();
  42. ctx.close();
  43. }
  44. }

三、源码简单分析

NioEventLoopGroup

1.默认线程数是0
image.png
2.进入这个方法内部一直深入,直到找到如下方法,可以看到当线程数=0的时候,返回一个默认的线程
image.png
3.默认线程如下,即当前CPU核心数(NettyRuntime.availableProcessors())*2
image.png
4.debug继续,由于我的电脑是12核心,所以这时的 NioEventLoopGroup 使用 24 线程,即有 24个children
image.png
children部分展示如下,有24个
image.png

验证workgroup每次都会启动一个新线程

  1. 这里我们启动了两个client,一个server,可以看到server使用了两个线程来对应每个客户端。

image.png

  1. 由于我们24个线程,启动24个客户端过于繁琐,这里将workGroup的线程数设置为3,便于测试。

可以看到,启动了4个客户端之后,workGroup会依次使用线程,第四个客户端会去使用第一个线程。
image.png

chanel 和 pipeline

通过下图可以简单的发现。

  • channel中有pipeline,pipeline中有channel,这两个相互对应
  • pipeline 是一个双向链表

image.png
image.png

四、上诉实例能不能再次优化?

1、缺陷

可以在 NettyServerHandler 类的 channelRead 方法中添加如下代码,可以发现当接受客户端的消息时,在这里被 阻塞 了,这样是不行的,本文将会介绍如何优化。

  1. try {
  2. Thread.sleep(10 * 1000);
  3. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,你好!", CharsetUtil.UTF_8));
  4. } catch (InterruptedException e) {
  5. System.out.println("发生异常" + e.getMessage());
  6. }

image.png

2、简单优化

可以在 NettyServerHandler 类的 channelRead 中添加异步任务,即可解决阻塞问题。

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. // 比如我们有一个耗时很长的任务 -> 异步执行 -> 提交该channel 对应的 NIOEventloop 的 taskQueue中
  4. // 解决方案1 用哪个户程序自定义的普通任务
  5. ctx.channel().eventLoop().execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. Thread.sleep(10 * 1000);
  10. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端,你好!", CharsetUtil.UTF_8));
  11. } catch (InterruptedException e) {
  12. System.out.println("发生异常" + e.getMessage());
  13. }
  14. }
  15. });
  16. System.out.println("server....go");
  17. }

上述方案的异步原理是,将异步任务放到 ChannelHandlerContext(DefaultChannelHandlerContext
�类型)—>pipeline(DefaultChannelPipeline�类型) —> channel(NioSocketChannel�类型) —> evenLoop(NioEventLoop类型)—>taskQueue(NioEventLoop�类型) 中,等待执行。

3、探究 taskQueue

疑问:上诉优化中,假如 channelRead 有两个异步任务,这两个异步任务会怎么执行?
例如:在 channelRead 方法中有两个线程,第一个任务阻塞 10s,第二个任务阻塞 20s,当程序运行,客户端发送消息过来时,会是怎么样的?

假如客户端 2:00:00 发送消息到客户端,两种情况 1、第一个任务 2:00:10执行,第二个任务2:00:20执行 2、第一个任务 2:00:10执行,第二个任务2:00:30执行

答案是 第二种情况,当有两个线程在 taskQueue 中时,这两个线程会顺序执行。代码如下

com.supkingx.netty.simple2.NettyServerHandler#channelRead

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. // 比如我们有一个耗时很长的任务 -> 异步执行 -> 提交该channel 对应的 NIOEventloop 的 taskQueue中
  4. // 解决方案1 用哪个户程序自定义的普通任务
  5. ctx.channel().eventLoop().execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. Thread.sleep(10 * 1000);
  10. String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  11. ctx.writeAndFlush(Unpooled.copiedBuffer(now + " hello,客户端,你好!", CharsetUtil.UTF_8));
  12. } catch (InterruptedException e) {
  13. System.out.println("发生异常" + e.getMessage());
  14. }
  15. }
  16. });
  17. ctx.channel().eventLoop().execute(new Runnable() {
  18. @Override
  19. public void run() {
  20. try {
  21. Thread.sleep(20 * 1000);
  22. String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(now + " hello,客户端,你好!", CharsetUtil.UTF_8));
  24. } catch (InterruptedException e) {
  25. System.out.println("发生异常" + e.getMessage());
  26. }
  27. }
  28. });
  29. String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  30. System.out.println("server....go," + now);
  31. }

执行代码并验证

服务端在 2021-12-09 00:03:22 收到客户端的消息,由于是异步,所以先输出了 server….go
image.png
等待 10s至00:03:32时,第一个任务执行并向客户端发送消息,客户端收到消息
等待 30s至00:03:52时,第二个任务执行并向客户端发送消息,客户端收到消息
image.png
如上可验证结果。

总结

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
  4. NioEventLoopGroup 下包含多个 NioEventLoop
  5. 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  6. 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
  7. 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
  8. 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
  9. ChannelHandlerContext(DefaultChannelHandlerContext

�类型)—>pipeline(DefaultChannelPipeline�类型) —> channel(NioSocketChannel�类型) —> evenLoop(NioEventLoop类型)—>taskQueue(NioEventLoop�类型) 中的 taskQueue 是串行设计。