@Author:zxw
@Email:502513206@qq.com


目录

  1. Netty源码分析(一) - 线程声明周期

    1.示例代码

    在看源码前,首先看看测试用的代码,这边为了方便测试,没有使用NioEventLoop,不过底层都是使用的父类方法,并不影响。

    1. public static void main(String[] args) {
    2. EventLoopGroup group = new DefaultEventLoopGroup(2);
    3. TestRunner testRunner = new TestRunner();
    4. for (int i = 0; i < 10; i++) {
    5. EventLoop next = group.next();
    6. next.execute(testRunner);
    7. }
    8. group.shutdownGracefully();
    9. }
    10. public static class TestRunner implements Runnable {
    11. @Override
    12. public void run() {
    13. try {
    14. Thread.sleep(5000);
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. log.info("测试线程数据");
    19. }
    20. }

    2.类结构

    我们在使用线程组的时候一般都会先创建线程池,然后调用线程池的执行方法执行具体的任务,在netty中也是进行了划分,组为EventLoopGroup,执行者EventExecutor其共同的父类是EventExecutorGroup
    image.png

    2.1 EventLoopGroup

    我们先从组的部分,看看group的类结构
    image.png
    通过继承关系可以看到,模板主要封装的逻辑在MultithreadEventExecutorGroup类中,我们知道生成group的时候如果不配置参数,那么netty会默认指定group大小为cpu核心数2倍的数量,这个逻辑则是在MultithreadEventLoopGroup类中实现,这里我们有个大概印象就行。

    2.2 EventExecutor

    看完了group,接下来就看执行者
    image.png
    在执行器中,netty的基本实现都在SingleThreadEventExecutor类,同样有个大概

    3.源码分析

    3.1 Group创建分析

    首先我们从group的源码看起,既然是EventLoopGroup是个组那么管理组元素当然就是我们的EventExecutor,并且是一对多的管理关系,在MultithreadEventExecutorGroup中通过数组表示,元数据如下。

    1. private final EventExecutor[] children;

    现在我们有了执行器,由于在group里是通过数组存储的,那么每次调用我们就需要从数组中取出一个执行器进行调用,选择的算法有很多,比如顺序取或者随机取,那这里就可以封装一个方法,用来获取执行器。netty是构建了一个选择器,元数据如下。

    1. private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    group里还有其它3个元数据,暂时只关注这两个,其它的等用到了在看。先回忆group的创建代码是怎么样的。

    1. EventLoopGroup group = new DefaultEventLoopGroup();

    可以看到group并没有指定EventExecutor入参,那么group里的数组执行器则是在类创建的时候就应该默认帮我们创建好了,之前说过group的模板逻辑都在MultithreadEventLoopGroup中,在该类中找到了初始化group的方法。对于执行器数组的大小如果指定了则使用我们设置的,没指定就用netty通过cpu核心数获取,大概流程如下
    image.png
    对于代码那就是循环线程数量,对于数组的每个位置设置对象。

    1. children = new EventExecutor[nThreads];
    2. children[i] = newChild(executor, args);

    我们使用的是默认的group,netty默认生成的执行器对象为DefaultEventExecutor,如果中途生成失败了,那么netty会将之前已生成的一个个停止。先前我们看元数据还有个选择器,看下netty怎么生成选择器的。
    chooser = chooserFactory.newChooser(children)
    netty一共提供了两种选择器,一种是通过逻辑运算符获取位置,一种通过求余获取。

    1. @Override
    2. public EventExecutor next() {
    3. return executors[idx.getAndIncrement() & executors.length - 1];
    4. }
    5. @Override
    6. public EventExecutor next() {
    7. return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    8. }

    对于group的源码分析暂时到这,group生成后可以通过next()方法获取到数组中的一个元数,然后调用executor()方法执行我们的具体逻辑,接下来我们看看EventExectuor的源码。

    3.2 Exectuor执行分析

    先前说过,netty的执行器封装的逻辑在SingleThreadEventExecutor类中,首先看看该类的元数据有哪些。在上篇文章我们讲过执行器的生命周期流程,有如下元数据

    1. private static final int ST_NOT_STARTED = 1;
    2. private static final int ST_STARTED = 2;
    3. private static final int ST_SHUTTING_DOWN = 3;
    4. private static final int ST_SHUTDOWN = 4;
    5. private static final int ST_TERMINATED = 5;
    6. private volatile int state = ST_NOT_STARTED;

    再然后我们知道netty是一个异步的框架,既然是异步框架那么肯定涉及到线程的时候,所以应该还有有一个线程池的引用

    1. private final Executor executor;

    线程池里一般存在多个线程对象的调用,有时候我们想对当前线程对象进行一些操作的话,那么也得记录一下当前线程的引用

    1. private volatile Thread thread;

    在netty中任务到达后不是立刻执行的,而是采用队列的方式从队列中获取执行,netty的队列结构使用的是Runnable对象,当然不是启动线程执行runnable,而是单纯调用run方法而已

    1. private final Queue<Runnable> taskQueue;

    涉及到队列,往队列添加元素时可能会有队列满了或者其他等因素,netty封装了一个拒绝策略,如下

    1. private final RejectedExecutionHandler rejectedExecutionHandler;

    剩下的就是一些对于时间上控制的参数

    1. private volatile long gracefulShutdownQuietPeriod;
    2. private volatile long gracefulShutdownTimeout;
    3. private long gracefulShutdownStartTime;
    4. private long lastExecutionTime;

    通过如下分析,我们大概了解到netty执行器的一些核心东西,比如状态,队列,线程池等。接下来看在具体代码逻辑中是如何使用这些参数的。

    1. EventLoop next = group.next();
    2. next.execute(testRunner);

    next()方法不用说,就是从我们的数组中取出一个EventLoop执行器,那么看下executor方法是怎么执行的。
    先前说过netty里有个队列,对于具体的逻辑执行都是通过队列取出来执行run方法
    image.png
    具体的逻辑就是

    1. addTask(task);

    加入了队列后,那么我就需要启动线程开始获取队列元素执行

    1. startThread();

    之前我们讲过EventExecutor的状态变更,那么在真正启动前状态还要变更为执行。

    1. private void startThread() {
    2. if (state == ST_NOT_STARTED) {
    3. // 状态设为start
    4. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    5. boolean success = false;
    6. try {
    7. // 启动线程
    8. doStartThread();
    9. success = true;
    10. } finally {
    11. if (!success) {
    12. STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
    13. }
    14. }
    15. }
    16. }
    17. }

    启动后,对于队列执行肯定是采用异步的方式执行,所以这时线程池就派上用场了,可以看到最后是执行了SingleThreadEventExecutor类中的run方法,不过该方法是个抽象方法,也就是说最终是根据我们使用的子类中的run方法

    1. executor.execute(new Runnable() {
    2. @Override
    3. public void run() {
    4. thread = Thread.currentThread();
    5. if (interrupted) {
    6. thread.interrupt();
    7. }
    8. boolean success = false;
    9. updateLastExecutionTime();
    10. try {
    11. SingleThreadEventExecutor.this.run();
    12. success = true;

    在该方法的实现中,就是一个无线循环,然后取出队列中的元数 ```java protected abstract void run();

@Override protected void run() { for (;;) { Runnable task = takeTask(); if (task != null) { task.run(); updateLastExecutionTime(); }

  1. if (confirmShutdown()) {
  2. break;
  3. }
  4. }
  5. }

``` 最后我们再来梳理下流程

  1. 根据chooser选择器获取一个EventExecutor
  2. 将任务加入队列中
  3. 将当前EventExecutor的状态从no_start设置为start,如果成功就调用调用线程池启动抽象方法run
  4. 通过无线循环从队列中获取数据,执行本地线程run方法