1、继承结构

2、线程池状态及线程数
线程池使用一个整数ctl表示线程池状态和线程数,高3位表示线程池的运行状态,除去高3位的低位表示当前线程池中拥有的线程数量
| 线程池状态 | 说明 | 状态转换 | 二进制 | 
|---|---|---|---|
| RUNNING | 能够接收新任务,以及对已添加的任务进行处理 | 线程池的初始化状态是RUNNING | -1左移29位 11100000 00000000 00000000 00000000 | 
| SHUTDOWN | 不接收新任务,但能处理已添加的任务 | 调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN | 0 左移29位 00000000 00000000 00000000 00000000 | 
| STOP | 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务 | 调用线程池的shutdownNow()接口时,线程池由(RUNNING 或者 SHUTDOWN ) -> STOP | 1 左移29位 00100000 00000000 00000000 00000000 | 
| TIDYING | 当所有的任务已终止,CTL记录线程数为0   | 
tryTerminate方法中如果线程池状态大于等于STOP或者线程池状态等于SHUTDOWN 且任务队列为空,并且线程池线程数为0,则将线程池状态修改为TIDYING | 2 左移29位 01000000 00000000 00000000 00000000 | 
| TERMINATED | 线程池彻底终止 | 线程池状态设置为TIDYING后,会执行terminated钩子方法,然后将线程池状态设置为 TERMINATED  | 
3 左移29位 01100000 00000000 00000000 00000000 | 
关于TIDYING状态详细说明
线程池修改为TIDYING状态的两种情况:
1、执行shutdown()或者shutdownNow()方法的外部线程修改,shutdown()方法中断所有空闲线程,shutdownnow()中断所有线程,之后执行tryTerminate            方法,如果所有工作线程被唤醒后将线程池线程数减1,则线程池线程数为0,外部线程抢到锁,设置线程池状态为TIDYING
2、执行shutdown()或者shutdownNow()方法的外部线程在执行tryTerminate方法时,线程池线程数不为0,之后唤醒的空闲线程执行tryTerminate方法时抢锁,设置线程池状态为TIDYING
通过一些位运算能够计算出线程的线程数和状态,并且还提供了通过线程数和状态计算ctl的方法
/*** 高3位表示线程池的运行状态* 除去高3位的低位: 表示当前线程池中拥有的线程数量*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));/*** 表示在ctl中,低COUNT_BITS位为存放线程数量的位*/private static final int COUNT_BITS = Integer.SIZE - 3;/*** 用于计算线程池状态的mask* 00011111 11111111 11111111 11111111*/private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;/*** 11100000 00000000 00000000 00000000* 运行中*/private static final int RUNNING = -1 << COUNT_BITS;/*** 00000000 00000000 00000000 00000000* 关闭 不接收新任务,但能处理已添加的任务*/private static final int SHUTDOWN = 0 << COUNT_BITS;/*** 00100000 00000000 00000000 00000000* 停止 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务*/private static final int STOP = 1 << COUNT_BITS;/*** 01000000 00000000 00000000 00000000* 所有的任务已终止*/private static final int TIDYING = 2 << COUNT_BITS;/*** 01100000 00000000 00000000 00000000* 线程池彻底终止*/private static final int TERMINATED = 3 << COUNT_BITS;/*** 获取线程池当前运行状态* COUNT_MASK : 00011111 11111111 11111111 11111111* ~COUNT_MASK : 11100000 00000000 00000000 00000000* @param c ctl* @return*/private static int runStateOf(int c) { return c & ~COUNT_MASK; }/*** 获取线程池线程数量* COUNT_MASK : 00011111 11111111 11111111 11111111* @param c ctl* @return*/private static int workerCountOf(int c) { return c & COUNT_MASK; }/*** 重置当前线程池的ctl值* rs 11100000 00000000 00000000 00000000 运行中* wc 00000000 00000000 00000000 00001010 10个线程* ctl 11100000 00000000 00000000 00001010* @param rs 线程池状态* @param wc 线程池线程数量* @return*/private static int ctlOf(int rs, int wc) { return rs | wc; }/*** 比较当前线程池ctl的状态是否小于某个状态s* 注:任何情况下 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED* @param c ctl* @param s 传入状态* @return*/private static boolean runStateLessThan(int c, int s) {return c < s;}/*** 比较当前线程池ctl的状态是否大于等于某个状态s* @param c ctl* @param s 传入状态* @return*/private static boolean runStateAtLeast(int c, int s) {return c >= s;}/*** 是否是RUNNING状态* @param c ctl* @param s 传入状态* @return*/private static boolean isRunning(int c) {return c < SHUTDOWN;}/*** 使用CAS增加线程数量*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** 使用CAS减少线程数量*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** 将线程数量-1 一定成功 底层用while(true)一直重试*/private void decrementWorkerCount() {ctl.addAndGet(-1);}
3、属性说明
/*** 任务缓存队列 线程池中的线程达到 核心线程数时,提交的任务就会提交到缓存队列,缓存队列满了就会创建线程数到最大线程数* 一般为 :* LinkedBlockingQueue 无界队列* ArrayBlockingQueue 有界队列*/private final BlockingQueue<Runnable> workQueue;/*** 线程池全局锁 增加减少线程时,修改线程池运行状态需要获取锁*/private final ReentrantLock mainLock = new ReentrantLock();/*** 线程池中真正存放工作线程的位置*/private final HashSet<Worker> workers = new HashSet<>();/*** 当外部线程调用awaitTermination()方法时,外部线程会等待当前线程池为Termination为止** termination.await()会将调用线程阻塞* termination.signalAll()会将阻塞线程依次唤醒** 实现原理: 将外部线程封装为WaitNode放入Condition队列中,外部线程会被park 阻塞* 当线程池处于Termination状态时,通过unpark将其唤醒*/private final Condition termination = mainLock.newCondition();/*** 记录线程池生命周期内线程数最大值*/private int largestPoolSize;/*** 记录线程池完成任务总数*/private long completedTaskCount;/*** 创建线程使用线程工厂 自定义线程名*/private volatile ThreadFactory threadFactory;/*** 拒绝策略 默认AbortPolicy* AbortPolicy:不处理,直接抛出异常。* CallerRunsPolicy:若线程池还没关闭,调用当前所在线程来运行任务,r.run()执行。* DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。* DiscardPolicy:不处理,丢弃掉,不抛出异常。*/private volatile RejectedExecutionHandler handler;/*** 空闲线程存活时间* allowCoreThreadTimeOut==true时 核心线程数量内线程也会被回收* allowCoreThreadTimeOut==false时 会维持核心线程数量内线程存活*/private volatile long keepAliveTime;/*** 控制核心线程数内的线程是否可以被回收* true 可以 false 不可以*/private volatile boolean allowCoreThreadTimeOut;/*** 核心线程数量*/private volatile int corePoolSize;/*** 最大线程数量*/private volatile int maximumPoolSize;/*** 默认拒绝策略*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
4、工作线程Worker
/*** Worker使用了AQS的独占模式* 独占模式两个重要属性: state 和 ExclusiveOwnerThread* state:* state==0 未被占用* state>0 被占用* state<0 初始化状态 该状态不能抢锁* ExclusiveOwnerThread:表示独占锁的线程*/private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** worker内部封装的工作线程*/final Thread thread;/*** 如果firstTask不为空 worker启动后优先执行firstTask,执行完firstTask后会到queue中获取任务*/Runnable firstTask;/** 记录当前worker完成任务数量 */volatile long completedTasks;// TODO: switch to AbstractQueuedLongSynchronizer and move// completedTasks into the lock word./*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {//设置AQS独占模式为初始状态 该状态不能抢锁setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//使用线程工厂创建线程 并以当前worker为runnablethis.thread = getThreadFactory().newThread(this);}/** worker启动时会执行run */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state./*** 当前worker的独占锁是否被独占* @return*/protected boolean isHeldExclusively() {return getState() != 0;}/*** 尝试去占用worker独占锁* @param unused* @return*/protected boolean tryAcquire(int unused) {//使用CAS修改AQS中的state 抢占成功则设置ExclusiveOwnerThread为当前线程if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}/*** 释放锁 外部不会调用 AQS内部调用* @param unused* @return*/protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}/*** 加锁失败时会阻塞线程,直到获取锁*/public void lock() { acquire(1); }/*** 尝试去加锁 不会阻塞* 如果当前锁是未持有状态,那么加锁成功后就会返回true 否则 直接返回false* @return*/public boolean tryLock() { return tryAcquire(1); }/*** 一般情况下调用unlock要保证当前线程是持有锁的* 特殊情况,当worker state == -1时 调用unlock表示初始化state 设置state=0* 启动worker之前会调用unlock方法*/public void unlock() { release(1); }/*** 返回当前worker的lock是否被占用* @return*/public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;/***线程启动状态 且 线程不为空 且没有被中断 则执行中断*/if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
5、重要方法
5.1、execute
execute为任务提交入口方法,如果当前线程池线程数量 小于核心线程数,则会创建新的worker线程,并将本次任务作为firstTask执行。如果核心线程数达到最大值,则将任务提交到任务队列,如果任务队列已满则尝试创建非核心线程。其中还包含了多处对线程池状态的校验,避免外部线程调用shutdown或者shutdownnow方法更改线程池状态,添加任务至任务队列前后都做了线程池是否RUNNING状态校验。
只有当线程池处于RUNNING状态的时候才可以提交任务,否则执行拒绝策略 ,如果线程池处于RUNNING状态,并且线程数达到最大线程数,任务队列满了,也会执行拒绝策略。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//判断当前线程池线程数量 是否小于 核心线程数if (workerCountOf(c) < corePoolSize) {/*** 创建新的worker,并将command作为firstTask* 参数1:任务对象* 参数2: true 创建核心线程 false 创建额外线程*/if (addWorker(command, true)){//如果创建新的worker线程成功 直接返回,addWorker中创建的新worker线程会将本次任务作为firstTask执行return;}/*** 创建核心线程失败* 可能原因:* 1、存在并发,当前线程和其他线程同时执行addWorker,但是其他线程先创建成功,并且核心线程数达到设置的核心线程数量* 2、当前线程池状态发生改变 线程池为非RUNNING状态 addWorker一般会失败* 特殊情况: SHUTDOWN状态下,firstTask == null 并且queue不为空 可能创建成功**/c = ctl.get();}/*** 执行到此处可能的情况:* 1、核心线程数达到设置的核心线程最大数量* 2、addWorker失败*//*** isRunning(c) 线程池处于运行状态* workQueue.offer(command) 把任务添加到任务队列中* true 任务添加到任务队列成功* false 任务添加到任务队列失败 可能任务队列满了*/if (isRunning(c) && workQueue.offer(command)) {//任务添加到任务队列成功 再次判断线程池状态int recheck = ctl.get();/*** 条件1:! isRunning(recheck)* true 线程池处于非RUNNING状态 任务提交到队列后线程池状态被外部线程修改(外部线程调用shutdown或者shutdownnow),需要删除提交任务* false:线程池处于RUNNING状态* 条件2:remove(command)* 前置条件: 线程池处于非RUNNING状态 需要删除提交任务* true 从队列中移除任务成功 任务提交到队列后,还未被处理* false 从队列中移除任务失败,任务提交到队列后,在外部线程调用shutdown或者shutdownnow之前就被线程池的工作线程处理了*/if (! isRunning(recheck) && remove(command)){//提交任务到队列后 线程池状态为非RUNNING了,并且移除任务成功,使用拒绝策略reject(command);}/*** 执行到此处可能的情况:* 1、线程池处于RUNNING状态* 2、线程池处于非RUNNING状态 但是移除提交任务时失败** workerCountOf(recheck) == 0 表明线程池线程数为0,此处为担保机制,避免线程池处于RUNNING状态但是无线程可用*/else if (workerCountOf(recheck) == 0)addWorker(null, false);}/*** 执行到此处可能的情况:* 1、当前线程池为非RUNNING 此时addWorker(command, false)一定会返回false* 2、线程数达到配置最大核心线程数,任务队列满了,添加任务到任务队列失败,如果当前线程数没达到最大线程数,尝试创建新worker线程*/else if (!addWorker(command, false)){/** 执行拒绝策略* 1、当前线程池为非RUNNING* 2、线程数达到配置最大核心线程数,任务队列满了,并且线程数达到最大线程数*/reject(command);}}
5.2、addWorker
addWorker 添加线程池工作线程:
- 线程池处于RUNNING状态
 
