线程池
先看继承体系

顶层接口 Executor
public interface Executor {//仅定义一个执行线程的方法void execute(Runnable command);}
二级接口ExecutorService
/**
* 定义一些修改线程池状态的方法,以及执行线程的方法
*/
public interface ExecutorService extends Executor {
// 关闭线程池
void shutdown();
// 立即关闭线程池
List<Runnable> shutdownNow();
//是否关闭了了
boolean isShutdown();
//是否terminate了
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 有返回值的提交线程
<T> Future<T> submit(Callable<T> task);
// 重载有返回值的提交线程
<T> Future<T> submit(Runnable task, T result);
// 返回值为null的提交现场
Future<?> submit(Runnable task);
// 通过invoke形式执行所有线程 并有返回值,用的少
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 通过invoke形式执行带有超时的所有线程 并有返回值,用的少
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// invoke执行线程 一个成功就行了,用的少
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// invoke执行带有超时的线程 一个成功就行了,用的少
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
三级抽象类 AbstractExecutorService
重点关注一些方法~
// 将runbale及返回值封装为FutureTask对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 进行重载
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// submit提交的任务是具有返回值的,此返回值为null,封装成FutureTask,然后调用execute来真正执行
// RunnableFuture的子类是FutureTask
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// submit提交的任务是具有返回值的,此返回值为result,封装成FutureTask,然后调用execute来真正执行
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 重载,和上述方法一个意思
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 综上,可以看去执行线程都是通过execute,而返回值是你传啥就是啥
四个构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 最完全的线程池构造器,其余构造器均对此进行重载
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
/**
* 如果核心线程数小于0
* 如果最大线程数<=0
* 如果最大线程数小于核心线程数,此可以理解为最大线程数必须 >= 核心线程数
* 如果空闲存活时间 < 0
* 进行抛异常
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 如果队列为空 或者 线程工厂为孔 或者 异常处理类为空,进行抛异常
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 权限相关的东西, 算了就, 不耽误
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
先引入一下位运算小知识
| & | 与 | 都为1则是1 |
|---|---|---|
| | | 或 | 都是0则是0 |
| ^ | 异或 | 相同为0 不同为1 |
| ~ | 取反 | 0变1 1变0 |
| >> | 右移 | 低位补0 高位丢弃 |
| << | 左移 | 高位补0 低位丢弃 |
线程池状态
通过ctl来表示
// ctl = 1110 0000 0000 0000 0000 0000 0000 0000
// 高三位 表示 线程池的运行状态, 低29位表示线程池的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS = 29,ctl保存线程数量的 位
private static final int COUNT_BITS = Integer.SIZE - 3;
//(1 << COUNT_BITS) = 0010 0000 0000 0000 0000 0000 0000 0000
//(1 << COUNT_BITS) -1 = 0001 1111 1111 1111 1111 1111 1111 1111 -> 5亿多最大的线程数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
/**
* 对于一下5个运行状态 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED.
*/
//111 补29个0 -> 高位1,是个负数,接受新的任务,处理等待队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
//000 补29个0,不接受新的任务提交,但是会继续处理等待队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001 补29个0,不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
private static final int STOP = 1 << COUNT_BITS;
//010 补29个0,所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//011 补29个0,terminated() 方法结束后,线程池的状态变为此
private static final int TERMINATED = 3 << COUNT_BITS;
计算线程池状态
/**
* c = ctl
* 求线程池的状态
* CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
* ~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000
* 假设 c = 1110 0000 0000 0000 0000 0000 0000 1111
* & ~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000 -> RUNNING
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
计算线程池线程数量
/**
* 计算线程池有多少线程
* 假设 c: 1110 0000 0000 0000 0000 0000 0000 1111
* CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
* 0000 0000 0000 0000 0000 0000 0000 1111 -> 15个
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
初始化/计算 ctl 的值
/**
* 初始化 ctl 的值
* 如:
* private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
* RUNNING | 0
* RS -> RUNNING: 111 0 0000 0000 0000 0000 0000 0000 0000
* WC -> 0 : 000 0 0000 0000 0000 0000 0000 0000 0000
* 结果为111 0 0000 0000 0000 0000 0000 0000 0000 ,是个复数
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
围绕ctl计算的一些方法
/**
* c = ctl
* 求线程池的状态
* CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
* ~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000
* 假设 c = 1110 0000 0000 0000 0000 0000 0000 1111
* & ~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000 -> RUNNING
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
* 计算线程池有多少线程
* 假设 c: 1110 0000 0000 0000 0000 0000 0000 1111
* CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
* 0000 0000 0000 0000 0000 0000 0000 1111 -> 15个
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 初始化 ctl 的值
* 如:
* private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
* RUNNING | 0
* RS -> RUNNING: 111 0 0000 0000 0000 0000 0000 0000 0000
* WC -> 0 : 000 0 0000 0000 0000 0000 0000 0000 0000
* 结果为111 0 0000 0000 0000 0000 0000 0000 0000 ,是个复数
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
//线程池的运行状态是否小于某个状态值
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//线程池的运行状态是否大于等于某个状态值
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 线程状态是否是运行中,需要小于SHUTDOWN
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// CAS 将 ctl + 1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// CAS 将 ctl - 1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 强制保证ctl -1 成功
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
一些其它属性或者方法
// 用于存储 任务 的 任务队列,核心线程数在满了之后会提交至此队列
private final BlockingQueue<Runnable> workQueue; // 后续会通过offer()方法进行添加
// 操作线程池需要获取锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中的线程 真正存放的位置,将线程Thread封装为Worker对象
private final HashSet<Worker> workers = new HashSet<Worker>();
// 伴随 mainLock ,等待或唤醒锁的条件,其有await(),signal()等方法
private final Condition termination = mainLock.newCondition();
// 在获取此值的时刻,线程池中线程的最大值
private int largestPoolSize;
// 线程池中已经完成的任务数,完成的任务会累加上去
private long completedTaskCount;
// 线程工厂类
private volatile ThreadFactory threadFactory;
// 拒绝策略,可自己实现此接口,默认有4种实现
private volatile RejectedExecutionHandler handler;
/**
* 空闲线程的存活时间,空闲线程包括核心线程 + 非核心线程
* allowCoreThreadTimeOut 为 false 时,核心线程不动
* 为 true 时,核心线程也会被回收
*/
private volatile long keepAliveTime;
// 核心线程是否会超时,配合 keepAliveTime
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大的线程数,5亿多
private volatile int maximumPoolSize;
// 默认的拒绝策略。AbortPolicy -> 直接拒绝抛异常,策略有4种随后进行分析
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
queue的一些方法
| add | 添加一个元素 | 如果满了 抛异常 |
|---|---|---|
| offer | 添加一个元素 | 如果满了 返回false |
| element | 返回头部节点的值,但不移除 | 为空,则抛异常 |
| peek | 返回头部节点的值,但不移除 | 为空,返回null |
| remove | 返回头部节点的值,并移除 | 为空,抛异常 |
| poll | 返回头部节点的值,并移除 | 为空,返回null |
| put | 添加一个元素 | 队列满了 则阻塞 |
| take | 返回头部节点的值,并移除 | 为空,则阻塞 |
线程池的拒绝策略(ThreadPoolExecutor内部默认实现了4种)
- 拒绝策略顶层接口
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
AbortPolicy 默认的拒绝策略直接抛异常
``` public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2. CallerRunsPolicy 由线程池启动的线程进行执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池没有SHUTDOWN
if (!e.isShutdown()) {
//直接执行
r.run();
}
}
}
3. DiscardPolicy 直接丢弃啥也不干
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
// 啥也不做
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
4. DiscardOldestPolicy 直接丢弃最早的任务,慎用
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池状态不是SHUTDOWN
if (!e.isShutdown()) {
// 把头部的取出来不管
e.getQueue().poll();
// 用线程池去执行线程
e.execute(r);
}
}
}
<a name="12b6b713"></a>
#### 线程执行的核心方法 execute
// 线程池执行方法,submit()也是调用此execute方法, public void execute(Runnable command) { if (command == null) throw new NullPointerException();
// 先获取ctl线程池的状态,高3位表状态,低29表线程池数量
int c = ctl.get();
// 如果当前线程池的线程数量,小于核心线程数,则表示可以去新起核心线程
if (workerCountOf(c) < corePoolSize) {
// 将线程 封装 进Worker,以核心线程为边界
if (addWorker(command, true))
return;
// 如果封装进worker失败,表示可能出现了并发,达到了核心线程数 或者 线程池状态改变了不允许放入线程
//再次获取ctl线程池的状态
c = ctl.get();
}
//逻辑至此,首先可以明确的是核心线程数满了 或者 addWoker 失败了
// 线程是运行时状态 且 线程入队成功
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl的状态
int recheck = ctl.get();
// 如果线程状态被改变(如shutdown掉),不是运行状态了,就移除刚才入队的任务线程,此if逻辑是有可能false的
if (! isRunning(recheck) && remove(command))
// 直接走拒绝策略
reject(command);
//代码至此表示,1:线程池状态是running,2:线程池状态不是running,但是remove(command)失败
//检查线程池中的线程数量是否为0 ,如果为0说明没线程存活去干活了,搞一个以非核心线程作为边界创建个worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 代码至此 ,表明线程池非running状态不能添加任务 或者 队列满了入队失败 或者核心线程数已满
// 所以以非核心线程数作为边界去添加任务,如果也添加失败即表示maxiumPoolSize也已经满了,则抛出异常
else if (!addWorker(command, false))
reject(command);
}
<a name="3df54512"></a>
#### 线程执行的核心方法addWorker
//入队逻辑 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 自旋 for (;;) { // 获取线程池状态 int c = ctl.get(); // 获取线程池的运行状态 int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 运行状态>= 停止
* 运行状态是停止 && firstTask是空 && queue不是空 (此种情况允许创建worker)
*/
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池的线程数量
int wc = workerCountOf(c);
// 如果大于最大容量 或者 已经达到核心线程数 | 最大线程数,则不允许添加worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 此处为通过CAS 进行线程数量 + 1,成功则跳出循环,继续向下执行创建worker的逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
// cas进行线程数量+1失败了,再次获取ctl的值
c = ctl.get(); // Re-read ctl
// 判断是否与刚进来时拿到的rs不同,不同则表示线程池状态已经被修改
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 至此,判断条件已经全部通过了,要新建Worker了
boolean workerStarted = false; // worker是否启动
boolean workerAdded = false; // worker是否新增成功
Worker w = null;
try {
// 先新建个Worker给引用w
w = new Worker(firstTask);
// worker对象
final Thread t = w.thread;
// t为空的话,后续t.isAlive()/start()将报错
if (t != null) {
// 全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取线程池的状态rs
int rs = runStateOf(ctl.get());
//条件1: 如果rs小于SHUTDOWN表明为running状态,该干嘛干嘛
//条件2: rs == SHUTDOWN 且 firstTask 是空(此种情况允许创建worker)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//此时线程t本没有启动,如果启动了 说明出现了问题
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//一系列判断后,将新建的worker add到HashSet<Worker> workers对象中
workers.add(w);
// 重新记录下最大的线程的数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//worker 新增成功改其标志
workerAdded = true;
}
} finally {
//解全局锁
mainLock.unlock();
}
//成功则将worker 启动
if (workerAdded) {
//调用worker.run() -> runWorker()
t.start();
workerStarted = true;
}
}
} finally {
//如果worker添加失败,需将线程数量-1,因为上面+1了,还需将worker从workerSet中清除
if (! workerStarted)
addWorkerFailed(w);
}
// 创建worker成功正常启动 返回true
// 线程池状态 > SHUTDOWN 返回false
// 线程池状态 == SHUTDOWN,firstTask不为空,队列为空 返回false
// 线程数量已经达到核心和最大线程数 返回false
// ThreadFactory工厂类创建的线程为null 返回false
return workerStarted;
}
<a name="5b648905"></a>
#### 线程执行的核心方法 runWorker
final void runWorker(Worker w) { //当前线程对象 Thread wt = Thread.currentThread(); //取出worker对象中firstTask Runnable task = w.firstTask; //置为null w.firstTask = null; // 此操作为调用AQS -> release(1) -> : /**
* setExclusiveOwnerThread(null); //将独占线程设为null
* setState(0); //state置为0
*/
w.unlock(); // allow interrupts
// 是否突然完成了(正常、非正常)
boolean completedAbruptly = true;
try {
/**
* 1. (task == firstTask) != null
* 2. firstTask == null了,则调用getTask去queue的队首中获取任务 getTask()会调用take(),是阻塞的
* 如果为null 表示没有任务需要被执行了已经
*/
while (task != null || (task = getTask()) != null) {
// 调用worker的lock()方法,设置独占线程为当前线程并将state设为1, 即获取到锁
w.lock();
/**
* 1: 看ctl的状态是否是>=STOP的,STOP不在接受新的任务,也不再处理queue中的任务,中断正在执行的任务
* 2:(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
* 2.1 线程是被打断了的 注意interrupted()会在判断后重置中断标记,
* 2.2 看ctl的状态是否是>=STOP的,
* 3: 当前线程没有被中断
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 空方法 子类可实现
beforeExecute(wt, task);
// 任务执行异常的引用
Throwable thrown = null;
try {
// 执行线程
// submit执行的则结果封装为FutureTask
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空方法 子类可实现
afterExecute(task, thrown);
}
} finally {
// task 执行结束置为null
task = null;
// 执行的task数量+1
w.completedTasks++;
// worker处理完后 释放锁
w.unlock();
}
}
// 正常执行结束,getTask拿到的是null,正常退出
completedAbruptly = false;
} finally {
// 线程池退出逻辑
// 正常 |未正常执行结束,进行相应处理
processWorkerExit(w, completedAbruptly);
}
}
<a name="f97981bf"></a>
#### 线程执行的核心方法 getTask
// getTask()是从队列中获取任务 private Runnable getTask() { // timeOut = false.不会超时 boolean timedOut = false;
for (;;) {
// 获取ctl的值
int c = ctl.get();
// 获取线程池状态 rs
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 1.线程池状态>=SHUTDOWN
* 2.线程池状态 >= STOP 或者 队列为空
* 表示已经没有可以执行的任务了, 准备退出了
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程池数量 - 1
decrementWorkerCount();
return null;
}
/**
* 执行到此 说明 rs是running 或者 rs == SHUTDOWN,队列不是null
*/
// 获取线程池数量
int wc = workerCountOf(c);
/**
* 1. allowCoreThreadTimeOut == true 表示核心线程也会被回收,通过poll(),false则维护核心线程数
* 如果线程数量 > 核心线程数 表示所有线程会通过poll()来获取任务,poll()不到 线程准备退出
* 就是来判断超时机制,有此机制可以在queue里获取任务时进行超时(poll()方法) 否则一直阻塞等待任务(take()方法),
*
* 在自旋时,wc > corePoolSize 是可能出现false的
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1.1 按道理线程数量不该大于maximumPoolSize,因为addWorker()的逻辑已经进行了逻辑判断,
// 是因为有个setMaximumPoolSize()方法可以修改maximumPoolSize的值,继而会出现这种情况
// 1.2 (timed && timedOut) 后续代码中 通过poll()获取任务超时会将timedOut改为true
// 2.1 线程池中还有线程 当前线程可以被回收
// 2.2 wc == 1 而且队列为null,最后一个线程也可以退出
// 上述是在判断线程池是否可以退出
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 线程数 - 1,失败可能是并发下其他线程率先执行了,或者 线程池状态变化
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 代码至此,表示还能取任务
try {
// 获取任务允许超时的进行poll() 不超时的进行take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 成功获取进行返回
if (r != null)
return r;
// 当前线程出现了超时,继续自旋
timedOut = true;
} catch (InterruptedException retry) {
// 获取时出现异常,就自旋再获取
timedOut = false;
}
}
}
<a name="5d608156"></a>
#### 线程池退出逻辑
private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果是突然的异常退出,则将ctl的线程数量必须保证-1 if (completedAbruptly) // If abrupt, then workerCount wasn’t adjusted decrementWorkerCount();
// 全局锁
final ReentrantLock mainLock = this.mainLock;
// 枷锁
mainLock.lock();
try {
// 将worker完成的任务数汇总到completedTaskCount
completedTaskCount += w.completedTasks;
// 移除worker
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 改线程池状态为TERMINATE的钩子方法
tryTerminate();
// 获取ctl的值
int c = ctl.get();
// 如果小于STOP 状态, 即为 RUNNING , SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 如果是正常退出
if (!completedAbruptly) {
// min 表示线程池最小的线程数
// 如果没开启回收核心线程,则将现在的核心线程数赋给min,开启则置为0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 假设线程数是0了,但是还有任务未执行完,则留一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程数是大于等于 min 的,表示有线程去执行队列的剩下的任务
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.非正常退出,创建个worker出来作为补充
// 2.queue还有任务没执行完,也要创建一个
// 3.当前的线程数量 < 核心线程数, 维护其数量
addWorker(null, false);
}
}
<a name="9465f60d"></a>
#### 修改线程池状态为最终的TERMINATE
final void tryTerminate() { for (;;) { int c = ctl.get(); // 如果线程池状态是running 则不能修改 // 如果线程池状态已经是>=TIDYING了,表示马上就要变成TERMINATE了 //(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()) 表示线程池是SHUTDOWN,但是queue不是空 // 还有任务没搞完 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 已经没有线程需要执行了 if (workerCountOf(c) != 0) { // Eligible to terminate // ONLY_ONE == TRUE,只打断一个空闲worker interruptIdleWorkers(ONLY_ONE); return; }
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置线程池状态为TIDYING成功
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子类实现,终止时执行的方法
terminated();
} finally {
// 设为TERMINATE
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有等待线程
termination.signalAll();
}
return;
}
} finally {
// 释放锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
<a name="57af449d"></a>
#### 线程池SHUTDOWN
public void shutdown() { // 先拿全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查权限相关 checkShutdownAccess(); // 去改线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲的线程 interruptIdleWorkers(); // 子类实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试修改线程池装为Terminate tryTerminate(); }
private void advanceRunState(int targetState) { for (;;) { // 获取线程池状态 int c = ctl.get(); /**
* 条件一:判断当前线程池状态是否>=SHUTDOWN ,TRUE表示已经是SHUTDOWN,STOP,TIDYING,TERMINATION了
* 条件二:如果1为false(线程池状态为RUNNING),则需要改线程池状态,如果ctl状态没有变化,则修改其状态为ctlOf(targetState, workerCountOf(c))
* ctlOf(targetState, workerCountOf(c))
* SHUTDOWN -> 000 000000000000000000000000000
* 假设线程数量为 000 00000000000000000000000011 -> 5个
* | 运算 000 00000000000000000000000011 ->
* 就是SHUTDOWN了,不接受新的任务提交,但是会继续处理等待队列中的任务
*/
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// shutdown() 调用时,onlynoe默认传false,onlyone为true只中断一个???
private void interruptIdleWorkers(boolean onlyOne) {
// 需要获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历真正存放worker的set
for (Worker w : workers) {
//取出worker中真正的线程~
Thread t = w.thread;
// 如果worker还没被设置中断标记,并且尝试获取到了锁(tryLock()是有可能阻塞起来的,因为是通过CAS的)
if (!t.isInterrupted() && w.tryLock()) {
try {
// 将其中断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 释放锁
w.unlock();
}
}
// 当为false ,意在将workers全部中断,true就是中断一个就行
if (onlyOne)
break;
}
} finally {
//释放锁
mainLock.unlock();
}
}
<a name="093a744a"></a>
#### 线程池关闭方法 shuotDownNow
public List
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
```
