Executors提供了几个线程池的工厂方法
Executors是线程池的静态工厂,其提供了快捷创建线程池的静态方法
- newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。
- newSingleThreadExecutor: 创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
- newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在 60 秒后自动回收
- newScheduledThreadPool: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有延迟和周期性执行任务的功能,类似定时器。
ThreadpoolExecutor
ThreadpoolExecutor 有多个重载的构造方法,我们可以基于它最完整的构造方法来分析
先来解释一下每个参数的作用,稍后我们在分析源码的过程中再来详细了解参数的意义。
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
线程池初始化时是没有创建线程的,线程池里的线程的初始化与其他线程一样,但是在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。这样既节省了建立线程所造成的性能损耗,也可以让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。另外 keepAliveTime 为 0,也就是超出核心线程数量以外的线程空余存活时间
而这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限
这个线程池执行任务的流程如下:
- 线程数少于核心线程数,也就是设置的线程数时,新建线程执行任务
- 线程数等于核心线程数后,将任务加入阻塞队列
- 由于队列容量非常大,可以一直添加
执行完任务的线程反复去队列中取任务执行
用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程; 并且没有核心线程,非核心线程数无上限,但是每个空闲的时间只有 60 秒,超过后就会被回收。
它的执行流程如下:没有核心线程,直接向 SynchronousQueue 中提交任务
- 如果有空闲线程,就去取出任务执行;如果没有空闲线程,就新建一个
执行完任务的线程有 60 秒生存时间,如果在这个时间内可以接到新任务,就可以继续活下去,否则就被回收
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
可以和newFixedThreadPool类型一样设置并发数量。同时其还支持定时及周期任务执行,因此它常是用于定时任务打当中。调用schedule方法可设置定时任务。scheduleAtFixedRate 方法可设置周期任务。
线程池的实现原理分析
ThreadPoolExecutor 是线程池的核心,提供了线程池的实现。
源码分析
execute 方法提交执行的任务,不能为空
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution RejectedExecutionException是一个RuntimeException
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了
* 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展)
* 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
*/
int c = ctl.get();
/**
* 1、如果当前线程数少于corePoolSize
*/
if (workerCountOf(c) < corePoolSize) {
//addWorker()成功,返回
if (addWorker(command, true))
return;
/**
* 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
* 失败的原因可能是:
* 1、线程池已经shutdown,shutdown的线程池不再接收新任务
* 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
*/
c = ctl.get();
}
/**
* 2、如果线程池RUNNING状态,且入队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//再次校验位
/**
* 再次校验放入workerQueue中的任务是否能被执行
* 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
* 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
*/
//如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
//为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢??
//只保证有一个worker线程可以从queue中获取任务执行就行了??
//因为只要还有活动的worker线程,就可以消费workerQueue中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false); //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
//第二个参数为true代表占用corePoolSize,false占用maxPoolSize
}
/**
* 3、如果线程池不是running状态 或者 无法入队列
* 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
*/
else if (!addWorker(command, false))
reject(command);
}
执行流程
1、如果线程池当前线程数量少于corePoolSize,则addWorker(command, true)创建新worker线程,如创建成功返回,如没创建成功,则执行后续步骤;
addWorker(command, true)失败的原因可能是:
A、线程池已经shutdown,shutdown的线程池不再接收新任务
B、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
2、如果线程池还在running状态,将task加入workQueue阻塞队列中,如果加入成功,进行double-check,如果加入失败(可能是队列已满),则执行后续步骤;
double-check主要目的是判断刚加入workQueue阻塞队列的task是否能被执行
A、如果线程池已经不是running状态了,应该拒绝添加新任务,从workQueue中删除任务
B、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
3、如果线程池不是running状态 或者 无法入队列,尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前commandctl的作用(原子)
在线程池中,ctl 贯穿在线程池的整个生命周期中
它是一个原子类,主要作用是用来保存线程数量和线程池的状态。
一个 int 数值是 32 个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//-536870912
private static final int COUNT_BITS = Integer.SIZE - 3;
// 接收新任务,并执行队列中的任务
private static final int RUNNING = -1 << COUNT_BITS; //-1向左移动29位 1110 0000 0000 0000 0000 0000 0000 0000
// 不接收新任务,但是执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0000 0000 0000 0000 0000 0000 0000 0000
// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
private static final int STOP = 1 << COUNT_BITS; //0010 0000 0000 0000 0000 0000 0000 0000
//所有的任务都已结束,线程数量为 0,处于该状态的线程池即将调用 terminated()方法
private static final int TIDYING = 2 << COUNT_BITS; //0100 0000 0000 0000 0000 0000 0000 0000
// terminated()方法执行完成
private static final int TERMINATED = 3 << COUNT_BITS;//0110 0000 0000 0000 0000 0000 0000 0000
/**
CAPACITY:00011111111111111111111111111111
~CAPACITY:11100000000000000000000000000000
**/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; } // | 如果对应位都是0,则为0 否则为1
//取c的低位 c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量
private static int workerCountOf(int c) { return c & CAPACITY; } // & 如果对应位都是1,则为1 否则为0
//取c的高位 c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
addWorker —> 添加worker线程
如果工作线程数小于核心线程数的话,会调用 addWorker,顾名思义,其实就是要创建一个工作线程。
该方法做了两件事:
- 采用循环 CAS 操作来将线程数加 1;
新建一个线程并启用。
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread#start), we roll back cleanly.
* 检查根据当前线程池的状态和给定的边界(core or maximum)是否可以创建一个新的worker
* 如果是这样的话,worker的数量做相应的调整,如果可能的话,创建一个新的worker并启动,参数中的firstTask作为worker的第一个任务
* 如果方法返回false,可能因为pool已经关闭或者调用过了shutdown
* 如果线程工厂创建线程失败,也会失败,返回false
* 如果线程创建失败,要么是因为线程工厂返回null,要么是发生了OutOfMemoryError
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass(绕开) queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
// firstTask: worker线程的初始任务,可以为空
// core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限
private boolean addWorker(Runnable firstTask, boolean core) {
//goto 语句,避免死循环
retry:
//外层循环,负责判断线程池状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //状态
// Check if queue empty only if necessary.
/**
* 线程池的运行状态,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
* 1、如果线程池state已经至少是shutdown状态了
* 2、并且以下3个条件任意一个是false
* rs == SHUTDOWN (隐含:rs>=SHUTDOWN)false情况: 线程池状态已经超过shutdown,可能是stop、tidying、terminated其中一个,即线程池已经终止
* firstTask == null (隐含:rs==SHUTDOWN)false情况: firstTask不为空,rs==SHUTDOWN 且 firstTask不为空,return false,场景是在线程池已经shutdown后,还要添加新的任务,拒绝
* ! workQueue.isEmpty() (隐含:rs==SHUTDOWN,firstTask==null)false情况: workQueue为空,当firstTask为空时是为了创建一个没有任务的线程,再从workQueue中获取任务,如果workQueue已经为空,那么就没有添加新worker线程的必要了
* return false,即无法addWorker()
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内层循环,负责worker数量+1
for (;;) {
int wc = workerCountOf(c); //worker数量
//如果worker数量>线程池最大上限CAPACITY(即使用int低29位可以容纳的最大值)
//或者( worker数量>corePoolSize 或 worker数量>maximumPoolSize ),即已经超过了给定的边界
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//调用unsafe CAS操作,使得worker数量+1,成功则跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS worker数量+1失败,再次读取ctl
c = ctl.get(); // Re-read ctl
//如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// else CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
}
}
/**
* worker数量+1成功的后续操作
* 添加到workers Set集合,并启动worker线程
*/
//工作线程是否启动的标识
boolean workerStarted = false;
//工作线程是否已经添加成功的标识
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask); //1、设置worker这个AQS锁的同步状态state=-1
//2、将firstTask设置给worker的成员变量firstTask
//3、使用worker自身这个runnable,调用ThreadFactory创建一个线程,并设置给worker的成员变量thread
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
//--------------------------------------------这部分代码是上锁的
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 当获取到锁后,再次检查
int c = ctl.get();
int rs = runStateOf(c);
//如果线程池在运行running<shutdown 或者 线程池已经shutdown,且firstTask==null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
//worker数量-1的操作在addWorkerFailed()
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//任务刚封装到 work 里面,还没 start,你封装的线程就是 alive
if (t.isAlive()) // precheck that t is startable 线程已经启动,抛非法线程状态异常
throw new IllegalThreadStateException();
workers.add(w);//workers是一个HashSet<Worker>
//设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
//--------------------------------------------
}
finally {
mainLock.unlock();
}
//如果往HashSet中添加worker成功,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果启动线程失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker(null, true) :这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行。
执行流程:
1、判断线程池当前是否为可以添加worker线程的状态,可以则继续下一步,不可以return false:
A、线程池状态>shutdown,可能为stop、tidying、terminated,不能添加worker线程
B、线程池状态==shutdown,firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务
C、线程池状态==shutdown,firstTask==null,workQueue为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取task,而workQueue为空,说明添加无任务线程已经没有意义
2、线程池当前线程数量是否超过上限(corePoolSize 或 maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步
3、在线程池的ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁,并启动worker线程,如果这一切都成功了,return true,如果添加worker入Set失败或启动失败,调用addWorkerFailed()逻辑。Worker内部类
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*
* Worker类大体上管理着运行线程的中断状态 和 一些指标
* Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁
* 这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程
* 解释:
* 为什么不直接执行execute(command)提交的command,而要在外面包一层Worker呢??
* 主要是为了控制中断
* 用什么控制??
* 用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁
* 只有在等待从workQueue中获取任务getTask()时才能中断
* worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁
* 因为我们不想让在调用比如setCorePoolSize()这种线程池控制方法时可以再次获取锁(重入)
* 解释:
* setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock()
* 如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:
* setMaximumPoolSize()
* setKeppAliveTime()
* allowCoreThreadTimeOut()
* shutdown()
* 此外,为了让线程真正开始后才可以中断,初始化lock状态为负值(-1),在开始runWorker()时将state置为0,而state>=0才可以中断
*
*
* Worker继承了AQS,实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread; //利用ThreadFactory和 Worker这个Runnable创建的线程对象
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
setState(-1); // inhibit interrupts until runWorker
// 在调用runWorker()前,禁止interrupt中断,在interruptIfStarted()方法中会判断 getState()>=0
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //根据当前worker创建一个线程对象
//当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,而是调用当前worker.run()时调用firstTask.run()
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this); //runWorker()是ThreadPoolExecutor的方法
}
// Lock methods
//
// The value 0 represents the unlocked state. 0代表“没被锁定”状态
// The value 1 represents the locked state. 1代表“锁定”状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 尝试获取锁
* 重写AQS的tryAcquire(),AQS本来就是让子类来实现的
*/
protected boolean tryAcquire(int unused) {
//尝试一次将state从0设置为1,即“锁定”状态,但由于每次都是state 0->1,而不是+1,那么说明不可重入
//且state==-1时也不会获取到锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread()); //设置exclusiveOwnerThread=当前线程
return true;
}
return false;
}
/**
* 尝试释放锁
* 不是state-1,而是置为0
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 中断(如果运行)
* shutdownNow时会循环对worker线程执行
* 且不需要获取worker锁,即使在worker运行时也可以中断
*/
void interruptIfStarted() {
Thread t;
//如果state>=0、t!=null、且t没有被中断
//new Worker()时state==-1,说明不能中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
- 每个 worker,都是一条线程,同时里面包含了一个 firstTask,即初始化时要被首先执行的任务.
- 最终执行任务的,是 runWorker()方法
Worker 类继承了 AQS,并实现了 Runnable 接口,注意其中的 firstTask 和 thread 属性:
firstTask 用它来保存传入的任务;thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要传入任务,这里通过 getThreadFactory().newThread(this);来新建一个线程,newThread 方法传入的参数是 this,因为 Worker 本身继承了 Runnable 接口,也就是一个线程,所以一个 Worker 对象在启动的时候会调用 Worker 类中的 run 方法。
Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:
lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;那么它会有以下几个作用
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态
- 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程
Worker控制中断主要有以下几方面:
1、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断
不允许中断体现在:
A、shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state==-1时tryLock()失败,没发interrupt()
B、shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>0才能interrupt的逻辑
2、为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程
Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法
Worker和Task的区别:
Worker是线程池中的线程,而Task虽然是runnable,但是并没有真正执行,只是被Worker调用了run方法,后面会看到这部分的实现。
addWorkerFailed
addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
这个方法主要做两件事
- 如果 worker 已经构造好了,则从 workers 集合中移除这个 worker
- 原子递减核心线程数(因为在 addWorker 方法中先做了原子增加)
-
runWorker() — 执行任务
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
* 重复的从队列中获取任务并执行,同时应对一些问题:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
* 我们可能使用一个初始化任务开始,即firstTask为null
* 然后只要线程池在运行,我们就从getTask()获取任务
* 如果getTask()返回null,则worker由于改变了线程池状态或参数配置而退出
* 其它退出因为外部代码抛异常了,这会使得completedAbruptly为true,这会导致在processWorkerExit()方法中替换当前线程
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and
* clearInterruptsForTaskRun called to ensure that unless pool is
* stopping, this thread does not have its interrupt set.
* 在任何任务执行之前,都需要对worker加锁去防止在任务运行时,其它的线程池中断操作
* clearInterruptsForTaskRun保证除非线程池正在stoping,线程不会被设置中断标示
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
* 每个任务执行前会调用beforeExecute(),其中可能抛出一个异常,这种情况下会导致线程die(跳出循环,且completedAbruptly==true),没有执行任务
* 因为beforeExecute()的异常没有cache住,会上抛,跳出循环
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to
* afterExecute. We separately handle RuntimeException, Error
* (both of which the specs guarantee that we trap) and arbitrary
* Throwables. Because we cannot rethrow Throwables within
* Runnable.run, we wrap them within Errors on the way out (to the
* thread's UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
* 假定beforeExecute()正常完成,我们执行任务
* 汇总任何抛出的异常并发送给afterExecute(task, thrown)
* 因为我们不能在Runnable.run()方法中重新上抛Throwables,我们将Throwables包装到Errors上抛(会到线程的UncaughtExceptionHandler去处理)
* 任何上抛的异常都会导致线程die
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
* 任务执行结束后,调用afterExecute(),也可能抛异常,也会导致线程die
* 根据JLS Sec 14.20,这个异常(finally中的异常)会生效
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用
//Worker 类的 tryRelease()方法,将 state 置为 0,
//而 interruptIfStarted()中只有 state>=0 才允许调用中断
w.unlock(); // allow interrupts
// new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0, 而interruptIfStarted()中只有state>=0才允许调用中断
boolean completedAbruptly = true; //是否“突然完成”,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
try {
/**
* 如果task不为null,或者从阻塞队列中getTask()不为null
*/
while (task != null || (task = getTask()) != null) {
w.lock(); //上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的worker
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/**
* clearInterruptsForTaskRun操作
* 确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
* 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
* 2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
* 是,再次设置中断标示,wt.interrupt()
* 否,不做操作,清除中断标示后进行后续步骤
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //当前线程调用interrupt()中断
try {
//执行前(子类实现)
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
}
catch (RuntimeException x) {
thrown = x; throw x;
}
catch (Error x) {
thrown = x; throw x;
}
catch (Throwable x) {
thrown = x; throw new Error(x);
}
finally {
//执行后(子类实现)
afterExecute(task, thrown); //这里就考验catch和finally的执行顺序了,因为要以thrown为参数
}
}
finally {
//置空任务(这样下次循环开始时,task 依然为 null,需要再通过 getTask()取) + 记录该 Worker 完成任务数量 + 解锁
task = null; //task置为null
w.completedTasks++; //完成任务数+1
w.unlock(); //解锁
}
}
completedAbruptly = false;
}
finally {
//处理worker的退出
//1.将入参 worker 从数组 workers 里删除掉;
//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组workers
processWorkerExit(w, completedAbruptly);
}
}
runWorker 方法,这个方法主要做几件事
如果 task 不为空,则开始执行 task
- 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
- 执行完毕后,通过 while 循环继续 getTask()取任务
如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
执行流程:
1、Worker线程启动后,通过Worker类的run()方法调用runWorker(this)
2、执行任务之前,首先worker.unlock(),将AQS的state置为0,允许中断当前worker线程
3、开始执行firstTask,调用task.run(),在执行任务前会上锁wroker.lock(),在执行完任务后会解锁,为了防止在任务运行时被线程池一些中断操作中断
4、在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法
5、无论在beforeExecute()、task.run()、afterExecute()发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程
6、如正常执行完当前task后,会通过getTask()从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程.getTask() — 获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {//自旋
int c = ctl.get();
int rs = runStateOf(c);
/* 对线程池状态的判断,两种情况会 workerCount-1,并且返回 null
1. 线程池状态为 shutdown,且 workQueue 为空(反映了 shutdown 状态的线程池还是要执行 workQueue 中剩余的任务的)
2. 线程池状态为 stop(shutdownNow()会导致变成 STOP)(此时不用考虑 workQueue的情况)*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;//返回 null,则当前 worker 线程会退出
}
int wc = workerCountOf(c);
// timed 变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*1. 线程数量超过 maximumPoolSize 可能是线程池在运行时被调用了 setMaximumPoolSize()
被改变了大小,否则已经 addWorker()成功不会超过 maximumPoolSize
2. timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中
获取任务发生了超时.其实就是体现了空闲线程的存活时间*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在
keepaliveTime 时间内没有获取到任务,则返回 null.
否则通过 take 方法阻塞式获取队列中的任务*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理
return r;
timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收
} catch (InterruptedException retry) {
timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
}
}
}
这里重要的地方是第二个 if 判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。
什么时候会销毁?当然是 runWorker 方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。
getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行processWorkerExit 方法。执行流程:
1、首先判断是否可以满足从workQueue中获取任务的条件,不满足return null
A、线程池状态是否满足:
(a)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
(b)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
B、线程数量是否超过maximumPoolSize 或 获取任务是否超时
(a)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize
(b)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
2、如果满足获取任务条件,根据是否需要定时获取调用不同方法:
A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
3、在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程processWorkerExit() — worker线程退出
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
// worker: 要结束的worker
// completedAbruptly: 是否突然完成(是否因为异常退出)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1、worker数量-1
* 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
* 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊
decrementWorkerCount();
/**
* 2、从Workers Set中移除worker
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数
workers.remove(w); //从HashSet<Worker>中移除
} finally {
mainLock.unlock();
}
/**
* 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
* 主要是判断线程池是否满足终止的状态
* 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
* 没有线程了,更新状态为tidying->terminated
*/
tryTerminate();
/**
* 4、是否需要增加worker线程
* 线程池状态是running 或 shutdown
* 如果当前线程是突然终止的,addWorker()
* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
* 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
*/
int c = ctl.get();
//如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker
if (runStateLessThan(c, STOP)) {
//不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize
//如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//添加一个没有firstTask的worker
//只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
addWorker(null, false);
}
}
执行流程:
1、worker数量-1
A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
2、从Workers Set中移除worker,删除时需要上锁mainlock
3、tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
判断线程池是否满足终止的状态
A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
B、没有线程了,更新状态为tidying->terminated
4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
线程池状态是running 或 shutdown
A、如果当前线程是突然终止的,addWorker()
B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程拒绝策略 interface RejectedExecutionHandler -> static class RejectHandler
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略。
四种策略:线程池的默认拒绝策略为AbortPolicy,即丢弃任务并抛出RejectedExecutionException异常ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
终止线程池原理
shutdown()
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 开始一个有序的关闭,在关闭中,之前提交的任务会被执行(包含正在执行的,在阻塞队列中的),但新任务会被拒绝
* 如果线程池已经shutdown,调用此方法不会有附加效应
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* 当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //上锁
try {
//判断调用者是否有权限shutdown线程池
checkShutdownAccess();
//CAS+循环设置线程池状态为shutdown
advanceRunState(SHUTDOWN);
//中断所有空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
}
finally {
mainLock.unlock(); //解锁
}
//尝试终止线程池
tryTerminate();
}
执行流程:
1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
2、判断调用者是否有权限shutdown线程池
3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务
4、中断所有空闲线程 interruptIdleWorkers()
5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
6、解锁
7、尝试终止线程池 tryTerminate()
shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown、中断所有空闲线程、tryTerminated()尝试终止线程池interruptIdleWorkers()
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
* 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case(以免) all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*
* onlyOne如果为true,最多interrupt一个worker
* 只有当终止流程已经开始,但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用
* (终止流程已经开始指的是:shutdown状态 且 workQueue为空,或者 stop状态)
* 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待
* 为保证线程池最终能终止,这个操作总是中断一个空闲worker
* 而shutdown()中断所有空闲worker,来保证空闲线程及时退出
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //上锁
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock(); //解锁
}
}
interruptIdleWorkers() 首先会获取mainLock锁,因为要迭代workers set,在中断每个worker前,需要做两个判断:
1、线程是否已经被中断,是就什么都不做
2、worker.tryLock() 是否成功
第二个判断比较重要,因为Worker类除了实现了可执行的Runnable,也继承了AQS,本身也是一把锁,具体可见 ThreadPoolExecutor内部类Worker解析tryLock()
调用了Worker自身实现的tryAcquire()方法,这也是AQS规定子类需要实现的尝试获取锁的方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire()先尝试将AQS的state从0—>1,返回true代表上锁成功,并设置当前线程为锁的拥有者
可以看到compareAndSetState(0, 1)只尝试了一次获取锁,且不是每次state+1,而是0—>1,说明锁不是可重入的
在runWorker()方法中每次获取到task,task.run()之前都需要worker.lock()上锁,运行结束后解锁,即正在运行任务的工作线程都是上了worker锁的
在interruptIdleWorkers()中断之前需要先tryLock()获取worker锁,意味着正在运行的worker不能中断,因为worker.tryLock()失败,且锁是不可重入的
故shutdown()只有对能获取到worker锁的空闲线程(正在从workQueue中getTask(),此时worker没有加锁)发送中断信号
由此可以将worker划分为: 1、空闲worker:正在从workQueue阻塞队列中获取任务的worker 2、运行中worker:正在task.run()执行任务的worker |
---|
正阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务
捕获中断异常后,将继续循环到getTask()最开始的判断线程池状态的逻辑,当线程池是shutdown状态,且workQueue.isEmpty时,return null,进行worker线程退出逻辑
某些情况下,interruptIdleWorkers()时多个worker正在运行,不会对其发出中断信号,假设此时workQueue也不为空
那么当多个worker运行结束后,会到workQueue阻塞获取任务,获取到的执行任务,没获取到的,如果还是核心线程,会一直workQueue.take()阻塞住,线程无法终止,因为workQueue已经空了,且shutdown后不会接收新任务了
这就需要在shutdown()后,还可以发出中断信号
Doug Lea大神巧妙的在所有可能导致线程池产终止的地方安插了tryTerminated()尝试线程池终止的逻辑,并在其中判断如果线程池已经进入终止流程,没有任务等待执行了,但线程池还有线程,中断唤醒一个空闲线程
tryTerminated()方法
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*
* 在以下情况将线程池变为TERMINATED终止状态
* shutdown 且 正在运行的worker 和 workQueue队列 都empty
* stop 且 没有正在运行的worker
*
* 这个方法必须在任何可能导致线程池终止的情况下被调用,如:
* 减少worker数量
* shutdown时从queue中移除任务
*
* 这个方法不是私有的,所以允许子类ScheduledThreadPoolExecutor调用
*/
final void tryTerminate() {
//这个for循环主要是和进入关闭线程池操作的CAS判断结合使用的
for (;;) {
int c = ctl.get();
/**
* 线程池是否需要终止
* 如果以下3中情况任一为true,return,不进行终止
* 1、还在运行状态
* 2、状态是TIDYING、或 TERMINATED,已经终止过了
* 3、SHUTDOWN 且 workQueue不为空
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* 只有shutdown状态 且 workQueue为空,或者 stop状态能执行到这一步
* 如果此时线程池还有线程(正在运行任务,正在等待任务)
* 中断唤醒一个正在等任务的空闲worker
* 唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程
*/
if (workerCountOf(c) != 0) { // Eligible to terminate 资格终止
interruptIdleWorkers(ONLY_ONE); //中断workers集合中的空闲任务,参数为true,只中断一个
return;
}
/**
* 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); //需子类实现
}
finally {
ctl.set(ctlOf(TERMINATED, 0)); //将线程池的ctl变成TERMINATED
termination.signalAll(); //唤醒调用了 等待线程池终止的线程 awaitTermination()
}
return;
}
}
finally {
mainLock.unlock();
}
// else retry on failed CAS
// 如果上面的CAS判断false,再次循环
}
}
执行流程:
1、判断线程池是否需要进入终止流程(只有当shutdown状态+workQueue.isEmpty 或 stop状态,才需要)
2、判断线程池中是否还有线程,有则 interruptIdleWorkers(ONLY_ONE) 尝试中断一个空闲线程(正是这个逻辑可以再次发出中断信号,中断阻塞在获取任务的线程)
3、如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
会先上锁,将线程池置为tidying状态,之后调用需子类实现的 terminated(),最后线程池置为terminated状态,并唤醒所有等待线程池终止这个Condition的线程
shutdownNow()
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
* 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
* 这个任务列表是从任务队列中排出(删除)的
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
* 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
* 除了尽力尝试停止运行中的任务,没有任何保证
* 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //上锁
try {
//判断调用者是否有权限shutdown线程池
checkShutdownAccess();
//CAS+循环设置线程池状态为stop
advanceRunState(STOP);
//中断所有线程,包括正在运行任务的
interruptWorkers();
tasks = drainQueue(); //将workQueue中的元素放入一个List并返回
}
finally {
mainLock.unlock(); //解锁
}
//尝试终止线程池
tryTerminate();
return tasks; //返回workQueue中未执行的任务
}
shutdownNow() 和 shutdown()的大体流程相似,差别是:
1、将线程池更新为stop状态
2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
interruptWorkers()
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
interruptWorkers() 很简单,循环对所有worker调用 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()
需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束
awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
在发出一个shutdown请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞
1、所有任务完成执行
2、到达超时时间
3、当前线程被中断
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间
termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待
阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):
1、如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出
2、如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败
3、如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞
如何合理配置线程池的大小
如何合理配置线程池大小,线程池大小不是靠猜,也不是说越多越好。
在遇到这类问题时,先冷静下来分析
- 需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型
- 每个任务执行的平均时长大概是多少,这个任务的执行时长可能还跟任务处理逻辑是否涉及到网络传输以及底层系统资源依赖有关系
如果是 CPU 密集型,主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数,加入 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。那线程池的最大线程数可以配置为 cpu 核心数+1 如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 出于空闲状态,导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置 cpu 核心数的 2 倍。
一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/ 线程 CPU 时间 )* CPU 数目
这个公式的线程 cpu 时间是预估的程序单个线程在 cpu 上运行的时间(通常使用 loadrunner测试大量运行次数求出平均值)
线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实 际中如果需要 线程池创建之 后立即创建线 程,可以通过 以下两个方法 办到:
prestartCoreThread():初始化一个核心线程; prestartAllCoreThreads():初始化所有核心线程