ThreadPoolExecutor
线程池的优点
- 当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new 一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。
- 是线程池也提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个ThreadPoo!Executor 也保留了一些基本的统计数据, 比如当前线程池完成的任务数目等。
- 线程池提供了可调参数和可扩展性接口,满足不同情境需要。
Executors是一个工具类,提供了许多静态方法,根据需要返回不同线程池示例。ThreadPoolExecutor继承了AbstractExecutorService,其成员变量ctl是一个原子类,记录线程池状态和线程池中线程个数。
线程池状态
// ctl 高29-31位表示线程池状态,0-28位表示线程数量(最多2^29-1,约5亿)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程个数掩码位数
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
// 线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29-1 0001 1111 1111 1111 1111 1111 1111 1111
// 高三位 29~31位 表示线程池状态 RUNNING < STUTDOWN < STOP < TIDYING < TERMINATED
// 111 0 0000 0000 0000 0000 0000 0000 0000 表示RUNNING
private static final int RUNNING = -1 << COUNT_BITS;
// 000 0 0000 0000 0000 0000 0000 0000 0000 表示SHUTDOWN
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 0 0000 0000 0000 0000 0000 0000 0000 表示STOP
private static final int STOP = 1 << COUNT_BITS;
// 010 0 0000 0000 0000 0000 0000 0000 0000 表示TIDYING
private static final int TIDYING = 2 << COUNT_BITS;
// 011 0 0000 0000 0000 0000 0000 0000 0000 表示TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取状态 c的高三位 CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
线程池状态含义:
- RUNNING:接受新任务并且处理阻塞队列里的任务
- SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
- STOP:拒绝新任务并抛弃阻塞队列里的任务,同时中断正在处理的任务
- TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将调用terminated方法
- TERMINATED:终止状态。terminated方法调用完成以后的状态
RUNNING -> SHUTDOWN : 显式调用shutdown () 方法, 或者隐式调了finalize()方法里面的shutdown()方法 RUNNING 或SHUTDOWN-> STOP : 显式调用shutdownNow()方法时 SHUTDOWN -> TIDY ING : 当线程池和任务队列都为空时 STOP -> TIDYING : 当线程池为空时 TIDYYIG-> TERM剧ATED : 当terminated() hook 方法执行完成时
线程池参数
/**
用于保存等待执行的任务的阻塞队列,
比如基于数组的有界ArrayBlock ingQueue 、基于链表的无界LinkedBlockingQueue 、
最多只有一个元素的同步队列SynchronousQueue 及优先级队列PriorityB lockingQueue 等。
*/
private final BlockingQueue<Runnable> workQueue;
// mainLock 是独占锁, 用来控制新增Worker线程操作的原子性。
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程调用awaitTermination时进入termination的等待条件队列
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
// 完成的任务数量
private long completedTaskCount;
// 创建线程工厂
private volatile ThreadFactory threadFactory;
/**
饱和策略, 当队列满并且线程个数达到maximunPoolSize后采取的策略,
比如AbortPolicy (抛出异常〉
CallerRunsPolicy (使用调用者所在线程来运行任务)
DiscardOldestPolicy (调用poll 丢弃一个任务,执行当前任务)
DiscardPolicy (默默丢弃,不抛出异常〉
*/
private volatile RejectedExecutionHandler handler;
// 存活时间 如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态, 则这些闲置的线程能存活的最大时间。
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
// 线程池核心线程数
private volatile int corePoolSize;
// 线程池最大线程数量
private volatile int maximumPoolSize;
ThreadPoolExecutor构造方法
构造方法重要参数:
int corePoolSize:核心线程数量
int maximumPoolSize:最大线程数量
long keepAliveTime:非核心线程空闲时的生存时间
TimeUnit unit :时间单位
BlockingQueue
TheadFactory threadFactory:线程工厂
RejectExectionHandler handler: 拒绝策略,线程个数达到maximunPoolSize并且任务队列满并且后采取的策略,jdk提供了4种,可自定义。jdk提供的4种:AbortPolicy (抛出异常〉、 CallerRunsPolicy(调用线程运行)、DiscardPolicy (丢弃任务)、 DiscardOldestPolicy (丢弃阻塞队列等待时间最长的任务,新任务加阻塞队列)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public class Code01_UserDefinedThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(2, 4, 0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5),
new MyThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
final int[] j = {i};
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 执行 任务" + j[0] + " 线程池状态:" + threadPoolExecutor.toString());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
static class MyThreadFactory implements ThreadFactory {
AtomicLong threadNum = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "线程池测试线程" + threadNum.getAndIncrement());
}
}
}
运行结果:最多4个线程、任务阻塞队列个数5,所以第10个任务(编号9)的任务被拒绝,根据设定的拒绝策略,抛出异常。
默认线程池类型
- newFixedThreadPool:创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keepAlivetime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
```java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
}0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// 使用自定义线程创建工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
- **newSingleThreadExecutor**:创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keepAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
```java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 使用自定义的线程创建工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- newCachedThreadPool:创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步对了。keepAliveTinme=60说明当前线程在60s内空闲则回收。加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务,再继续添加任务会被阻塞。
SynchronousQueue 容量为0,用于两个线程之间传递数据,类似Exchanger。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 自定义线程创建工厂
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
工作线程Worker
用户线程提交任务到线程池后,由Worker执行。Worker继承了AQS和Runnable接口,自己实现了不可重入的独占锁,state=0表示锁未被获取,state=1表示锁已经被获取状态,state=-1是Worker创建时的默认状态,避免在调用runWorker()方法前被中断。firstTash记录工作线程执行的第一个任务。thread是具体执行任务的线程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker(Runnable firstTask) {
setState(-1); // 在调用runWorker前禁止中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建一个线程
}
在构造函数内首先设置Worker 的状态为-1 ,这是为了避免当前Worker 在调用runWorker 方法前被中断(当其他线程调用了线程池的shutdownNow 时,如果Worker 状态>=0 则会中断该线程) 。这里设置了线程的状态为-1 ,所以该线程就不会被中断了。
这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了shutdown 后正在执行的任务被中断( shutdown 只会中断当前被阻塞挂起的线程〕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// worker自己继承了AQS,实现了非重入的独占锁
w.unlock(); // 将state设置为0,表示允许中断
boolean completedAbruptly = true;
try {
// task null 检查
while (task != null || (task = getTask()) != null) {
w.lock();
/*
1、 runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
线程池的状态 >=STOP(STOP、TIDYING、TERMINATED) 同时工作线程没有被中断
2、 Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
线程被中断 同时 线程池的状态被设置为STOP或TIDYING或TERMINATED 线程中断
*/
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前干的事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务完毕后千一些事情
afterExecute(task, thrown);
}
} finally {
task = null;
// )统计当前Worker 完成了多少个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// )执行清理工作
processWorkerExit(w, completedAbruptly);
}
}
execute
execute 方法的作用是提交任务command 到线程池进行执行。用户线程提交任务到线程池的模型图如图所示:(生产者-消费者模型,用户线程是生产者,w线程池worker线程是消费者)
public void execute(Runnable command) {
// null判断
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 获取ctl (高3位表示线程池状态,低29位表示线程数量)
// 如果线程池中线程个数小于corePoolSize,则添加新的线程执行该任务
if (workerCountOf(c) < corePoolSize) {
// 核心线程添加成功则核心线程运行这个任务
if (addWorker(command, true))
return;
// 核心线程添加失败,再次获取 ctl
c = ctl.get();
}
// 如果线程池处于RUNNING,则添加任务到阻塞队列,offer添加成功返回true,失败返回false
if (isRunning(c) && workQueue.offer(command)) {
// 二次检查,防止其他线程改变了线程池状态
// 如果当前线程池状态不是RUNNING,则从从队列中删除任务,并执行拒绝策略
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池是否还有线程,没有则添加一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列满了,则新增工作线程(非核心线程),线程新增失败执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // ctl 高3位线程状态,低29位线程数量
int rs = runStateOf(c); // 线程池的状态
/*
RUNNING(111 xxxxx……) < SHUTDOWN(000 xxxx……) < STOP(001 xxxxx……) < TIDYING(010 xxxx……)<TERMINATED(011 xxxx……)
rs>=SHUTDOWN(当前线程池状态为STOP 、TIDYING 或TERMINATED )||
rs==SHUTDOWN && firstTask !=null(当前线程池状态为SHUTDOWN 并且己经有了第一个任务)||
rs==SHUTDOWN && workQueue.isEmpty (当前线程池状态为SHUTDOWN 并且任务队列为空)
以上情况任意一种,添加失败
*/
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
return false;
// 循环CAS增加线程个数
for (;;) {
// 获取工作线程的个数
int wc = workerCountOf(c);
/*1、如果线程个数超过限制返回false,
core是否为核心线程:
2、core=true,添加的是核心线程,当前线程数>corePoolSize,无法添加,返回false
3、core=false,添加的不是核心线程,当前线程数>maximumPoolSize,无法添加,返回false
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加线程个数,同时只有一个线程成功,添加成功跳出外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失败,则看线程池状态是否变化了,变化则跳出循坏尝试获取线程池,否则继续重新CAS添加工作线程
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 运行到这表示CAS成功 线程数量添加成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建worker,第一个任务为firstTask
w = new Worker(firstTask);
final Thread t = w.thread; // t为worker执行任务的线程
if (t != null) {
// 整个线程池加独占锁,把新增的Worker 添加到工作集workers
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次检查线程池状态,避免在获取锁之前调用了shutDown
int rs = runStateOf(ctl.get());
/*
rs<SHUTDOWN 表示 线程程 RUNNING,线程池可以任务阻塞队列可以添加任务
rs == SHUTDOWN && firstTask==null,
rs == SHUTDOWN:线程池只能运行阻塞队列里的任务,不能添加
firstTask==null:第一个任务为null
*/
if (rs < SHUTDOWN || rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功后启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
shutdown
调用shutdown 方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 设置当前线程池状态SHUTDOWN,如采已经是SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
// 设置中断标志
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试将状态变为TERMINATED
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 没有被中断的空闲的线程,设置中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果线程池是RUNNING状态或者已经是TIDTING或TERMINATED 或者是SHUTDOWN且任务队列非空,直接返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 工作线程不等于0,将一个线程中断
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置当前线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//TIDYING 设置成功,
// 执行扩展接口terminated 在线程池状态变为TERMINATED前做一些事情
terminated();
} finally {
// 设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒调用添加阻塞队列 termimarion.await方法而被阻塞的所有线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow
调用shutdownNow 方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务, 正在执行的任务会被中断, 该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 设置线程池状态STOP
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 将任务列表移动的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 中断全部线程,包括空闲线程和正在执行任务的线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
awaitTermination
当线程调用awaitTermination 方法后,当前线程会被阻塞,直到线程池状态变TERMINATED 才返回, 或者等待时间超时才返回。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 判断线程池状是否是TERMINATED
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 当前线程进入termination等待队列
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是指定一定延迟时间后或者定时进行任务调度执行的线程池。 ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor 并实现了ScheduledExecutorService 接口。 线程池队列是DelayedWorkQueue , 其和DelayedQueue 类似,是一个延迟队列。
ScheduledFutureTask
ScheduledThreadPoolExecutor 内部有一个ScheduledFutureTask类,继承了FutureTask,实现了RunnableScheduledFuture。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
FutureTask内部有一个表示任务的状态的变量 state
private volatile int state;
private static final int NEW = 0; // 初始状态
private static final int COMPLETING = 1; // 执行中
private static final int NORMAL = 2; // 正常运行结束状态
private static final int EXCEPTIONAL = 3; // 运行中异常
private static final int CANCELLED = 4; // 任务被取消
private static final int INTERRUPTING = 5; // 任务正在被中断
private static final int INTERRUPTED = 6; // 任务已经被中断
可能的任务的状态转移路径: NEW -> COMPLETING ->NORMAL NEW ->COMPLETING ->EXCEPTIONAL NEW ->CANCELLED NEW ->INTERRUPTING ->INTERRUPTED
ScheduledFutureTask内部还有一个变量period用来表示任务的类型
/**
period = 0 : 表示当前任务是一次性的,执行完毕后退出
period < 0 : 表示当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务
period > 0 : 表示当前任务为fixed-rate任务,是固定频率的定时可重复执行任务
*/
private final long period;
构造函数
由构造函数可知,ScheduledThreadPoolExecutor的任务队列是DelayedWorkedQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
}
schedule
该方法的作用是提交一个延迟执行的任务, 任务从提交时间算起延迟单位为unit 的delay 时间后开始执行。提交的任务不是周期性任务,任务只会执行一次
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {
// 参数检查
if (command == null || unit == null)
throw new NullPointerException();
// 任务转换
RunnableScheduledFuture<?> t =
decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));
// 添加任务延迟队列
delayedExecute(t);
return t;
}
ScheduledFutureTask构造函数
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result); // FutureTask(Runnable runnable, V result)
this.time = ns; // 表示启动任务的时间
this.period = 0; // 任务类型 ,period=0表示一次性的任务
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(r unnable, result); // Runnable类型任务装换为Callable类型
this.state = NEW; // 设置任务状态为NEW
}
delayedExecute:将任务添加代任务队列(延迟队列),让最快要过期的元素放到队首
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池一个SHUTDOWN 则拒绝加入
if (isShutdown())
reject(task);
else {
// 添加任务到延迟队列
super.getQueue().add(task);
// 再次检查线程池状态
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 确保至少一个线程在处理任务
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 增加核心线程
if (wc < corePoolSize)
addWorker(null, true);
// 如采初始化corePoolSize= = O , 则也添加一个线程
else if (wc == 0)
addWorker(null, false);
}
ScheduledFutureTask#run
public void run() {
// 线程是否只执行一次,false表示只执行1次
boolean periodic = isPeriodic(); // return period != 0;
// 取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 只执行1次
ScheduledFutureTask.super.run(); // 调用FutureTask#run
else if (ScheduledFutureTask.super.runAndReset()) { // 定时执行
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
public void run() {
// 当前任务不是NEW 或者 当前任务是NEW但是CAS设置当前线程为任务持有者失败
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 执行Callable,再次检查任务状态为NEW
if (c != null && state == NEW) {
V result;
boolean ran;
// 正常结算设置任务状态NORMAL,抛出异常设置任务状态EXCEPTIONAL
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
// 执行成功修改任务状态
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 设置当前任务状态为NORMAL 正常结算,没有使用CA S 是因为对于同一个任务只可能有一个线程运行到这里
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
// 如果当前任务状态是NEW 设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 设置当前线程EXCEPTIONAL 非正常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
scheduleWithFixedDelay
当任务执行完毕后,让其延迟固定时间后再次运行( fixed-delay 任务)。 其中initialDelay 表示提交任务后延迟多少时间开始执行任务command, delay 表示当任务执行完毕后延长多少时间后再次运行command 任务, unit 是initialDelay 和delay 的时间单位 任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
// 参数检查
if (command == null || unit == null)
throw new NullPointerException();
// 参数检查,延迟不能小于0
if (delay <= 0)
throw new IllegalArgumentException();
// 任务转换,period = -delay < 0, 表示当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 添加任务到延迟队列
delayedExecute(t);
return t;
}
执行任务是会先调用ScheduledFutureTask#run,发现period!=0,则会调用FutureTask#runAndReset,运行成功在执行 setNextRunTime()、reExecutePeriodic(outerTask);
FutureTask#runAndReset
protected boolean runAndReset() {
// 任务状态不是NEW 或者是NEW 当前设置当前线程为任务执行先失败
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
// 再次检查任务状态为NEW
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
// 执行Callable过程抛出异常,设置任务状态为异常中断
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 如果当前任务正常执行完毕并且任务状态为NEW 则返回true , 否则返回false
return ran && s == NEW;
}
ScheduledFutureTask#setNextRunTime
private void setNextRunTime() {
long p = period;
if (p > 0) // fixed-rate 类型任务
time += p;
else // fixed-delay类型任务
time = triggerTime(-p); // 设置time 为当前时间加上-p 的时间,也就是延迟-p时间后再次执行
}
scheduleAtFixedRate
该方法相对起始时间点以固定频率调用指定的任务( fixed-rate 任务) 。 当把任务提交到线程池并延迟initialDelay 时间( 时间单位为unit )后开始执行任command 然后从initialDelay+period 时间点再次执行,而后在initialDelay + 2 * period 时间点再次执行,循环往复,直到抛出异常或者调用了任务的cancel 方法取消了任务,或者关闭了线程池。 scheduleAtFixedRate 的原理与scheduleWithFixedDelay 类似,下面我们讲下它们之间的不同点。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// period > 0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
当前任务执行完毕后,调用setNextRunTime 设置任务下次执行的时间时执行的是time += p 而不再是time = triggerTime(-p) 。