初始化代码

  1. EventLoopGroup parentGroup = new NioEventLoopGroup();
  2. EventLoopGroup childGroup = new NioEventLoopGroup();

创建源码

  1. 构造器方法 ```java public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    //1. 第一步创建 public NioEventLoopGroup() {

    1. this(0);

    }

    //2. 第二部创建null,就是内部封装的executor public NioEventLoopGroup(int nThreads) {

    1. this(nThreads, (Executor) null);

    }

    //3. 第三步 public NioEventLoopGroup(int nThreads, Executor executor) {

    1. this(nThreads, executor, SelectorProvider.provider());

    }

    //4. 第四步,参数是 0 null public NioEventLoopGroup(

    1. int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    2. this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);

    }

  1. //5. 此处的executor仍然是null
  2. // args 包括,selectorProvider,selectStrategyFactory,rejectedExecutionHanlder
  3. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  4. // 默认会创建 逻辑内核数量2倍的 nioEventLoop
  5. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  6. }
  7. //6.
  8. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
  9. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  10. }

}

  1. - 可以看出,内部封装的executor在初始化的时候,是没有进行创建的
  2. - SelectorProvider.provider()是干什么用的???
  3. - 首先它是全局唯一的,一个JVM只有一个此对象
  4. - 后续的selectorchannel都是由它进行创建的
  5. - DefaultSelectStrategyFactory.INSTANCE,默认的选择策略
  6. - 通过第5步可知,如果没有设置数据,那么NIOEventGroupLoop里面的EventLoop就是逻辑内核数量的两倍,否则就是你设置的个数。
  7. 2. 最终到了MultithreadEventExecutorGroup的构造方法,也是创建过程中最重要的方法
  8. ```java
  9. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  10. EventExecutorChooserFactory chooserFactory, Object... args) {
  11. if (nThreads <= 0) {
  12. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  13. }
  14. //1. 创建一个 总executor,就是这个group所包含的executor,一个group包含一个总executor
  15. if (executor == null) {
  16. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  17. }
  18. //2. 创建一个数组,存放该group中包含的所有eventLoop,因为eventLoop就是EventExecutor
  19. children = new EventExecutor[nThreads];
  20. //3. 逐个创建eventLoop
  21. for (int i = 0; i < nThreads; i ++) {
  22. boolean success = false;
  23. try {
  24. //3.1 创建eventLoop
  25. children[i] = newChild(executor, args);
  26. success = true;
  27. } catch (Exception e) {
  28. // TODO: Think about if this is a good exception type
  29. throw new IllegalStateException("failed to create a child event loop", e);
  30. } finally {
  31. //3.2 若在创建eventLoop过程中有一个发生异常,则关闭所有
  32. // 已经创建成功的eventLoop,并终止所有它们执行的任务
  33. if (!success) {
  34. // 关闭所有已经创建成功的eventLoop
  35. for (int j = 0; j < i; j ++) {
  36. children[j].shutdownGracefully();
  37. }
  38. // 终止所有eventLoop执行的任务,此处为优雅的终止,如果executor有任务,则等待执行完后终止
  39. for (int j = 0; j < i; j ++) {
  40. EventExecutor e = children[j];
  41. try {
  42. while (!e.isTerminated()) {
  43. //阻塞方法
  44. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  45. }
  46. } catch (InterruptedException interrupted) {
  47. // Let the caller handle the interruption.
  48. Thread.currentThread().interrupt();
  49. break;
  50. }
  51. }
  52. }
  53. }
  54. }
  55. //4. 创建一个选择器,选择器将来用于从eventLoop数组中选择一个eventLoop
  56. // 此处代码,实质上只是创建一个可以优化算法的选择器
  57. chooser = chooserFactory.newChooser(children);
  58. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  59. @Override
  60. public void operationComplete(Future<Object> future) throws Exception {
  61. if (terminatedChildren.incrementAndGet() == children.length) {
  62. terminationFuture.setSuccess(null);
  63. }
  64. }
  65. };
  66. for (EventExecutor e: children) {
  67. e.terminationFuture().addListener(terminationListener);
  68. }
  69. Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  70. Collections.addAll(childrenSet, children);
  71. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  72. }

newDefaultThreadFactory()

  • 创建了一个默认的线程工厂,定义了以后创建线程的名词等等 ```java protected ThreadFactory newDefaultThreadFactory() { //getClass()即NioEventGroup类型 return new DefaultThreadFactory(getClass()); }

public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { // toPoolName() 计算出线程池名称 this(toPoolName(poolType), daemon, priority); }

  1. - 最后交给了ThreadPerTaskExecutor,使用此Executor封装了线程工厂,此Executor会在每个eventLoop创建一个线程
  2. ```java
  3. public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
  4. if (threadFactory == null) {
  5. throw new NullPointerException("threadFactory");
  6. }
  7. // 初始化线程工厂,其将来会为每个eventLoop创建一个线程
  8. this.threadFactory = threadFactory;
  9. }

