1. 概况
线程池作用:
- 减少频繁创建、销毁线程的资源损耗
- 任务来了,可以立即执行,响应快
- 避免对资源的过度使用,超过预期的请求能够拒绝
2. 类定义

- ExecutorService 扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future的 方法;提供了管控线程池的方法,比如停止线程池的运行。
- AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
public class ThreadPoolExecutor extends AbstractExecutorService {}public abstract class AbstractExecutorService implements ExecutorService {}public interface ExecutorService extends Executor {}public interface Executor {void execute(Runnable command);}
3. 成员变量
3.1 ctl
用一个变量去表示两种状态,避免为了维护两者的一致使用额外的锁资源。高 3 位表示线程池的状态 runState,低 29 位表示有效工作线程数量 workerCount。
// ctl 高 3 位表示线程状态 RunState,低 29 位表示线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; } // 计算 runStateprivate static int workerCountOf(int c) { return c & CAPACITY; } // 计算 workerCountprivate static int ctlOf(int rs, int wc) { return rs | wc; } // 通过 runState 和 workerCount 计算出 ctl
3.2 RunState
线程池的运行状态,值从小到大排序: RUNNING < SHUTDOW < STOP < TIDYING < TERMINATED
private static final int RUNNING = -1 << COUNT_BITS; // 111private static final int SHUTDOWN = 0 << COUNT_BITS; // 000private static final int STOP = 1 << COUNT_BITS; // 001private static final int TIDYING = 2 << COUNT_BITS; // 010,所有的任务已经终止,workerCount = 0,将会执行 terminated() 的 hook 方法private static final int TERMINATED = 3 << COUNT_BITS;// 011

线程池状态转换:
- RUNNING -> SHUTDOWN,调用 shutdown() 或者 finalize()
- (RUNNING or SHUTDOWN) -> STOP,调用 shutdownNow()
- SHUTDOWN -> TIDYING,队列中的任务和线程池中的线程都为空
- STOP -> TIDYING,线程池线程为空
- TIDYING -> TERMINATED,调用 terminated() 方法

