构造方法
//五参数构造方法private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}//七参数构造方法public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();。。。。。。}
通过五参数构造方法可以得知
Executors.defaultThreadFactory()指定了默认的创建线程工程工厂。defaultHandler指定了默认的拒绝策略AbortPolicy。
另外,七个参数的分别含义为:
corePoolSize核心线程池数量。maximumPoolSize最大线程池数量。keepAliveTime线程闲置的超时时间。超时后,非核心线程就会被销毁。如果设置了allowCoreThreadTimeOut(true)核心线程闲置时间超过设置值后,也会被销毁。unit存活时间单位。workQueue线程池任务阻塞队列。ThreadFactory threadFactory创建线程的工厂。RejectedExecutionHandler handler当前队列已满,任务不能执行的时候的拒绝策略。 一共四种CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy
execute
//ThreadPoolExecutor.javapublic void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//首先判断线程池数量是否大于核心线程池数量if (workerCountOf(c) < corePoolSize) {//小于核心线程池数量,添加核心线程池。添加成功则 return 返回。if (addWorker(command, true))return;c = ctl.get();}//线程池数量大于核心线程。//判断线程池当前状态是否处于运行状态,如果是,则添加到阻塞队列 workQueue 里面。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次检查线程池状态,如果不是运行状态,则将 command 从 workQueue 中移除。if (!isRunning(recheck) && remove(command))//然后执行拒绝策略。reject(command);//如果是运行状态,但是线程池数量为 0,创建一个非核心线程。else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false)) //如果上面添加到队列失败,队列任务爆满,则通过添加一个非核心线程去执行这个任务。//任务执行失败,执行拒绝策略。reject(command);}
二次检查线程池的原因:
在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将
command加入workqueue是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command永远不会执行。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();//获取线程池状态int rs = runStateOf(c);// Check if queue empty only if necessary.//当线程池状态大于 SHUTDOWN 时,就不能继续执行新的任务。//当前状态为 SHUTDOWN 时,只有传入的任务为 null,并且队列不为空,才会继续执行任务。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//这里判断线程数是否达到了阈值。for (;;) {int wc = workerCountOf(c);//这里其实有一个隐形的线程数最大值 CAPACITY。//下面就是根据当前是否创建的是否是核心线程,来设定阈值来判断线程数是否超出。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//通过 CAS 来增加线程工作数量。成功就退出 retry 这个外部大循环。if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl/**上面 CAS 增加线程数没有成功,检测线程池状态是否发生了改变。*改变了:重新进行 retry 大循环。*未改变:继续内部循环,尝试 CAS 增加线程数。*CAS 增加线程数成功后,后续才能进行增加线程执行任务。**/if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建一个 worker,并将要执行的任务传进去w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//再次检测线程池状态,继续执行下去的条件是://1.线程池状态处于 RUNNING//2.或者处于SHUTDOWN 状态,但是阻塞队列不为空。if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//检测线程状态if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//将 worker 添加到集合中workers.add(w);int s = workers.size();//largestPoolSize 可以表示线程池达到的最大并发if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//执行线程。t.start();workerStarted = true;}}} finally {if (! workerStarted)//执行失败addWorkerFailed(w);}return workerStarted;}
在 addWorker 方法中
①首先检测了线程池状态,当线程池处于 RUNNING 或者 处于 SHUTDOWN 状态并且阻塞队列不为空时才继续进行下去。
②然后检测线程池中线程数量是否达到了阈值(阈值大小是根绝添加的线程是否是核心线程来决定的),达到了,返回 false。
③创建 Worker ,再次进行第①步的检查,符合条件,就将创建好的 Worker 添加到集合中。
④最后执行 Worker 中的 Thread。
下面看一下 Worker 内部是怎么工作的,是怎么创建线程的。最后再看一下如果执行失败,addWorkerFailed 内部逻辑是什么。
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...省略一些源码.../** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//这里 getThreadFactory() 获取的 ThreadFactory 就是通过//ThreadPoolExecutor 构造传入的 ThreadFactorythis.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}......}
可以看出 Worker 本身就实现了 Runnable 接口,并且通过 构造方法 来创建了一个 Thread ,并将自身作为参数传入了 Thread。
所以就看 run 方法中的 runWorker(this) 方法。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//这里获取 task 来执行。while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//这里又进行了线程池状态的判断//如果线程池状态不小于 STOP,就是处于 STOP,TIDYING,TERMINATED 这三个时,将线程中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//子类可以重写,执行任务之前做一些操作beforeExecute(wt, task);Throwable thrown = null;try {//这里是最终执行任务的地方。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//子类可以重写,执行任务之后做一些操作afterExecute(task, thrown);}} finally {task = null;//增加任务执行数w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//线程池没有队列,并且已经超时。关闭线程。processWorkerExit(w, completedAbruptly);}}
这里可以看出是有一个循环进行不断的取出任务来进行执行的。本次 task 执行完后,又通过 getTask() 取出任务来执行。从这里也可以看到线程复用的一面
getTask
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//这里又进行了线程池状态的判断//如果线程池处于 STOP 直接返回 null,//或者处于SHUTDOWN 但是阻塞队列workQueue 为空,也会返回 nullif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?//该线程是否要检测超时退出//allowCoreThreadTimeOut 是代表核心线程是不是要超时退出,或者线程数超过核心线程数。boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//同时满足下列两个条件时,返回 null ,上一层函数就退出循环。(runWorker 退出循环,线程结束。)//①线程数超过最大数,或者当前线程允许超时并且已经超时//②线程数大于 1,或者队列为空。(这个条件代表最后一个线程必须队列为空时才能退出)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//CAS 操作减少线程数if (compareAndDecrementWorkerCount(c))return null;continue;}try {//如果满足超时退出条件,就通过 poll 方法最多等待阻塞 keepAliveTime 时间来获取 Task//否则就通过 take 方法一直阻塞,直到有任务返回。//这里会响应中断,一旦线程池关闭,就打断阻塞状态。Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//超时,下次循环用来判断是否要返回 null,结束线程。timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
- getTask 中又一次检测了线程池状态。并检测线程池数量是否达到阈值。
根据是否要满足超时退出的条件来选择是 阻塞等待一段时间通过 poll 取任务,还是一直阻塞通过 take 来取任务。
processWorkerExit这里我们看如何结束线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//记录执行的任务数completedTaskCount += w.completedTasks;//将 worker 移除workers.remove(w);} finally {mainLock.unlock();}//线程池状态改变,尝试中止线程池tryTerminate();int c = ctl.get();//如果线程池状态是 RUNNING 或者 SHUTDOWNif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {//线程池中最小的数量取决于核心线程是否允许超时退出int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果队列还有任务,要至少留一个线程if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}//线程数量小于最小数,尝试添加线程执行任务addWorker(null, false);}}
在 addWorker 中如果添加 Worker 失败,则执行
addWorkerFailed方法private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)//移除 workerworkers.remove(w);//通过 CAS 减少 worker 数decrementWorkerCount();tryTerminate();} finally {mainLock.unlock();}}
总结


问答
线程池如何实现
总结就是这个问题的答案
非核心线程延迟死亡,如何实现
通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡
核心线程为什么不死
通过阻塞队列take(),让线程一直等待,直到获取到任务
如何释放核心线程
将allowCoreThreadTimeOut设置为true。
非核心线程能成为核心线程吗
线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。
Runnable在线程池里如何执行
线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。
线程数如何做选择
这就要看任务类型是计算密集型任务还是IO密集型任务了,区别在于CPU占用率。计算密集型任务涉及内存数据的存取,CPU处于忙绿状态,因此并发数相应要低一些。而IO密集型任务,因为外部设备速度不匹配问题,CPU更多是处于等待状态,因此可以把时间片分给其他线程,因此并发数可以高一些。
常见的不同类型的线程池的功效如何做到
常见的线程池有:
CachedThreadPool:适合异步任务多,但周期短的场景
FixedThreadPool: 适合有一定异步任务,周期较长的场景,能达到有效的并发状态
SingleThreadExecutor: 适合任务串行的场景
ScheduledThreadPool: 适合周期性执行任务的场景
对于如何选择线程池就要看具体的场景,其中的差异通过构造参数可以到达效果,通过之前的分析,就能知道参数的具体作用以及为什么能达到效果。取FixedThreadPool来看,抛砖引玉。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
nThreads个数量核心线程持续并发任务,没有非核心线程,如果没有任务,则通过take()阻塞等待,不允许核心线程死亡。并且阻塞队列为LinkedBlockingQueue,容量为Integer.MAX_VALUE,可以视为无界队列,更难走到拒绝添加线程逻辑。