newChild(executor, args)

  1. NioEventLoopGroup的newChild方法用来,解析出arg中的参数,并且创建NioEventLoop

    1. public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    3. //从args中获取provider selectStrategy
    4. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
    5. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    6. }
    7. }
  2. new NioEventLoop

    1. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
    2. SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    3. //跳转到2.2
    4. super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    5. if (selectorProvider == null) {
    6. throw new NullPointerException("selectorProvider");
    7. }
    8. if (strategy == null) {
    9. throw new NullPointerException("selectStrategy");
    10. }
    11. //必须有,因为后面需要用此对象,来创建selector和channel
    12. provider = selectorProvider;
    13. final SelectorTuple selectorTuple = openSelector();
    14. // Netty封装过的selector
    15. selector = selectorTuple.selector;
    16. // 原始的NIO中的selector
    17. unwrappedSelector = selectorTuple.unwrappedSelector;
    18. selectStrategy = strategy;
    19. }

    2.2 SingleThreadEventLoop

    1. protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
    2. boolean addTaskWakesUp, int maxPendingTasks,
    3. RejectedExecutionHandler rejectedExecutionHandler) {
    4. super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    5. // 创建收尾队列(收尾队列中的任务不是在所有 任务队列中的任务全部执行完毕 后才执行的
    6. // 一段时间内的任务执行完就会去执行收尾队列,相当于时间到了,其他任务暂时先不执行,执行完收尾队列任务,
    7. // 表示此阶段任务执行完毕
    8. tailTasks = newTaskQueue(maxPendingTasks);
    9. }

    2.3 SingleThreadEventExecutor

    1. //此executor是总的executor,是在EventLoopGroup中包含的那个executor
    2. //一个EventGroup包含一个总executor
    3. protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
    4. boolean addTaskWakesUp, int maxPendingTasks,
    5. RejectedExecutionHandler rejectedHandler) {
    6. super(parent);
    7. this.addTaskWakesUp = addTaskWakesUp;
    8. this.maxPendingTasks = Math.max(16, maxPendingTasks);
    9. // NioEventLoop所封装的executor,我们称其为 子executor
    10. // 参数中的executor是总executor,是属于EventLoopGroup
    11. // 参数中的this,表示的创建的newChild即NioEventLoop对象
    12. // 继续2.4
    13. this.executor = ThreadExecutorMap.apply(executor, this);
    14. // 创建任务队列
    15. taskQueue = newTaskQueue(this.maxPendingTasks);
    16. rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    17. }

    2.4 ThreadExecutorMap.apply(executor, this);

    1. class ThreadExecutorMap{
    2. ...
    3. public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    4. ObjectUtil.checkNotNull(executor, "executor");
    5. ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    6. // 这里创建了一个 子executor
    7. return new Executor() {
    8. // 这是子executor的execute()方法
    9. @Override
    10. public void execute(final Runnable command) {
    11. // 这个executor是 总executor,其execute()会创建并启动一个线程
    12. // 总executor就是之前封装线程工厂的执行器ThreadPerTaskExecutor
    13. //,那么execute就是ThreadPerTaskExecutor的execute方法
    14. //ThreadPerTaskExecutor.execute见2.4.1
    15. //apply方法见2.4.2
    16. executor.execute(apply(command, eventExecutor));
    17. }
    18. };
    19. }
    20. ...
    21. }

    2.4.1 ThreadPerTaskExecutor.execute

  • 目的:原来此方法的执行就是使用总的executor来创建一个新的线程执行后面的apply产生的任务,并且线程启动起来开始执行

    1. public final class ThreadPerTaskExecutor implements Executor {
    2. private final ThreadFactory threadFactory;
    3. public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
    4. if (threadFactory == null) {
    5. throw new NullPointerException("threadFactory");
    6. }
    7. // 初始化线程工厂,其将来会为每个eventLoop创建一个线程
    8. this.threadFactory = threadFactory;
    9. }
    10. // 这是总executer的execute()方法
    11. @Override
    12. public void execute(Runnable command) {
    13. // 创建并启动一个线程
    14. threadFactory.newThread(command).start();
    15. }
    16. }

    2.4.2 apply(command, eventExecutor)

  • 目的:作用可以看到就是原来的runnable任务,重新创建了一个任务,内部执行的任务,就是原有的command的任务。执行他的run方法,为何要重新包装一层呢,原因在于setCurrentEventExecutor(eventExecutor); ```java //第一个参数是真正执行的任务 //第二个参数是NioEventLoop public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(command, “command”); ObjectUtil.checkNotNull(eventExecutor, “eventExecutor”); return new Runnable() {

    1. @Override
    2. public void run() {
    3. //具体用途呢???,用来作为线程隔离使用,类似于ThreadLocal
    4. setCurrentEventExecutor(eventExecutor);
    5. try {
    6. // 真正的任务的run()是在这里被调用的
    7. command.run();
    8. } finally {
    9. setCurrentEventExecutor(null);
    10. }
    11. }

    }; }

public final class ThreadExecutorMap {

  1. //存储着正在执行所有的executor(NioEventLoop)
  2. private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
  3. /**
  4. * Set the current {@link EventExecutor} that is used by the {@link Thread}.
  5. * 设置当前被线程使用的EventExecutor
  6. */
  7. private static void setCurrentEventExecutor(EventExecutor executor) {
  8. mappings.set(executor);
  9. }

} ```

总结
  1. 总流程:NioEventLoop的创建,
    1. 首先创建了内部的一些属性
    2. 而且创建了内部的executor,此executor内部包含了要执行的任务,总executor,NioEventLoop,同时在execute方法执行的时候,会调用总executor执行execute
      1. 总executor执行execute的时候,会使用默认的线程工厂创建出线程,同时与NioEventLoop绑定,然后执行任务
  2. 注意点

    1. 此过程只是创建了NioEventLoop对象,也创建了NioEventLoop内部绑定的子executor对象,但是子executor对象,总executor还没有给他分配线程与其进行绑定,只有子executor在执行execute方法的时候,总executor才会分配线程给他,让他执行任务
  3. 总Executor,一个EventLoopGroup一个此对象,是为了在子Executor(一个NioEventLoop内部包含一个此对象,一个EventLoopGroup内部包含多个NioEventLoop)执行任务的时候,分配线程来执行任务,而不是创建的时候分配线程。切记切记