概述
ThreadPoolExecutor 是 Java 非常重要的工具类,用好线程池可以成倍缩短业务执行时间。但是对于并发编程,用好也十分困难,它是一把双刃剑,只有勇士才能发挥最大效能。
Executor 体系结构如下图所示:Executor
是最顶层的接口,接口定义的 API 非常简单,仅定义了一个 execute(Runnable)
接口,实现类会在未来的某一刻执行 Runnable
。ExecutorService
同样也是接口,它的 API 比较丰富,通常我们也是用这里的 API 实现更高级的功能。里面包含大致有三类 API:
- 提交任务。包括单个任务提交(submit())和批量任务提交(invokeAll())。
- 状态判断。判断线程池内的任务是否已全部执行完成(isTerminated),判断线程池是否被关闭。
- 关闭线程池。比如立即关闭线程池,不会给还未执行的任务执行机会。
ExecutorService
还提供了一个awaitTermination
方法,用来当当前线程阻塞等待直到线程池完全关闭后被唤醒。
AbstractExecutorService
是一个抽象类,实现了 ExecutorService
部分接口,主要是提交任务。ThreadPoolExecutor
是本篇文章最重要的类,它也是我们经常使用的线程池实现类。
状态定义
Doug Lea 使用一个 32 位 int
类型变量保存线程池的状态和池中线程数量。
- 高
3
位存放线程池状态。 - 低
29
位保存学院路线程数量。
线程池状态解析如下:
状态 | 描述 |
---|---|
RUNNING (111) | 线程池正常运行:线程池可以接收新的任务,并且线程也正常执行任务 |
SHUTDOWN (000) | 拒绝新任务(提交),继续执行未完成的任务(包括 workQueue 队列中的任务) |
STOP (001) | 拒绝新任务(提交),中断正在执行的任务,不处理 workQueue 队列中的任务 |
TIDYING (010) | 销毁所有任务,重置 workCount 为 0。当线程池状态变更为 TIDYING 时,会调用 terminated() 钩子函数。 |
TERMINATED (011) | terminated() 方法结束后,意味着线程池已被关闭。 |
- 当
state < 0
,意味着线程池正常运行。- 当
state = 0
,意味着无法提交任务,但仍能继续执行未完成的任务。- 当
state > 0
,意味着正在执行的任务也要立即被中断执行。
源码分析
内部类:Worker
Doug Lea 把线程池中的线程使用 Worker
内部类进行包装,而且 Worker 继承抽象灶 AQS,因此,它本身也是一把锁。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 每个 Worker 类都会绑定一个 Thread 对象,由 ThreadFactory 线程工厂生成
final Thread thread;
// Worker 首个待执行的任务,在创建 Worker 时,通过构造函数传入,可能为 null
// 这里感觉可以起到一个加速作用,不一定必须得通过阻塞队列才能获得任务
Runnable firstTask;
// 记录此线程完成的任务数量
volatile long completedTasks;
// 构造函数
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// ...
}
Worker
继承 AQS
以实现一个简单的排它锁,当任务需要执行时,首先需要获取锁,才会调用 Runnable#run
方法,当任务执行完后,必须要释放锁,以便下一个任务可以获得该线程的执行权。
提交单个任务 submit
我们从最简单的提交单个 Runnable
任务开始讲起。submit(Runnable)
是在抽象类 AbstractExecutorService
实现的:
// java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
/**
* 向线程池提供一个Runnalbe任务,并返回一个Future凭证。
* 可以通过 Future#get() 方法等待获取计算结果
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// #1 包装适配类
RunnableFuture<Void> ftask = newTaskFor(task, null);
// #2 交给线程池执行任务
execute(ftask);
// #3 返回 Future 凭证
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
方法会将 Runnable
任务包装为 RunnableFuture
实现类 FutureTask
,然后调用 Executor#execute(Runnable)
执行任务。包装这一步是为了可以从 FutureTask
获得计算结果。
任务执行:execute
// java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// #1 c 表示线程池状态和线程数
int c = ctl.get();
// #2 当前线程数小于核心线程数,同步创建一个新的Worker线程,异步执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// #3 创建Worker线程并成功添加任务,直接返回
return;
// #4 返回false意味着出现错误,可能其它线程并发修改了
c = ctl.get();
}
// #5 走到这里,说明当前线程数大于等于核心线程数,或者 addWorker() 方法调用失败
// ① 判断当前线程池状态,没有异常,继续执行; ② 将任务添加到workQueue队列中
if (isRunning(c) && workQueue.offer(command)) {
// #6 任务入队成功,重新检查线程池状态
int recheck = ctl.get();
// #7 double-check 如果线程池没有处于运行状态,则从队列中移除任务
if (! isRunning(recheck) && remove(command))
// #8 执行拒绝策略
reject(command);
// #8 线程池状态正常,并且线程数为 0,那么创建新的 Worker 线程
// 这里是一个兜底操作,担心任务提交到队列,但是没有活的线程可以来执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// #9 任务队列已满,offer 失败,那么创建新的 Worker 线程
else if (!addWorker(command, false))
// #10 Worker 数量不能超过 maxPoolSize,如果超过,则创建失败,执行拒绝策略
reject(command);
}
创建 Worker 任务执行线程:addWorker
这是任务提交线程调用的方法,当我们阅读源码的时候,需要明确这个方法到底是被哪个线程所调用,存在哪些共享变量。
方法执行流程:
- 检查线程池状态。如果
state >= 0
,就不允许提交新的任务。 - 状态没问题,获取当前线程池存活的线程数量。并判断是否出现溢出,即超出所设置的阈值。如果一切正常,那么通过 CAS 将线程数
+1
。这里可能会发生失败,如果失败,重头再来。 - 这里我们会走到
#6
,意味着我们 CAS 设置成功,接下来就需要创建Worker
对象。首先,我们需要获取全局锁,因为向非安全类HashSet
插入Worker
,而且,在获取全局锁后,我们会再次检查线程状态是否正常,如果不正常,抛出相关异常。如果一切满足要求,将Worker
放入HashSet
后,释放全局锁。 - 启动
Worker
工作线程,至此,该线程可以从阻塞队列中获取任务并执行任务。 如果线程没有启动成功,则会重置相关变量,并调用
terminate()
方法。// java.util.concurrent.ThreadPoolExecutor#addWorker
/**
* 检查是否可以根据当前线程池状态和给定的界限(corePoolSize 或 maxPoolSize)创建新的 Worker 任务线程。
* 如果允许,则将 firstTask 作为新的 Wroker 线程的首个任务。
* 如果线程池
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// #1 这里只需要记住,虽然线程状态为 SHUTDOWN,但是当任务队列workQueue不为空时
// 是允许创建 Worker 工作线程。但是不允许提交新的任务(firstTask 不为null,创建失败)
// 因此,这里是对 >= SHUTDOWN 状态的处理,其中存在一种特殊情况是可以允许创建新的Worker线程
// 否则,直接返回 false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 死循环
for (;;) {
// #2 获取线程池 worker 数量
int wc = workerCountOf(c);
// #3 判断 wc 是否"溢出",如果超过预设值,返回 false,不允许创建 worker 线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// #4 条件都满足了,CAS 将 woker 数量 + 1
if (compareAndIncrementWorkerCount(c))
// CAS 成功,跳出第一层 for(;;)
break retry;
// #5 CAS 失败,说明有其它线程也在尝试创建新的 Worker 线程
// 重新来过
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// #6 走到这里,说明 CAS 设置成功(相当于占位),现在可以慢慢创建线程,并执行任务了
// 注意,这里仍是提交任务的线程在执行代码
boolean workerStarted = false; // worker 线程是否已启动
boolean workerAdded = false; // worker 线程是否已被添加到 HashSet 中
Worker w = null; // 新创建的 Worker 线程
try {
// #7 二话不说,直接创建 Worker 实例对象
// Wroker 对象有三个重要的变量,其中包括首个任务、由线程工作创建的线程、还有一个已完成的任务数
// 别忘记,Worker 可是继承 AQS 的
w = new Worker(firstTask);
// #8 获取 Worker 内部的线程
final Thread t = w.thread;
if (t != null) { // 安全判断是否为 null
// #9 这是 ThreadPoolExecutor 对象的锁,可理解为线程池全局锁
// 因为关闭线程也需要持有这把锁,所以在获得锁这段时间内,可以保证线程池不会被关闭
final ReentrantLock mainLock = this.mainLock;
// #10 加锁
mainLock.lock();
try {
// #11 当持有全局锁时,重新检查一下线程状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// #12 进到这里,说明线程池状态正常,满足启动Worker线程执行任务
if (t.isAlive())
// #13 Worker 所创建的线程出现问题,抛出异常
throw new IllegalThreadStateException();
// #13 将 Worker 添加到线程池全局变量中(HashSet 集合)
workers.add(w);
// #14 largestPoolSize 记录 workers 工作线程数量的最大值
// 通过这个值可以得到当前线程池的峰值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// #15 释放全局锁
mainLock.unlock();
}
// #16 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// #17 如果线程没有启动,做一些收尾工作,比如复原 workCount 数量等等
if (! workerStarted)
addWorkerFailed(w);
}
// #18 返回线程是否启动成功
return workerStarted;
}
addWorkerFailed
// java.util.concurrent.ThreadPoolExecutor#addWorkerFailed private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
tryTerminate
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
Worker 执行任务:Worker#run
Worker
线程启动是由提交任务线程来触发的,调用Worker#start()
方法后,Worker
线程就会执行run()
方法逻辑。// java.util.concurrent.ThreadPoolExecutor.Worker#run public void run() { runWorker(this); }
runWorker
逻辑相对简单,执行流程:
首先打开中断响应。
- 如果 firstTask != null,则先执行。否则调用
getTask()
从阻塞队列中获取任务。 - 获取独占锁
w.lock()
,表示当前线程正在执行任务。任务执行完后,需要释放锁。 进入
finally
,线程执行完后,为了节约系统资源,执行完任务的线程需要被回收。 ```java // java.util.concurrent.ThreadPoolExecutor#runWorker final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // #1 获取当前 Worker 首个任务 Runnable task = w.firstTask;// #2 设置 firstTask 为 null w.firstTask = null;
// #3 允许响应中断 w.unlock(); boolean completedAbruptly = true; try {
// #4 循环调用 getTask 获取任务 while (task != null || (task = getTask()) != null) { //#5 上锁,锁的范围仅限于当前 Worker 工作线程 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // #6 如果线程池状态 >= STOP,确保该线程也要被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // #7 这是一个钩子函数,留给需要的子类去实现 beforeExecute(wt, task); // #8 任务执行的过程中抛出的异常 Throwable thrown = null; try { // #9 执行任务 task.run(); // #10 捕获并记录异常 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // #10 任务执行完成,置空 task ,准备通过 getTask() 获取下一个任务 task = null; // #11 更新任务已完成计数器 w.completedTasks++; // #12 释放锁 w.unlock(); } } completedAbruptly = false;
} finally {
// #13 如果执行到这里,需要执行线程关闭,一般遇到以下两种情况才会关闭Worker线程 // ① getTask() 返回 null,② 任务执行过程中出现异常 // processWorkExit 第二种情况会在这个方法中做一些清理工作 processWorkerExit(w, completedAbruptly);
} }
<a name="knI4h"></a>
### getTask
从队列队列中获取任务
```java
// java.util.concurrent.ThreadPoolExecutor#getTask
/**
* 获取待执行的任务,总共有三种可能:
* ① 阻塞直到成功获得任务才返回。我们知道,存在 corePoolSize 大小的线程数不会被回收。
* ② 超时退出。keepAliveTime 起作用。如果一个线程空闲时间超过 keepAliveTime,那么它会被回收。
* ③ 如果发生以下情况,一定会返回 null
* 1. 池中存在大于 maximumPoolSize 个 worker
* 2. 线程池被关闭了。
* 3. 线程池处于 SHUTDOWN 状态,而且 workQueue = null,这种情况线程池不会接收新的任务
* 3. woker 等待超时。
*
*/
private Runnable getTask() {
// #1 poll() 操作是否超时
boolean timedOut = false; // Did the last poll() time out?
// 死循环
for (;;) {
// #2 获取线程池数量
int c = ctl.get();
int rs = runStateOf(c);
// #3 检查线程池状态,存在两种情况会返回 null
// ① rs == SHUTDOWN && workQueue.isEmpty
// ② rs >= STOP
// 这两种情况都没有任务可执行,所以直接返回
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// #4 获取池内存活的线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
// #5 allowCoreThreadTimeOut(false) ,线程数量超出 corePoolSize,
// 则会根据 keepAlive 设置来关闭线程,这是线程回收操作
// 如果允许核心线程数也加入超时等待或者池内线程池超过 corePoolSize
// 则会调用带超时时间的 poll(time)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前线程数超过 maximumPoolSize 或者超时
// 且 wc > 1 或任务队列为空
// 这里的意思是如果线程超时或超过 maximumPoolSize大小,并且池中有 > 1 的线程,或者北关区为空
// 那么当前的这个 Worker 线程就可以被回收,避免资源浪费呀
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// #6 减少 Worker 数量
if (compareAndDecrementWorkerCount(c))
// #7 返回 null ,回收当前 Worker
return null;
continue;
}
try {
// #8 等待 keepAliveTime,如果还没有得到任务,当前线程也可以被关闭
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时可退出
workQueue.take(); // 一直等待直到有任务到达
// #9 如果能得到任务,则立即返回
if (r != null)
return r;
// #10 否则,timeOut 置为 true,表示这是一次超时获取,当前线程会被回收
timedOut = true;
} catch (InterruptedException retry) {
// #11 Worker 发生中断,Worker 会进行重试操作。
timedOut = false;
}
}
}
拒绝策略
RejectedExecutionHandler
是线程池拒绝策略的接口类,所有的拒绝策略都需要实现这个接口,它只有一个方法:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
入参有两个,一个是任务 r
,另一个是线程池 executor
对象。这两个正是线程池的核心对象,我们可以通过这两个对象可以获得很大的权限。
当然,ThreadPoolExecutor 中已经有四个定义好的拒绝策略,当然,我们也可以根据实际需求实现个性化拒绝策略。我们先看看自带的拒绝策略:
拒绝策略 | 描述 |
---|---|
CallerRunsPolicy | 任务被线程池给拒绝了,只要线程池没有被关闭,那么谁提交谁执行。 |
AbortPolicy | 无论什么情况,直接抛出 RejectedException 异常 |
DiscardPolicy | 忽略任务,不做任何处理。 |
DiscardOldestPolicy | 丢弃阻塞队列队头的任务(即等待时间最长),然后将新任务添加到队列队列中 |
一般默认是 CallerRunsPolicy
,其它拒绝策略都不可靠。
Executors
Executors 是一个工具类,可以快速创建线程池。
// 拥有恒定数量的线程池,且队列长度无限制
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newCachedThreadPool 会在很短的时间内缓存线程,以将其重用于其它任务。
// 线程池从0开始增长,数量可能会到达 MAX_VALUE,
// 线程池将会删除闲置一分钟的线程
// 而 SynchronousQueue 队列为 0,因此,如果有空闲线程,交给空闲线程处理,否则,创建一个新的线程
// 来处理该任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
总结
ThreadPoolExecutor 总共有 5 个特别关键的参数:
- corePoolSize
- maximumPoolSize
- workQueue
- keepAliveTime
- rejectedExecutionHandler
我们需要清楚,线程,即 Worker
对象创建的时机,而且如何与阻塞队列配合使用。并且,还有线程回收机制。当任务执行过程中出现异常,执行线程会被关闭。