3.3 其它成员变量
// 保存任务的队列,阻塞式private final BlockingQueue<Runnable> workQueue;// 主锁,后面防止并发的时候都会用到这个锁private final ReentrantLock mainLock = new ReentrantLock();// 线程池中的工作线程放到 worker 中,将任务与工作线程分开private final HashSet<Worker> workers = new HashSet<Worker>();// 调用 awaitTermination() 方法时,线程会等待这个条件private final Condition termination = mainLock.newCondition();// 线程池中线程数曾经的最大值private int largestPoolSize;// 线程池完成的任务数private long completedTaskCount;
阻塞队列可选:
3.4 构造函数所需变量
// 创建线程的工厂类,在里面可以设置名称private volatile ThreadFactory threadFactory;// 拒绝策略private volatile RejectedExecutionHandler handler;// 超过 coresize 的线程空闲等待时长,nanoseconds 级别private volatile long keepAliveTime;// core 线程空闲标志,默认为 false,若设为 true ,core 线程将等待 keepAliveTime 之后被销魂private volatile boolean allowCoreThreadTimeOut;// 线程池的最大数量,一般为 CAPACITYprivate volatile int maximumPoolSize;// 默认的拒绝策略private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
5. 成员方法
线程池创建之后,一般就会调用线程池的 execute 方法执行任务
5.1 execute
通过构造函数创建线程池之后,使用 execute 方法执行放到线程池中的任务,他的执行过程参考下图:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get(); // 获取当前 ctl 的值,高 3 位是线程状态,低 29 位是线程数if (workerCountOf(c) < corePoolSize) { // 工作线程数小于 corePoolSize,通过 addWorker 添加核心线程,把当前任务 command 作为该线程的 firstTaskif (addWorker(command, true)) // 添加核心线程,addWorker 里面有执行任务的逻辑return;c = ctl.get();}// 走到这里,表示线程数大于 corePoolSize 或者 添加工作线程失败了// 线程池是 running 状态,并且工作队列没满(能够添加任务)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) // 线程池不是 RUNNING,移除刚刚添加的任务,执行拒绝策略reject(command);else if (workerCountOf(recheck) == 0) // 此时没有 worker ,添加 workeraddWorker(null, false); // 添加非核心线程}// 添加工作线程失败了else if (!addWorker(command, false)) // 队列满了,并且添加非核心线程返回 false(线程数达到 maximumPoolSize),需要执行拒绝策略reject(command); // 执行拒绝策略}}
5.2 shutdown
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 检查权限advanceRunState(SHUTDOWN); // CAS 方式将线程池状态改为 SHUTDOWNinterruptIdleWorkers(); // 中断空闲的线程onShutdown(); // 调用 shuntdown 的 hook 方法} finally {mainLock.unlock();}tryTerminate(); // 进行后续的终止操作}private void interruptIdleWorkers() {interruptIdleWorkers(false); // 中断所有的闲置线程}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) { // 获取锁成功,表明线程没有在执行任务,此时可以进行中断try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}
5.3 tryTerminate
将线程池设置为 TERMINATED 状态,调用 shutdown 方法或者减少 worker count 的时候该方法会被调用。
- runWorker 当线程池获取任务失败时,跳出 while 循环,执行 processWorkerExit 方法,里面调用 tryTerminate() 方法,此时阻塞的任务会被中断。
final void tryTerminate() {for (;;) {int c = ctl.get();// 若线程池是 RUNNING 状态,那么不能 terminate// 或者线程池状态 >= TIDYING,也就是 TIDYING TERMINATED 其中之一,也不能 terminate// 或者线程池状态 == SHUTDOWN,并且工作队列不为空,也不能 terminateif (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果此时线程池中有工作线程if (workerCountOf(c) != 0) { // Eligible to terminate// 中断闲置的 worker,只中断一个 worker// 若此 worker 阻塞在 take 方法中,那么就可以响应中断,并把中断信息传递到外面,runWorker 里面有调用interruptIdleWorkers(ONLY_ONE); // ONLY_ONE = truereturn;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 尝试将 runstate 设为 TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 设置成功,执行 terminated()terminated();} finally {// 将 runState 设为 TERMINATEDctl.set(ctlOf(TERMINATED, 0));termination.signalAll(); // 通知 awaitTermination 的线程}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
6. 内部类 Worker
- firstTask,保存传入的第一个任务,若 firstTask != null,那么线程启动时就会执行这个任务,否则只是增加一个 worker(创建非核心线程)
- Worker 继承 AQS,通过 lock 方法获取独占锁,表示线程正在运行;若线程没有获取独占锁(空闲状态),那么他就可以被回收。
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable // 继承自 AQS,只用实现 tryAquire 和 tryRelease 就可以实现线程间的同步{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;// 若 firstTask 不为空,那么 thread 启动的时候就会执行这个任务;否则 thread 会去调用 getTask() 方法去获取执行的任务Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // 初始化的时候禁止中断响应this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); // 通过工厂方法新建线程}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this); // 执行具体的任务逻辑}// Lock methods 加锁办法,熟悉 AQS 下面的办法应该会很熟悉//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { // 抢占式,直接设定 statesetExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
addWorker
线程池的执行,主要是通过 addWorker 方法去增加工作线程,线程池根据 addWorker 方法的返回判断是否需要执行拒绝策略。
// firstTask 可能为 null// core == ture 表示以 corePoolSize 为界限,工作线程不能超过这个值;core == false 表示以 maximumPoolSize 为界限,不能超过该值private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池处于 SHUTDOW and firstTask == null and ! workQueue.isEmpty() 情况下可以创建线程。若线程池处于 STOP,TIDYING,TERMINATED 情况下,不允许添加线程if (rs >= SHUTDOWN &&!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c); // 线程池中正在运行的线程数if (wc >= CAPACITY || // 正在运行的线程超过 5 亿,或者如果是添加核心线程超过 corePoolSize ,添加非核心线程超过 maximumPoolSize,直接返回不能添加wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) // 以 CAS 方式将当前的工作线程数增加 1,跳出循环break retry;c = ctl.get(); // 存在并发重新读取if (runStateOf(c) != rs) // CAS 失败,线程池状态发生了变更(其它线程关闭线程池),重新循环continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 经过上面循环 workerCount 已经增加了 1 个boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 将 Runnable 对象封装到 Worker 对象中,届时能够进行加锁执行某些方法w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 全局锁,持有这个锁期间,线程池不会关闭,关闭线程池需要该锁try {// 加锁之后再检查 stateint rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) { // 如果是 running 状态,或者是 shutdown ,此时不接受新任务,但是可以处理队列中已有的任务if (t.isAlive()) // worker 里面的 thread,不应该是启动状态throw new IllegalThreadStateException();workers.add(w); // 添加到 Work<Set> 中int s = workers.size();if (s > largestPoolSize) // 维护一个全局达到过的最大线程数计数器largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// worker 成功加入到 HashSetif (workerAdded) {t.start(); // 执行任务的 run 方法workerStarted = true; // 标记添加到队列里面的 worker 启动成功}}} finally {if (! workerStarted)addWorkerFailed(w); // 刚添加到队列的 worker 没有启动成功,需要移除刚添加到 HashSet 里面的 worker}// 返回线程是否启动成功return workerStarted;}

addWorker 四种调用方式:
- addWorker(command, true),线程数 < coreSize时,将 task 放入 workers,如果线程数 >= coreSize,返回 false;
- addWorker(command, false),当阻塞对列已满,尝试将新的 task 放入 workers,如果线程数 >= maximumPoolSize,返回 false;
- addWorker(null, false),放入一个空的 task 到 workers,此时线程数的限制是 maximumPoolSize,相当于创建一个新的线程,没立马分配任务;
- addWorker(null, true)放入一个空的 task 到 workers,线程数限制是需要小于 coreSize,否则返回 false。实际的使用是在prestartCoreThread()等方法,有兴趣的读者可以自行阅读,在此不做详细赘述。
addWorkerFailed
- 删除 workers 里面的 worker
- workCount 减 1
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w); // 移除 HashSet<Worker> 中的这个 workerdecrementWorkerCount(); // worderCount 减 1tryTerminate(); // 尝试终止线程池} finally {mainLock.unlock();}}
runWorker
addWorker 里面执行 start() 方法之后,线程开始执行 run() 逻辑,而 worker 里面的 run() 调用的是 runWorker(),下面来看具体的任务执行过程

final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 线程的第一个任务,可以不从队列中获取Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 释放锁,允许响应中断boolean completedAbruptly = true;try {// worker 的 firsttask 不为空或者从 workeQueue 中获取到任务while (task != null || (task = getTask()) != null) {w.lock();// 若线程池是非 RUNNING 状态,要保证该线程是中断状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); // wt 线程执行 task,做一些执行前的操作,WorkerPoolExecutor 继承 ThreadPoolExecutor,有对这个方法具体实现Throwable thrown = null;try {task.run(); // 执行具体的任务处理逻辑} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {// 任务正常执行完成,此已完成任务置为null,进行下一次循环执行 getTask()task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false; // 完成任务,没有被中断} finally {// 走到这里,说明上面循环结束// 1. getTask 返回 null// 2. 发生了异常processWorkerExit(w, completedAbruptly); // 回收线程}}
getTask
从任务队列中阻塞获取任务,并且会回收多余的 worker

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// 1. rs == SHUTDOWN && workQueue.isEmpty// 2. rs >= STOP// 以上两种情况需要减少 工作线程数if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 若设置了核心线程数可超时 或者 此时工作线程数大于核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 减少 workCountif (compareAndDecrementWorkerCount(c))return null;continue;}try {// 超时获取或者阻塞获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// 没获取到任务,表示超时,继续 for 循环获取timedOut = true;} catch (InterruptedException retry) {// 获取任务过程中被中断timedOut = false;}}}
processWorkerExit
回收 worker
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 更新完成的任务数量workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) { // 线程池处于 RUNNING 或者 SHUTDOWN 状态if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}
