参考文章

一、线程池核心设计与实现

ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分成两部分:任务管理、线程管理。

任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
(1)直接申请线程执行该任务;
(2)缓冲到队列中等待线程执行;
(3)拒绝该任务。
线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
image.png
image.png

核心问题

  1. 线程池如何维护自身状态?
  2. 线程池如何管理任务?
  3. 线程池如何管理线程?

    二、生命周期管理

    线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)
    状态控制主要围绕原子整型成员变量ctl
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//-536870912

在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。其中,低29位用于存放工作线程数,而高3位用于存放线程池状态

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //-536870912
private static final int SHUTDOWN = 0 << COUNT_BITS; //0
private static final int STOP = 1 << COUNT_BITS; //536870912
private static final int TIDYING = 2 << COUNT_BITS; //1073741824
private static final int TERMINATED = 3 << COUNT_BITS; //1610612736

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

工作线程数为0的前提下,小结一下线程池的运行状态常量:

状态名称 位图 十进制值 描述
RUNNING 111-00000000000000000000000000000 -536870912 运行中状态,可以接收新的任务和执行任务队列中的任务
SHUTDOWN 000-00000000000000000000000000000 0 shutdown状态,不再接收新的任务,但是会执行任务队列中的任务
STOP 001-00000000000000000000000000000 536870912 停止状态,不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务
TIDYING 010-00000000000000000000000000000 1073741824 整理中状态,所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated()
TERMINATED 011-00000000000000000000000000000 1610612736 终结状态,钩子方法terminated()执行完毕

三、任务执行机制

任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

    任务缓冲

    线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

    任务申请

    线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现

    任务拒绝

    任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程

    四、Worker线程管理

    线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

    Worker线程增加

    增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

    Worker线程回收

    线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

    Worker线程执行任务

    在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  5. while循环不断地通过getTask()方法获取任务。

  6. getTask()方法从阻塞队列中取任务。
  7. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  8. 执行任务。
  9. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

五、源码分析

构造方法

image.png
构造方法中以下任一情况发生,都会抛出异常

IllegalArgumentException:

  • corePoolSize < 0
  • keepAliveTime < 0
  • maximumPoolSize <= 0
  • maximumPoolSize < corePoolSize

NullPointerException

  • workQueue is null
  • threadFactory is null
  • handler is null

    关键属性

    1. private volatile int corePoolSize;
    2. private volatile int maximumPoolSize;
    3. private volatile long keepAliveTime;
    4. private final BlockingQueue<Runnable> workQueue;
    5. private volatile ThreadFactory threadFactory;
    6. private volatile RejectedExecutionHandler handler;

    corePoolSize

    表示线程池核心线程数。默认不会初始化核心线程,当有任务时,创建核心线程直到数量达到corePoolSize。核心线程默认不会被摧毁,即使它是空闲的(除非设置了allowCoreThreadTimeOut属性为true)。降低了任务一来时要创建新线程的时间和性能开销。

    maximumPoolSize

    表示最大线程数,意味着当核心线程都被用完了,那只能重新创建新的线程来执行任务,但是前提是不能超过最大线程数量,否则该任务只能进入阻塞队列进行排队等候,直到有线程空闲了,才能继续执行任务。

    keepAliveTime

    表示线程存活时间,除了核心线程外,那些被新创建出来的线程可以存活多久。意味着,这些新的线程一但完成任务,而后面都是空闲状态时,就会在一定时间后被摧毁。

    unit

    存活时间单位,将用户输入的keepAliveTime转成纳秒。

    workQueue

    表示任务的阻塞队列,由于任务可能会有很多,而线程就那么几个,所以那么还未被执行的任务就进入队列中排队,等到线程空闲了,就从阻塞队列中取出任务。

    threadFactory

    用来创建新线程的工厂。默认是DefaultThreadFactory。

    handler

    当阻塞的任务数量超过了队列容量,需要拒绝策略。默认使用AbortPolicy。
    image.png

Worker

class Worker
extends AbstractQueuedSynchronizer
implements Runnable
基于AQS实现的不可重入锁,主要用来维护执行任务的线程的中断控制状态。

image.png
setState(-1)是为了抑制中断直到线程真正开始运行任务,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)

Worker构造方法新建线程,并将worker作为工作线程的target,这样,worker.thread run的时候,实际是调用worker的run方法,而worker的方法又委托到runWorker()方法中。

runWorker方法

getTask方法

execute方法

image.png