ThreadPoolExecutor 原理
ThreadPoolExecutor
里面使用到 JUC 同步器框架 AbstractQueuedSynchronizer
(俗称 AQS)、大量的位操作、CAS 操作。ThreadPoolExecutor 提供了固定的活跃线程(核心线程)、额外线程、任务队列以及拒绝策略这几个重要的功能。
JUC 同步器框架
ThreadPoolExecutor
里面使用到了 JUC 同步器框架,主要用于 4 个方面:
- 全局锁
mainlock
成员属性,是可重入锁 ReentrantLock 类型,主要用于访问工作线程 Worker 集合和进行数据统计记录时候的加锁操作。 - 条件变量
termination
,Condition 类型,主要用于线程进行等待终结awaitTermination()
方法时的带期限阻塞。 - 任务队列
workQueue
,BlockingQueue类型,用于存放待执行的任务。 工作线程,内部类 Worker 类型,是线程池中真正的工作线程对象。
实现一个只有核心线程的线程池
这里先参考
ThreadPoolExecutor
的实现并简化,实现一个只有核心线程的线程池,要求如下:暂时不考虑任务执行异常情况下的处理;
- 任务队列为无界队列;
- 线程池容量固定为核心线程数量;
- 暂时不考虑拒绝策略。
运行结果如下:public class CoreThreadPool implements Executor {
private BlockingQueue<Runnable> workQueue;
private static final AtomicInteger COUNTER = new AtomicInteger();
private int coreSize;
private int threadCount = 0;
public CoreThreadPool(int coreSize) {
this.coreSize = coreSize;
this.workQueue = new LinkedBlockingQueue<>();
}
@Override
public void execute(Runnable command) {
// 如果当前线程数小于核心线程,新增一个 worker 线程
if (++threadCount <= coreSize) {
new Worker(command).start();
} else {
// 如果当前线程数大于核心线程,把这个任务放入队列中
try {
workQueue.put(command);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
private class Worker extends Thread {
private Runnable firstTask;
public Worker(Runnable runnable) {
super(String.format("Worker-%d", COUNTER.getAndIncrement()));
this.firstTask = runnable;
}
@Override
public void run() {
Runnable task = this.firstTask;
while (null != task || null != (task = getTask())) {
try {
task.run();
} finally {
task = null;
}
}
}
}
private Runnable getTask() {
try {
return workQueue.take();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
public static void main(String[] args) throws Exception {
CoreThreadPool pool = new CoreThreadPool(5);
IntStream.range(0, 10)
.forEach(i -> pool.execute(() ->
System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i))));
Thread.sleep(Integer.MAX_VALUE);
}
}
设计此线程池时,核心线程是懒创建的。如果线程空闲的时候则阻塞在任务队列的Thread:Worker-0,value:0
Thread:Worker-1,value:1
Thread:Worker-0,value:5
Thread:Worker-0,value:6
Thread:Worker-0,value:7
Thread:Worker-0,value:8
Thread:Worker-0,value:9
Thread:Worker-3,value:3
Thread:Worker-4,value:4
Thread:Worker-2,value:2
take()
方法,其实对于ThreadPoolExecutor
也是类似这样实现,只是如果使用了keepAliveTime
并且允许核心线程超时(allowCoreThreadTimeOut
设置为 true)则会使用BlockingQueue#poll(keepAliveTime)
进行轮询代替永久阻塞。
任务队列的take()
方法,其实是取队列的第一个元素,如果队列此时有元素,立即返回。如果队列里没有元素,则会使线程阻塞在此。直到一个新的元素进入队列时,唤醒刚刚的线程,让它去重新获取队列中的元素。
下面是获取元素的代码:
从代码中我们可以发现,需要让public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
notEmpty
(Condition 类型)放在 while 循环里面,才能让线程正常重试。unlinkFirst()
方法获取队列里的元素,如果有就返回对应元素,没有则返回 null。其他附加功能
构建ThreadPoolExecutor
实例时,需要定义 maximumPoolSize(线程池最大线程数)和 corePoolSize(核心线程数)。如果任务队列是有界的阻塞队列,且核心线程满负载,任务队列已满的情况,会尝试创建额外的maximumPoolSize - corePoolSize
个线程去执行新提交的任务。如果创建额外线程失败,就会进入拒绝策略。keepAliveTime
一般情况下指非核心线程的存活时间,超时后线程池会回收这些非核心线程。拒绝策略
提供拒绝策略,也就是在核心线程满负载、任务队列已满、非核心线程满负载的条件下会触发拒绝策略。源码分析
先分析线程池的关键属性,接着分析其控制状态,最后重点分析ThreadPoolExecutor#execute()
方法。类图
构造器
下面看参数列表最长的构造函数:public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
管理状态
线程池状态机
最后是线程池状态的跃迁图:
状态初始值
接来下分析线程池的状态变量,工作线程上限数量位的长度为private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 通过ctl值获取运行状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 通过ctl值获取工作线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// 通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// CAS操作线程数增加1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// CAS操作线程数减少1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 线程数直接减少1
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
COUNT_BITS
,它的值是Integer.SIZE -3
,也就是 29。小提示:int 占据 4 个字节,也就是 32位。在 ThreadPoolExecutor 实现中,使用这 32 位存放工作线程数和线程池状态。其中,低 29 位存放工作线程数,而高 3 位存放线程池状态,所以线程池状态最多有 23 种。工作线程的上限数量为 2^29-1,超过 5 亿,这个数量在短期内不会考虑超限。
位运算加减法
如何计算二进制减法
接着看工作线程的上限数量掩码 COUNT_MASK,它的值是 (1 << COUNT_BITS) -1
,也就是 1 左移 29 位,再减去 1,如果补全 32 位,它的位视图如下:
然后就是线程池的状态常量,这里只分析其中一个,其他类似,这里看 RUNNING
状态:
// -1的补码为:111-11111111111111111111111111111
// 左移29位后:111-00000000000000000000000000000
// 10进制值为:-536870912
// 高3位111的值就是表示线程池正在处于运行状态
private static final int RUNNING = -1 << COUNT_BITS;
控制变量 ctl 的组成就是通过线程池运行状态 rs 和工作线程数 wc 的或运算得到的:
// rs=RUNNING值为:111-00000000000000000000000000000
// wc的值为0:000-00000000000000000000000000000
// rs | wc的结果为:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
ctl 变量有时简写为 c,running_status 简写为 rs,worker_count 简写为 wc。
那么我们怎么总 ctl 中取出高 3 位的线程池状态呢?上面源码中提供的 runStateOf() 方法就是提取运行状态:
// 先把COUNT_MASK取反(~COUNT_MASK),得到:111-00000000000000000000000000000
// ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 两者做一次与运算即可得到高3位xxx
private static int runStateOf(int c){
return c & ~COUNT_MASK;
}
同理,取出低 29 的工作线程数量只需要把 ctl 和 COUNT_MASK(000-11111111111111111111111111111)做一次与运算即可。
线程池状态
工作线程数量为 0 时,小结一下线程池的运行状态常量:
状态名称 | 位图 | 十进制值 | 描述 |
---|---|---|---|
RUNNING | 111-00000000000000000000000000000 | -536870912 | 运行中状态,可以接收新的任务和执行任务队列中的任务 |
SHUTDOWN | 000-00000000000000000000000000000 | 0 | shutdown状态,不再接收新的任务,但是会执行任务队列中的任务 |
STOP | 001-00000000000000000000000000000 | 536870912 | 停止状态,不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务 |
TIDYING | 010-00000000000000000000000000000 | 1073741824 | 整理中状态,所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated() |
TERMINATED | 011-00000000000000000000000000000 | 1610612736 | 终结状态,钩子方法terminated()执行完毕 |
这里有个比较特殊的技巧,由于运行状态放在高 3 位,所以可以直接通过十进制值(甚至可以直接忽略低 29 位,直接用 ctl 进行比较,或者使用 ctl 和线程池状态常量进行比较)来比较和判断线程池的状态:
工作线程数为0的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)
线程池的状态使用 ctl 的高三位表示:
状态 | 高三位值 | 含义 |
---|---|---|
running | 100 | 运行状态,数值为负数 |
shutdown | 000 | 关闭状态,调用shutdown()方法 |
stop | 001 | 关闭状态,调用shutdownNow()方法,与shutdown状态稍有不同 |
tidying | 010 | 线程池关闭的后处理状态 |
terminated | 011 | 终止状态,这才是最终关闭 |
下面这三个方法就是使用这种技巧:
// ctl和状态常量比较,判断是否小于
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// ctl和状态常量比较,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
Worker
类图
线程池中的每一个具体的工作线程被包装为内部类 Worker 实例,Worker 继承于 AbstractQueuedSynchronizer(AQS)
,实现了 Runnable 接口:
源码
Worker 的构造器
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker 的构造函数里面的逻辑十分重要,通过 ThreadFactory 创建的 Thread 实例同时传入 Worker 实例,因为 Worker 本身实现了 Runnable,可以作为任务提交到线程中执行。只要 Worker 持有的线程实例 w 调用 Thread#start()
方法就能在合适时机执行 Worker#run()
。简化逻辑如下:
// addWorker()方法中构造
Worker worker = createWorker();
// 通过线程池构造时候传入
ThreadFactory threadFactory = getThreadFactory();
// Worker构造函数中
Thread thread = threadFactory.newThread(worker);
// addWorker()方法中启动
thread.start();
Worker 继承自 AQS,这里使用了 AQS 的独占模式,有个技巧是构造 Worker 的时候,把 AQS 的资源(状态)通过 setState(-1)
设置为 -1,这是因为 Worker 实例刚创建的时候 AQS 中 state 的默认值是 0,此时线程尚未启动,不能在这个时候进行线程中断,见 Worker#interruptIfStarted()
方法。 Worker 中两个覆盖 AQS 的方法 tryAcquire()
和 tryRelease()
都没有判断外部传入的变量,前者直接 CAS(0,1),后者直接 setState(0)。
// ThreadPoolExecutor
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// ThreadPoolExecutor.Worker
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
execute
当任务提交到线程池时,都会调用这个方法。其主要目的是增加工作线程数量以执行任务。
流程图
源码
// 执行命令,其中命令(下面称任务)对象是Runnable的实例
public void execute(Runnable command) {
// 判断命令(任务)对象非空
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// 判断如果当前工作线程数小于核心线程数,则创建新的核心线程并且执行传入的任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 如果创建新的核心线程成功则直接返回
return;
// 这里说明创建核心线程失败,需要更新ctl的临时变量c
c = ctl.get();
}
// 走到这里说明创建新的核心线程失败,存在两种情况
//1. 当前工作线程数大于等于corePoolSize
//2. 当前线程数<核心线程数,但是添加Worker失败,失败又有如下情况
//2.1 线程池的状态为 stop, tidying, terminated
//2.2 shutdown command 不为 null
// 2.3 shutdown command 为null,阻塞队列也为空(这种情况不会出现在这里)
// 如果是运行状态,说明属于第一种情况,那么将任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查
// 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理之(也就是移除前面成功入队的任务实例)
if (!isRunning(recheck) && remove(command))
// 调用拒绝策略处理任务 - 返回
reject(command);
// 走到下面的else if分支,说明有以下的前提:
// 0、待执行的任务已经成功加入任务队列
// 1、线程池可能是RUNNING状态
// 2、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)
// 如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null - 返回
// 也就是创建的非核心线程不会马上运行,而是等待获取任务队列的任务去执行
// 如果前工作线程数量不为0,原来应该是最后的else分支,但是可以什么也不做,
// 因为任务已经成功入队列,总会有合适的时机分配其他空闲线程去执行它
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 走到这里说明有以下的前提:
// 0、线程池中的工作线程总数已经大于等于corePoolSize(简单来说就是核心线程已经全部懒创建完毕)
// 1、线程池可能不是RUNNING状态
// 2、线程池可能是RUNNING状态同时任务队列已经满了
// 如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行
// 创建非核心线程失败,此时需要拒绝执行任务
else if (!addWorker(command, false))
// 调用拒绝策略处理任务 - 返回
reject(command);
}
简易流程:
- 如果当前工作线程总数小于
corePoolSize
,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。 - 如果当前工作线程总数大于等于
corePoolSize
,判断线程池是否处于运行中状态,同时尝试用非阻塞的方式向任务队列放入任务。这里会二次检查线程池运行状态,如果线程池处于非运行中状态,且能成功从队列中移除,调用拒绝策略处理任务。如果当前工作线程数为 0,则创建一个非核心线程并且传入的任务对象为 null。 - 如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。
- 如果创建非核心线程失败,说明线程池已经关闭或队列已满,此时需要拒绝执行任务,调用拒绝策略处理任务。
执行 shutdown() 或 shutdownNow() 后,不能够再提交新任务。
addWorker
boolean addWorker(Runnable firstTask, boolean core)
方法的第一个参数可以用于直接传入任务实例,第二个参数用于标识将要创建的工作线程是否是核心线程。方法的源码如下:
// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 注意这是一个死循环 - 最外层循环
for (int c = ctl.get();;) {
// 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
// 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0),stop,tidying,terminated
// 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
// 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 注意这也是一个死循环 - 二层循环
for (;;) {
// 这里每一轮循环都会重新获取工作线程数wc
// 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
// 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 成功通过CAS更新工作线程数wc,则break到最外层的循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
c = ctl.get(); // Re-read ctl
// 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
// 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
// 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
// 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
// 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
// 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
// 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把创建的工作线程实例添加到工作线程集合
// 所有的事情都为了这一句,添加工作线程
workers.add(w);
int s = workers.size();
// 尝试更新历史峰值工作线程数,也就是线程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
// 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程集合移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 添加Worker失败
private void addWorkerFailed(Worker w) {
// 需要操作 workers,类型为线程不安全的HashSet,所以需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 从工作线程集合移除之
if (w != null)
workers.remove(w);
// wc数量减1
decrementWorkerCount();
// 基于状态判断尝试终结线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
下面的代码逻辑比较复杂:
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// ....
// 代码拆分一下如下
boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN); # rs >= SHUTDOWN(0)
boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty();
if (atLeastShutdown && atLeastStop){
return false;
}
Worker 实例创建的同时,在其构造函数中会通过 ThreadFactory
创建一个 Java 线程 Thread 实例,后面会加锁后二次检查是否需要把 Worker 实例添加到工作线程集合 workers 中和是否需要启动 Worker 中持有的 Thread 实例,只有启动了 Thread 实例,Worker 才真正开始运作,否则只是一个无用的临时对象。Worker 本身也实现了 Runnable 接口,它可以看成是一个 Runnable 的适配器。
runWorker
任务顺利绑定线程或加入到队列后,接下来就要去执行这些任务。addWorker 方法中执行 t.start()
,表明线程已启动,线程对象调用 Worker#run
方法,接着调用到外部类 ThreadPoolExecutor#runWorker()
。
流程图
源码
final void runWorker(Worker w) {
// 获取当前线程,实际上和Worker持有的线程实例是相同的
Thread wt = Thread.currentThread();
// 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中
Runnable task = w.firstTask;
// 设置Worker中持有的初始化时传入的任务对象为null
w.firstTask = null;
// 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许锁被获取
w.unlock(); // allow interrupts
// 记录线程是否因为用户异常终结,默认是true
boolean completedAbruptly = true;
try {
// 初始化任务对象不为null,或者从任务队列获取任务不为空(从任务队列获取到的任务会更新到临时变量task中)
// getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结
while (task != null || (task = getTask()) != null) {
// Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1
w.lock();
// 如果状态为stop,tidying或terminated,则中断
// 如果为 running,shutdown 状态,且存在中断的话,清空中断标记
// Thread.interrupted 方法会清除中断标记
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 该中断会在 getTask 里体现
wt.interrupt();
try {
// 钩子方法,任务执行前
beforeExecute(wt, task);
try {
task.run();
// 钩子方法,任务执行后 - 正常情况
afterExecute(task, null);
} catch (Throwable ex) {
// 钩子方法,任务执行后 - 异常情况
afterExecute(task, ex);
throw ex;
}
} finally {
// 清空task临时变量,这个很重要,否则while会死循环执行同一个task
task = null;
// 累加Worker完成的任务数
w.completedTasks++;
// Worker解锁,本质是AQS释放资源,设置state为0
w.unlock();
}
}
// 走到这里说明某一次getTask()返回为null,线程正常退出
completedAbruptly = false;
} finally {
// 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
processWorkerExit(w, completedAbruptly);
}
}
小结一下上面 runWorker() 方法的核心流程:
- Worker 先执行一次解锁操作,用于解除不可中断状态。
- 通过 while 循环调用
getTask()
方法从任务队列中获取任务(当然,首轮循环最有可能是外部传入的firstTask
任务实例)。 - 如果线程池更变为
STOP
状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程不是中断状态。 - 执行任务实例
Runnable#run()
方法,任务实例执行之前和之后(包括正常执行完毕和异常执行情况)分别会调用钩子方法beforeExecute()
和afterExecute()
。 - while 循环跳出则意味着
runWorker()
方法结束和工作线程生命周期结束(Worker#run()
生命周期完结),会调用processWorkerExit()
处理线程退出后的后续工作。
这里重点拆解分析一下判断当前工作线程中断状态的代码:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 先简化一下判断逻辑,如下
// 判断线程池状态是否至少为STOP,rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// 判断线程池状态是否至少为STOP,同时判断当前线程的中断状态并且清空当前线程的中断状态
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if ((atLeastStop || interruptedAndAtLeastStop) && !wt.isInterrupted()){
wt.interrupt();
}
Thread.interruped()
方法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个方法是因为在执行上面这个 if 逻辑同时外部有可能调用 shutdownNow()
方法,shutdownNow()
方法中也存在中断所有 Worker 线程的逻辑,但是由于 shutdownNow()
方法中会遍历所有 Worker 做线程中断,有可能无法及时在任务提交到 Worker 执行之前进行中断,所有这个中断逻辑在 Worker 内部去判断。这里还要注意的是:STOP 状态下会拒绝所有新提交的任务,不会再执行任务队列中的任务,同时会中断所有 Worker 线程。也就是说,即使任务 Runnable 已经 runWorker()
中前半段逻辑取出,只要还没走到调用其 Runnable#run()
,都有可能被中断。假设刚好发生了进入 if 代码块的逻辑同时外部调用了 shutdownNow()
方法,那么 if 逻辑内会判断线程中断状态并且重置,那么 shutdownNow()
方法中调用的 interruptWorkers()
就不会因为中断状态判断出现问题导致二次中断线程(会导致异常)。
getTask
getTask()
方法是工作线程在 while
死循环中获取任务队列中的任务对象的方法,如果获取不到任务就会阻塞,keepAliveTime 控制非核心线程阻塞多长时间。
阻塞又分为两种,超时阻塞(poll)和完全阻塞(take),但是这两个方法都会响应中断。响应中断后,会重新执行 getTask() 方法。
流程图
源码分析
private Runnable getTask() {
// 记录上一次从队列中拉取的时候是否超时
boolean timedOut = false; // Did the last poll() time out?
// 注意这是死循环
for (;;) {
int c = ctl.get();
// 返回空的条件,存在两种
// 1. 状态为 stop,tidying,terminated
// 2. 状态为 shutdown,且队列为空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// ctl 的值减 1,这个操作一定要重视,经常找不到它
decrementWorkerCount();
return null;
}
// 跑到这里说明
//1. 线程池还处于RUNNING状态
//2. shotdown 状态,阻塞队列非空
int wc = workerCountOf(c);
// 判断是否处理超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果需要减少工作线程(线程数> maxinumPoolSize,或者处理超时并超时两种情况)
// 并且存在可以减少的线程数,那么就减少线程数,返回 null,结束一个线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环
if (r != null)
return r;
// 如果超时了,重入循环体,超时标记为true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
当 allowCoreThreadTimeOut
设置为 true 的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出 keepAliveTime
的意义:
- 当允许核心线程超时,也就是
allowCoreThreadTimeOut
设置为 true 的时候,此时keepAliveTime
表示空闲的工作线程的存活周期。 - 默认情况下不允许核心线程超时,此时
keepAliveTime
表示空闲的非核心线程的存活周期。
在一下特定的场景下,配置合理的 keepAliveTime
能够更好地利用线程池的工作线程资源。
processWorkerExit 方法源码分析
processWorkerExit()
方法是为将要终结的 Worker 做一次清理和数据记录工作。因为 processWorkerExit()
方法也包裹在 runWoker()
方法 finally 代码块中其实工作线程在执行完 processWorkerExit()
方法才算是真正的终结。
它分为正常中断和非正常中断两种。
对于非正常中断的线程,需要通过新增工作线程的方法将线程补回来,对于正常中断的线程,需要保证在阻塞队列非空的情况有工作线程存在。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 因为抛出用户异常导致线程终结,直接使工作线程数减1即可
// 如果没有任何异常抛出的情况下是通过getTask()返回null引导线程正常跳出runWorker()方法的while死循环从而正常终结,这种情况下,在getTask()中已经把线程数减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 首先要获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
completedTaskCount += w.completedTasks;
// 工作线程集合中移除此将要终结的Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 见下一小节分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理
// 主要是传播停止信号
tryTerminate();
int c = ctl.get();
// 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下:
// 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
// 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程
if (runStateLessThan(c, STOP)) {
// 正常终止,那么判断线程池中是否还有工作线程,如果没有工作线程了,那么新增工作线程
if (!completedAbruptly) {
// 如果允许核心线程超时,最小值为0,否则为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果最小值为0,同时任务队列不空,则更新最小值为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 工作线程数大于等于最小值,直接返回不新增非核心线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 异常终止导致少了一个线程,应该将该线程补回来
addWorker(null, false);
}
}
代码的后面部分区域,会判断线程池的状态,如果线程池是 RUNNING 或者 SHUTDOWN 状态的前提下,如果当前的工作线程由于抛出用户异常被终结,那么会新建一个非核心线程。如果当前的工作线程并不是抛出用户异常被终结(正常情况下的终结),那么会这样处理:
allowCoreThreadTimeOut
为 true,也就是允许核心线程超时的前提下,如果任务队列为空,即使线程池里没有工作线程,会通过创建一个非核心线程保持线程池至少有一个工作线程。如果队列不为空,且线程池里面有一个以上的线程,就不会创建工作线程。allowCoreThreadTimeOut
为 false,如果工作线程总数大于corePoolSize
则直接返回,否则创建一个非核心线程,也就是会趋向于保持线程池中的工作线程数量趋向于corePoolSize
。
processWorkerExit()
执行完毕后,意味着该工作线程的生命周期已经完结。
tryTerminate 方法源码分析
每个工作线程终结的时候都会调用 tryTerminate()
方法,检查是否关闭连接池:
final void tryTerminate() {
// 这是一个死循环,看到这种情况,就需要注意返回值
for (;;) {
int c = ctl.get();
// 判断线程池的状态,如果是下面三种情况下的任意一种则直接返回:
// 1.线程池处于RUNNING状态
// 2.线程池至少为TIDYING状态,也就是TIDYING或者TERMINATED状态,意味着已经走到了下面的步骤,说明没有工作线程了
// 3.线程池至少为STOP状态并且任务队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// 代码执行到这,只有2种情况:1:stop状态,2:shutdown状态,且阻塞队列为空
// 如果工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//代码执行到这,说明工作线程为0,此处可能存在两种状态变化
//1. shutdown-->tidying
//2. stop-->tidying
//3. tidying-->terminated 执行线程池关闭的后处理操作,terminated() 空方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// CAS设置线程池状态为TIDYING,如果设置成功则执行钩子方法terminated()
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 最后更新线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒等待线程池关闭的线程,这个变量的await()方法在awaitTermination()中调用
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
这里必须要明白为什么是中断一个空闲线程。
这是因为中断 1 个空闲线程,就会执行 processWorkerExit()
方法,而该方法内部又调用了 tryTerminated()
方法,该方法在满足关闭线程池的条件时会进而中断一个空闲线程,调用 processWorkerExit()
方法。因此只要条件满足,所有的线程都会被中断。因此该方法具有扩散中断线程的作用。
interruptIdleWorkers 方法
// 中断空闲的工作线程,onlyOne为true的时候,只会中断工作线程集合中的某一个线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 这里判断线程不是中断状态并且尝试获取锁成功的时候才进行线程中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 这里跳出循环,也就是只中断集合中第一个工作线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
如果参数为 true,则只中断一个空闲线程,tryTerminate()
方法只传入 true。
如果参数为 false,则中断所有空闲线程。
shutdown() 方法
该方法用于关闭线程池,该状态修改为 shutdown,存在以下几点特质:
- 不能提交新任务,从 execute() 方法中可以看出
- 可以继续执行完阻塞队列中的任务,从 runWorker() 方法中可以看出
- 终止所有的空闲线程
返回值为 void
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验,安全策略相关判断
checkShutdownAccess();
// 设置SHUTDOWN状态
advanceRunState(SHUTDOWN);
// 中断所有的空闲的工作线程
interruptIdleWorkers();
// 钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
// 有关闭线程的地方就有该方法
tryTerminate();
}
// 升提状态
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
// 线程池状态至少为targetState或者CAS设置状态为targetState则跳出循环
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// 中断所有的空闲的工作线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
shutdownNow() 方法
该方法用于关闭线程池,将该状态修改为 STOP 状态,存在以下几点特质:
不能提交新任务,从 execute() 方法可以看出
- 线程直接中断,不再执行任务,阻塞队列的任务不再执行,从 runWorker() 和 interruptWorkers() 方法中可以看出
- 终止所有的线程。不管线程是否工作,都终止,任务的执行是原子性的,不会出现执行一半任务的情况
返回值为阻塞队列中的任务列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验,安全策略相关判断
checkShutdownAccess();
// 设置STOP状态
advanceRunState(STOP);
// 中断所有的空闲的工作线程
interruptWorkers();
// 清空工作队列并且取出所有的未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
tryTerminate();
return tasks;
}
drainQueue() 方法
使用阻塞队列的 drainTo() 方法,获取所有的任务,将任务添加到 list 中,并返回:
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
//将任务添加到list中,采用该方式效率高
q.drainTo(taskList);
//如果队列中还存在元素,则一个个添加到list中
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;}
awaitTermination() 方法
最长等待 timeout 时间,等待线程池完全关闭,该方法没有新的东西,就是判断线程池的状态是否为 terminated 状态,如果是则返回,如果不是,则等待 timeout 时间并添加到全局锁的 Condition 等待队列中,如果超时返回 false。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 转换timeout的单位为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 循环等待直到线程池状态更变为TERMINATED,每轮循环等待nanos纳秒
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}
线程池的拒绝策略
RejectExecutionHandler 类型,线程池的拒绝执行处理器,更多时候称为拒绝策略。拒绝策略的时机是当阻塞队列已满、没有空闲的线程(核心线程和非核心线程)并且继续提交任务。这里内置了 4 种拒绝策略实现:
AbortPolicy: 直接拒绝策略,也就是不执行任务,直接抛出 RejectExecutionException,这是默认的拒绝策略。
- DiscardPolicy: 抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现,默默消失在人海中那种)。
- DiscardOldestPolicy: 抛弃最老任务策略,也就是通过 poll 方法取出任务队列队头任务抛弃,然后执行当前提交的任务。
- CallerRunsPolicy: 调用者执行策略,也就是当前调用 Executor#execute 的线程直接调用任务 Runnable#run 方法,一般不希望任务丢失就会选择这种策略,但就从实际角度来看,原来的异步调用意图就会变为同步调用。
总结
使用线程池时,一般我们不会放任线程池中的线程和阻塞队列占用的资源无限制的使用。
因此一般会设定好线程数,设置固定大小的阻塞队列,由于阻塞队列和线程的资源都分配了固定的,就必须考虑任务的拒绝策略,至于用什么拒绝策略,根据直接的业务需求选择或者自定义拒绝策略。
对于线程数量的把握,一定要经过压测之后确定,具体业务场景具体分析。
此外,还要做好线程池的隔离,不要让一个线程池做所有的事情,避免一个功能模块出现问题影响到其它的功能。
还应该做好降级应对线程池的奔溃的情况,如果线程池奔溃,使用其它的处理逻辑取代,保证线上服务稳定可用。