NioEventLoopGroup

NioEventLoopGroup是一组NioEventLoop,通常,我们创建一个Netty server都采用以下的代码:

  1. EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
  2. EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
  3. ServerBootstrap serverBootstrap = new ServerBootstrap();
  4. serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
  5. .channel(NioServerSocketChannel.class)
  6. .childHandler(new ChannelInitializer<SocketChannel>(){
  7. @Override
  8. protected void initChannel(SocketChannel ch){
  9. ChannelPipeline pipeline = ch.pipeline();
  10. pipeline.addLast(...);//添加channelHandler
  11. ...
  12. }
  13. });

NioEventLoopGroup是MultithreadEventLoopGroup类的子类,它用于基于NIO多路复用器(Selector)的channel。同时它的继承关系中父类还有ScheduledExecutorService类,也就是说,它也是一个用来执行定时任务的线程池。

上面的代码中使用NioEventLoopGroup的无参构造函数构造bossEventLoopGroup和workerEventLoopGroup,其调用过程如下:

  1. public NioEventLoopGroup() {
  2. this(0);
  3. }
  4. public NioEventLoopGroup(int nThreads) {
  5. this(nThreads, (Executor) null);
  6. }
  7. public NioEventLoopGroup(int nThreads, Executor executor) {
  8. //调用SelectorProvider.provider()获取SelectorProvider,关于SelectorProvider可以参考https://www.yuque.com/arkvie/uo8gda/duh1bg#741f3129
  9. this(nThreads, executor, SelectorProvider.provider());
  10. }
  11. public NioEventLoopGroup(
  12. int nThreads, Executor executor, final SelectorProvider selectorProvider) {
  13. //默认的选择策略,在之前的NioEventLoop中有介绍https://www.yuque.com/arkvie/uo8gda/rv8h61#NioEventLoop
  14. this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
  15. }
  16. public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
  17. final SelectStrategyFactory selectStrategyFactory) {
  18. //使用默认拒绝的拒绝策略 调用父类MultithreadEventLoopGroup的构造函数
  19. super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
  20. }
  21. protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
  22. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
  23. }
  24. protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
  25. this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
  26. }
  27. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
  28. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  29. }
  30. /**
  31. * 创建一个实例
  32. * @param nThreads 线程数量
  33. * @param executor 线程池(用来执行 {@link Runnable}的执行器)
  34. * @param chooserFactory
  35. * @param args 调用{@link #newChild(Executor, Object...)}的其他参数
  36. */
  37. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  38. EventExecutorChooserFactory chooserFactory, Object... args) {
  39. if (nThreads <= 0) {
  40. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  41. }
  42. if (executor == null) {
  43. //如果executor为null,实例化一个 {@link ThreadPerTaskExecutor} 它的execut方法每次都会new一个线程去执行 {@link Runnable}任务
  44. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  45. }
  46. //实例化children数组
  47. children = new EventExecutor[nThreads];
  48. for (int i = 0; i < nThreads; i ++) {
  49. boolean success = false;
  50. try {
  51. //实例化每个child 在NioEventLoopGroup的实现中是构建一个NioEventLoop对象
  52. children[i] = newChild(executor, args);
  53. success = true;
  54. } catch (Exception e) {
  55. // TODO: Think about if this is a good exception type
  56. throw new IllegalStateException("failed to create a child event loop", e);
  57. } finally {
  58. //省略部分代码
  59. }
  60. }
  61. //根据数组children的length即线程的数量创建chooser
  62. // 如果legnth是2的次方会创建一个PowerOfTwoEventExecutorChooser否则创建GenericEventExecutorChooser,
  63. // 这两个都是轮询的算法但是PowerOfTwoEventExecutorChooser使用位运算在细节上性能更好
  64. chooser = chooserFactory.newChooser(children);
  65. //构建terminationListener
  66. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  67. @Override
  68. /**
  69. * 操作完成时触发
  70. */
  71. public void operationComplete(Future<Object> future) throws Exception {
  72. //terminatedChildren自增
  73. if (terminatedChildren.incrementAndGet() == children.length) {
  74. //如果全部都完成了,就标记为成功
  75. terminationFuture.setSuccess(null);
  76. }
  77. }
  78. };
  79. //给每个child添加listener,在这个EventExecutor执行完之前会调用listener的operationComplete方法
  80. for (EventExecutor e: children) {
  81. e.terminationFuture().addListener(terminationListener);
  82. }
  83. //使用children构建一个不可变的set视图赋值给readonlyChildren
  84. Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  85. Collections.addAll(childrenSet, children);
  86. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  87. }

以上是一个NioEventLoopGroup构造函数的调用过程,在NioEventLoopGroup中的newChild()方法如下实现,调用NioEventLoop的构造方法:

  1. @Override
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  3. EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
  4. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  5. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
  6. }

