NioEventLoopGroup
NioEventLoopGroup是一组NioEventLoop,通常,我们创建一个Netty server都采用以下的代码:
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel ch){ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(...);//添加channelHandler...}});
NioEventLoopGroup是MultithreadEventLoopGroup类的子类,它用于基于NIO多路复用器(Selector)的channel。同时它的继承关系中父类还有ScheduledExecutorService类,也就是说,它也是一个用来执行定时任务的线程池。
上面的代码中使用NioEventLoopGroup的无参构造函数构造bossEventLoopGroup和workerEventLoopGroup,其调用过程如下:
public NioEventLoopGroup() {this(0);}public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);}public NioEventLoopGroup(int nThreads, Executor executor) {//调用SelectorProvider.provider()获取SelectorProvider,关于SelectorProvider可以参考https://www.yuque.com/arkvie/uo8gda/duh1bg#741f3129this(nThreads, executor, SelectorProvider.provider());}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {//默认的选择策略,在之前的NioEventLoop中有介绍https://www.yuque.com/arkvie/uo8gda/rv8h61#NioEventLoopthis(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {//使用默认拒绝的拒绝策略 调用父类MultithreadEventLoopGroup的构造函数super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);}protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}/*** 创建一个实例* @param nThreads 线程数量* @param executor 线程池(用来执行 {@link Runnable}的执行器)* @param chooserFactory* @param args 调用{@link #newChild(Executor, Object...)}的其他参数*/protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {//如果executor为null,实例化一个 {@link ThreadPerTaskExecutor} 它的execut方法每次都会new一个线程去执行 {@link Runnable}任务executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//实例化children数组children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//实例化每个child 在NioEventLoopGroup的实现中是构建一个NioEventLoop对象children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {//省略部分代码}}//根据数组children的length即线程的数量创建chooser// 如果legnth是2的次方会创建一个PowerOfTwoEventExecutorChooser否则创建GenericEventExecutorChooser,// 这两个都是轮询的算法但是PowerOfTwoEventExecutorChooser使用位运算在细节上性能更好chooser = chooserFactory.newChooser(children);//构建terminationListenerfinal FutureListener<Object> terminationListener = new FutureListener<Object>() {@Override/*** 操作完成时触发*/public void operationComplete(Future<Object> future) throws Exception {//terminatedChildren自增if (terminatedChildren.incrementAndGet() == children.length) {//如果全部都完成了,就标记为成功terminationFuture.setSuccess(null);}}};//给每个child添加listener,在这个EventExecutor执行完之前会调用listener的operationComplete方法for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}//使用children构建一个不可变的set视图赋值给readonlyChildrenSet<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
以上是一个NioEventLoopGroup构造函数的调用过程,在NioEventLoopGroup中的newChild()方法如下实现,调用NioEventLoop的构造方法:
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);}
EventExecutorChooserFactory和EventExecutorChooser
在MultithreadEventExecutorGroup中使用EventExecutorChooser来获取下一个Executor,EventExecutorChooser就是一个选择器,其中保存了MultithreadEventExecutorGroup中的成员变量——children数组,在EventExecutorChooser中对应的属性是executors,EventExecutorChooser自身还有一个AtomicInteger类型的成员变量idx。每次调用next方法idx会自增1,然后根据idx去数组executors中获取一个Executor。
GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser是EventExecutorChooser的两个实现类,GenericEventExecutorChooser是通用的选择器,其next()方法是去idx与数组executors的长度的余数取绝对值,这也就是轮询的方式。代码如下:
@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
而PowerOfTwoEventExecutorChooser适用于executors的长度为2的次方的情况,其代码如下:
@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
在PowerOfTwoEventExecutorChooser中是取idx与executors.length - 1的逻辑与运算,executors.length是2的n次方的话,那么executors.length - 1就是最右边的n-1位都为1,其余为0,得到的结果最右边的n-1位为idx的最右边n-1位的值,其余为0,也就是将idx的高位都置0,其实也就是除以2的n次方的余数。所以PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser本质上的结果都是轮询,但PowerOfTwoEventExecutorChooser只是用与executors的长度为2的次方的情况,且效率更高。
EventExecutorChooserFactory是获取EventExecutorChooser的工厂类,DefaultEventExecutorChooserFactory是EventExecutorChooserFactory的默认实现类。其newChooser方法就是根据executors的长度获取一个选择器:
@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}
EventLoopGroup
EventLoopGroup能够注册Channel,注册的channel后续会处理事件循环中的各个事件。NioEventLoopGroup的父类MultithreadEventLoopGroup以及之前提到的EventLoop都实现/继承了EventLoopGroup接口。EventLoopGroup是一组EventLoop,而EventLoop是一种特殊的EventLoopGroup,因为这个组里面只有一个EventLoop。EventLoopGroup的next方法就是用来获取下一个EventLoop:
/*** Return the next {@link EventLoop} to use*/@OverrideEventLoop next();
next方法在MultithreadEventLoopGroup中的实现如下:
@Overridepublic EventLoop next() {return (EventLoop) super.next();}/*** MultithreadEventLoopGroup父类MultithreadEventExecutorGroup中的方法* 因为EventExecutorGroup和EventLoopGroup都定义了next方法但返回值不同,* 父类中实现了EventExecutorGroup的next方法,而这个chooser中维护的executors* 就是构建NioEventLoopGroup时创建NioEventLoop数组children,* 所以这里返回的是一个NioEventLoop对象,NioEventLoop既实现了EventExecutor接口* 也实现了EventLoop接口*/@Overridepublic EventExecutor next() {return chooser.next();}
显而易见MultithreadEventLoopGroup是轮询其中的children获取的下一个EventLoop对象;而在EventLoop接口的实现类SingleThreadEventLoop中next方法实现如下:
@Overridepublic EventLoop next() {return (EventLoop) super.next();}/*** SingleThreadEventLoop父类AbstractEventExecutor中的方法,返回自身*/@Overridepublic EventExecutor next() {return this;}
SingleThreadEventLoop中获取下一个EventLoop就是返回自身。
next方法在哪里会使用到呢?其中之一就是NioEventLoopGroup绑定channel的时候:
/*** MultithreadEventLoopGroup中的实现*/@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
NioEventLoopGroup注册一个channel就是获取下一个NioEventLoop来绑定channel。
关于NioEventLoopGroup这个类的解析先告一段落,后面会找时间补充一个流程图。
