- 总结与使用
- 1.extends 和 implement
- 2.构造方法
- 3.属性
- 4.方法
- native方法
- Java方法
- runStateOf(int c): 计算线程运行状态 c 是ctl , 做高位运算,得到状态值
- workerCountOf(int c):计算有效线程数量 ,c是ctl ,做低位运算,得到workcount
- ctlof(int rs,int wc): 计算 ctl值 rs 表示 runstate ,wc 表示 workcount
- runStateLessThan(int c,int s): 运行状态比较
- runStateAtLeast(int c, int s):
- isRunning(int c): 是否运行中
- compareAndIncrementWorkerCount(int expect): 比较并且增加workcount
- compareAndDecrementWorkerCount(int expect):比较并且减少workcount
- decrementWorkerCount(): 递减workcount ,线程终止时调用,do while循环,直到更新成功为止
- advanceRunState(int targetState):
- tryTerminate(): 尝试停止线程池,如果非运行中状态,会尝试停止线程池
- checkShutdownAccess(): 检查是否有关闭的权限
- interruptWorkers(): 中断所有的线程
- interruptIdleWorkers(Boolean onlyOne):中断未使用工作线程
- reject(Runnable command):执行拒绝策略
- onShutdown(): 调用shutdown 的进一步清理方法,参照ScheduledThreadPoolExecutor的重写
- isRunningOrShutdown: 是否运行状态 或者SHUTDOWN状态
- drainQueue: 将队列元素移动List中,并且删除队列元素
- addWorker: 添加线程
- addWorkerFailed: 添加线程失败,回滚相关数据
- processWorkerExit: 清理已经不使用worker,completedAbruptly 参数表示突然完成的,如执行的线程异常等
- getTask: 获取线程
- runWorker:
- execute():执行线程
- shutdown:关闭线程池,不再接受新的任务
- shutdownNow:立即关闭 状态直接变更到STOP,返回队列中的线程列表
- isShutdown: 是否shutdown 状态, 非RUNNING状态
- isTerminating():是否停止中状态 ,即 SHUTDOWN ,STOP , TINDYING
- isTerminated: 是否TERMINATED状态
- awaitTermination: 等待timeout时间,检查是否TERMINATED
- setCorePoolSize:设置核心线程数(修改)
- prestartCoreThread:预启动一个核心线程
- ensurePrestart:预启动一个核心线程,另外当核心线程数为0时也启动一个线程
- prestartAllCoreThreads:预启动所有核心线程
- allowCoreThreadTimeOut: 设置核心线程超时
- setMaximumPoolSize: 设置最大线程数
- setKeepAliveTime: 设置线程停止运行后的存活时间
- purge:删除队列中所有的future任务
- getActiveCount: 统计运行的线程数
- getTaskCount:统计运行中和运行完成的线程总数
- getCompletedTaskCount: 统计运行完成的线程总数
- 5.内部类
总结与使用
线程池的参数的使用
新进入的线程直接以核心线程去执行,如果核心线程没有空闲,那么加入到阻塞队列当中,如果阻塞队列满了,那么启用最大线程数,开始开辟新的线程执行任务,如果最大线程数也开辟满了,那么执行拒绝策略。如果线程池空闲下来了,那么按照设置的线程存活时间来销毁线程,如果设置了核心线程会超时,那么也会清理核心线程。
几个问题:
如果采取 拒绝策略 选择 回到主线程中执行 ,那么线程异常时,是否会引起主线程中断?
worker 和task 的关系是什么样的呢?
学习的前提: 了解位移运算相关的知识
1.extends 和 implement
extends AbstractExecutorService
抽象类 实现了ExecutorService接口,
public abstract class AbstractExecutorService implements ExecutorService {
//返回给定的 运行线程和默认值的RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
//返回 给定callable 的RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
//提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
//提交任务,带有默认返回值
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//提交callable 任务
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//执行线程集合
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null) throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0) throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
} else f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
public interface ExecutorService extends Executor {
//有序关闭线程池,关闭中 执行之前提交的任务,但不接受新的任务。
void shutdown();
//尝试停止所有的任务,并返回在等待中的任务列表
List<Runnable> shutdownNow();
//是否已经关闭
boolean isShutdown();
//关闭后,所有任务都已经完成,返回true
boolean isTerminated();
//阻塞, 所有任务执行完成,或者超时,或者中断(interrupt) 结束
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交任务
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//执行任务集,并返回结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//执行任务集,所有任务执行完成,或者超时 的时候 返回执行结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//执行任务,返回指定的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
//执行线程
void execute(Runnable command);
}
2.构造方法
参数:
int corePoolSize :核心线程数
int maximumPoolSize: 最大线程数
long keepAliveTime:线程未使用后的存活时间
TimeUnit unit: 时间单位
BlockingQueue
ThreadFactory threadFactory: 线程工厂
RejectedExecutionHandler handler: 线程池满的拒绝策略
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.属性
静态常量:
// 32-3=29 位数控制
private static final int COUNT_BITS = Integer.SIZE - 3;
// 1 左移29位 -1 ,是 28位2进制1 ,即 2^29 -1 也就是线程池的最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程状态
//-1左移29位,10000000000000000000000000000001 -> 补码 01111111111111111111111111111110 -> 左移 11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 0 左移,不改变 00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//2^29 00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//2^30 01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//2^30 + 2^29 01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
//默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
//调用shutdown和shutdownNow 需要的权限
private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");
//表示只中断一个线程
private static final boolean ONLY_ONE = true;
常量
//运行状态控制 存放 workcount 工作线程数量 和 runState 线程池状态信息
//为什么能存放两个值,ctl 是runstate | workcount , runstate 值分布在高位, 即第30,31位, workcount 分布在1-29位 ,所以两者做|运算,两个值都能保存
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//阻塞队列,final 确保队列大小不可改变
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
//工作线程集合,mainLock访问
private final HashSet<Worker> workers = new HashSet<Worker>();
//等待条件
private final Condition termination = mainLock.newCondition();
属性
//池中最大线程数
private int largestPoolSize;
//完成的任务数
private long completedTaskCount;
//线程工厂
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//线程空闲后的存活时间
private volatile long keepAliveTime;
//是否允许核心线程超时,true表示允许,核心线程会在keepAliveTime时间后超时。false表示空闲状态,核心线程一直存活。
private volatile boolean allowCoreThreadTimeOut;
//核心线程数
private volatile int corePoolSize;
//最大线程数 不能大于CAPACITY
private volatile int maximumPoolSize;
4.方法
native方法
Java方法
runStateOf(int c): 计算线程运行状态 c 是ctl , 做高位运算,得到状态值
// c & ~CAPACITY CAPACITY = 00001111111111111111111111111111 ~CAPACITY = 111100000000000000000000000
private static int runStateOf(int c) { return c & ~CAPACITY; }
workerCountOf(int c):计算有效线程数量 ,c是ctl ,做低位运算,得到workcount
private static int workerCountOf(int c) { return c & CAPACITY; }
ctlof(int rs,int wc): 计算 ctl值 rs 表示 runstate ,wc 表示 workcount
private static int ctlOf(int rs, int wc) { return rs | wc; }
runStateLessThan(int c,int s): 运行状态比较
private static boolean runStateLessThan(int c, int s) { return c < s;}
runStateAtLeast(int c, int s):
private static boolean runStateAtLeast(int c, int s) { return c >= s;}
isRunning(int c): 是否运行中
private static boolean isRunning(int c) {return c < SHUTDOWN;}
compareAndIncrementWorkerCount(int expect): 比较并且增加workcount
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}
compareAndDecrementWorkerCount(int expect):比较并且减少workcount
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}
decrementWorkerCount(): 递减workcount ,线程终止时调用,do while循环,直到更新成功为止
private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}
advanceRunState(int targetState):
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
tryTerminate(): 尝试停止线程池,如果非运行中状态,会尝试停止线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//状态为 STOP 或者状态为SHUTDOWN 但是队列为空的时候 不执行return操作
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果还有空闲的线程,那么中断一个线程
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
//获取锁,进行停止
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//状态更新为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有等待的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
checkShutdownAccess(): 检查是否有关闭的权限
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
interruptWorkers(): 中断所有的线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers(Boolean onlyOne):中断未使用工作线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//如果worker的线程在运行中,那么w.tryLock() 将会失败,否则可以中断worker
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
reject(Runnable command):执行拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
onShutdown(): 调用shutdown 的进一步清理方法,参照ScheduledThreadPoolExecutor的重写
void onShutdown() {};
isRunningOrShutdown: 是否运行状态 或者SHUTDOWN状态
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
// shutdownOk 为false,那么返回是否运行状态。 为true,返回是否为RUNNING和SHUTDOWN 状态
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
drainQueue: 将队列元素移动List中,并且删除队列元素
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
//复制并删除队列元素
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
addWorker: 添加线程
private boolean addWorker(Runnable firstTask, boolean core) {
//循环标志
retry:
for (;;) {
//获取线程池ctl值
int c = ctl.get();
//计算线程池状态
int rs = runStateOf(c);
//如果线程池状态为SHUTDOWN,STOP,TIDYING,TERMINATED状态,
// 并且如果是SHUTDOWN状态还需要firstTask不为空,阻塞队列为空,线程才会添加失败
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
//计算线程池运行中的线程总数
int wc = workerCountOf(c);
//如果线程总数大于等于设置的容量上限,
//或者 如果线程需要加入核心线程池,核心线程数的容量小于wc,如果不加入,那线程总数要小于wc
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//自增线程数,如果失败,退出最外层for循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//如果运行状态变更,那么继续最外层for循环
if (runStateOf(c) != rs) continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根据传入的线程参数,新建一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//加锁后再次检查线程池的状态
int rs = runStateOf(ctl.get());
//线程池是RUNNING状态,或者 线程池是SHUTDOWN状态而且加入的线程不为空
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
//先检查线程是否是alive,激活的,如果已经激活,那么线程已经在执行,不能加入线程池
if (t.isAlive()) throw new IllegalThreadStateException();
//添加到工作组中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//如果添加成功,那么直接启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果添加失败,那么减少计数,并且在workers中移除计入的worker
addWorkerFailed(w);
}
return workerStarted;
}
addWorkerFailed: 添加线程失败,回滚相关数据
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
processWorkerExit: 清理已经不使用worker,completedAbruptly 参数表示突然完成的,如执行的线程异常等
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//递减工作线程数
if (completedAbruptly) decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//增加线程池完成任务数
completedTaskCount += w.completedTasks;
//移除worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//检查并尝试停止线程池
tryTerminate();
int c = ctl.get();
//如果线程池是SHUTDOWN 或者 Running状态,
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//如果没有设置核心线程数超时,那么如果worker的总数比核心线程数小,也需要添加worker
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
//如果不是异常关闭,那么需要给线程池在添加一个worker
addWorker(null, false);
}
}
getTask: 获取线程
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果线程池状态为SHUTDOWN 或者 线程池状态为 STOP TIDYING TERMINATED并且阻塞队列为空
// 此时递减worker的计数,并且返回空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//是否可以超时 允许核心线程超时终止 或者 线程总数已经大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程总数大于池的最大线程数 这时候需要递减worker count 并且返回空Task
//如果超时了, 阻塞队列为空,那么也递减workcount ,返回空Task
//即线程池现无待执行的线程,或者线程池满无法执行
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) return null;
continue;
}
try {
//如果是核心线程超时
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取worker中的线程,并且解锁worker对象
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//当worker中task等于空时,试图从队列中获取一个任务线程
while (task != null || (task = getTask()) != null) {
//先锁定worker
w.lock();
//如果线程池处于停止中状态,确保线程中断, 如果不是确保线程不中断
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//任务执行前的操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行完之后的操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//如果执行失败,需要回滚数据
processWorkerExit(w, completedAbruptly);
}
}
execute():执行线程
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
//如果工作的线程总数小于核心线程数,那么直接创建一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程总数大于核心线程数,那么添加进队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果线程池不是运行状态,并且移除队列中线程成功,那么执行阻塞策略
reject(command);
else if (workerCountOf(recheck) == 0)
//如果工作线程的总数等于0,那么创建一个worker,传入的线程为null,会总动从阻塞队列中拿取线程
addWorker(null, false);
} else if (!addWorker(command, false))
//如果阻塞队列无法添加,那么创建新的线程,数量取决于最大线程数
reject(command);
}
shutdown:关闭线程池,不再接受新的任务
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限
checkShutdownAccess();
//变更状态
advanceRunState(SHUTDOWN);
//中断正在等待的线程,空值默认调用 onlyOne =false
interruptIdleWorkers();
//ScheduledThreadPoolExecutor 需要执行,清理队列
onShutdown();
} finally {
mainLock.unlock();
}
//尝试停止线程池
tryTerminate();
}
shutdownNow:立即关闭 状态直接变更到STOP,返回队列中的线程列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
isShutdown: 是否shutdown 状态, 非RUNNING状态
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
isTerminating():是否停止中状态 ,即 SHUTDOWN ,STOP , TINDYING
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
isTerminated: 是否TERMINATED状态
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
awaitTermination: 等待timeout时间,检查是否TERMINATED
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
//如果已经停止,返回ture
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
//如果已经超时,并且没有终止,返回false
if (nanos <= 0)
return false;
//等待timeout,纳秒单位,返回的值为timeout剩余时间 继续循环
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
setCorePoolSize:设置核心线程数(修改)
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) throw new IllegalArgumentException();
//差值
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//如果线程总数大于核心线程数 收缩workerCount
if (workerCountOf(ctl.get()) > corePoolSize)
//中断Worker,worker在运行中时获取不到中断需要的锁,所以中断的是未使用的worker
interruptIdleWorkers();
else if (delta > 0) {
//如果设置的核心线程数比之前的的大 并且 线程总数 小于核心线程数
//添加worker ,delta如果大于阻塞队列长度,那么添加worker数位阻塞队列长度数量,因为addWorker(null,true)方法创建的worker取值阻塞队列中的线程
//当阻塞队列为0时也就不会创建worker
int k = Math.min(delta, workQueue.size());
//循环创建worker
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
prestartCoreThread:预启动一个核心线程
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
ensurePrestart:预启动一个核心线程,另外当核心线程数为0时也启动一个线程
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
//线程总数等于0 并且线程总数不小于核心线程数,说明核心线程数为0
addWorker(null, false);
}
prestartAllCoreThreads:预启动所有核心线程
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
allowCoreThreadTimeOut: 设置核心线程超时
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
//中断未使用的worker
interruptIdleWorkers();
}
}
setMaximumPoolSize: 设置最大线程数
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//如果运行的线程总数大于最大线程数,那么需要中断未在运行的worker
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
setKeepAliveTime: 设置线程停止运行后的存活时间
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
//如果时间是减小,中断未在运行的worker
if (delta < 0)
interruptIdleWorkers();
}
purge:删除队列中所有的future任务
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate();
}
getActiveCount: 统计运行的线程数
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
getTaskCount:统计运行中和运行完成的线程总数
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
getCompletedTaskCount: 统计运行完成的线程总数
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
5.内部类
静态内部类
阻塞策略: CallerRunsPolicy 被拒绝的任务回到主线程中执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
阻塞策略: AbortPolicy 被拒绝的任务 异常抛出
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
阻塞策略: DiscardPolicy 被拒绝的任务 直接丢弃,不做任何处理
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
阻塞策略:DiscardOldestPolicy 被拒绝的任务 移除阻塞队列中的头部任务,然后执行 被拒绝的任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
内部类
Worker:
线程工作类,实现了Runable类,继承了抽象类 AbstractQueuedSynchronizer(FIFO队列,锁同步),work运行后,同时只有一个线程
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
//运行的线程
final Thread thread;
//初始任务
Runnable firstTask;
//线程任务计数器
volatile long completedTasks;
//Work 构造,根据新任务构建
Worker(Runnable firstTask) {
setState(-1); //禁止中断,直到worker开始运行
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//运行的主循环委托给外部
public void run() {runWorker(this);}
// state 0 代表无锁, 1代表已锁定
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}