概述
ThreadPoolExecutor 是 Java 非常重要的工具类,用好线程池可以成倍缩短业务执行时间。但是对于并发编程,用好也十分困难,它是一把双刃剑,只有勇士才能发挥最大效能。
Executor 体系结构如下图所示:
Executor 是最顶层的接口,接口定义的 API 非常简单,仅定义了一个 execute(Runnable) 接口,实现类会在未来的某一刻执行 Runnable。ExecutorService 同样也是接口,它的 API 比较丰富,通常我们也是用这里的 API 实现更高级的功能。里面包含大致有三类 API:
- 提交任务。包括单个任务提交(submit())和批量任务提交(invokeAll())。
- 状态判断。判断线程池内的任务是否已全部执行完成(isTerminated),判断线程池是否被关闭。
- 关闭线程池。比如立即关闭线程池,不会给还未执行的任务执行机会。
ExecutorService还提供了一个awaitTermination方法,用来当当前线程阻塞等待直到线程池完全关闭后被唤醒。
AbstractExecutorService 是一个抽象类,实现了 ExecutorService 部分接口,主要是提交任务。ThreadPoolExecutor 是本篇文章最重要的类,它也是我们经常使用的线程池实现类。
状态定义
Doug Lea 使用一个 32 位 int 类型变量保存线程池的状态和池中线程数量。
- 高
3位存放线程池状态。 - 低
29位保存学院路线程数量。
线程池状态解析如下:
| 状态 | 描述 |
|---|---|
| RUNNING (111) | 线程池正常运行:线程池可以接收新的任务,并且线程也正常执行任务 |
| SHUTDOWN (000) | 拒绝新任务(提交),继续执行未完成的任务(包括 workQueue 队列中的任务) |
| STOP (001) | 拒绝新任务(提交),中断正在执行的任务,不处理 workQueue 队列中的任务 |
| TIDYING (010) | 销毁所有任务,重置 workCount 为 0。当线程池状态变更为 TIDYING 时,会调用 terminated() 钩子函数。 |
| TERMINATED (011) | terminated() 方法结束后,意味着线程池已被关闭。 |
- 当
state < 0,意味着线程池正常运行。- 当
state = 0,意味着无法提交任务,但仍能继续执行未完成的任务。- 当
state > 0,意味着正在执行的任务也要立即被中断执行。
源码分析
内部类:Worker
Doug Lea 把线程池中的线程使用 Worker 内部类进行包装,而且 Worker 继承抽象灶 AQS,因此,它本身也是一把锁。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{// 每个 Worker 类都会绑定一个 Thread 对象,由 ThreadFactory 线程工厂生成final Thread thread;// Worker 首个待执行的任务,在创建 Worker 时,通过构造函数传入,可能为 null// 这里感觉可以起到一个加速作用,不一定必须得通过阻塞队列才能获得任务Runnable firstTask;// 记录此线程完成的任务数量volatile long completedTasks;// 构造函数Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// ...}
Worker 继承 AQS 以实现一个简单的排它锁,当任务需要执行时,首先需要获取锁,才会调用 Runnable#run 方法,当任务执行完后,必须要释放锁,以便下一个任务可以获得该线程的执行权。
提交单个任务 submit
我们从最简单的提交单个 Runnable 任务开始讲起。submit(Runnable) 是在抽象类 AbstractExecutorService 实现的:
// java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)/*** 向线程池提供一个Runnalbe任务,并返回一个Future凭证。* 可以通过 Future#get() 方法等待获取计算结果*/public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();// #1 包装适配类RunnableFuture<Void> ftask = newTaskFor(task, null);// #2 交给线程池执行任务execute(ftask);// #3 返回 Future 凭证return ftask;}protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}
方法会将 Runnable 任务包装为 RunnableFuture 实现类 FutureTask,然后调用 Executor#execute(Runnable) 执行任务。包装这一步是为了可以从 FutureTask 获得计算结果。
任务执行:execute
// java.util.concurrent.ThreadPoolExecutor#executepublic void execute(Runnable command) {if (command == null)throw new NullPointerException();// #1 c 表示线程池状态和线程数int c = ctl.get();// #2 当前线程数小于核心线程数,同步创建一个新的Worker线程,异步执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))// #3 创建Worker线程并成功添加任务,直接返回return;// #4 返回false意味着出现错误,可能其它线程并发修改了c = ctl.get();}// #5 走到这里,说明当前线程数大于等于核心线程数,或者 addWorker() 方法调用失败// ① 判断当前线程池状态,没有异常,继续执行; ② 将任务添加到workQueue队列中if (isRunning(c) && workQueue.offer(command)) {// #6 任务入队成功,重新检查线程池状态int recheck = ctl.get();// #7 double-check 如果线程池没有处于运行状态,则从队列中移除任务if (! isRunning(recheck) && remove(command))// #8 执行拒绝策略reject(command);// #8 线程池状态正常,并且线程数为 0,那么创建新的 Worker 线程// 这里是一个兜底操作,担心任务提交到队列,但是没有活的线程可以来执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}// #9 任务队列已满,offer 失败,那么创建新的 Worker 线程else if (!addWorker(command, false))// #10 Worker 数量不能超过 maxPoolSize,如果超过,则创建失败,执行拒绝策略reject(command);}
创建 Worker 任务执行线程:addWorker
这是任务提交线程调用的方法,当我们阅读源码的时候,需要明确这个方法到底是被哪个线程所调用,存在哪些共享变量。
方法执行流程:
- 检查线程池状态。如果
state >= 0,就不允许提交新的任务。 - 状态没问题,获取当前线程池存活的线程数量。并判断是否出现溢出,即超出所设置的阈值。如果一切正常,那么通过 CAS 将线程数
+1。这里可能会发生失败,如果失败,重头再来。 - 这里我们会走到
#6,意味着我们 CAS 设置成功,接下来就需要创建Worker对象。首先,我们需要获取全局锁,因为向非安全类HashSet插入Worker,而且,在获取全局锁后,我们会再次检查线程状态是否正常,如果不正常,抛出相关异常。如果一切满足要求,将Worker放入HashSet后,释放全局锁。 - 启动
Worker工作线程,至此,该线程可以从阻塞队列中获取任务并执行任务。 如果线程没有启动成功,则会重置相关变量,并调用
terminate()方法。// java.util.concurrent.ThreadPoolExecutor#addWorker/*** 检查是否可以根据当前线程池状态和给定的界限(corePoolSize 或 maxPoolSize)创建新的 Worker 任务线程。* 如果允许,则将 firstTask 作为新的 Wroker 线程的首个任务。* 如果线程池*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// #1 这里只需要记住,虽然线程状态为 SHUTDOWN,但是当任务队列workQueue不为空时// 是允许创建 Worker 工作线程。但是不允许提交新的任务(firstTask 不为null,创建失败)// 因此,这里是对 >= SHUTDOWN 状态的处理,其中存在一种特殊情况是可以允许创建新的Worker线程// 否则,直接返回 falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 死循环for (;;) {// #2 获取线程池 worker 数量int wc = workerCountOf(c);// #3 判断 wc 是否"溢出",如果超过预设值,返回 false,不允许创建 worker 线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// #4 条件都满足了,CAS 将 woker 数量 + 1if (compareAndIncrementWorkerCount(c))// CAS 成功,跳出第一层 for(;;)break retry;// #5 CAS 失败,说明有其它线程也在尝试创建新的 Worker 线程// 重新来过c = ctl.get();if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// #6 走到这里,说明 CAS 设置成功(相当于占位),现在可以慢慢创建线程,并执行任务了// 注意,这里仍是提交任务的线程在执行代码boolean workerStarted = false; // worker 线程是否已启动boolean workerAdded = false; // worker 线程是否已被添加到 HashSet 中Worker w = null; // 新创建的 Worker 线程try {// #7 二话不说,直接创建 Worker 实例对象// Wroker 对象有三个重要的变量,其中包括首个任务、由线程工作创建的线程、还有一个已完成的任务数// 别忘记,Worker 可是继承 AQS 的w = new Worker(firstTask);// #8 获取 Worker 内部的线程final Thread t = w.thread;if (t != null) { // 安全判断是否为 null// #9 这是 ThreadPoolExecutor 对象的锁,可理解为线程池全局锁// 因为关闭线程也需要持有这把锁,所以在获得锁这段时间内,可以保证线程池不会被关闭final ReentrantLock mainLock = this.mainLock;// #10 加锁mainLock.lock();try {// #11 当持有全局锁时,重新检查一下线程状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// #12 进到这里,说明线程池状态正常,满足启动Worker线程执行任务if (t.isAlive())// #13 Worker 所创建的线程出现问题,抛出异常throw new IllegalThreadStateException();// #13 将 Worker 添加到线程池全局变量中(HashSet 集合)workers.add(w);// #14 largestPoolSize 记录 workers 工作线程数量的最大值// 通过这个值可以得到当前线程池的峰值int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {// #15 释放全局锁mainLock.unlock();}// #16 启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {// #17 如果线程没有启动,做一些收尾工作,比如复原 workCount 数量等等if (! workerStarted)addWorkerFailed(w);}// #18 返回线程是否启动成功return workerStarted;}
addWorkerFailed
// java.util.concurrent.ThreadPoolExecutor#addWorkerFailed private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }tryTerminate
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }Worker 执行任务:Worker#run
Worker线程启动是由提交任务线程来触发的,调用Worker#start()方法后,Worker线程就会执行run()方法逻辑。// java.util.concurrent.ThreadPoolExecutor.Worker#run public void run() { runWorker(this); }runWorker
逻辑相对简单,执行流程:
首先打开中断响应。
- 如果 firstTask != null,则先执行。否则调用
getTask()从阻塞队列中获取任务。 - 获取独占锁
w.lock(),表示当前线程正在执行任务。任务执行完后,需要释放锁。 进入
finally,线程执行完后,为了节约系统资源,执行完任务的线程需要被回收。 ```java // java.util.concurrent.ThreadPoolExecutor#runWorker final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // #1 获取当前 Worker 首个任务 Runnable task = w.firstTask;// #2 设置 firstTask 为 null w.firstTask = null;
// #3 允许响应中断 w.unlock(); boolean completedAbruptly = true; try {
// #4 循环调用 getTask 获取任务 while (task != null || (task = getTask()) != null) { //#5 上锁,锁的范围仅限于当前 Worker 工作线程 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // #6 如果线程池状态 >= STOP,确保该线程也要被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // #7 这是一个钩子函数,留给需要的子类去实现 beforeExecute(wt, task); // #8 任务执行的过程中抛出的异常 Throwable thrown = null; try { // #9 执行任务 task.run(); // #10 捕获并记录异常 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // #10 任务执行完成,置空 task ,准备通过 getTask() 获取下一个任务 task = null; // #11 更新任务已完成计数器 w.completedTasks++; // #12 释放锁 w.unlock(); } } completedAbruptly = false;} finally {
// #13 如果执行到这里,需要执行线程关闭,一般遇到以下两种情况才会关闭Worker线程 // ① getTask() 返回 null,② 任务执行过程中出现异常 // processWorkExit 第二种情况会在这个方法中做一些清理工作 processWorkerExit(w, completedAbruptly);} }
<a name="knI4h"></a>
### getTask
从队列队列中获取任务
```java
// java.util.concurrent.ThreadPoolExecutor#getTask
/**
* 获取待执行的任务,总共有三种可能:
* ① 阻塞直到成功获得任务才返回。我们知道,存在 corePoolSize 大小的线程数不会被回收。
* ② 超时退出。keepAliveTime 起作用。如果一个线程空闲时间超过 keepAliveTime,那么它会被回收。
* ③ 如果发生以下情况,一定会返回 null
* 1. 池中存在大于 maximumPoolSize 个 worker
* 2. 线程池被关闭了。
* 3. 线程池处于 SHUTDOWN 状态,而且 workQueue = null,这种情况线程池不会接收新的任务
* 3. woker 等待超时。
*
*/
private Runnable getTask() {
// #1 poll() 操作是否超时
boolean timedOut = false; // Did the last poll() time out?
// 死循环
for (;;) {
// #2 获取线程池数量
int c = ctl.get();
int rs = runStateOf(c);
// #3 检查线程池状态,存在两种情况会返回 null
// ① rs == SHUTDOWN && workQueue.isEmpty
// ② rs >= STOP
// 这两种情况都没有任务可执行,所以直接返回
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// #4 获取池内存活的线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
// #5 allowCoreThreadTimeOut(false) ,线程数量超出 corePoolSize,
// 则会根据 keepAlive 设置来关闭线程,这是线程回收操作
// 如果允许核心线程数也加入超时等待或者池内线程池超过 corePoolSize
// 则会调用带超时时间的 poll(time)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前线程数超过 maximumPoolSize 或者超时
// 且 wc > 1 或任务队列为空
// 这里的意思是如果线程超时或超过 maximumPoolSize大小,并且池中有 > 1 的线程,或者北关区为空
// 那么当前的这个 Worker 线程就可以被回收,避免资源浪费呀
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// #6 减少 Worker 数量
if (compareAndDecrementWorkerCount(c))
// #7 返回 null ,回收当前 Worker
return null;
continue;
}
try {
// #8 等待 keepAliveTime,如果还没有得到任务,当前线程也可以被关闭
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时可退出
workQueue.take(); // 一直等待直到有任务到达
// #9 如果能得到任务,则立即返回
if (r != null)
return r;
// #10 否则,timeOut 置为 true,表示这是一次超时获取,当前线程会被回收
timedOut = true;
} catch (InterruptedException retry) {
// #11 Worker 发生中断,Worker 会进行重试操作。
timedOut = false;
}
}
}
拒绝策略
RejectedExecutionHandler 是线程池拒绝策略的接口类,所有的拒绝策略都需要实现这个接口,它只有一个方法:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
入参有两个,一个是任务 r,另一个是线程池 executor 对象。这两个正是线程池的核心对象,我们可以通过这两个对象可以获得很大的权限。
当然,ThreadPoolExecutor 中已经有四个定义好的拒绝策略,当然,我们也可以根据实际需求实现个性化拒绝策略。我们先看看自带的拒绝策略:
| 拒绝策略 | 描述 |
|---|---|
| CallerRunsPolicy | 任务被线程池给拒绝了,只要线程池没有被关闭,那么谁提交谁执行。 |
| AbortPolicy | 无论什么情况,直接抛出 RejectedException 异常 |
| DiscardPolicy | 忽略任务,不做任何处理。 |
| DiscardOldestPolicy | 丢弃阻塞队列队头的任务(即等待时间最长),然后将新任务添加到队列队列中 |
一般默认是 CallerRunsPolicy,其它拒绝策略都不可靠。
Executors
Executors 是一个工具类,可以快速创建线程池。
// 拥有恒定数量的线程池,且队列长度无限制
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newCachedThreadPool 会在很短的时间内缓存线程,以将其重用于其它任务。
// 线程池从0开始增长,数量可能会到达 MAX_VALUE,
// 线程池将会删除闲置一分钟的线程
// 而 SynchronousQueue 队列为 0,因此,如果有空闲线程,交给空闲线程处理,否则,创建一个新的线程
// 来处理该任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
总结
ThreadPoolExecutor 总共有 5 个特别关键的参数:
- corePoolSize
- maximumPoolSize
- workQueue
- keepAliveTime
- rejectedExecutionHandler
我们需要清楚,线程,即 Worker 对象创建的时机,而且如何与阻塞队列配合使用。并且,还有线程回收机制。当任务执行过程中出现异常,执行线程会被关闭。
