@Author:zxw
@Email:502513206@qq.com
目录
-
1.示例代码
在看源码前,首先看看测试用的代码,这边为了方便测试,没有使用NioEventLoop,不过底层都是使用的父类方法,并不影响。
public static void main(String[] args) {EventLoopGroup group = new DefaultEventLoopGroup(2);TestRunner testRunner = new TestRunner();for (int i = 0; i < 10; i++) {EventLoop next = group.next();next.execute(testRunner);}group.shutdownGracefully();}public static class TestRunner implements Runnable {@Overridepublic void run() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}log.info("测试线程数据");}}
2.类结构
我们在使用线程组的时候一般都会先创建线程池,然后调用线程池的执行方法执行具体的任务,在netty中也是进行了划分,组为
EventLoopGroup,执行者EventExecutor其共同的父类是EventExecutorGroup
2.1 EventLoopGroup
我们先从组的部分,看看group的类结构

通过继承关系可以看到,模板主要封装的逻辑在MultithreadEventExecutorGroup类中,我们知道生成group的时候如果不配置参数,那么netty会默认指定group大小为cpu核心数2倍的数量,这个逻辑则是在MultithreadEventLoopGroup类中实现,这里我们有个大概印象就行。2.2 EventExecutor
看完了group,接下来就看执行者

在执行器中,netty的基本实现都在SingleThreadEventExecutor类,同样有个大概3.源码分析
3.1 Group创建分析
首先我们从group的源码看起,既然是
EventLoopGroup是个组那么管理组元素当然就是我们的EventExecutor,并且是一对多的管理关系,在MultithreadEventExecutorGroup中通过数组表示,元数据如下。private final EventExecutor[] children;
现在我们有了执行器,由于在group里是通过数组存储的,那么每次调用我们就需要从数组中取出一个执行器进行调用,选择的算法有很多,比如顺序取或者随机取,那这里就可以封装一个方法,用来获取执行器。netty是构建了一个选择器,元数据如下。
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
group里还有其它3个元数据,暂时只关注这两个,其它的等用到了在看。先回忆group的创建代码是怎么样的。
EventLoopGroup group = new DefaultEventLoopGroup();
可以看到group并没有指定
EventExecutor入参,那么group里的数组执行器则是在类创建的时候就应该默认帮我们创建好了,之前说过group的模板逻辑都在MultithreadEventLoopGroup中,在该类中找到了初始化group的方法。对于执行器数组的大小如果指定了则使用我们设置的,没指定就用netty通过cpu核心数获取,大概流程如下
对于代码那就是循环线程数量,对于数组的每个位置设置对象。children = new EventExecutor[nThreads];children[i] = newChild(executor, args);
我们使用的是默认的group,netty默认生成的执行器对象为
DefaultEventExecutor,如果中途生成失败了,那么netty会将之前已生成的一个个停止。先前我们看元数据还有个选择器,看下netty怎么生成选择器的。chooser = chooserFactory.newChooser(children)
netty一共提供了两种选择器,一种是通过逻辑运算符获取位置,一种通过求余获取。@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}@Overridepublic EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];}
对于group的源码分析暂时到这,group生成后可以通过
next()方法获取到数组中的一个元数,然后调用executor()方法执行我们的具体逻辑,接下来我们看看EventExectuor的源码。3.2 Exectuor执行分析
先前说过,netty的执行器封装的逻辑在
SingleThreadEventExecutor类中,首先看看该类的元数据有哪些。在上篇文章我们讲过执行器的生命周期流程,有如下元数据private static final int ST_NOT_STARTED = 1;private static final int ST_STARTED = 2;private static final int ST_SHUTTING_DOWN = 3;private static final int ST_SHUTDOWN = 4;private static final int ST_TERMINATED = 5;private volatile int state = ST_NOT_STARTED;
再然后我们知道netty是一个异步的框架,既然是异步框架那么肯定涉及到线程的时候,所以应该还有有一个线程池的引用
private final Executor executor;
线程池里一般存在多个线程对象的调用,有时候我们想对当前线程对象进行一些操作的话,那么也得记录一下当前线程的引用
private volatile Thread thread;
在netty中任务到达后不是立刻执行的,而是采用队列的方式从队列中获取执行,netty的队列结构使用的是Runnable对象,当然不是启动线程执行runnable,而是单纯调用run方法而已
private final Queue<Runnable> taskQueue;
涉及到队列,往队列添加元素时可能会有队列满了或者其他等因素,netty封装了一个拒绝策略,如下
private final RejectedExecutionHandler rejectedExecutionHandler;
剩下的就是一些对于时间上控制的参数
private volatile long gracefulShutdownQuietPeriod;private volatile long gracefulShutdownTimeout;private long gracefulShutdownStartTime;private long lastExecutionTime;
通过如下分析,我们大概了解到netty执行器的一些核心东西,比如状态,队列,线程池等。接下来看在具体代码逻辑中是如何使用这些参数的。
EventLoop next = group.next();next.execute(testRunner);
next()方法不用说,就是从我们的数组中取出一个EventLoop执行器,那么看下executor方法是怎么执行的。
先前说过netty里有个队列,对于具体的逻辑执行都是通过队列取出来执行run方法
具体的逻辑就是addTask(task);
加入了队列后,那么我就需要启动线程开始获取队列元素执行
startThread();
之前我们讲过
EventExecutor的状态变更,那么在真正启动前状态还要变更为执行。private void startThread() {if (state == ST_NOT_STARTED) {// 状态设为startif (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 启动线程doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
启动后,对于队列执行肯定是采用异步的方式执行,所以这时线程池就派上用场了,可以看到最后是执行了
SingleThreadEventExecutor类中的run方法,不过该方法是个抽象方法,也就是说最终是根据我们使用的子类中的run方法executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;
在该方法的实现中,就是一个无线循环,然后取出队列中的元数 ```java protected abstract void run();
@Override protected void run() { for (;;) { Runnable task = takeTask(); if (task != null) { task.run(); updateLastExecutionTime(); }
if (confirmShutdown()) {break;}}}
``` 最后我们再来梳理下流程
- 根据chooser选择器获取一个EventExecutor
- 将任务加入队列中
- 将当前EventExecutor的状态从no_start设置为start,如果成功就调用调用线程池启动抽象方法run
- 通过无线循环从队列中获取数据,执行本地线程run方法