线程池核心线程数或者最大线程数未达到配置最大值,调用增加核心线程或者非核心线程。线程池核心线程数小于最大核心线程数时,提交线程池任务时调用addWorker
并给与初始任务,核心线程数达到配置值后,提交线程池任务时直接提交到任务队列,如果任务队列满了则调用addworker创建非核心线程并给与初始任务
- 线程池处于SHUTDOWN状态
 
1、外部线程调用shutdown,中断空闲线程。执行退出逻辑processWorkerExit,如果此时任务队列还有数据,则需要调用**addWorker(null, false)**方法维护最少线程数(allowCoreThreadTimeOut配置为true即可以回收核心线程,此时任务队列有任务最少线程为1,allowCoreThreadTimeOut配置为false则最少线程数为配置最大核心线程数)
2、外部线程调用shutdown,此时任务队列有任务,任务执行过程中线程异常退出,此时需要调用**addWorker(null, false)**方法维持线程数。

/*** 创建worker线程* @param firstTask 创建worker线程绑定的第一个任务* @param core 是否创建的核心线程* @return 创建worker是否成功* 创建worker失败情况:* 1、线程池状态大于SHUTDOWN,为STOP,TIDYING或者TERMINATED状态* 2、线程池状态为SHUTDOWN但是firstTask不为空 或者 线程池状态为SHUTDOWN但是任务队列为空* 3、线程池核心线程数或者最大线程数达到配置最大值* 4、开发人员实现的ThreadFactory创建线程为null* 创建worker成功情况:* 1、线程池状态为RUNNING状态且线程池核心线程数或者最大线程数未达到配置最大值* 2、线程池处于SHUTDOWN状态且任务队列不为空 firstTask为空,并且线程池核心线程数或者最大线程数未达到配置最大值*/private boolean addWorker(Runnable firstTask, boolean core) {//外层for循环判断线程池状态retry:for (int c = ctl.get();;) {/**判断当前线程池状态是否还允许添加工作线程* 条件1:runStateAtLeast(c, SHUTDOWN)* true 线程池处于非RUNNING状态, 为SHUTDOWN,STOP,TIDYING或者TERMINATED状态* false 线程池处于RUNNING状态* 条件2:runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty())* 线程池处于STOP,TIDYING或者TERMINATED状态 或者 firstTask不为null(addworker可能不是execute调用) 或者 任务队列为空***为了排除这个情况: 线程池处于SHUTDOWN状态 但是队列任务没有处理完 这个时候还是允许添加worker,但是不允许再次提交task** 返回false的情况:* 1、线程池状态为非RUNNING状态,线程池状态为STOP,TIDYING或者TERMINATED状态* 2、线程池状态为SHUTDOWN 但是firstTask不为空 或者 线程池状态为SHUTDOWN但是任务队列为空*/if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;//内层for循环CAS增加线程池线程数for (;;) {/*** 判断是否还能创建核心线程或者额外线程* core== true时* 判断当前线程数是否达到核心线程数* core== false时* 判断当前线程数是否达到最大线程数*/if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;/*** CAS增加线程数* true 线程数增加成功* false CAS冲突线程数增加失败* 1、其他线程也修改了线程数* 2、外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化*/if (compareAndIncrementWorkerCount(c)){//CAS成功 直接跳出外层循环到下面的创建workerbreak retry;}c = ctl.get();/*** CAS失败 增加线程数失败,两种可能* 1、CAS并发,其他线程也修改了线程数导致CAS失败* 2、外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化*//*** 判断线程池状态是否为非RUNNING状态* 验证第二种CAS失败的可能 外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化*/if (runStateAtLeast(c, SHUTDOWN)){//回到RETRY的下一步循环 让其去验证线程池状态continue retry;}//CAS失败由其他线程修改了线程数导致CAS失败 内层for循环再次尝试CAS增加线程数}}/*** 执行到此处说明允许创建线程*///表示创建的worker是否启动boolean workerStarted = false;//表示创建的worker是否添加到线程池boolean workerAdded = false;Worker w = null;try {//创建workerw = new Worker(firstTask);final Thread t = w.thread;//防止ThreadFactory存在bug,ThreadFactory是开发人员自己实现的if (t != null) {final ReentrantLock mainLock = this.mainLock;//持有全局锁 可能会阻塞 加锁后其他线程是无法更改线程池状态的mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();/***条件1:isRunning(c) 当前线程池是否处于RUNNING状态** 条件2:runStateLessThan(c, STOP) && firstTask == null* true:SHUTDOWN状态 并且 firstTask为空 (前置条件为当前线程池为非RUNNING状态)**/if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {//判断线程池线程是否是NEW状态 防止实现ThreadFactory的开发人员将线程启动了if (t.getState() != Thread.State.NEW)throw new IllegalThreadStateException();//把新建的worker放入线程池workers.add(w);//更改创建的worker是否添加到线程池的标识workerAdded = true;//记录线程池生命周期内线程数最大值int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;}} finally {//解锁mainLock.unlock();}/*** workerAdded* true 添加线程成功* false 在lock之前 外部线程更改了线程池状态导致添加失败*/if (workerAdded) {//将线程放入线程池后 启动线程t.start();workerStarted = true;}}} finally {/*** 如果添加线程失败 需要做清理工作* 1、减少增加的线程数* 2、将worker移出线程池线程集合* 3、*/if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
5.3、runWorker
任务线程执行任务的核心逻辑,如果firstTask不为空执行firstTask,否则去任务队列获取任务,如果获取任务成功则获取工作线程的全局锁执行任务,执行完毕后释放全局锁,
再次去任务队列阻塞获取任务。如果从阻塞队列获取的任务为null(多种原因,在getTask方法细讲) 或者任务执行过程中抛出异常,则执行线程退出逻辑。
/*** 新建worker线程的时候会把Worker自身传进去,thread.start方法时,线程启动后会调用Worker的run方法,Worker的run方法中调用了本方法runWorker(this)* @param w*/final void runWorker(Worker w) {//Worker对应的线程Thread wt = Thread.currentThread();//初始任务Runnable task = w.firstTask;w.firstTask = null;//当worker state == -1时 调用unlock表示初始化state 设置state=0 和 exclusiveOwnerThread == nullw.unlock(); // allow interrupts//表示是否突然退出boolean completedAbruptly = true;try {/*** 条件1: task != null firstTask不为null则直接执行循环体** 条件2: (task = getTask()) != null* getTask()是一个会阻塞的方法 如果getTask()返回null说明要执行退出逻辑* true 当前线程从任务队列中获取任务成功*/while (task != null || (task = getTask()) != null) {/*** worker加锁 设置独占锁为当前线程* 当线程池shutdown时时通过独占锁判断当前worker的状态 根据独占锁是否空闲判断当前worker是否在工作 worker处理完后会释放锁*/w.lock();/*** @TODO* 本质是强制刷新当前线程的中断标记位,因为可能上次执行task时,业务代码将当前线程的中断标记位设置为true 并且没有处理,此处强制刷新一下,不会影响到后面的task** 1、 线程池处于STOP ,TIDYING或者TERMINATED状态并且线程没有设置中断状态,此时要给线程一个中断信号* 2、 第一次判断线程池状态处于RUNNING或者SHUTDOWN状态后,外部线程调用shutdown或者shutdownnow修改了线程池状态,并且当前线程被中断,清除中断标记位后**/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行前钩子方法 留给子类去实现beforeExecute(wt, task);try {//执行任务task.run();//执行后钩子方法 留给子类去实现afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;//更新当前worker完成任务数量w.completedTasks++;//worker处理完一个任务后释放独占锁 正常情况下会回到getTask()获取任务w.unlock();}}/*** completedAbruptly 表明正常退出* 代码执行到此处的情况* getTask()返回null时 当前线程要执行退出逻辑*/completedAbruptly = false;} finally {// task.run()内部出现异常时 直接从 w.unlock()跳到该行processWorkerExit(w, completedAbruptly);}}
5.4、getTask
getTask是从任务队列阻塞获取任务,根据allowCoreThreadTimeOut配置和当前线程池线程情况决定阻塞获取任务是否设置超时时间,如果allowCoreThreadTimeOut配置为true,则核心线程数内线程也可以回收,如果allowCoreThreadTimeOut配置为false,可以回收非核心线程,如果当前线程可以回收,则设置阻塞获取任务的超时时间为线程池的最大空闲时间。超时后返回null,线程进入退出逻辑,被回收;当前线程无法回收则一直阻塞等待获取任务。
返回null的四种情况
  1、 线程池处于STOP,TIDYING或者TERMINATED状态
             2、线程池处于SHUTDOWN状态 并且 任务队列为空
             3、线程池的线程数量大于最大线程数量限制
              4、线程池中的线程数大于核心线程数 或者是 allowCoreThreadTimeOut配置为true 核心线程数内线程可以回收 时 获取任务超时返回null
/*** 获取任务队列的任务 返回null时 调用getTask()的线程会退出*什么情况下会返回null* 1、 线程池处于STOP,TIDYING或者TERMINATED状态* 2、线程池处于SHUTDOWN状态 并且 任务队列为空* 3、线程池的线程数量大于最大线程数量限制* 4、线程池中的线程数大于核心线程数 或者是 allowCoreThreadTimeOut配置为true 核心线程数内线程可以回收 时 获取任务超时返回null* @return*/private Runnable getTask() {//当前线程获取任务是否超时boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();/*** 条件1 runStateAtLeast(c, SHUTDOWN)* true: 线程池处于非RUNNING* 条件2 runStateAtLeast(c, STOP) || workQueue.isEmpty()* 线程池处于STOP,TIDYING或者TERMINATED状态 或者 任务队列为空** 条件成立两种可能:* 1、线程池处于STOP,TIDYING或者TERMINATED状态* 2、线程池处于SHUTDOWN状态 并且 任务队列为空*/if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {//执行退出逻辑 使用CAS减少线程数decrementWorkerCount();return null;}/*** 执行到这里的情况* 1、线程池处于RUNNING状态* 2、线程池处于SHUTDOWN状态 但是任务队列不为空*/int wc = workerCountOf(c);/*** timed ==true 当前线程获取task时是支持超时机制的 workQueue.poll设置超时时间 否则一直阻塞等待** 条件1: allowCoreThreadTimeOut 控制核心线程数内的线程是否可以被回收 true可以false不可以* 条件2:wc > corePoolSize 当前线程数是否大于核心线程数* 前置条件:allowCoreThreadTimeOut配置为false 核心线程数内线程不可以回收*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 条件1为true表示线程可以被回收** 条件1: wc > maximumPoolSize || (timed && timedOut)* 条件1.1 wc > maximumPoolSize* true 当前线程数大于最大线程数 外部线程可能调用setMaximumPoolSize方法将线程池的最大线程数设置的比初始化的小* 条件1.2 timed && timedOut* 当前线程获取task时是支持超时机制 并且当前线程上一次循环获取task已经超时** 条件2:wc > 1 || workQueue.isEmpty()* 前置条件:当前线程可以被回收,但是还需要进一步判断* 条件 2.1 wc > 1* true 说明当前线程池还有其他线程 当前线程可以直接被回收 返回null* 条件 2.2 workQueue.isEmpty()* 前置条件:wc = 1 本线程为最后一个线程* true 任务队列已经没有任务 最后一个线程也可以被回收 返回null***/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {/*** 将线程数-1* 当前CAS可能会失败,失败后再次循环上去,检查线程池状态,CAS减少线程数* 1、可能其他线程也退出回收* 2、线程池状态改变了*/if (compareAndDecrementWorkerCount(c))return null;continue;}try {//获取任务逻辑 根据是否支持超时机制的选择poll方法Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//当前线程获取任务超时了 走循环 检测到超时 返回nulltimedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
5.5、processWorkerExit
空闲线程或者异常线程退出逻辑
/**** @param w 当前要退出的worker* @param completedAbruptly 是否突然退出*/private void processWorkerExit(Worker w, boolean completedAbruptly) {/*** 条件成立 表明当前worker是发生异常退出的,task任务执行过程中向上抛出了异常*/if (completedAbruptly){//将线程数量-1decrementWorkerCount();}//获取全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//将当前线程完成的任务数累加到线程池完成的任务总数上completedTaskCount += w.completedTasks;//从线程池移除该线程workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();//当前线程池状态为RUNNING或者SHUTDOWNif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {//线程正常退出//线程池最低持有的线程数量 如果核心线程数内的线程是也可以被回收 则是 0 否则是核心线程数int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty()){//如果最小线程数可以为0 但是任务队列还有任务 线程池至少要留一个线程min = 1;}/*** 如果线程池线程数拥有足够的线程 线程退出 否则新添加一个线程*/if (workerCountOf(c) >= min)return;}/*** 执行该代码的情况* 1、当前线程在执行task时异常退出 此处新创建一个新worker加入线程池* 2、allowCoreThreadTimeOut配置为true,当前线程池状态为RUNNING或者SHUTDOWN,任务队列还有任务,线程池至少要留一个线程* 3、allowCoreThreadTimeOut配置为false,当前线程池状态为RUNNING或者SHUTDOWN,当前线程数量小于corePoolSize,需要维持核心线程数量*/addWorker(null, false);}}
5.6、shutdown
shutdown会先将线程池设置为SHUTDOWN状态,然后中断所有的空闲工作线程
public void shutdown() {final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {//判断有没有权限checkShutdownAccess();//设置线程池状态为SHUTDOWNadvanceRunState(SHUTDOWN);//中断空闲线程interruptIdleWorkers();//扩展使用onShutdown();} finally {mainLock.unlock();}tryTerminate();}
中断空闲线程方法逻辑
private void interruptIdleWorkers() {interruptIdleWorkers(false);}/**** @param onlyOne true 只中断一个线程 false中断所有线程*/private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//循环所有workerfor (Worker w : workers) {Thread t = w.thread;/*** 条件1: !t.isInterrupted()* true: 线程没有中断* 条件2:w.tryLock()* true:说明当前worker处于空闲状态,目前线程在queue.poll或者queue.take 阻塞中*/if (!t.isInterrupted() && w.tryLock()) {try {//给空闲worker一个中断信号 处于queue阻塞的线程会被唤醒 return null,执行退出逻辑t.interrupt();} catch (SecurityException ignore) {} finally {//释放线程独占锁w.unlock();}}if (onlyOne)break;}} finally {//释放全局锁mainLock.unlock();}}
5.7、shutdownnow
shutdownnow会先将线程池设置为STOP状态,然后中断所有的工作线程,最后导出未处理任务
public List<Runnable> shutdownNow() {//返回值List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;//获取全局锁mainLock.lock();try {checkShutdownAccess();//将线程池状态改为STOPadvanceRunState(STOP);//中断线程池中所有线程interruptWorkers();//导出未处理的tasktasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}
中断所有线程逻辑
private void interruptWorkers() {// assert mainLock.isHeldByCurrentThread();for (Worker w : workers)w.interruptIfStarted();}void interruptIfStarted() {Thread t;/***线程启动状态 且 线程不为空 且没有被中断 则执行中断*/if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
5.8、tryTerminate
尝试将线程池设置为TERMINATED
调用对象可能为两种:
1、外部线程调用shutdown或者shutdownnow ,调用tryTerminate
2、空闲工作线程退出或者异常工作线程退出,调用tryTerminate
final void tryTerminate() {for (;;) {int c = ctl.get();/*** 条件1: isRunning(c)* 线程池处于RUNNING状态* 条件2: runStateAtLeast(c, TIDYING)* 线程池状态大于等于TIDYING,说明有其他线程在执行TIDYING -> TERMINATED状态了* 条件3:runStateLessThan(c, STOP) && ! workQueue.isEmpty()* 线程池处于SHUTDOWN(此处执行前提条件就是 线程池不处于RUNNING状态) 且任务队列不为空*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))return;/**** 执行到此处的情况:* 1、线程池状态大于等于STOP* 2、线程池状态等于SHUTDOWN 且任务队列为空*///当前线程池线程数大于0if (workerCountOf(c) != 0) { // Eligible to terminate//中断一个空闲线程/*** 1、中断的空闲线程在getTask中 pool|task 阻塞,唤醒后,返回null* 2、在runWorker方法中执行退出逻辑,退出逻辑中会调用tryTerminate方法 唤醒下一个空闲线程* 3、因为线程池状态为(线程池状态大于等于STOP | 线程池状态等于SHUTDOWN 且任务队列为空)最终调用addWorker失败,最终空闲线程都会在这儿退出* 非空闲线程执行执行task抛出异常也会调用 tryTerminate,最终退出*/interruptIdleWorkers(ONLY_ONE);return;}/*** 执行到这里说明当前线程为线程池最后一个线程* (1) 线程池处于STOP,TIDYING或者TERMINATED状态 (2) 线程池处于SHUTDOWN状态 并且 任务队列为空* 满足这两种状态时 getTask被唤醒后会减少线程池线程数 所以上面最后一个线程执行workerCountOf(c) != 0时通过了*/final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//设置线程池状态为TIDYINGif (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {//调用钩子方法terminated();} finally {//设置线程池状态为TERMINATEDctl.set(ctlOf(TERMINATED, 0));//唤醒调用awaitTermination()方法的线程termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
