初始化代码
EventLoopGroup parentGroup = new NioEventLoopGroup();EventLoopGroup childGroup = new NioEventLoopGroup();
创建源码
构造器方法 ```java public class NioEventLoopGroup extends MultithreadEventLoopGroup {
//1. 第一步创建 public NioEventLoopGroup() {
this(0);
}
//2. 第二部创建null,就是内部封装的executor public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
//3. 第三步 public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
//4. 第四步,参数是 0 null public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
//5. 此处的executor仍然是null// args 包括,selectorProvider,selectStrategyFactory,rejectedExecutionHanlderprotected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {// 默认会创建 逻辑内核数量2倍的 nioEventLoopsuper(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}//6.protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}
}
- 可以看出,内部封装的executor在初始化的时候,是没有进行创建的- SelectorProvider.provider()是干什么用的???- 首先它是全局唯一的,一个JVM只有一个此对象- 后续的selector和channel都是由它进行创建的- DefaultSelectStrategyFactory.INSTANCE,默认的选择策略- 通过第5步可知,如果没有设置数据,那么NIOEventGroupLoop里面的EventLoop就是逻辑内核数量的两倍,否则就是你设置的个数。2. 最终到了MultithreadEventExecutorGroup的构造方法,也是创建过程中最重要的方法```javaprotected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}//1. 创建一个 总executor,就是这个group所包含的executor,一个group包含一个总executorif (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//2. 创建一个数组,存放该group中包含的所有eventLoop,因为eventLoop就是EventExecutorchildren = new EventExecutor[nThreads];//3. 逐个创建eventLoopfor (int i = 0; i < nThreads; i ++) {boolean success = false;try {//3.1 创建eventLoopchildren[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {//3.2 若在创建eventLoop过程中有一个发生异常,则关闭所有// 已经创建成功的eventLoop,并终止所有它们执行的任务if (!success) {// 关闭所有已经创建成功的eventLoopfor (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}// 终止所有eventLoop执行的任务,此处为优雅的终止,如果executor有任务,则等待执行完后终止for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {//阻塞方法e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}//4. 创建一个选择器,选择器将来用于从eventLoop数组中选择一个eventLoop// 此处代码,实质上只是创建一个可以优化算法的选择器chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
newDefaultThreadFactory()
- 创建了一个默认的线程工厂,定义了以后创建线程的名词等等 ```java protected ThreadFactory newDefaultThreadFactory() { //getClass()即NioEventGroup类型 return new DefaultThreadFactory(getClass()); }
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { // toPoolName() 计算出线程池名称 this(toPoolName(poolType), daemon, priority); }
- 最后交给了ThreadPerTaskExecutor,使用此Executor封装了线程工厂,此Executor会在每个eventLoop创建一个线程```javapublic ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}// 初始化线程工厂,其将来会为每个eventLoop创建一个线程this.threadFactory = threadFactory;}
newChild(executor, args)
NioEventLoopGroup的newChild方法用来,解析出arg中的参数,并且创建NioEventLoop
public class NioEventLoopGroup extends MultithreadEventLoopGroup {protected EventLoop newChild(Executor executor, Object... args) throws Exception {//从args中获取provider selectStrategyreturn new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}}
new NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {//跳转到2.2super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}//必须有,因为后面需要用此对象,来创建selector和channelprovider = selectorProvider;final SelectorTuple selectorTuple = openSelector();// Netty封装过的selectorselector = selectorTuple.selector;// 原始的NIO中的selectorunwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy;}
2.2 SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);// 创建收尾队列(收尾队列中的任务不是在所有 任务队列中的任务全部执行完毕 后才执行的// 一段时间内的任务执行完就会去执行收尾队列,相当于时间到了,其他任务暂时先不执行,执行完收尾队列任务,// 表示此阶段任务执行完毕tailTasks = newTaskQueue(maxPendingTasks);}
2.3 SingleThreadEventExecutor
//此executor是总的executor,是在EventLoopGroup中包含的那个executor//一个EventGroup包含一个总executorprotected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);// NioEventLoop所封装的executor,我们称其为 子executor// 参数中的executor是总executor,是属于EventLoopGroup// 参数中的this,表示的创建的newChild即NioEventLoop对象// 继续2.4this.executor = ThreadExecutorMap.apply(executor, this);// 创建任务队列taskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}
2.4 ThreadExecutorMap.apply(executor, this);
class ThreadExecutorMap{...public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(executor, "executor");ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");// 这里创建了一个 子executorreturn new Executor() {// 这是子executor的execute()方法@Overridepublic void execute(final Runnable command) {// 这个executor是 总executor,其execute()会创建并启动一个线程// 总executor就是之前封装线程工厂的执行器ThreadPerTaskExecutor//,那么execute就是ThreadPerTaskExecutor的execute方法//ThreadPerTaskExecutor.execute见2.4.1//apply方法见2.4.2executor.execute(apply(command, eventExecutor));}};}...}
2.4.1 ThreadPerTaskExecutor.execute
目的:原来此方法的执行就是使用总的executor来创建一个新的线程执行后面的apply产生的任务,并且线程启动起来开始执行
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}// 初始化线程工厂,其将来会为每个eventLoop创建一个线程this.threadFactory = threadFactory;}// 这是总executer的execute()方法@Overridepublic void execute(Runnable command) {// 创建并启动一个线程threadFactory.newThread(command).start();}}
2.4.2 apply(command, eventExecutor)
目的:作用可以看到就是原来的runnable任务,重新创建了一个任务,内部执行的任务,就是原有的command的任务。执行他的run方法,为何要重新包装一层呢,原因在于setCurrentEventExecutor(eventExecutor); ```java //第一个参数是真正执行的任务 //第二个参数是NioEventLoop public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(command, “command”); ObjectUtil.checkNotNull(eventExecutor, “eventExecutor”); return new Runnable() {
@Overridepublic void run() {//具体用途呢???,用来作为线程隔离使用,类似于ThreadLocalsetCurrentEventExecutor(eventExecutor);try {// 真正的任务的run()是在这里被调用的command.run();} finally {setCurrentEventExecutor(null);}}
}; }
public final class ThreadExecutorMap {
//存储着正在执行所有的executor(NioEventLoop)private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();/*** Set the current {@link EventExecutor} that is used by the {@link Thread}.* 设置当前被线程使用的EventExecutor*/private static void setCurrentEventExecutor(EventExecutor executor) {mappings.set(executor);}
总结
- 总流程:NioEventLoop的创建,
- 首先创建了内部的一些属性
- 而且创建了内部的executor,此executor内部包含了要执行的任务,总executor,NioEventLoop,同时在execute方法执行的时候,会调用总executor执行execute
- 总executor执行execute的时候,会使用默认的线程工厂创建出线程,同时与NioEventLoop绑定,然后执行任务
注意点
- 此过程只是创建了NioEventLoop对象,也创建了NioEventLoop内部绑定的子executor对象,但是子executor对象,总executor还没有给他分配线程与其进行绑定,只有子executor在执行execute方法的时候,总executor才会分配线程给他,让他执行任务
总Executor,一个EventLoopGroup一个此对象,是为了在子Executor(一个NioEventLoop内部包含一个此对象,一个EventLoopGroup内部包含多个NioEventLoop)执行任务的时候,分配线程来执行任务,而不是创建的时候分配线程。切记切记
