初始化代码
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
创建源码
构造器方法 ```java public class NioEventLoopGroup extends MultithreadEventLoopGroup {
//1. 第一步创建 public NioEventLoopGroup() {
this(0);
}
//2. 第二部创建null,就是内部封装的executor public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
//3. 第三步 public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
//4. 第四步,参数是 0 null public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
//5. 此处的executor仍然是null
// args 包括,selectorProvider,selectStrategyFactory,rejectedExecutionHanlder
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 默认会创建 逻辑内核数量2倍的 nioEventLoop
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//6.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
}
- 可以看出,内部封装的executor在初始化的时候,是没有进行创建的
- SelectorProvider.provider()是干什么用的???
- 首先它是全局唯一的,一个JVM只有一个此对象
- 后续的selector和channel都是由它进行创建的
- DefaultSelectStrategyFactory.INSTANCE,默认的选择策略
- 通过第5步可知,如果没有设置数据,那么NIOEventGroupLoop里面的EventLoop就是逻辑内核数量的两倍,否则就是你设置的个数。
2. 最终到了MultithreadEventExecutorGroup的构造方法,也是创建过程中最重要的方法
```java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//1. 创建一个 总executor,就是这个group所包含的executor,一个group包含一个总executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//2. 创建一个数组,存放该group中包含的所有eventLoop,因为eventLoop就是EventExecutor
children = new EventExecutor[nThreads];
//3. 逐个创建eventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//3.1 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//3.2 若在创建eventLoop过程中有一个发生异常,则关闭所有
// 已经创建成功的eventLoop,并终止所有它们执行的任务
if (!success) {
// 关闭所有已经创建成功的eventLoop
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 终止所有eventLoop执行的任务,此处为优雅的终止,如果executor有任务,则等待执行完后终止
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
//阻塞方法
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//4. 创建一个选择器,选择器将来用于从eventLoop数组中选择一个eventLoop
// 此处代码,实质上只是创建一个可以优化算法的选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
newDefaultThreadFactory()
- 创建了一个默认的线程工厂,定义了以后创建线程的名词等等 ```java protected ThreadFactory newDefaultThreadFactory() { //getClass()即NioEventGroup类型 return new DefaultThreadFactory(getClass()); }
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { // toPoolName() 计算出线程池名称 this(toPoolName(poolType), daemon, priority); }
- 最后交给了ThreadPerTaskExecutor,使用此Executor封装了线程工厂,此Executor会在每个eventLoop创建一个线程
```java
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
// 初始化线程工厂,其将来会为每个eventLoop创建一个线程
this.threadFactory = threadFactory;
}
newChild(executor, args)
NioEventLoopGroup的newChild方法用来,解析出arg中的参数,并且创建NioEventLoop
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//从args中获取provider selectStrategy
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
}
new NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//跳转到2.2
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
//必须有,因为后面需要用此对象,来创建selector和channel
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
// Netty封装过的selector
selector = selectorTuple.selector;
// 原始的NIO中的selector
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
2.2 SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 创建收尾队列(收尾队列中的任务不是在所有 任务队列中的任务全部执行完毕 后才执行的
// 一段时间内的任务执行完就会去执行收尾队列,相当于时间到了,其他任务暂时先不执行,执行完收尾队列任务,
// 表示此阶段任务执行完毕
tailTasks = newTaskQueue(maxPendingTasks);
}
2.3 SingleThreadEventExecutor
//此executor是总的executor,是在EventLoopGroup中包含的那个executor
//一个EventGroup包含一个总executor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// NioEventLoop所封装的executor,我们称其为 子executor
// 参数中的executor是总executor,是属于EventLoopGroup
// 参数中的this,表示的创建的newChild即NioEventLoop对象
// 继续2.4
this.executor = ThreadExecutorMap.apply(executor, this);
// 创建任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
2.4 ThreadExecutorMap.apply(executor, this);
class ThreadExecutorMap{
...
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 这里创建了一个 子executor
return new Executor() {
// 这是子executor的execute()方法
@Override
public void execute(final Runnable command) {
// 这个executor是 总executor,其execute()会创建并启动一个线程
// 总executor就是之前封装线程工厂的执行器ThreadPerTaskExecutor
//,那么execute就是ThreadPerTaskExecutor的execute方法
//ThreadPerTaskExecutor.execute见2.4.1
//apply方法见2.4.2
executor.execute(apply(command, eventExecutor));
}
};
}
...
}
2.4.1 ThreadPerTaskExecutor.execute
目的:原来此方法的执行就是使用总的executor来创建一个新的线程执行后面的apply产生的任务,并且线程启动起来开始执行
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
// 初始化线程工厂,其将来会为每个eventLoop创建一个线程
this.threadFactory = threadFactory;
}
// 这是总executer的execute()方法
@Override
public void execute(Runnable command) {
// 创建并启动一个线程
threadFactory.newThread(command).start();
}
}
2.4.2 apply(command, eventExecutor)
目的:作用可以看到就是原来的runnable任务,重新创建了一个任务,内部执行的任务,就是原有的command的任务。执行他的run方法,为何要重新包装一层呢,原因在于setCurrentEventExecutor(eventExecutor); ```java //第一个参数是真正执行的任务 //第二个参数是NioEventLoop public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(command, “command”); ObjectUtil.checkNotNull(eventExecutor, “eventExecutor”); return new Runnable() {
@Override
public void run() {
//具体用途呢???,用来作为线程隔离使用,类似于ThreadLocal
setCurrentEventExecutor(eventExecutor);
try {
// 真正的任务的run()是在这里被调用的
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
}; }
public final class ThreadExecutorMap {
//存储着正在执行所有的executor(NioEventLoop)
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
/**
* Set the current {@link EventExecutor} that is used by the {@link Thread}.
* 设置当前被线程使用的EventExecutor
*/
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
总结
- 总流程:NioEventLoop的创建,
- 首先创建了内部的一些属性
- 而且创建了内部的executor,此executor内部包含了要执行的任务,总executor,NioEventLoop,同时在execute方法执行的时候,会调用总executor执行execute
- 总executor执行execute的时候,会使用默认的线程工厂创建出线程,同时与NioEventLoop绑定,然后执行任务
注意点
- 此过程只是创建了NioEventLoop对象,也创建了NioEventLoop内部绑定的子executor对象,但是子executor对象,总executor还没有给他分配线程与其进行绑定,只有子executor在执行execute方法的时候,总executor才会分配线程给他,让他执行任务
总Executor,一个EventLoopGroup一个此对象,是为了在子Executor(一个NioEventLoop内部包含一个此对象,一个EventLoopGroup内部包含多个NioEventLoop)执行任务的时候,分配线程来执行任务,而不是创建的时候分配线程。切记切记