1、继承结构
2、线程池状态及线程数
线程池使用一个整数ctl表示线程池状态和线程数,高3位表示线程池的运行状态,除去高3位的低位表示当前线程池中拥有的线程数量
线程池状态 | 说明 | 状态转换 | 二进制 |
---|---|---|---|
RUNNING | 能够接收新任务,以及对已添加的任务进行处理 | 线程池的初始化状态是RUNNING | -1左移29位 11100000 00000000 00000000 00000000 |
SHUTDOWN | 不接收新任务,但能处理已添加的任务 | 调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN | 0 左移29位 00000000 00000000 00000000 00000000 |
STOP | 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务 | 调用线程池的shutdownNow()接口时,线程池由(RUNNING 或者 SHUTDOWN ) -> STOP | 1 左移29位 00100000 00000000 00000000 00000000 |
TIDYING | 当所有的任务已终止,CTL记录线程数为0 |
tryTerminate方法中如果线程池状态大于等于STOP或者线程池状态等于SHUTDOWN 且任务队列为空,并且线程池线程数为0,则将线程池状态修改为TIDYING | 2 左移29位 01000000 00000000 00000000 00000000 |
TERMINATED | 线程池彻底终止 | 线程池状态设置为TIDYING后,会执行terminated钩子方法,然后将线程池状态设置为 TERMINATED |
3 左移29位 01100000 00000000 00000000 00000000 |
关于TIDYING状态详细说明
线程池修改为TIDYING状态的两种情况:
1、执行shutdown()或者shutdownNow()方法的外部线程修改,shutdown()方法中断所有空闲线程,shutdownnow()中断所有线程,之后执行tryTerminate 方法,如果所有工作线程被唤醒后将线程池线程数减1,则线程池线程数为0,外部线程抢到锁,设置线程池状态为TIDYING
2、执行shutdown()或者shutdownNow()方法的外部线程在执行tryTerminate方法时,线程池线程数不为0,之后唤醒的空闲线程执行tryTerminate方法时抢锁,设置线程池状态为TIDYING
通过一些位运算能够计算出线程的线程数和状态,并且还提供了通过线程数和状态计算ctl的方法
/**
* 高3位表示线程池的运行状态
* 除去高3位的低位: 表示当前线程池中拥有的线程数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 表示在ctl中,低COUNT_BITS位为存放线程数量的位
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 用于计算线程池状态的mask
* 00011111 11111111 11111111 11111111
*/
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
/**
* 11100000 00000000 00000000 00000000
* 运行中
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 00000000 00000000 00000000 00000000
* 关闭 不接收新任务,但能处理已添加的任务*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 00100000 00000000 00000000 00000000
* 停止 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 01000000 00000000 00000000 00000000
* 所有的任务已终止
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 01100000 00000000 00000000 00000000
* 线程池彻底终止
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/**
* 获取线程池当前运行状态
* COUNT_MASK : 00011111 11111111 11111111 11111111
* ~COUNT_MASK : 11100000 00000000 00000000 00000000
* @param c ctl
* @return
*/
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
/**
* 获取线程池线程数量
* COUNT_MASK : 00011111 11111111 11111111 11111111
* @param c ctl
* @return
*/
private static int workerCountOf(int c) { return c & COUNT_MASK; }
/**
* 重置当前线程池的ctl值
* rs 11100000 00000000 00000000 00000000 运行中
* wc 00000000 00000000 00000000 00001010 10个线程
* ctl 11100000 00000000 00000000 00001010
* @param rs 线程池状态
* @param wc 线程池线程数量
* @return
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
* 比较当前线程池ctl的状态是否小于某个状态s
* 注:任何情况下 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
* @param c ctl
* @param s 传入状态
* @return
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/**
* 比较当前线程池ctl的状态是否大于等于某个状态s
* @param c ctl
* @param s 传入状态
* @return
*/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 是否是RUNNING状态
* @param c ctl
* @param s 传入状态
* @return
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS增加线程数量
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS减少线程数量
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 将线程数量-1 一定成功 底层用while(true)一直重试
*/
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
3、属性说明
/**
* 任务缓存队列 线程池中的线程达到 核心线程数时,提交的任务就会提交到缓存队列,缓存队列满了就会创建线程数到最大线程数
* 一般为 :
* LinkedBlockingQueue 无界队列
* ArrayBlockingQueue 有界队列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 线程池全局锁 增加减少线程时,修改线程池运行状态需要获取锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中真正存放工作线程的位置
*/
private final HashSet<Worker> workers = new HashSet<>();
/**
* 当外部线程调用awaitTermination()方法时,外部线程会等待当前线程池为Termination为止
*
* termination.await()会将调用线程阻塞
* termination.signalAll()会将阻塞线程依次唤醒
*
* 实现原理: 将外部线程封装为WaitNode放入Condition队列中,外部线程会被park 阻塞
* 当线程池处于Termination状态时,通过unpark将其唤醒
*/
private final Condition termination = mainLock.newCondition();
/**
* 记录线程池生命周期内线程数最大值
*/
private int largestPoolSize;
/**
* 记录线程池完成任务总数
*/
private long completedTaskCount;
/**
* 创建线程使用线程工厂 自定义线程名
*/
private volatile ThreadFactory threadFactory;
/**
* 拒绝策略 默认AbortPolicy
* AbortPolicy:不处理,直接抛出异常。
* CallerRunsPolicy:若线程池还没关闭,调用当前所在线程来运行任务,r.run()执行。
* DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。
* DiscardPolicy:不处理,丢弃掉,不抛出异常。
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲线程存活时间
* allowCoreThreadTimeOut==true时 核心线程数量内线程也会被回收
* allowCoreThreadTimeOut==false时 会维持核心线程数量内线程存活
*/
private volatile long keepAliveTime;
/**
* 控制核心线程数内的线程是否可以被回收
* true 可以 false 不可以
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程数量
*/
private volatile int corePoolSize;
/**
* 最大线程数量
*/
private volatile int maximumPoolSize;
/**
* 默认拒绝策略
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
4、工作线程Worker
/**
* Worker使用了AQS的独占模式
* 独占模式两个重要属性: state 和 ExclusiveOwnerThread
* state:
* state==0 未被占用
* state>0 被占用
* state<0 初始化状态 该状态不能抢锁
* ExclusiveOwnerThread:表示独占锁的线程
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** worker内部封装的工作线程*/
final Thread thread;
/**
* 如果firstTask不为空 worker启动后优先执行firstTask,执行完firstTask后会到queue中获取任务
*/
Runnable firstTask;
/** 记录当前worker完成任务数量 */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//设置AQS独占模式为初始状态 该状态不能抢锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//使用线程工厂创建线程 并以当前worker为runnable
this.thread = getThreadFactory().newThread(this);
}
/** worker启动时会执行run */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
/**
* 当前worker的独占锁是否被独占
* @return
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 尝试去占用worker独占锁
* @param unused
* @return
*/
protected boolean tryAcquire(int unused) {
//使用CAS修改AQS中的state 抢占成功则设置ExclusiveOwnerThread为当前线程
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 释放锁 外部不会调用 AQS内部调用
* @param unused
* @return
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 加锁失败时会阻塞线程,直到获取锁
*/
public void lock() { acquire(1); }
/**
* 尝试去加锁 不会阻塞
* 如果当前锁是未持有状态,那么加锁成功后就会返回true 否则 直接返回false
* @return
*/
public boolean tryLock() { return tryAcquire(1); }
/**
* 一般情况下调用unlock要保证当前线程是持有锁的
* 特殊情况,当worker state == -1时 调用unlock表示初始化state 设置state=0
* 启动worker之前会调用unlock方法
*/
public void unlock() { release(1); }
/**
* 返回当前worker的lock是否被占用
* @return
*/
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
/**
*线程启动状态 且 线程不为空 且没有被中断 则执行中断
*/
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
5、重要方法
5.1、execute
execute为任务提交入口方法,如果当前线程池线程数量 小于核心线程数,则会创建新的worker线程,并将本次任务作为firstTask执行。如果核心线程数达到最大值,则将任务提交到任务队列,如果任务队列已满则尝试创建非核心线程。其中还包含了多处对线程池状态的校验,避免外部线程调用shutdown或者shutdownnow方法更改线程池状态,添加任务至任务队列前后都做了线程池是否RUNNING状态校验。
只有当线程池处于RUNNING状态的时候才可以提交任务,否则执行拒绝策略 ,如果线程池处于RUNNING状态,并且线程数达到最大线程数,任务队列满了,也会执行拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前线程池线程数量 是否小于 核心线程数
if (workerCountOf(c) < corePoolSize) {
/**
* 创建新的worker,并将command作为firstTask
* 参数1:任务对象
* 参数2: true 创建核心线程 false 创建额外线程
*/
if (addWorker(command, true)){
//如果创建新的worker线程成功 直接返回,addWorker中创建的新worker线程会将本次任务作为firstTask执行
return;
}
/**
* 创建核心线程失败
* 可能原因:
* 1、存在并发,当前线程和其他线程同时执行addWorker,但是其他线程先创建成功,并且核心线程数达到设置的核心线程数量
* 2、当前线程池状态发生改变 线程池为非RUNNING状态 addWorker一般会失败
* 特殊情况: SHUTDOWN状态下,firstTask == null 并且queue不为空 可能创建成功
*
*/
c = ctl.get();
}
/**
* 执行到此处可能的情况:
* 1、核心线程数达到设置的核心线程最大数量
* 2、addWorker失败
*/
/**
* isRunning(c) 线程池处于运行状态
* workQueue.offer(command) 把任务添加到任务队列中
* true 任务添加到任务队列成功
* false 任务添加到任务队列失败 可能任务队列满了
*/
if (isRunning(c) && workQueue.offer(command)) {
//任务添加到任务队列成功 再次判断线程池状态
int recheck = ctl.get();
/**
* 条件1:! isRunning(recheck)
* true 线程池处于非RUNNING状态 任务提交到队列后线程池状态被外部线程修改(外部线程调用shutdown或者shutdownnow),需要删除提交任务
* false:线程池处于RUNNING状态
* 条件2:remove(command)
* 前置条件: 线程池处于非RUNNING状态 需要删除提交任务
* true 从队列中移除任务成功 任务提交到队列后,还未被处理
* false 从队列中移除任务失败,任务提交到队列后,在外部线程调用shutdown或者shutdownnow之前就被线程池的工作线程处理了
*/
if (! isRunning(recheck) && remove(command)){
//提交任务到队列后 线程池状态为非RUNNING了,并且移除任务成功,使用拒绝策略
reject(command);
}
/**
* 执行到此处可能的情况:
* 1、线程池处于RUNNING状态
* 2、线程池处于非RUNNING状态 但是移除提交任务时失败
*
* workerCountOf(recheck) == 0 表明线程池线程数为0,此处为担保机制,避免线程池处于RUNNING状态但是无线程可用
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 执行到此处可能的情况:
* 1、当前线程池为非RUNNING 此时addWorker(command, false)一定会返回false
* 2、线程数达到配置最大核心线程数,任务队列满了,添加任务到任务队列失败,如果当前线程数没达到最大线程数,尝试创建新worker线程
*/
else if (!addWorker(command, false)){
/** 执行拒绝策略
* 1、当前线程池为非RUNNING
* 2、线程数达到配置最大核心线程数,任务队列满了,并且线程数达到最大线程数
*/
reject(command);
}
}
5.2、addWorker
addWorker 添加线程池工作线程:
- 线程池处于RUNNING状态
线程池核心线程数或者最大线程数未达到配置最大值,调用增加核心线程或者非核心线程。线程池核心线程数小于最大核心线程数时,提交线程池任务时调用addWorker
并给与初始任务,核心线程数达到配置值后,提交线程池任务时直接提交到任务队列,如果任务队列满了则调用addworker创建非核心线程并给与初始任务
- 线程池处于SHUTDOWN状态
1、外部线程调用shutdown,中断空闲线程。执行退出逻辑processWorkerExit,如果此时任务队列还有数据,则需要调用**addWorker(null, false)**
方法维护最少线程数(allowCoreThreadTimeOut配置为true即可以回收核心线程,此时任务队列有任务最少线程为1,allowCoreThreadTimeOut配置为false则最少线程数为配置最大核心线程数)
2、外部线程调用shutdown,此时任务队列有任务,任务执行过程中线程异常退出,此时需要调用**addWorker(null, false)**
方法维持线程数。
/**
* 创建worker线程
* @param firstTask 创建worker线程绑定的第一个任务
* @param core 是否创建的核心线程
* @return 创建worker是否成功
* 创建worker失败情况:
* 1、线程池状态大于SHUTDOWN,为STOP,TIDYING或者TERMINATED状态
* 2、线程池状态为SHUTDOWN但是firstTask不为空 或者 线程池状态为SHUTDOWN但是任务队列为空
* 3、线程池核心线程数或者最大线程数达到配置最大值
* 4、开发人员实现的ThreadFactory创建线程为null
* 创建worker成功情况:
* 1、线程池状态为RUNNING状态且线程池核心线程数或者最大线程数未达到配置最大值
* 2、线程池处于SHUTDOWN状态且任务队列不为空 firstTask为空,并且线程池核心线程数或者最大线程数未达到配置最大值
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//外层for循环判断线程池状态
retry:
for (int c = ctl.get();;) {
/**判断当前线程池状态是否还允许添加工作线程
* 条件1:runStateAtLeast(c, SHUTDOWN)
* true 线程池处于非RUNNING状态, 为SHUTDOWN,STOP,TIDYING或者TERMINATED状态
* false 线程池处于RUNNING状态
* 条件2:runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty())
* 线程池处于STOP,TIDYING或者TERMINATED状态 或者 firstTask不为null(addworker可能不是execute调用) 或者 任务队列为空
*
*
*为了排除这个情况: 线程池处于SHUTDOWN状态 但是队列任务没有处理完 这个时候还是允许添加worker,但是不允许再次提交task
*
* 返回false的情况:
* 1、线程池状态为非RUNNING状态,线程池状态为STOP,TIDYING或者TERMINATED状态
* 2、线程池状态为SHUTDOWN 但是firstTask不为空 或者 线程池状态为SHUTDOWN但是任务队列为空
*/
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
//内层for循环CAS增加线程池线程数
for (;;) {
/**
* 判断是否还能创建核心线程或者额外线程
* core== true时
* 判断当前线程数是否达到核心线程数
* core== false时
* 判断当前线程数是否达到最大线程数
*/
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
/**
* CAS增加线程数
* true 线程数增加成功
* false CAS冲突线程数增加失败
* 1、其他线程也修改了线程数
* 2、外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化
*/
if (compareAndIncrementWorkerCount(c)){
//CAS成功 直接跳出外层循环到下面的创建worker
break retry;
}
c = ctl.get();
/**
* CAS失败 增加线程数失败,两种可能
* 1、CAS并发,其他线程也修改了线程数导致CAS失败
* 2、外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化
*/
/**
* 判断线程池状态是否为非RUNNING状态
* 验证第二种CAS失败的可能 外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化
*/
if (runStateAtLeast(c, SHUTDOWN)){
//回到RETRY的下一步循环 让其去验证线程池状态
continue retry;
}
//CAS失败由其他线程修改了线程数导致CAS失败 内层for循环再次尝试CAS增加线程数
}
}
/**
* 执行到此处说明允许创建线程
*/
//表示创建的worker是否启动
boolean workerStarted = false;
//表示创建的worker是否添加到线程池
boolean workerAdded = false;
Worker w = null;
try {
//创建worker
w = new Worker(firstTask);
final Thread t = w.thread;
//防止ThreadFactory存在bug,ThreadFactory是开发人员自己实现的
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//持有全局锁 可能会阻塞 加锁后其他线程是无法更改线程池状态的
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
/**
*条件1:isRunning(c) 当前线程池是否处于RUNNING状态
*
* 条件2:runStateLessThan(c, STOP) && firstTask == null
* true:SHUTDOWN状态 并且 firstTask为空 (前置条件为当前线程池为非RUNNING状态)
*
*/
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
//判断线程池线程是否是NEW状态 防止实现ThreadFactory的开发人员将线程启动了
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//把新建的worker放入线程池
workers.add(w);
//更改创建的worker是否添加到线程池的标识
workerAdded = true;
//记录线程池生命周期内线程数最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
//解锁
mainLock.unlock();
}
/**
* workerAdded
* true 添加线程成功
* false 在lock之前 外部线程更改了线程池状态导致添加失败
*/
if (workerAdded) {
//将线程放入线程池后 启动线程
t.start();
workerStarted = true;
}
}
} finally {
/**
* 如果添加线程失败 需要做清理工作
* 1、减少增加的线程数
* 2、将worker移出线程池线程集合
* 3、
*/
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
5.3、runWorker
任务线程执行任务的核心逻辑,如果firstTask不为空执行firstTask,否则去任务队列获取任务,如果获取任务成功则获取工作线程的全局锁执行任务,执行完毕后释放全局锁,
再次去任务队列阻塞获取任务。如果从阻塞队列获取的任务为null(多种原因,在getTask方法细讲) 或者任务执行过程中抛出异常,则执行线程退出逻辑。
/**
* 新建worker线程的时候会把Worker自身传进去,thread.start方法时,线程启动后会调用Worker的run方法,Worker的run方法中调用了本方法runWorker(this)
* @param w
*/
final void runWorker(Worker w) {
//Worker对应的线程
Thread wt = Thread.currentThread();
//初始任务
Runnable task = w.firstTask;
w.firstTask = null;
//当worker state == -1时 调用unlock表示初始化state 设置state=0 和 exclusiveOwnerThread == null
w.unlock(); // allow interrupts
//表示是否突然退出
boolean completedAbruptly = true;
try {
/**
* 条件1: task != null firstTask不为null则直接执行循环体
*
* 条件2: (task = getTask()) != null
* getTask()是一个会阻塞的方法 如果getTask()返回null说明要执行退出逻辑
* true 当前线程从任务队列中获取任务成功
*/
while (task != null || (task = getTask()) != null) {
/**
* worker加锁 设置独占锁为当前线程
* 当线程池shutdown时时通过独占锁判断当前worker的状态 根据独占锁是否空闲判断当前worker是否在工作 worker处理完后会释放锁
*/
w.lock();
/**
* @TODO
* 本质是强制刷新当前线程的中断标记位,因为可能上次执行task时,业务代码将当前线程的中断标记位设置为true 并且没有处理,此处强制刷新一下,不会影响到后面的task
*
* 1、 线程池处于STOP ,TIDYING或者TERMINATED状态并且线程没有设置中断状态,此时要给线程一个中断信号
* 2、 第一次判断线程池状态处于RUNNING或者SHUTDOWN状态后,外部线程调用shutdown或者shutdownnow修改了线程池状态,并且当前线程被中断,清除中断标记位后
*
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前钩子方法 留给子类去实现
beforeExecute(wt, task);
try {
//执行任务
task.run();
//执行后钩子方法 留给子类去实现
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
//更新当前worker完成任务数量
w.completedTasks++;
//worker处理完一个任务后释放独占锁 正常情况下会回到getTask()获取任务
w.unlock();
}
}
/**
* completedAbruptly 表明正常退出
* 代码执行到此处的情况
* getTask()返回null时 当前线程要执行退出逻辑
*/
completedAbruptly = false;
} finally {
// task.run()内部出现异常时 直接从 w.unlock()跳到该行
processWorkerExit(w, completedAbruptly);
}
}
5.4、getTask
getTask是从任务队列阻塞获取任务,根据allowCoreThreadTimeOut配置和当前线程池线程情况决定阻塞获取任务是否设置超时时间,如果allowCoreThreadTimeOut配置为true,则核心线程数内线程也可以回收,如果allowCoreThreadTimeOut配置为false,可以回收非核心线程,如果当前线程可以回收,则设置阻塞获取任务的超时时间为线程池的最大空闲时间。超时后返回null,线程进入退出逻辑,被回收;当前线程无法回收则一直阻塞等待获取任务。
返回null的四种情况
1、 线程池处于STOP,TIDYING或者TERMINATED状态
2、线程池处于SHUTDOWN状态 并且 任务队列为空
3、线程池的线程数量大于最大线程数量限制
4、线程池中的线程数大于核心线程数 或者是 allowCoreThreadTimeOut配置为true 核心线程数内线程可以回收 时 获取任务超时返回null
/**
* 获取任务队列的任务 返回null时 调用getTask()的线程会退出
*什么情况下会返回null
* 1、 线程池处于STOP,TIDYING或者TERMINATED状态
* 2、线程池处于SHUTDOWN状态 并且 任务队列为空
* 3、线程池的线程数量大于最大线程数量限制
* 4、线程池中的线程数大于核心线程数 或者是 allowCoreThreadTimeOut配置为true 核心线程数内线程可以回收 时 获取任务超时返回null
* @return
*/
private Runnable getTask() {
//当前线程获取任务是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
/**
* 条件1 runStateAtLeast(c, SHUTDOWN)
* true: 线程池处于非RUNNING
* 条件2 runStateAtLeast(c, STOP) || workQueue.isEmpty()
* 线程池处于STOP,TIDYING或者TERMINATED状态 或者 任务队列为空
*
* 条件成立两种可能:
* 1、线程池处于STOP,TIDYING或者TERMINATED状态
* 2、线程池处于SHUTDOWN状态 并且 任务队列为空
*/
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
//执行退出逻辑 使用CAS减少线程数
decrementWorkerCount();
return null;
}
/**
* 执行到这里的情况
* 1、线程池处于RUNNING状态
* 2、线程池处于SHUTDOWN状态 但是任务队列不为空
*/
int wc = workerCountOf(c);
/**
* timed ==true 当前线程获取task时是支持超时机制的 workQueue.poll设置超时时间 否则一直阻塞等待
*
* 条件1: allowCoreThreadTimeOut 控制核心线程数内的线程是否可以被回收 true可以false不可以
* 条件2:wc > corePoolSize 当前线程数是否大于核心线程数
* 前置条件:allowCoreThreadTimeOut配置为false 核心线程数内线程不可以回收
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 条件1为true表示线程可以被回收
*
* 条件1: wc > maximumPoolSize || (timed && timedOut)
* 条件1.1 wc > maximumPoolSize
* true 当前线程数大于最大线程数 外部线程可能调用setMaximumPoolSize方法将线程池的最大线程数设置的比初始化的小
* 条件1.2 timed && timedOut
* 当前线程获取task时是支持超时机制 并且当前线程上一次循环获取task已经超时
*
* 条件2:wc > 1 || workQueue.isEmpty()
* 前置条件:当前线程可以被回收,但是还需要进一步判断
* 条件 2.1 wc > 1
* true 说明当前线程池还有其他线程 当前线程可以直接被回收 返回null
* 条件 2.2 workQueue.isEmpty()
* 前置条件:wc = 1 本线程为最后一个线程
* true 任务队列已经没有任务 最后一个线程也可以被回收 返回null
*
*
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/**
* 将线程数-1
* 当前CAS可能会失败,失败后再次循环上去,检查线程池状态,CAS减少线程数
* 1、可能其他线程也退出回收
* 2、线程池状态改变了
*/
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//获取任务逻辑 根据是否支持超时机制的选择poll方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//当前线程获取任务超时了 走循环 检测到超时 返回null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
5.5、processWorkerExit
空闲线程或者异常线程退出逻辑
/**
*
* @param w 当前要退出的worker
* @param completedAbruptly 是否突然退出
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 条件成立 表明当前worker是发生异常退出的,task任务执行过程中向上抛出了异常
*/
if (completedAbruptly){
//将线程数量-1
decrementWorkerCount();
}
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//将当前线程完成的任务数累加到线程池完成的任务总数上
completedTaskCount += w.completedTasks;
//从线程池移除该线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
//当前线程池状态为RUNNING或者SHUTDOWN
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//线程正常退出
//线程池最低持有的线程数量 如果核心线程数内的线程是也可以被回收 则是 0 否则是核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()){
//如果最小线程数可以为0 但是任务队列还有任务 线程池至少要留一个线程
min = 1;
}
/**
* 如果线程池线程数拥有足够的线程 线程退出 否则新添加一个线程
*/
if (workerCountOf(c) >= min)
return;
}
/**
* 执行该代码的情况
* 1、当前线程在执行task时异常退出 此处新创建一个新worker加入线程池
* 2、allowCoreThreadTimeOut配置为true,当前线程池状态为RUNNING或者SHUTDOWN,任务队列还有任务,线程池至少要留一个线程
* 3、allowCoreThreadTimeOut配置为false,当前线程池状态为RUNNING或者SHUTDOWN,当前线程数量小于corePoolSize,需要维持核心线程数量
*/
addWorker(null, false);
}
}
5.6、shutdown
shutdown会先将线程池设置为SHUTDOWN状态,然后中断所有的空闲工作线程
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//判断有没有权限
checkShutdownAccess();
//设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
//扩展使用
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
中断空闲线程方法逻辑
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
*
* @param onlyOne true 只中断一个线程 false中断所有线程
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//循环所有worker
for (Worker w : workers) {
Thread t = w.thread;
/**
* 条件1: !t.isInterrupted()
* true: 线程没有中断
* 条件2:w.tryLock()
* true:说明当前worker处于空闲状态,目前线程在queue.poll或者queue.take 阻塞中
*/
if (!t.isInterrupted() && w.tryLock()) {
try {
//给空闲worker一个中断信号 处于queue阻塞的线程会被唤醒 return null,执行退出逻辑
t.interrupt();
} catch (SecurityException ignore) {
} finally {
//释放线程独占锁
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
//释放全局锁
mainLock.unlock();
}
}
5.7、shutdownnow
shutdownnow会先将线程池设置为STOP状态,然后中断所有的工作线程,最后导出未处理任务
public List<Runnable> shutdownNow() {
//返回值
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
checkShutdownAccess();
//将线程池状态改为STOP
advanceRunState(STOP);
//中断线程池中所有线程
interruptWorkers();
//导出未处理的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
中断所有线程逻辑
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
void interruptIfStarted() {
Thread t;
/**
*线程启动状态 且 线程不为空 且没有被中断 则执行中断
*/
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
5.8、tryTerminate
尝试将线程池设置为TERMINATED
调用对象可能为两种:
1、外部线程调用shutdown或者shutdownnow ,调用tryTerminate
2、空闲工作线程退出或者异常工作线程退出,调用tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 条件1: isRunning(c)
* 线程池处于RUNNING状态
* 条件2: runStateAtLeast(c, TIDYING)
* 线程池状态大于等于TIDYING,说明有其他线程在执行TIDYING -> TERMINATED状态了
* 条件3:runStateLessThan(c, STOP) && ! workQueue.isEmpty()
* 线程池处于SHUTDOWN(此处执行前提条件就是 线程池不处于RUNNING状态) 且任务队列不为空
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
/**
*
* 执行到此处的情况:
* 1、线程池状态大于等于STOP
* 2、线程池状态等于SHUTDOWN 且任务队列为空
*/
//当前线程池线程数大于0
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个空闲线程
/**
* 1、中断的空闲线程在getTask中 pool|task 阻塞,唤醒后,返回null
* 2、在runWorker方法中执行退出逻辑,退出逻辑中会调用tryTerminate方法 唤醒下一个空闲线程
* 3、因为线程池状态为(线程池状态大于等于STOP | 线程池状态等于SHUTDOWN 且任务队列为空)最终调用addWorker失败,最终空闲线程都会在这儿退出
* 非空闲线程执行执行task抛出异常也会调用 tryTerminate,最终退出
*/
interruptIdleWorkers(ONLY_ONE);
return;
}
/**
* 执行到这里说明当前线程为线程池最后一个线程
* (1) 线程池处于STOP,TIDYING或者TERMINATED状态 (2) 线程池处于SHUTDOWN状态 并且 任务队列为空
* 满足这两种状态时 getTask被唤醒后会减少线程池线程数 所以上面最后一个线程执行workerCountOf(c) != 0时通过了
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//调用钩子方法
terminated();
} finally {
//设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination()方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}