1. 概况
线程池作用:
- 减少频繁创建、销毁线程的资源损耗
- 任务来了,可以立即执行,响应快
- 避免对资源的过度使用,超过预期的请求能够拒绝
2. 类定义
- ExecutorService 扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future的 方法;提供了管控线程池的方法,比如停止线程池的运行。
- AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
public class ThreadPoolExecutor extends AbstractExecutorService {
}
public abstract class AbstractExecutorService implements ExecutorService {
}
public interface ExecutorService extends Executor {
}
public interface Executor {
void execute(Runnable command);
}
3. 成员变量
3.1 ctl
用一个变量去表示两种状态,避免为了维护两者的一致使用额外的锁资源。高 3 位表示线程池的状态 runState,低 29 位表示有效工作线程数量 workerCount。
// ctl 高 3 位表示线程状态 RunState,低 29 位表示线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算 runState
private static int workerCountOf(int c) { return c & CAPACITY; } // 计算 workerCount
private static int ctlOf(int rs, int wc) { return rs | wc; } // 通过 runState 和 workerCount 计算出 ctl
3.2 RunState
线程池的运行状态,值从小到大排序: RUNNING < SHUTDOW < STOP < TIDYING < TERMINATED
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010,所有的任务已经终止,workerCount = 0,将会执行 terminated() 的 hook 方法
private static final int TERMINATED = 3 << COUNT_BITS;// 011
线程池状态转换:
- RUNNING -> SHUTDOWN,调用 shutdown() 或者 finalize()
- (RUNNING or SHUTDOWN) -> STOP,调用 shutdownNow()
- SHUTDOWN -> TIDYING,队列中的任务和线程池中的线程都为空
- STOP -> TIDYING,线程池线程为空
- TIDYING -> TERMINATED,调用 terminated() 方法
3.3 其它成员变量
// 保存任务的队列,阻塞式
private final BlockingQueue<Runnable> workQueue;
// 主锁,后面防止并发的时候都会用到这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池中的工作线程放到 worker 中,将任务与工作线程分开
private final HashSet<Worker> workers = new HashSet<Worker>();
// 调用 awaitTermination() 方法时,线程会等待这个条件
private final Condition termination = mainLock.newCondition();
// 线程池中线程数曾经的最大值
private int largestPoolSize;
// 线程池完成的任务数
private long completedTaskCount;
阻塞队列可选:
3.4 构造函数所需变量
// 创建线程的工厂类,在里面可以设置名称
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 超过 coresize 的线程空闲等待时长,nanoseconds 级别
private volatile long keepAliveTime;
// core 线程空闲标志,默认为 false,若设为 true ,core 线程将等待 keepAliveTime 之后被销魂
private volatile boolean allowCoreThreadTimeOut;
// 线程池的最大数量,一般为 CAPACITY
private volatile int maximumPoolSize;
// 默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
5. 成员方法
线程池创建之后,一般就会调用线程池的 execute 方法执行任务
5.1 execute
通过构造函数创建线程池之后,使用 execute 方法执行放到线程池中的任务,他的执行过程参考下图:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 获取当前 ctl 的值,高 3 位是线程状态,低 29 位是线程数
if (workerCountOf(c) < corePoolSize) { // 工作线程数小于 corePoolSize,通过 addWorker 添加核心线程,把当前任务 command 作为该线程的 firstTask
if (addWorker(command, true)) // 添加核心线程,addWorker 里面有执行任务的逻辑
return;
c = ctl.get();
}
// 走到这里,表示线程数大于 corePoolSize 或者 添加工作线程失败了
// 线程池是 running 状态,并且工作队列没满(能够添加任务)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 线程池不是 RUNNING,移除刚刚添加的任务,执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0) // 此时没有 worker ,添加 worker
addWorker(null, false); // 添加非核心线程
}
// 添加工作线程失败了
else if (!addWorker(command, false)) // 队列满了,并且添加非核心线程返回 false(线程数达到 maximumPoolSize),需要执行拒绝策略
reject(command); // 执行拒绝策略
}
}
5.2 shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查权限
advanceRunState(SHUTDOWN); // CAS 方式将线程池状态改为 SHUTDOWN
interruptIdleWorkers(); // 中断空闲的线程
onShutdown(); // 调用 shuntdown 的 hook 方法
} finally {
mainLock.unlock();
}
tryTerminate(); // 进行后续的终止操作
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false); // 中断所有的闲置线程
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // 获取锁成功,表明线程没有在执行任务,此时可以进行中断
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
5.3 tryTerminate
将线程池设置为 TERMINATED 状态,调用 shutdown 方法或者减少 worker count 的时候该方法会被调用。
- runWorker 当线程池获取任务失败时,跳出 while 循环,执行 processWorkerExit 方法,里面调用 tryTerminate() 方法,此时阻塞的任务会被中断。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 若线程池是 RUNNING 状态,那么不能 terminate
// 或者线程池状态 >= TIDYING,也就是 TIDYING TERMINATED 其中之一,也不能 terminate
// 或者线程池状态 == SHUTDOWN,并且工作队列不为空,也不能 terminate
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果此时线程池中有工作线程
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中断闲置的 worker,只中断一个 worker
// 若此 worker 阻塞在 take 方法中,那么就可以响应中断,并把中断信息传递到外面,runWorker 里面有调用
interruptIdleWorkers(ONLY_ONE); // ONLY_ONE = true
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试将 runstate 设为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 设置成功,执行 terminated()
terminated();
} finally {
// 将 runState 设为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); // 通知 awaitTermination 的线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
6. 内部类 Worker
- firstTask,保存传入的第一个任务,若 firstTask != null,那么线程启动时就会执行这个任务,否则只是增加一个 worker(创建非核心线程)
- Worker 继承 AQS,通过 lock 方法获取独占锁,表示线程正在运行;若线程没有获取独占锁(空闲状态),那么他就可以被回收。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable // 继承自 AQS,只用实现 tryAquire 和 tryRelease 就可以实现线程间的同步
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
// 若 firstTask 不为空,那么 thread 启动的时候就会执行这个任务;否则 thread 会去调用 getTask() 方法去获取执行的任务
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 初始化的时候禁止中断响应
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 通过工厂方法新建线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this); // 执行具体的任务逻辑
}
// Lock methods 加锁办法,熟悉 AQS 下面的办法应该会很熟悉
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) { // 抢占式,直接设定 state
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
addWorker
线程池的执行,主要是通过 addWorker 方法去增加工作线程,线程池根据 addWorker 方法的返回判断是否需要执行拒绝策略。
// firstTask 可能为 null
// core == ture 表示以 corePoolSize 为界限,工作线程不能超过这个值;core == false 表示以 maximumPoolSize 为界限,不能超过该值
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池处于 SHUTDOW and firstTask == null and ! workQueue.isEmpty() 情况下可以创建线程。若线程池处于 STOP,TIDYING,TERMINATED 情况下,不允许添加线程
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
)
return false;
for (;;) {
int wc = workerCountOf(c); // 线程池中正在运行的线程数
if (wc >= CAPACITY || // 正在运行的线程超过 5 亿,或者如果是添加核心线程超过 corePoolSize ,添加非核心线程超过 maximumPoolSize,直接返回不能添加
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // 以 CAS 方式将当前的工作线程数增加 1,跳出循环
break retry;
c = ctl.get(); // 存在并发重新读取
if (runStateOf(c) != rs) // CAS 失败,线程池状态发生了变更(其它线程关闭线程池),重新循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 经过上面循环 workerCount 已经增加了 1 个
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将 Runnable 对象封装到 Worker 对象中,届时能够进行加锁执行某些方法
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 全局锁,持有这个锁期间,线程池不会关闭,关闭线程池需要该锁
try {
// 加锁之后再检查 state
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果是 running 状态,或者是 shutdown ,此时不接受新任务,但是可以处理队列中已有的任务
if (t.isAlive()) // worker 里面的 thread,不应该是启动状态
throw new IllegalThreadStateException();
workers.add(w); // 添加到 Work<Set> 中
int s = workers.size();
if (s > largestPoolSize) // 维护一个全局达到过的最大线程数计数器
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// worker 成功加入到 HashSet
if (workerAdded) {
t.start(); // 执行任务的 run 方法
workerStarted = true; // 标记添加到队列里面的 worker 启动成功
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 刚添加到队列的 worker 没有启动成功,需要移除刚添加到 HashSet 里面的 worker
}
// 返回线程是否启动成功
return workerStarted;
}
addWorker 四种调用方式:
- addWorker(command, true),线程数 < coreSize时,将 task 放入 workers,如果线程数 >= coreSize,返回 false;
- addWorker(command, false),当阻塞对列已满,尝试将新的 task 放入 workers,如果线程数 >= maximumPoolSize,返回 false;
- addWorker(null, false),放入一个空的 task 到 workers,此时线程数的限制是 maximumPoolSize,相当于创建一个新的线程,没立马分配任务;
- addWorker(null, true)放入一个空的 task 到 workers,线程数限制是需要小于 coreSize,否则返回 false。实际的使用是在prestartCoreThread()等方法,有兴趣的读者可以自行阅读,在此不做详细赘述。
addWorkerFailed
- 删除 workers 里面的 worker
- workCount 减 1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 移除 HashSet<Worker> 中的这个 worker
decrementWorkerCount(); // worderCount 减 1
tryTerminate(); // 尝试终止线程池
} finally {
mainLock.unlock();
}
}
runWorker
addWorker 里面执行 start() 方法之后,线程开始执行 run() 逻辑,而 worker 里面的 run() 调用的是 runWorker(),下面来看具体的任务执行过程
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 线程的第一个任务,可以不从队列中获取
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 释放锁,允许响应中断
boolean completedAbruptly = true;
try {
// worker 的 firsttask 不为空或者从 workeQueue 中获取到任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 若线程池是非 RUNNING 状态,要保证该线程是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // wt 线程执行 task,做一些执行前的操作,WorkerPoolExecutor 继承 ThreadPoolExecutor,有对这个方法具体实现
Throwable thrown = null;
try {
task.run(); // 执行具体的任务处理逻辑
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 任务正常执行完成,此已完成任务置为null,进行下一次循环执行 getTask()
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false; // 完成任务,没有被中断
} finally {
// 走到这里,说明上面循环结束
// 1. getTask 返回 null
// 2. 发生了异常
processWorkerExit(w, completedAbruptly); // 回收线程
}
}
getTask
从任务队列中阻塞获取任务,并且会回收多余的 worker
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. rs == SHUTDOWN && workQueue.isEmpty
// 2. rs >= STOP
// 以上两种情况需要减少 工作线程数
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 若设置了核心线程数可超时 或者 此时工作线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 减少 workCount
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 超时获取或者阻塞获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 没获取到任务,表示超时,继续 for 循环获取
timedOut = true;
} catch (InterruptedException retry) {
// 获取任务过程中被中断
timedOut = false;
}
}
}
processWorkerExit
回收 worker
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 更新完成的任务数量
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 线程池处于 RUNNING 或者 SHUTDOWN 状态
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}