前面介绍了Netty的线程模型及基本用法,今天我们以一个简单的例子,深入了解Netty的启动原理及主从Reactor线程模型的底层实现,以下是简单的Netty服务端程序

  1. public static void main(String[] args) throws Exception {
  2. // 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
  3. // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
  4. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  5. EventLoopGroup workerGroup = new NioEventLoopGroup(8);
  6. try {
  7. // 创建服务器端的启动对象
  8. ServerBootstrap bootstrap = new ServerBootstrap();
  9. // 使用链式编程来配置参数
  10. bootstrap.group(bossGroup, workerGroup) //设置两个线程组
  11. // 使用NioServerSocketChannel作为服务器的通道实现
  12. .channel(NioServerSocketChannel.class)
  13. // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
  14. // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
  15. .option(ChannelOption.SO_BACKLOG, 1024)
  16. .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
  17. @Override
  18. protected void initChannel(SocketChannel ch) throws Exception {
  19. //对workerGroup的SocketChannel设置处理器
  20. ch.pipeline().addLast(new NettyServerHandler());
  21. }
  22. });
  23. System.out.println("netty server start。。");
  24. // 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
  25. // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
  26. ChannelFuture cf = bootstrap.bind(9000).sync();
  27. // 等待服务端监听端口关闭,closeFuture是异步操作
  28. // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
  29. cf.channel().closeFuture().sync();
  30. } finally {
  31. bossGroup.shutdownGracefully();
  32. workerGroup.shutdownGracefully();
  33. }
  34. }
  35. -------------------------------------------------------------
  36. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  37. /**
  38. * 当客户端连接服务器完成就会触发该方法
  39. *
  40. * @param ctx
  41. * @throws Exception
  42. */
  43. @Override
  44. public void channelActive(ChannelHandlerContext ctx) {
  45. System.out.println("客户端连接通道建立完成");
  46. }
  47. /**
  48. * 读取客户端发送的数据
  49. *
  50. * @param ctx 上下文对象, 含有通道channel,管道pipeline
  51. * @param msg 就是客户端发送的数据
  52. * @throws Exception
  53. */
  54. @Override
  55. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  56. //Channel channel = ctx.channel();
  57. //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
  58. //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
  59. ByteBuf buf = (ByteBuf) msg;
  60. System.out.println("收到客户端的消息:" + buf.toString(CharsetUtil.UTF_8));
  61. }
  62. /**
  63. * 数据读取完毕处理方法
  64. *
  65. * @param ctx
  66. * @throws Exception
  67. */
  68. @Override
  69. public void channelReadComplete(ChannelHandlerContext ctx) {
  70. ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
  71. ctx.writeAndFlush(buf);
  72. }
  73. /**
  74. * 处理异常, 一般是需要关闭通道
  75. *
  76. * @param ctx
  77. * @param cause
  78. * @throws Exception
  79. */
  80. @Override
  81. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  82. ctx.close();
  83. }
  84. }

回顾一下Netty整体架构设计,看源码时要时刻想起这幅图
微信截图_20210722172442.png

neety线程模型及基本用法可以参考《Netty线程模型》

创建NioEventLoopGroup


在启动 server 时,首先需要启动两个线程组 NioEventLoopGroup

  1. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  2. //若没有指定线程数量,默认是cpu核数的两倍
  3. EventLoopGroup workerGroup = new NioEventLoopGroup();

若没有指定 eventLoopGroup 的线程数,默认是CPU核数的两倍

  1. //默认线程数是cpu核数两倍
  2. private static final int DEFAULT_EVENT_LOOP_THREADS =
  3. Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
  4. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  5. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  6. }

微信截图_20210824112317.png

以下是 eventLoopGroup 的具体构造方法,创建了一个线程池 executor 及线程数组childern[],数组每个元素对应一个 NioEventLoop(创建的线程池executor后续用于给NioEventLoop的父类SingleThreadEventExecutor中executor属性赋值)

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args){
  2. this.terminatedChildren = new AtomicInteger();
  3. this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
  4. if (nThreads <= 0) {
  5. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  6. } else {
  7. if (executor == null) {
  8. //传进来的executor参数默认为null,创建一个ThreadPerTaskExecutor类型的线程池
  9. executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
  10. }
  11. //创建线程数组,大小为我们指定的线程数或默认线程数
  12. this.children = new EventExecutor[nThreads];
  13. int j;
  14. for(int i = 0; i < nThreads; ++i) {
  15. boolean success = false;
  16. boolean var18 = false;
  17. try {
  18. var18 = true;
  19. //创建nThreads个NioEventLoop赋值给数组
  20. //并且传入了上面创建的线程池,后续用于给父类SingleThreadEventExecutor的executor属性赋值
  21. this.children[i] = this.newChild((Executor)executor, args);
  22. success = true;
  23. var18 = false;
  24. } catch (Exception var19) {
  25. throw new IllegalStateException("failed to create a child event loop", var19);
  26. } finally {
  27. //分支逻辑
  28. ......
  29. }
  30. if (!success) {
  31. //分支逻辑
  32. ......
  33. }
  34. }
  35. this.chooser = chooserFactory.newChooser(this.children);
  36. FutureListener<Object> terminationListener = new FutureListener<Object>() {
  37. public void operationComplete(Future<Object> future) throws Exception {
  38. if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
  39. MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
  40. }
  41. }
  42. };
  43. EventExecutor[] var24 = this.children;
  44. j = var24.length;
  45. for(int var26 = 0; var26 < j; ++var26) {
  46. EventExecutor e = var24[var26];
  47. e.terminationFuture().addListener(terminationListener);
  48. }
  49. Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
  50. Collections.addAll(childrenSet, this.children);
  51. this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
  52. }
  53. }