EventExecutorChooserFactory和EventExecutorChooser

在MultithreadEventExecutorGroup中使用EventExecutorChooser来获取下一个Executor,EventExecutorChooser就是一个选择器,其中保存了MultithreadEventExecutorGroup中的成员变量——children数组,在EventExecutorChooser中对应的属性是executors,EventExecutorChooser自身还有一个AtomicInteger类型的成员变量idx。每次调用next方法idx会自增1,然后根据idx去数组executors中获取一个Executor。
GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser是EventExecutorChooser的两个实现类,GenericEventExecutorChooser是通用的选择器,其next()方法是去idx与数组executors的长度的余数取绝对值,这也就是轮询的方式。代码如下:

  1. @Override
  2. public EventExecutor next() {
  3. return executors[Math.abs(idx.getAndIncrement() % executors.length)];
  4. }

而PowerOfTwoEventExecutorChooser适用于executors的长度为2的次方的情况,其代码如下:

  1. @Override
  2. public EventExecutor next() {
  3. return executors[idx.getAndIncrement() & executors.length - 1];
  4. }

在PowerOfTwoEventExecutorChooser中是取idx与executors.length - 1的逻辑与运算,executors.length是2的n次方的话,那么executors.length - 1就是最右边的n-1位都为1,其余为0,得到的结果最右边的n-1位为idx的最右边n-1位的值,其余为0,也就是将idx的高位都置0,其实也就是除以2的n次方的余数。所以PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser本质上的结果都是轮询,但PowerOfTwoEventExecutorChooser只是用与executors的长度为2的次方的情况,且效率更高。

EventExecutorChooserFactory是获取EventExecutorChooser的工厂类,DefaultEventExecutorChooserFactory是EventExecutorChooserFactory的默认实现类。其newChooser方法就是根据executors的长度获取一个选择器:

  1. @Override
  2. public EventExecutorChooser newChooser(EventExecutor[] executors) {
  3. if (isPowerOfTwo(executors.length)) {
  4. return new PowerOfTwoEventExecutorChooser(executors);
  5. } else {
  6. return new GenericEventExecutorChooser(executors);
  7. }
  8. }

EventLoopGroup

EventLoopGroup能够注册Channel,注册的channel后续会处理事件循环中的各个事件。NioEventLoopGroup的父类MultithreadEventLoopGroup以及之前提到的EventLoop都实现/继承了EventLoopGroup接口。EventLoopGroup是一组EventLoop,而EventLoop是一种特殊的EventLoopGroup,因为这个组里面只有一个EventLoop。EventLoopGroup的next方法就是用来获取下一个EventLoop:

  1. /**
  2. * Return the next {@link EventLoop} to use
  3. */
  4. @Override
  5. EventLoop next();

next方法在MultithreadEventLoopGroup中的实现如下:

  1. @Override
  2. public EventLoop next() {
  3. return (EventLoop) super.next();
  4. }
  5. /**
  6. * MultithreadEventLoopGroup父类MultithreadEventExecutorGroup中的方法
  7. * 因为EventExecutorGroup和EventLoopGroup都定义了next方法但返回值不同,
  8. * 父类中实现了EventExecutorGroup的next方法,而这个chooser中维护的executors
  9. * 就是构建NioEventLoopGroup时创建NioEventLoop数组children,
  10. * 所以这里返回的是一个NioEventLoop对象,NioEventLoop既实现了EventExecutor接口
  11. * 也实现了EventLoop接口
  12. */
  13. @Override
  14. public EventExecutor next() {
  15. return chooser.next();
  16. }

显而易见MultithreadEventLoopGroup是轮询其中的children获取的下一个EventLoop对象;而在EventLoop接口的实现类SingleThreadEventLoop中next方法实现如下:

  1. @Override
  2. public EventLoop next() {
  3. return (EventLoop) super.next();
  4. }
  5. /**
  6. * SingleThreadEventLoop父类AbstractEventExecutor中的方法,返回自身
  7. */
  8. @Override
  9. public EventExecutor next() {
  10. return this;
  11. }

SingleThreadEventLoop中获取下一个EventLoop就是返回自身。

next方法在哪里会使用到呢?其中之一就是NioEventLoopGroup绑定channel的时候:

  1. /**
  2. * MultithreadEventLoopGroup中的实现
  3. */
  4. @Override
  5. public ChannelFuture register(Channel channel) {
  6. return next().register(channel);
  7. }

NioEventLoopGroup注册一个channel就是获取下一个NioEventLoop来绑定channel。
关于NioEventLoopGroup这个类的解析先告一段落,后面会找时间补充一个流程图。