正确使用线程池姿势
线程池根据不同业务场景设置不同的线程池。不要整个系统用一个线程池。因为其他业务场景占用线程过多,影响其他业务处理
线程池方法
Executor有一个重要子接口ExecutorService,其中定义了线程池的具体行为
1.execute(Runnable command):履行Ruannable类型的任务,
2.submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
3.shutdown():在完成已提交的任务后关闭线程池,不再接管新任务,
4.shutdownNow():停止所有正在履行的任务并立刻关闭线程池。
5.isTerminated():测试所有任务是否都执行完毕。
6.isShutdown():测试是否该ExecutorService已被关闭。
一、线程与线程池性能对比
线程池:线程缓存的实现,线程是稀缺资源,避免线程被频繁的创建销毁。
例如:在web应用中,服务器接受处理请求,并为请求分配一个线程进行处理,每个新请求创建一个线程,实现简单,但存在问题。
线程池优势
复用存在的线程,减少线程创建、消亡的开销,提高性能。
可以提高响应速度。(当任务到达时,不需要等待线程创建,使用现有线程,就可以立即执行)
提高线程的可管理性。(线程池统一管理、分配线程;避免无限创建)
二、JDK自带线程池
Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue
}
问题:高并发时,可能导致大量创建线程,导致cup达到100%。因为队列不缓存任务,所有任务都需要对应一个线程。
Executors.newFixedThreadPool(10)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
}
Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
}
Executors.newScheduledThreadPool(5)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,内部使用DelayedWorkQueue延迟队列
//利用ThreadPoolExecutor初始化
public ScheduledThreadPoolExecutor(int corePoolSize) {
** **super(_corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}_
任务被封装为ScheduledFutureTask,底层执行任务依靠addWorker方法,定时依赖延迟队列
线程池中的队列
SynchronousQueue(无缓冲)
SynchronousQueue:无缓冲等待队列,是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,
拥有公平(FIFO)和非公平(LIFO)策略,非公平策略会导致一些数据永远无法被消费的情况?
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为Integer.MAX_VALUE,避免线程拒绝执行操作
LinkedBlockingQueue(无界)
按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene
LinkedBlockingQueue:无界队列,当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes参数就相当于无效),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
可指定队列大小,默认队列大小Integer.MAX_VALUE
ArrayBlockingQueue(有界)
ArrayBlockingQueue:有界队列,可以指定缓存队列的大小,
1.当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,
2.当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行
3.当线程数达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会执行拒绝策略
priorityBlockingQuene
DelayedWorkQueue
底层为RunnableScheduledFuture类型的数组,初始容量为16,
三、线程池应用
具体实现:ThreadPoolExecutor、ScheduledThreadPoolExecutor
ThreadPoolExecutor
核心参数
corePoolSize(核心线程数)
1.线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
2.如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
3.如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize(最大线程数)
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,(如果当前线程数小于maximumPoolSize)则创建新的线程执行任务,
keepAliveTime(允许的空闲时间)
线程池非核心线程允许的空闲时间。当线程池中的线程数量大于corePoolSize的时,且队列中任务时,核心线程外的线程会等待超过了keepAliveTime,对额外的线程进行销毁;
unit(单位)
workQueue(阻塞队列)
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
threadFactory(线程工厂)
继承ThreadFactory接口的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。
使用默认的ThreadFactory来创建线程时,新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置线程的名称。
handler
线程池的饱和策略,阻塞队列放满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。
也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
ScheduledThreadPoolExecutor
继承ThreadPoolExecutor,复用addWorker方法
运行机制
任务首先会被添加到队列中,队列对任务按照下次执行时间进行排序。线程获取队列中堆顶元素,进行获取
底层依赖:延迟队列(DelayQueue)、而延迟队列底层依赖优先级队列(PriorityQueue)、优先级队列底层使用堆排序保证元素顺序
执行过程
下面是对这4个步骤的说明。
1)线程1从DelayQueue中获取到期的ScheduledFutureTask(DelayQueue.take())。
到期是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将执行的时间。
4)线程1把修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
主要方法
schedule
scheduleAtFixedRate
scheduleWithFixedDelay
延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 固定周期执行;
四、源码解析
源码重点属性
//ctl=111000000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Interge.size = 32;故COUNTBITS=29
private static final int _COUNT_BITS = Integer.SIZE - 3;
//向左移动29位,0010 0000 0000 0000 0000 0000 0000 0000 - 1 = 0001 1111 1111 1111 1111 1111 1111 1111(29位1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl是对线程池的运行状态、线程池有效线程数量(能记录2^29-1.约5亿)进行记录的一个字段。
使用Integer类型保存
高3位保存线程状态(runState),低29位保存线程数量(workerCount)
线程池状态
RUNNING[-536870912]
//111000000000000000000000000000000。
//状态说明:接受新任务或者处理队列中任务。
private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN[0]
//000000000000000000000000000000000。
//状态切换:调用线程池的shutdown()方法。(RUNNING—>SHUTDOWN)
//状态说明:不再接受新任务,仅处理队列中的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP[536870912]
//001000000000000000000000000000000。
//状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN—>STOP)
//状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务。
private static final int STOP = 1 << COUNT_BITS;
TIDYING[1073741824]
//01000000000000000000000000000000。
//状态切换:
当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中工作线程数为0时,就会由 SHUTDOWN -> TIDYING。
线程池在STOP状态下,线程池中工作线程数为0时,就会由STOP -> TIDYING。
//状态说明:所有任务都已终止,workerCount(任务数)为0,状态为TIDYING的线程将运行钩子函数terminated(),(terminated()方法为空,需要用户可自己实现)。
private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED[1610612736]
//01100000000000000000000000000000。
//状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
//状态说明:线程池彻底终止,就变成TERMINATED状态。
private static final int TERMINATED = 3 << COUNT_BITS;
线程池执行流程
见疑难杂文:问题三
ThreadPoolExecutor源码注释
execute()方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl记录线程池状态(runState)、线程数(workCount)
int c = ctl.get();
//1.计算当前线程数
//如果当前线程数【 < 核心线程数】,则创建新的线程。并把任务添加到线程中。
if (workerCountOf(c) < corePoolSize) {
//创建新线程成功后直接返回。
//第二个参数:(ture)标示添加线程与corePoolSize比较,否则与maximumPoolSize比较
if (addWorker(command, true))
return;
c = ctl.get();//添加失败,则重新获取ctl数据
}
//2.当线程数【 >= 核心线程数】,则执行到此处
//a.校验线程池状态,是否为运行状态
//b.尝试将任务加入队列,
//c.再次校验线程池状态(防止线程加入队列时发生变化),如果线程池shutdown,则丢弃任务,如果线程池无工作线程,则添加线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//再次判断线程池运行状态,如果不是RUNNING状态,则进行移除操作。并执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)//获取线程池中有效线程数,如果为0,则执行addWorker方法。
addWorker(null, false);//第一个参数为null时,表示仅在线程池创建一个线程,用于消费队列中任务(task != null || (task = getTask()) != null)task==null,则从队列中获取任务。
}
//3.当队列放满,则执行到此处,有两种情况。
//a.线程池已经不是RUNNING状态
//b.是RUNNING状态,但线程数>核心线程数,且队列已满。(故addWorker方法,第二个参数出入false与maximumPoolSize比较)
else if (!addWorker(command, false))
reject(command);
}
execute()执行总结。
1.线程池未达到核心线程时(workerCount < corePoolSize),当有任务添加时,不断创建新线程作为核心线程;
2.当线程数达到核心线程时(workerCount >= corePoolSize)队列不满,则将任务放入队列中.
3.当队列满时(workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满),尝试创建非核心线程
4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常
通过上面步骤可以看出。如果开始创建非核心线程时,队列任务被消费(即不满状态),新任务添加时,将任务添加到队列中。等队列满时,再次开始创建非核心线程
addWorker()方法
//校验是否满足增加的条件
//1.firstTask:用于指定新增的线程执行的【第一个任务】,firstTask==null时,代表仅增加执行线程。
//2.core:(ture)添加线程时与corePoolSize比较,否则与maximumPoolSize比较
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取当前线程运行状态
int c = ctl.get();
int rs = runStateOf(c);
/*
* 如果rs >= SHUTDOWN(0),则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false;即:如果队列中已经没有任务了,不需要再添加线程
*
* 但是如果队列不会空,则可以继续执行
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
//终止添加新线程
return false;
for (;;) {
//获取线程池中线程数
int wc = workerCountOf(c);
//判断线程池中线程数是否大于最大限制,或者核心(或最大线程数)。如果大于直接返回。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环(为什么修改c值?因为满足增加线程的条件,而变量c记录线程池中线程的数量)
//cas修改数据可能失败,如果失败,则继续执行内循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回外层for循环,继续执行外层for循环校验
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//通过校验,则执行增加逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//firstTask作为入参,创建Worker对象
w = new Worker(firstTask);
//每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
//防止并发问题
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// rs是RUNNING状态 || rs是SHUTDOWN状态 && firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 测试线程是否是活跃状态(即:调用了start方法,则为true)
throw new IllegalThreadStateException();
//使用set集合暂存worker对象
workers.add(w);
int s = workers.size();
// largestPoolSize记录当前线程池中最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动worker相关线程,通过run方法调用runWork方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类
//Worker类继承AQS、实现Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** 具体执行任务的线程. */
final Thread thread;
/** 初始的待任务,可能为空 */
Runnable firstTask;
/** 记录每个线程完成任务数 */
volatile long completedTasks;
/**
*构造函数中,利用ThreadFactory产生新的线程,并给新线程第一个任务(任务可能为null)
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 在运行worker前,禁止线程中断操作。(如果想对worker操作,必须获取锁,将state=0,改成state=1,设置为1,则无法获取锁)
this.firstTask = firstTask;
//创建新线程,并给新线程初始化执行逻辑(worker就是Runnable,在run方法中定义了处理逻辑)
this.thread = getThreadFactory().newThread(this);
}
/** runWorker为主要处理逻辑 */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁(cas方式0-->1)
//tryAcquire方法判断state是否为0,所以setState(-1);将state设置为-1,为了禁止在执行任务前(调用runWorker前)对线程进行中断。
//在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。(当state=0时,worker线程可以被中断)
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//中断当前线程
//isInterrupted():判断线程对象中是否设置了中断标识,如果设置,则返回true,但不会清除中断标识,再次调用仍为true。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
//没有设置中断,则执行中断操作
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
线程池中每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象(workers.add(w);上面代码显示),请参见JDK源码。
Worker类继承AQS、实现Runnable接口,重要的两个属性
1.firstTask用它来保存传入的任务;
2.thread是在调用构造方法时通过ThreadFactory来创建的线程,是处理任务的线程。
在调用构造方法时,传入需要执行的任务,通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程, 所以Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?
Worker中tryAcquire方法不允许重入的(当前线程同一时间仅能被一个任务占用),而ReentrantLock的tryAcquire允许重入:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;如果正在执行任务,则不应该中断线程;
- 如果线程不是独占锁的状态,即空闲的状态,说明它没有在处理任务,则可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
- 所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
Worker在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
因为AQS中默认的state是0,如果创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,即防止Worker对象未调用runWorker方法,就被中断。
runWorker()方法
//每个线程都会循环获取队列中的任务,并调用任务的run()方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取第一个任务,可能为空
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // (将state从-1修改为0,允许中断操作)
//是否因为异常退出循环
boolean completedAbruptly = true;
try {
//如果task=null,则获取队列中任务,即:队列任务最后执行
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池正在停止,那么要保证当前线程是中断状态;
//如果不是的话,则要保证当前线程不是中断状态;
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//中断当前线程
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//当前线程调用传入线程池任务的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
STOP
//001
//状态切换:调用线程池的shutdownNow()方法。(RUNNING、SHUTDOWN-->STOP)
//状态说明:不再接受新任务,不再处理队列中的任务,并且中断当前执行的任务。
private static final int STOP = 1 << COUNT_BITS;
runWorker方法的执行过程:
- while循环不断地通过getTask()方法获取任务,getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。getTask()方法
private Runnable getTask() {
//上次从阻塞队列中(使用poll方法)获取任务,是否超时。
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。【返回null,调用getTask处,会跳出循环,并对线程销毁处理】
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//使用CAS方式操作,将Worker数量减1
//返回null,终止runWork方法中循环
return null;
}
//wc:线程池中线程数
int wc = workerCountOf(c);
// timed:判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,即:核心线程不进行超时回收;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,执行continue,进行重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。【保证核心线程,不会被销毁的方式】。
* keepAliveTime:线程池初始化时,设置的参数
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}
总结:
第二个if判断目的是控制线程池的有效线程数量。
在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,
但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit方法(进行线程回收)。
processWorkerExit()方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经完成对workerCount进行了减1操作。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
//从线程池中移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 对线程池进行判断,尝试设置线程池为最终态
//如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
tryTerminate();
int c = ctl.get();
/*
* 保留线程池中线程操作
* 当线程池是RUNNING或SHUTDOWN状态时(c<STOP),如果worker是异常结束,直接通过addWorker方式想线程池添加线程;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {//正常结束。completedAbruptly=false
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate()方法
如果(SHUTDOWN且线程池和队列为空)或(STOP且线程池为空),则转换为终止状态(TERMINATED)。
如果有资格终止,但workerCount不为零,则中断空闲工作进程,以确保停机信号传播。
必须在可能导致终止的任何操作之后调用此方法,这些操作包括减少worker计数或在关机期间从队列中删除任务。
该方法是非私有的,允许从ScheduledThreadPoolExecutor进行访问
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//校验是否进行终止线程池操作
//1.线程池为RUNNING状态。(不能终止)
//2.线程池为TERMINATED状态(已经终止)。
//3.线程池为SHUTDOWN状态且队列不为null(不能终止,需要处理完队列任务)
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//符合终止条件。判断线程池中是否还存在线程,如果存在,则设置中断标识(调用interrupt方法)
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断线程池中线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//设置线程池终止状态
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//设置线程池状态为终态。
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
ScheduledThreadPoolExecutor源码注释
scheduleWithFixedDelay�
//延迟指定时间后(initialDelay)执行一次,之后按照:上一次任务执行时长 + 固定周期(delay)执行;
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//包装任务,将任务包装成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
//装饰方式无实质操作。也许为用户预留的扩展接口
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//将自己赋值给自己的outerTask属性(sft和t是同一个对象)
sft.outerTask = t;
delayedExecute(t);
return t;
}
delayedExecute方法
//1.首先校验线程池状态。(如果已经被关停,则拒绝任务)
//2.将任务加入延迟队列。(队列通过堆排序,对任务排序)
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
//与ThreadPoolExecutor不同,【直接把任务加入延迟队列】
super.getQueue().add(task);
//如果当前状态无法执行任务,则取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//向线程池中添加线程。和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker
ensurePrestart();
}
//向线程池添加执行线程,【任务立即加入队列中】。即使核心线程数为0,也会至少启动一个线程,添加到线程池。(ThreadPoolExecutor)
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
复用了ThreadPoolExecutor中的addWorker方法,
1.创建Worker对象。
2.调用Worker的start方法,
3.Worker本身就是一个线程,启动线程后,执行run方法,在run方法中调用runWork。
4.在阻塞队列中获取task,并执行task.run
ScheduledFutureTask类
//
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果不是周期执行,则执行一次
else if (!periodic)
ScheduledFutureTask.super.run();
//周期执行任务。执行完成后,重新添加到队列中
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下次执行时间
setNextRunTime();
//将任务重新加入队列中
reExecutePeriodic(outerTask);
}
}
//将任务添加到队列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
//和上面方法相同
ensurePrestart();
}
}
run方法
public void run() {
// 是否周期性,就是判断period是否为0。
boolean periodic = isPeriodic();
// 检查任务是否可以被执行。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果非周期性任务直接调用run运行即可。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
// 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
reExecutePeriodic(outerTask);
}
fied-rate模式和fixed-delay模式区别
private void setNextRunTime() {
long p = period;
/*
* fixed-rate模式,时间设置为上一次时间+p。
* 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。
* 如果这次任务还没执行完是肯定不会执行下一次的。
*/
if (p > 0)
time += p;
/**
* fixed-delay模式,计算下一次任务可以被执行的时间。
* 简单来说差不多就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。
*/
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
/*
* 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
*
* 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
*/
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 主要就是有这么一种情况:
* 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
*
* 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
*
* 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
* 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
DelayedWorkQueue
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面
(注意:顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的)
DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组
假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则:
- 一个节点的左子节点的索引为:k = p * 2 + 1;
- 一个节点的右子节点的索引为:k = (p + 1) * 2;
一个节点的父节点的索引为:p = (k - 1) / 2
// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader线程
private Thread leader = null;
/**
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();
Leader-Follower模式的变体,用于减少不必要的定时等待。
对于多线程的网络模型来说:
所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。
基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。offer方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//如果大于队列长度则进行扩容
if (i >= queue.length)
// 容量扩增50%。
grow();
size = i + 1;
// 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 插入堆尾。
siftUp(i, e);
}
//如果新加入的元素就是队列头,这里有两种情况
//1.这是用户提交的第一个任务
//2.新任务进行堆调整以后,排在队列头
if (queue[0] == e) {
leader = null;
// 由于原先leader已经无效被设置为null了,唤醒一个线程(未必是原先的leader)来取走堆顶任务。执行available.awaitNanos(delay)
available.signal();
}
} finally {
lock.unlock();
}
return true;
siftup方法
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 找到父节点的索引
while (k > 0) {
// 获取父节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
if (key.compareTo(e) >= 0)
break;
// 如果key.compareTo(e) < 0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
queue[k] = e;
setIndex(e, k);
// 设置索引为k
k = parent;
}
// key设置为排序后的位置中
queue[k] = key;
setIndex(key, k);
take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();//队列空,则进行阻塞
else {
// 计算当前时间到执行时间的时间间隔
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
//// leader不为空,阻塞当前线程,因为只有leader才能执行
if (leader != null)
available.await();
else {
// leader为空,则把leader设置为当前线程.
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞到指定时间 (delay为还有多久执行任务时间)
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader不为空,则说明leader的线程正在执行available.awaitNanos(delay);
/ 如果queue[0] == null,说明队列为空
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
take方法是什么时机?
在ThreadPoolExecutor中,工作线程调用getTask方法,会循环地从workQueue中取任务。但定时任务却不同,因为如果一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take方法中,要保证只有到指定的执行时间的时,任务才被取走。
leader的作用?
leader是为了减少不必要的定时等待,当一个线程成为leader时,它只等待下一个节点的时间间隔,但其它线程无限期等待。 leader线程必须在从take()或poll()返回之前,signal其它线程,除非其他线程成为了leader。
举例来说,如果没有leader,那么在执行take时,都要执行available.awaitNanos(delay),假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。多个这时执行该段代码是没有作用的,因为只能有一个线程会从take中返回queue[0](因为有lock),其他线程这时再返回for循环执行时取的queue[0],已经不是之前的queue[0]了,然后又要继续阻塞。所以,为了不让多个线程频繁的做无用的定时等待,这里增加了leader,如果leader不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在finally中调用了signal()来唤醒一个线程,而不是signalAll())poll方法
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
// 如果delay <= 0,说明已经到了任务执行的时间,返回
if (delay <= 0)
return finishPoll(first);
// 如果nanos <= 0,说明已经超时,返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// nanos < delay 说明需要等待的时间小于任务要执行的延迟时间
// leader != null 说明有其它线程正在对任务进行阻塞
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// timeLeft表示delay减去实际的等待时间
long timeLeft = available.awaitNanos(delay);
// 计算剩余的等待时间
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
finishPoll方法
调用了take或者poll方法能够获取到任务时,会调用该方法进行返回
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
siftDown方法
元素出队后,末尾元素上升至根节点,进行下沉操作,重新选择根节点。siftDown方法使堆从k开始向下调整private void siftDown(int k, RunnableScheduledFuture<?> key) {
// 根据二叉树的特性,数组长度除以2,表示取有子节点的索引
int half = size >>> 1;
// 判断索引为k的节点是否有子节点
while (k < half) {
// 左子节点的索引
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
// 右子节点的索引
int right = child + 1;
// 如果有右子节点并且左子节点的时间间隔大于右子节点,取时间间隔最小的节点
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果key的时间间隔小于等于c的时间间隔,跳出循环
if (key.compareTo(c) <= 0)
break;
// 设置要移除索引的节点为其子节点
queue[k] = c;
setIndex(c, k);
k = child;
}
// 将key放入索引为k的位置
queue[k] = key;
setIndex(key, k);
}
remove方法
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
// 从i开始向下调整
siftDown(i, replacement);
// 如果queue[i] == replacement,说明i是叶子节点
// 如果是这种情况,不能保证子节点的下次执行时间比父节点的大
// 这时需要进行一次向上调整
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}