创建NioEventLoop

调用NioEventLoopGroup的newChild()方法创建NioEventLoop,其构造方法会创建TaskQueue,并调用JDK中NIO底层代码,创建了多路复用器Selector(对应了上面的架构图,是不是有点感觉了?)

  1. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  2. return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
  3. }
  4. ---------------------------------------------
  5. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
  6. //调用父类的构造方法会创建我们的TaskQueue,是一个LinkedBlockingQueue
  7. super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
  8. if (selectorProvider == null) {
  9. throw new NullPointerException("selectorProvider");
  10. } else if (strategy == null) {
  11. throw new NullPointerException("selectStrategy");
  12. } else {
  13. this.provider = selectorProvider;
  14. //调用NIO底层的openSelector()方法,创建多路复用器
  15. NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
  16. this.selector = selectorTuple.selector;
  17. this.unwrappedSelector = selectorTuple.unwrappedSelector;
  18. this.selectStrategy = strategy;
  19. }
  20. }

微信截图_20210723172100.png

创建服务端启动对象


创建服务端启动对象ServerBootstrap,通过链式编程为ServerBootstrap的各种属性赋值

group()

调用group()方法设置将bossGroup赋值给group属性,将workerGroup赋值给childGroup属性

  1. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
  2. super.group(parentGroup);
  3. if (childGroup == null) {
  4. throw new NullPointerException("childGroup");
  5. } else if (this.childGroup != null) {
  6. throw new IllegalStateException("childGroup set already");
  7. } else {
  8. //将workerGroup赋值给childGroup属性
  9. this.childGroup = childGroup;
  10. return this;
  11. }
  12. }
  13. --------------------------------------
  14. public B group(EventLoopGroup group) {
  15. if (group == null) {
  16. throw new NullPointerException("group");
  17. } else if (this.group != null) {
  18. throw new IllegalStateException("group set already");
  19. } else {
  20. //将bossGroup赋值给group属性
  21. this.group = group;
  22. return this.self();
  23. }
  24. }

微信截图_20210722204053.png

channel()

调用 channel() 方法,指定使用NioServerSocketChannel(对NIO中的ServerSocketChannel进行了封装)作为服务器的通道实现
微信截图_20210722204449.png
创建channelFactory,通过反射获取NioServerSocketChannel的构造方法(空构造方法),复制给constructor属性,后续通过该构造方法创建NioServerSocketChannel

  1. public B channel(Class<? extends C> channelClass) {
  2. if (channelClass == null) {
  3. throw new NullPointerException("channelClass");
  4. } else {
  5. //创建ReflectiveChannelFactory对象,赋值给当前服务器启动对象ServerBootstrap
  6. return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
  7. }
  8. }
  9. ---------------------------------------
  10. public ReflectiveChannelFactory(Class<? extends T> clazz) {
  11. ObjectUtil.checkNotNull(clazz, "clazz");
  12. try {
  13. //通过反射获取NioSocketChannel的空构造方法,赋值给ReflectiveChannelFactory的constructor属性
  14. this.constructor = clazz.getConstructor();
  15. } catch (NoSuchMethodException var3) {
  16. throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", var3);
  17. }
  18. }

微信截图_20210722210737.png

option()

设置TCP相关参数(KEEPALIVE、RCVBUF、SNDBUF、TIMEOUT…….),把配置的参数及对应的值放到一个Map中

  1. public <T> B option(ChannelOption<T> option, T value) {
  2. if (option == null) {
  3. throw new NullPointerException("option");
  4. } else {
  5. if (value == null) {
  6. synchronized(this.options) {
  7. this.options.remove(option);
  8. }
  9. } else {
  10. synchronized(this.options) {
  11. //将配置的参数放入到Map中
  12. this.options.put(option, value);
  13. }
  14. }
  15. return this.self();
  16. }
  17. }

微信截图_20210722212509.png

childHandler()

创建通道初始化对象,重写initChannel()方法(客户端连接服务端成功会回调该方法,往pipeline中添加我们自定义的Handler)

  1. //创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
  2. childHandler(new ChannelInitializer<SocketChannel>() {
  3. @Override
  4. protected void initChannel(SocketChannel ch) throws Exception {
  5. //对workerGroup的SocketChannel设置处理器
  6. ch.pipeline().addLast(new NettyServerHandler());
  7. }
  8. });
  9. ---------------------------
  10. public ServerBootstrap childHandler(ChannelHandler childHandler) {
  11. if (childHandler == null) {
  12. throw new NullPointerException("childHandler");
  13. } else {
  14. //为childHandler赋值
  15. this.childHandler = childHandler;
  16. return this;
  17. }
  18. }

微信截图_20210722213804.png

此时服务端启动对象基本构建完毕,记住这些属性,后续在调用bind()方法中都会用到