构造方法
//五参数构造方法
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//七参数构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
。。。。。。
}
通过五参数构造方法可以得知
Executors.defaultThreadFactory()
指定了默认的创建线程工程工厂。defaultHandler
指定了默认的拒绝策略AbortPolicy
。
另外,七个参数的分别含义为:
corePoolSize
核心线程池数量。maximumPoolSize
最大线程池数量。keepAliveTime
线程闲置的超时时间。超时后,非核心线程就会被销毁。如果设置了allowCoreThreadTimeOut(true)
核心线程闲置时间超过设置值后,也会被销毁。unit
存活时间单位。workQueue
线程池任务阻塞队列。ThreadFactory threadFactory
创建线程的工厂。RejectedExecutionHandler handler
当前队列已满,任务不能执行的时候的拒绝策略。 一共四种CallerRunsPolicy
、AbortPolicy
、DiscardPolicy
、DiscardOldestPolicy
execute
//ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//首先判断线程池数量是否大于核心线程池数量
if (workerCountOf(c) < corePoolSize) {
//小于核心线程池数量,添加核心线程池。添加成功则 return 返回。
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池数量大于核心线程。
//判断线程池当前状态是否处于运行状态,如果是,则添加到阻塞队列 workQueue 里面。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池状态,如果不是运行状态,则将 command 从 workQueue 中移除。
if (!isRunning(recheck) && remove(command))
//然后执行拒绝策略。
reject(command);
//如果是运行状态,但是线程池数量为 0,创建一个非核心线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //如果上面添加到队列失败,队列任务爆满,则通过添加一个非核心线程去执行这个任务。
//任务执行失败,执行拒绝策略。
reject(command);
}
二次检查线程池的原因:
在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将
command
加入workqueue
是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command
永远不会执行。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//当线程池状态大于 SHUTDOWN 时,就不能继续执行新的任务。
//当前状态为 SHUTDOWN 时,只有传入的任务为 null,并且队列不为空,才会继续执行任务。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//这里判断线程数是否达到了阈值。
for (;;) {
int wc = workerCountOf(c);
//这里其实有一个隐形的线程数最大值 CAPACITY。
//下面就是根据当前是否创建的是否是核心线程,来设定阈值来判断线程数是否超出。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过 CAS 来增加线程工作数量。成功就退出 retry 这个外部大循环。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
/*
*上面 CAS 增加线程数没有成功,检测线程池状态是否发生了改变。
*改变了:重新进行 retry 大循环。
*未改变:继续内部循环,尝试 CAS 增加线程数。
*CAS 增加线程数成功后,后续才能进行增加线程执行任务。
**/
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个 worker,并将要执行的任务传进去
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//再次检测线程池状态,继续执行下去的条件是:
//1.线程池状态处于 RUNNING
//2.或者处于SHUTDOWN 状态,但是阻塞队列不为空。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//检测线程状态
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将 worker 添加到集合中
workers.add(w);
int s = workers.size();
//largestPoolSize 可以表示线程池达到的最大并发
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//执行线程。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//执行失败
addWorkerFailed(w);
}
return workerStarted;
}
在 addWorker
方法中
①首先检测了线程池状态,当线程池处于 RUNNING
或者 处于 SHUTDOWN
状态并且阻塞队列不为空时才继续进行下去。
②然后检测线程池中线程数量是否达到了阈值(阈值大小是根绝添加的线程是否是核心线程来决定的),达到了,返回 false。
③创建 Worker
,再次进行第①步的检查,符合条件,就将创建好的 Worker
添加到集合中。
④最后执行 Worker
中的 Thread。
下面看一下 Worker
内部是怎么工作的,是怎么创建线程的。最后再看一下如果执行失败,addWorkerFailed
内部逻辑是什么。
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
...省略一些源码...
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//这里 getThreadFactory() 获取的 ThreadFactory 就是通过
//ThreadPoolExecutor 构造传入的 ThreadFactory
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
......
}
可以看出 Worker
本身就实现了 Runnable
接口,并且通过 构造方法 来创建了一个 Thread
,并将自身作为参数传入了 Thread
。
所以就看 run
方法中的 runWorker(this)
方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//这里获取 task 来执行。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//这里又进行了线程池状态的判断
//如果线程池状态不小于 STOP,就是处于 STOP,TIDYING,TERMINATED 这三个时,将线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//子类可以重写,执行任务之前做一些操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//这里是最终执行任务的地方。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//子类可以重写,执行任务之后做一些操作
afterExecute(task, thrown);
}
} finally {
task = null;
//增加任务执行数
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程池没有队列,并且已经超时。关闭线程。
processWorkerExit(w, completedAbruptly);
}
}
这里可以看出是有一个循环进行不断的取出任务来进行执行的。本次 task 执行完后,又通过 getTask()
取出任务来执行。从这里也可以看到线程复用的一面
getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//这里又进行了线程池状态的判断
//如果线程池处于 STOP 直接返回 null,
//或者处于SHUTDOWN 但是阻塞队列workQueue 为空,也会返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//该线程是否要检测超时退出
//allowCoreThreadTimeOut 是代表核心线程是不是要超时退出,或者线程数超过核心线程数。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//同时满足下列两个条件时,返回 null ,上一层函数就退出循环。(runWorker 退出循环,线程结束。)
//①线程数超过最大数,或者当前线程允许超时并且已经超时
//②线程数大于 1,或者队列为空。(这个条件代表最后一个线程必须队列为空时才能退出)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//CAS 操作减少线程数
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果满足超时退出条件,就通过 poll 方法最多等待阻塞 keepAliveTime 时间来获取 Task
//否则就通过 take 方法一直阻塞,直到有任务返回。
//这里会响应中断,一旦线程池关闭,就打断阻塞状态。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//超时,下次循环用来判断是否要返回 null,结束线程。
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- getTask 中又一次检测了线程池状态。并检测线程池数量是否达到阈值。
根据是否要满足超时退出的条件来选择是 阻塞等待一段时间通过 poll 取任务,还是一直阻塞通过 take 来取任务。
processWorkerExit
这里我们看如何结束线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//记录执行的任务数
completedTaskCount += w.completedTasks;
//将 worker 移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//线程池状态改变,尝试中止线程池
tryTerminate();
int c = ctl.get();
//如果线程池状态是 RUNNING 或者 SHUTDOWN
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//线程池中最小的数量取决于核心线程是否允许超时退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果队列还有任务,要至少留一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//线程数量小于最小数,尝试添加线程执行任务
addWorker(null, false);
}
}
在 addWorker 中如果添加 Worker 失败,则执行
addWorkerFailed
方法private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//移除 worker
workers.remove(w);
//通过 CAS 减少 worker 数
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
总结
问答
线程池如何实现
总结就是这个问题的答案
非核心线程延迟死亡,如何实现
通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡
核心线程为什么不死
通过阻塞队列take(),让线程一直等待,直到获取到任务
如何释放核心线程
将allowCoreThreadTimeOut设置为true。
非核心线程能成为核心线程吗
线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。
Runnable在线程池里如何执行
线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。
线程数如何做选择
这就要看任务类型是计算密集型任务还是IO密集型任务了,区别在于CPU占用率。计算密集型任务涉及内存数据的存取,CPU处于忙绿状态,因此并发数相应要低一些。而IO密集型任务,因为外部设备速度不匹配问题,CPU更多是处于等待状态,因此可以把时间片分给其他线程,因此并发数可以高一些。
常见的不同类型的线程池的功效如何做到
常见的线程池有:
CachedThreadPool:适合异步任务多,但周期短的场景
FixedThreadPool: 适合有一定异步任务,周期较长的场景,能达到有效的并发状态
SingleThreadExecutor: 适合任务串行的场景
ScheduledThreadPool: 适合周期性执行任务的场景
对于如何选择线程池就要看具体的场景,其中的差异通过构造参数可以到达效果,通过之前的分析,就能知道参数的具体作用以及为什么能达到效果。取FixedThreadPool来看,抛砖引玉。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads个数量核心线程持续并发任务,没有非核心线程,如果没有任务,则通过take()阻塞等待,不允许核心线程死亡。并且阻塞队列为LinkedBlockingQueue,容量为Integer.MAX_VALUE,可以视为无界队列,更难走到拒绝添加线程逻辑。