总结与使用

线程池的参数的使用

新进入的线程直接以核心线程去执行,如果核心线程没有空闲,那么加入到阻塞队列当中,如果阻塞队列满了,那么启用最大线程数,开始开辟新的线程执行任务,如果最大线程数也开辟满了,那么执行拒绝策略。如果线程池空闲下来了,那么按照设置的线程存活时间来销毁线程,如果设置了核心线程会超时,那么也会清理核心线程。
几个问题:
如果采取 拒绝策略 选择 回到主线程中执行 ,那么线程异常时,是否会引起主线程中断?

worker 和task 的关系是什么样的呢?

学习的前提: 了解位移运算相关的知识

1.extends 和 implement

extends AbstractExecutorService

抽象类 实现了ExecutorService接口,

  1. public abstract class AbstractExecutorService implements ExecutorService {
  2. //返回给定的 运行线程和默认值的RunnableFuture
  3. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  4. return new FutureTask<T>(runnable, value);
  5. }
  6. //返回 给定callable 的RunnableFuture
  7. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  8. return new FutureTask<T>(callable);
  9. }
  10. //提交任务
  11. public Future<?> submit(Runnable task) {
  12. if (task == null) throw new NullPointerException();
  13. RunnableFuture<Void> ftask = newTaskFor(task, null);
  14. execute(ftask);
  15. return ftask;
  16. }
  17. //提交任务,带有默认返回值
  18. public <T> Future<T> submit(Runnable task, T result) {
  19. if (task == null) throw new NullPointerException();
  20. RunnableFuture<T> ftask = newTaskFor(task, result);
  21. execute(ftask);
  22. return ftask;
  23. }
  24. //提交callable 任务
  25. public <T> Future<T> submit(Callable<T> task) {
  26. if (task == null) throw new NullPointerException();
  27. RunnableFuture<T> ftask = newTaskFor(task);
  28. execute(ftask);
  29. return ftask;
  30. }
  31. //执行线程集合
  32. private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
  33. boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
  34. if (tasks == null) throw new NullPointerException();
  35. int ntasks = tasks.size();
  36. if (ntasks == 0) throw new IllegalArgumentException();
  37. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
  38. ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
  39. try {
  40. ExecutionException ee = null;
  41. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  42. Iterator<? extends Callable<T>> it = tasks.iterator();
  43. futures.add(ecs.submit(it.next()));
  44. --ntasks;
  45. int active = 1;
  46. for (;;) {
  47. Future<T> f = ecs.poll();
  48. if (f == null) {
  49. if (ntasks > 0) {
  50. --ntasks;
  51. futures.add(ecs.submit(it.next()));
  52. ++active;
  53. }
  54. else if (active == 0)
  55. break;
  56. else if (timed) {
  57. f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
  58. if (f == null)
  59. throw new TimeoutException();
  60. nanos = deadline - System.nanoTime();
  61. } else f = ecs.take();
  62. }
  63. if (f != null) {
  64. --active;
  65. try {
  66. return f.get();
  67. } catch (ExecutionException eex) {
  68. ee = eex;
  69. } catch (RuntimeException rex) {
  70. ee = new ExecutionException(rex);
  71. }
  72. }
  73. }
  74. if (ee == null)
  75. ee = new ExecutionException();
  76. throw ee;
  77. } finally {
  78. for (int i = 0, size = futures.size(); i < size; i++)
  79. futures.get(i).cancel(true);
  80. }
  81. }
  82. public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  83. throws InterruptedException, ExecutionException {
  84. try {
  85. return doInvokeAny(tasks, false, 0);
  86. } catch (TimeoutException cannotHappen) {
  87. assert false;
  88. return null;
  89. }
  90. }
  91. public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  92. long timeout, TimeUnit unit)
  93. throws InterruptedException, ExecutionException, TimeoutException {
  94. return doInvokeAny(tasks, true, unit.toNanos(timeout));
  95. }
  96. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  97. throws InterruptedException {
  98. if (tasks == null)
  99. throw new NullPointerException();
  100. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  101. boolean done = false;
  102. try {
  103. for (Callable<T> t : tasks) {
  104. RunnableFuture<T> f = newTaskFor(t);
  105. futures.add(f);
  106. execute(f);
  107. }
  108. for (int i = 0, size = futures.size(); i < size; i++) {
  109. Future<T> f = futures.get(i);
  110. if (!f.isDone()) {
  111. try {
  112. f.get();
  113. } catch (CancellationException ignore) {
  114. } catch (ExecutionException ignore) {
  115. }
  116. }
  117. }
  118. done = true;
  119. return futures;
  120. } finally {
  121. if (!done)
  122. for (int i = 0, size = futures.size(); i < size; i++)
  123. futures.get(i).cancel(true);
  124. }
  125. }
  126. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  127. long timeout, TimeUnit unit)
  128. throws InterruptedException {
  129. if (tasks == null)
  130. throw new NullPointerException();
  131. long nanos = unit.toNanos(timeout);
  132. ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  133. boolean done = false;
  134. try {
  135. for (Callable<T> t : tasks)
  136. futures.add(newTaskFor(t));
  137. final long deadline = System.nanoTime() + nanos;
  138. final int size = futures.size();
  139. for (int i = 0; i < size; i++) {
  140. execute((Runnable)futures.get(i));
  141. nanos = deadline - System.nanoTime();
  142. if (nanos <= 0L)
  143. return futures;
  144. }
  145. for (int i = 0; i < size; i++) {
  146. Future<T> f = futures.get(i);
  147. if (!f.isDone()) {
  148. if (nanos <= 0L)
  149. return futures;
  150. try {
  151. f.get(nanos, TimeUnit.NANOSECONDS);
  152. } catch (CancellationException ignore) {
  153. } catch (ExecutionException ignore) {
  154. } catch (TimeoutException toe) {
  155. return futures;
  156. }
  157. nanos = deadline - System.nanoTime();
  158. }
  159. }
  160. done = true;
  161. return futures;
  162. } finally {
  163. if (!done)
  164. for (int i = 0, size = futures.size(); i < size; i++)
  165. futures.get(i).cancel(true);
  166. }
  167. }
  168. }
  1. public interface ExecutorService extends Executor {
  2. //有序关闭线程池,关闭中 执行之前提交的任务,但不接受新的任务。
  3. void shutdown();
  4. //尝试停止所有的任务,并返回在等待中的任务列表
  5. List<Runnable> shutdownNow();
  6. //是否已经关闭
  7. boolean isShutdown();
  8. //关闭后,所有任务都已经完成,返回true
  9. boolean isTerminated();
  10. //阻塞, 所有任务执行完成,或者超时,或者中断(interrupt) 结束
  11. boolean awaitTermination(long timeout, TimeUnit unit)
  12. throws InterruptedException;
  13. //提交任务
  14. <T> Future<T> submit(Callable<T> task);
  15. <T> Future<T> submit(Runnable task, T result);
  16. Future<?> submit(Runnable task);
  17. //执行任务集,并返回结果
  18. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  19. throws InterruptedException;
  20. //执行任务集,所有任务执行完成,或者超时 的时候 返回执行结果
  21. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  22. long timeout, TimeUnit unit)
  23. throws InterruptedException;
  24. //执行任务,返回指定的结果
  25. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  26. throws InterruptedException, ExecutionException;
  27. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  28. long timeout, TimeUnit unit)
  29. throws InterruptedException, ExecutionException, TimeoutException;
  30. }
  1. public interface Executor {
  2. //执行线程
  3. void execute(Runnable command);
  4. }

2.构造方法

参数:
int corePoolSize :核心线程数
int maximumPoolSize: 最大线程数
long keepAliveTime:线程未使用后的存活时间
TimeUnit unit: 时间单位
BlockingQueue workQueue: 阻塞队列
ThreadFactory threadFactory: 线程工厂
RejectedExecutionHandler handler: 线程池满的拒绝策略

  1. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
  2. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
  3. }
  4. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
  5. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
  6. }
  7. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
  8. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
  9. }
  10. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
  11. if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException();
  12. if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();
  13. this.corePoolSize = corePoolSize;
  14. this.maximumPoolSize = maximumPoolSize;
  15. this.workQueue = workQueue;
  16. this.keepAliveTime = unit.toNanos(keepAliveTime);
  17. this.threadFactory = threadFactory;
  18. this.handler = handler;
  19. }

3.属性

静态常量:

  1. // 32-3=29 位数控制
  2. private static final int COUNT_BITS = Integer.SIZE - 3;
  3. // 1 左移29位 -1 ,是 28位2进制1 ,即 2^29 -1 也就是线程池的最大容量
  4. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  5. //线程状态
  6. //-1左移29位,10000000000000000000000000000001 -> 补码 01111111111111111111111111111110 -> 左移 11100000000000000000000000000000
  7. private static final int RUNNING = -1 << COUNT_BITS;
  8. // 0 左移,不改变 00000000000000000000000000000000
  9. private static final int SHUTDOWN = 0 << COUNT_BITS;
  10. //2^29 00100000000000000000000000000000
  11. private static final int STOP = 1 << COUNT_BITS;
  12. //2^30 01000000000000000000000000000000
  13. private static final int TIDYING = 2 << COUNT_BITS;
  14. //2^30 + 2^29 01100000000000000000000000000000
  15. private static final int TERMINATED = 3 << COUNT_BITS;
  16. //默认的拒绝策略
  17. private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
  18. //调用shutdown和shutdownNow 需要的权限
  19. private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");
  20. //表示只中断一个线程
  21. private static final boolean ONLY_ONE = true;

常量

  1. //运行状态控制 存放 workcount 工作线程数量 和 runState 线程池状态信息
  2. //为什么能存放两个值,ctl 是runstate | workcount , runstate 值分布在高位, 即第30,31位, workcount 分布在1-29位 ,所以两者做|运算,两个值都能保存
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4. //阻塞队列,final 确保队列大小不可改变
  5. private final BlockingQueue<Runnable> workQueue;
  6. private final ReentrantLock mainLock = new ReentrantLock();
  7. //工作线程集合,mainLock访问
  8. private final HashSet<Worker> workers = new HashSet<Worker>();
  9. //等待条件
  10. private final Condition termination = mainLock.newCondition();

属性

  1. //池中最大线程数
  2. private int largestPoolSize;
  3. //完成的任务数
  4. private long completedTaskCount;
  5. //线程工厂
  6. private volatile ThreadFactory threadFactory;
  7. //拒绝策略
  8. private volatile RejectedExecutionHandler handler;
  9. //线程空闲后的存活时间
  10. private volatile long keepAliveTime;
  11. //是否允许核心线程超时,true表示允许,核心线程会在keepAliveTime时间后超时。false表示空闲状态,核心线程一直存活。
  12. private volatile boolean allowCoreThreadTimeOut;
  13. //核心线程数
  14. private volatile int corePoolSize;
  15. //最大线程数 不能大于CAPACITY
  16. private volatile int maximumPoolSize;

4.方法

native方法

Java方法

runStateOf(int c): 计算线程运行状态 c 是ctl , 做高位运算,得到状态值

  1. // c & ~CAPACITY CAPACITY = 00001111111111111111111111111111 ~CAPACITY = 111100000000000000000000000
  2. private static int runStateOf(int c) { return c & ~CAPACITY; }

workerCountOf(int c):计算有效线程数量 ,c是ctl ,做低位运算,得到workcount

  1. private static int workerCountOf(int c) { return c & CAPACITY; }

ctlof(int rs,int wc): 计算 ctl值 rs 表示 runstate ,wc 表示 workcount

  1. private static int ctlOf(int rs, int wc) { return rs | wc; }

runStateLessThan(int c,int s): 运行状态比较

  1. private static boolean runStateLessThan(int c, int s) { return c < s;}

runStateAtLeast(int c, int s):

  1. private static boolean runStateAtLeast(int c, int s) { return c >= s;}

isRunning(int c): 是否运行中

  1. private static boolean isRunning(int c) {return c < SHUTDOWN;}

compareAndIncrementWorkerCount(int expect): 比较并且增加workcount

  1. private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}

compareAndDecrementWorkerCount(int expect):比较并且减少workcount

  1. private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}

decrementWorkerCount(): 递减workcount ,线程终止时调用,do while循环,直到更新成功为止

  1. private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}

advanceRunState(int targetState):

  1. private void advanceRunState(int targetState) {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (runStateAtLeast(c, targetState) ||
  5. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
  6. break;
  7. }
  8. }

tryTerminate(): 尝试停止线程池,如果非运行中状态,会尝试停止线程池

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. //状态为 STOP 或者状态为SHUTDOWN 但是队列为空的时候 不执行return操作
  5. if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  6. return;
  7. //如果还有空闲的线程,那么中断一个线程
  8. if (workerCountOf(c) != 0) {
  9. interruptIdleWorkers(ONLY_ONE);
  10. return;
  11. }
  12. //获取锁,进行停止
  13. final ReentrantLock mainLock = this.mainLock;
  14. mainLock.lock();
  15. try {
  16. //状态更新为TIDYING
  17. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  18. try {
  19. terminated();
  20. } finally {
  21. ctl.set(ctlOf(TERMINATED, 0));
  22. //唤醒所有等待的线程
  23. termination.signalAll();
  24. }
  25. return;
  26. }
  27. } finally {
  28. mainLock.unlock();
  29. }
  30. }
  31. }

checkShutdownAccess(): 检查是否有关闭的权限

  1. private void checkShutdownAccess() {
  2. SecurityManager security = System.getSecurityManager();
  3. if (security != null) {
  4. security.checkPermission(shutdownPerm);
  5. final ReentrantLock mainLock = this.mainLock;
  6. mainLock.lock();
  7. try {
  8. for (Worker w : workers)
  9. security.checkAccess(w.thread);
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. }
  14. }

interruptWorkers(): 中断所有的线程

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

interruptIdleWorkers(Boolean onlyOne):中断未使用工作线程

  1. private void interruptIdleWorkers(boolean onlyOne) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) {
  6. Thread t = w.thread;
  7. //如果worker的线程在运行中,那么w.tryLock() 将会失败,否则可以中断worker
  8. if (!t.isInterrupted() && w.tryLock()) {
  9. try {
  10. t.interrupt();
  11. } catch (SecurityException ignore) {
  12. } finally {
  13. w.unlock();
  14. }
  15. }
  16. if (onlyOne)
  17. break;
  18. }
  19. } finally {
  20. mainLock.unlock();
  21. }
  22. }

reject(Runnable command):执行拒绝策略

  1. final void reject(Runnable command) {
  2. handler.rejectedExecution(command, this);
  3. }

onShutdown(): 调用shutdown 的进一步清理方法,参照ScheduledThreadPoolExecutor的重写

  1. void onShutdown() {};

isRunningOrShutdown: 是否运行状态 或者SHUTDOWN状态

  1. final boolean isRunningOrShutdown(boolean shutdownOK) {
  2. int rs = runStateOf(ctl.get());
  3. // shutdownOk 为false,那么返回是否运行状态。 为true,返回是否为RUNNING和SHUTDOWN 状态
  4. return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
  5. }

drainQueue: 将队列元素移动List中,并且删除队列元素

  1. private List<Runnable> drainQueue() {
  2. BlockingQueue<Runnable> q = workQueue;
  3. ArrayList<Runnable> taskList = new ArrayList<Runnable>();
  4. //复制并删除队列元素
  5. q.drainTo(taskList);
  6. if (!q.isEmpty()) {
  7. for (Runnable r : q.toArray(new Runnable[0])) {
  8. if (q.remove(r))
  9. taskList.add(r);
  10. }
  11. }
  12. return taskList;
  13. }

addWorker: 添加线程

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. //循环标志
  3. retry:
  4. for (;;) {
  5. //获取线程池ctl值
  6. int c = ctl.get();
  7. //计算线程池状态
  8. int rs = runStateOf(c);
  9. //如果线程池状态为SHUTDOWN,STOP,TIDYING,TERMINATED状态,
  10. // 并且如果是SHUTDOWN状态还需要firstTask不为空,阻塞队列为空,线程才会添加失败
  11. if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
  12. return false;
  13. for (;;) {
  14. //计算线程池运行中的线程总数
  15. int wc = workerCountOf(c);
  16. //如果线程总数大于等于设置的容量上限,
  17. //或者 如果线程需要加入核心线程池,核心线程数的容量小于wc,如果不加入,那线程总数要小于wc
  18. if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
  19. return false;
  20. //自增线程数,如果失败,退出最外层for循环
  21. if (compareAndIncrementWorkerCount(c))
  22. break retry;
  23. c = ctl.get();
  24. //如果运行状态变更,那么继续最外层for循环
  25. if (runStateOf(c) != rs) continue retry;
  26. }
  27. }
  28. boolean workerStarted = false;
  29. boolean workerAdded = false;
  30. Worker w = null;
  31. try {
  32. //根据传入的线程参数,新建一个worker
  33. w = new Worker(firstTask);
  34. final Thread t = w.thread;
  35. if (t != null) {
  36. final ReentrantLock mainLock = this.mainLock;
  37. mainLock.lock();
  38. try {
  39. //加锁后再次检查线程池的状态
  40. int rs = runStateOf(ctl.get());
  41. //线程池是RUNNING状态,或者 线程池是SHUTDOWN状态而且加入的线程不为空
  42. if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
  43. //先检查线程是否是alive,激活的,如果已经激活,那么线程已经在执行,不能加入线程池
  44. if (t.isAlive()) throw new IllegalThreadStateException();
  45. //添加到工作组中
  46. workers.add(w);
  47. int s = workers.size();
  48. if (s > largestPoolSize) largestPoolSize = s;
  49. workerAdded = true;
  50. }
  51. } finally {
  52. mainLock.unlock();
  53. }
  54. if (workerAdded) {
  55. //如果添加成功,那么直接启动线程
  56. t.start();
  57. workerStarted = true;
  58. }
  59. }
  60. } finally {
  61. if (! workerStarted)
  62. //如果添加失败,那么减少计数,并且在workers中移除计入的worker
  63. addWorkerFailed(w);
  64. }
  65. return workerStarted;
  66. }

addWorkerFailed: 添加线程失败,回滚相关数据

  1. private void addWorkerFailed(Worker w) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. if (w != null)
  6. workers.remove(w);
  7. decrementWorkerCount();
  8. tryTerminate();
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. }

processWorkerExit: 清理已经不使用worker,completedAbruptly 参数表示突然完成的,如执行的线程异常等

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. //递减工作线程数
  3. if (completedAbruptly) decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. //增加线程池完成任务数
  8. completedTaskCount += w.completedTasks;
  9. //移除worker
  10. workers.remove(w);
  11. } finally {
  12. mainLock.unlock();
  13. }
  14. //检查并尝试停止线程池
  15. tryTerminate();
  16. int c = ctl.get();
  17. //如果线程池是SHUTDOWN 或者 Running状态,
  18. if (runStateLessThan(c, STOP)) {
  19. if (!completedAbruptly) {
  20. //如果没有设置核心线程数超时,那么如果worker的总数比核心线程数小,也需要添加worker
  21. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  22. if (min == 0 && ! workQueue.isEmpty())
  23. min = 1;
  24. if (workerCountOf(c) >= min)
  25. return;
  26. }
  27. //如果不是异常关闭,那么需要给线程池在添加一个worker
  28. addWorker(null, false);
  29. }
  30. }

getTask: 获取线程

  1. private Runnable getTask() {
  2. boolean timedOut = false;
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. //如果线程池状态为SHUTDOWN 或者 线程池状态为 STOP TIDYING TERMINATED并且阻塞队列为空
  7. // 此时递减worker的计数,并且返回空
  8. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  9. decrementWorkerCount();
  10. return null;
  11. }
  12. int wc = workerCountOf(c);
  13. //是否可以超时 允许核心线程超时终止 或者 线程总数已经大于核心线程数
  14. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  15. //线程总数大于池的最大线程数 这时候需要递减worker count 并且返回空Task
  16. //如果超时了, 阻塞队列为空,那么也递减workcount ,返回空Task
  17. //即线程池现无待执行的线程,或者线程池满无法执行
  18. if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
  19. if (compareAndDecrementWorkerCount(c)) return null;
  20. continue;
  21. }
  22. try {
  23. //如果是核心线程超时
  24. Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
  25. if (r != null)
  26. return r;
  27. timedOut = true;
  28. } catch (InterruptedException retry) {
  29. timedOut = false;
  30. }
  31. }
  32. }

runWorker:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. //获取worker中的线程,并且解锁worker对象
  4. Runnable task = w.firstTask;
  5. w.firstTask = null;
  6. w.unlock();
  7. boolean completedAbruptly = true;
  8. try {
  9. //当worker中task等于空时,试图从队列中获取一个任务线程
  10. while (task != null || (task = getTask()) != null) {
  11. //先锁定worker
  12. w.lock();
  13. //如果线程池处于停止中状态,确保线程中断, 如果不是确保线程不中断
  14. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
  15. wt.interrupt();
  16. try {
  17. //任务执行前的操作
  18. beforeExecute(wt, task);
  19. Throwable thrown = null;
  20. try {
  21. task.run();
  22. } catch (RuntimeException x) {
  23. thrown = x; throw x;
  24. } catch (Error x) {
  25. thrown = x; throw x;
  26. } catch (Throwable x) {
  27. thrown = x; throw new Error(x);
  28. } finally {
  29. //任务执行完之后的操作
  30. afterExecute(task, thrown);
  31. }
  32. } finally {
  33. task = null;
  34. w.completedTasks++;
  35. w.unlock();
  36. }
  37. }
  38. completedAbruptly = false;
  39. } finally {
  40. //如果执行失败,需要回滚数据
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

execute():执行线程

  1. public void execute(Runnable command) {
  2. if (command == null) throw new NullPointerException();
  3. int c = ctl.get();
  4. //如果工作的线程总数小于核心线程数,那么直接创建一个线程
  5. if (workerCountOf(c) < corePoolSize) {
  6. if (addWorker(command, true))
  7. return;
  8. c = ctl.get();
  9. }
  10. //如果线程总数大于核心线程数,那么添加进队列中
  11. if (isRunning(c) && workQueue.offer(command)) {
  12. int recheck = ctl.get();
  13. if (! isRunning(recheck) && remove(command))
  14. //如果线程池不是运行状态,并且移除队列中线程成功,那么执行阻塞策略
  15. reject(command);
  16. else if (workerCountOf(recheck) == 0)
  17. //如果工作线程的总数等于0,那么创建一个worker,传入的线程为null,会总动从阻塞队列中拿取线程
  18. addWorker(null, false);
  19. } else if (!addWorker(command, false))
  20. //如果阻塞队列无法添加,那么创建新的线程,数量取决于最大线程数
  21. reject(command);
  22. }

shutdown:关闭线程池,不再接受新的任务

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. //检查权限
  6. checkShutdownAccess();
  7. //变更状态
  8. advanceRunState(SHUTDOWN);
  9. //中断正在等待的线程,空值默认调用 onlyOne =false
  10. interruptIdleWorkers();
  11. //ScheduledThreadPoolExecutor 需要执行,清理队列
  12. onShutdown();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. //尝试停止线程池
  17. tryTerminate();
  18. }

shutdownNow:立即关闭 状态直接变更到STOP,返回队列中的线程列表

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. advanceRunState(STOP);
  8. interruptWorkers();
  9. tasks = drainQueue();
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. tryTerminate();
  14. return tasks;
  15. }

isShutdown: 是否shutdown 状态, 非RUNNING状态

  1. public boolean isShutdown() {
  2. return ! isRunning(ctl.get());
  3. }

isTerminating():是否停止中状态 ,即 SHUTDOWN ,STOP , TINDYING

  1. public boolean isTerminating() {
  2. int c = ctl.get();
  3. return ! isRunning(c) && runStateLessThan(c, TERMINATED);
  4. }

isTerminated: 是否TERMINATED状态

  1. public boolean isTerminated() {
  2. return runStateAtLeast(ctl.get(), TERMINATED);
  3. }

awaitTermination: 等待timeout时间,检查是否TERMINATED

  1. public boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. for (;;) {
  8. //如果已经停止,返回ture
  9. if (runStateAtLeast(ctl.get(), TERMINATED))
  10. return true;
  11. //如果已经超时,并且没有终止,返回false
  12. if (nanos <= 0)
  13. return false;
  14. //等待timeout,纳秒单位,返回的值为timeout剩余时间 继续循环
  15. nanos = termination.awaitNanos(nanos);
  16. }
  17. } finally {
  18. mainLock.unlock();
  19. }
  20. }

setCorePoolSize:设置核心线程数(修改)

  1. public void setCorePoolSize(int corePoolSize) {
  2. if (corePoolSize < 0) throw new IllegalArgumentException();
  3. //差值
  4. int delta = corePoolSize - this.corePoolSize;
  5. this.corePoolSize = corePoolSize;
  6. //如果线程总数大于核心线程数 收缩workerCount
  7. if (workerCountOf(ctl.get()) > corePoolSize)
  8. //中断Worker,worker在运行中时获取不到中断需要的锁,所以中断的是未使用的worker
  9. interruptIdleWorkers();
  10. else if (delta > 0) {
  11. //如果设置的核心线程数比之前的的大 并且 线程总数 小于核心线程数
  12. //添加worker ,delta如果大于阻塞队列长度,那么添加worker数位阻塞队列长度数量,因为addWorker(null,true)方法创建的worker取值阻塞队列中的线程
  13. //当阻塞队列为0时也就不会创建worker
  14. int k = Math.min(delta, workQueue.size());
  15. //循环创建worker
  16. while (k-- > 0 && addWorker(null, true)) {
  17. if (workQueue.isEmpty())
  18. break;
  19. }
  20. }
  21. }

prestartCoreThread:预启动一个核心线程

  1. public boolean prestartCoreThread() {
  2. return workerCountOf(ctl.get()) < corePoolSize &&
  3. addWorker(null, true);
  4. }

ensurePrestart:预启动一个核心线程,另外当核心线程数为0时也启动一个线程

  1. void ensurePrestart() {
  2. int wc = workerCountOf(ctl.get());
  3. if (wc < corePoolSize)
  4. addWorker(null, true);
  5. else if (wc == 0)
  6. //线程总数等于0 并且线程总数不小于核心线程数,说明核心线程数为0
  7. addWorker(null, false);
  8. }

prestartAllCoreThreads:预启动所有核心线程

  1. public int prestartAllCoreThreads() {
  2. int n = 0;
  3. while (addWorker(null, true))
  4. ++n;
  5. return n;
  6. }

allowCoreThreadTimeOut: 设置核心线程超时

  1. public void allowCoreThreadTimeOut(boolean value) {
  2. if (value && keepAliveTime <= 0)
  3. throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  4. if (value != allowCoreThreadTimeOut) {
  5. allowCoreThreadTimeOut = value;
  6. if (value)
  7. //中断未使用的worker
  8. interruptIdleWorkers();
  9. }
  10. }

setMaximumPoolSize: 设置最大线程数

  1. public void setMaximumPoolSize(int maximumPoolSize) {
  2. if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
  3. throw new IllegalArgumentException();
  4. this.maximumPoolSize = maximumPoolSize;
  5. //如果运行的线程总数大于最大线程数,那么需要中断未在运行的worker
  6. if (workerCountOf(ctl.get()) > maximumPoolSize)
  7. interruptIdleWorkers();
  8. }

setKeepAliveTime: 设置线程停止运行后的存活时间

  1. public void setKeepAliveTime(long time, TimeUnit unit) {
  2. if (time < 0)
  3. throw new IllegalArgumentException();
  4. if (time == 0 && allowsCoreThreadTimeOut())
  5. throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  6. long keepAliveTime = unit.toNanos(time);
  7. long delta = keepAliveTime - this.keepAliveTime;
  8. this.keepAliveTime = keepAliveTime;
  9. //如果时间是减小,中断未在运行的worker
  10. if (delta < 0)
  11. interruptIdleWorkers();
  12. }

purge:删除队列中所有的future任务

  1. public void purge() {
  2. final BlockingQueue<Runnable> q = workQueue;
  3. try {
  4. Iterator<Runnable> it = q.iterator();
  5. while (it.hasNext()) {
  6. Runnable r = it.next();
  7. if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
  8. it.remove();
  9. }
  10. } catch (ConcurrentModificationException fallThrough) {
  11. for (Object r : q.toArray())
  12. if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
  13. q.remove(r);
  14. }
  15. tryTerminate();
  16. }

getActiveCount: 统计运行的线程数

  1. public int getActiveCount() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. int n = 0;
  6. for (Worker w : workers)
  7. if (w.isLocked())
  8. ++n;
  9. return n;
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. }

getTaskCount:统计运行中和运行完成的线程总数

  1. public long getTaskCount() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. long n = completedTaskCount;
  6. for (Worker w : workers) {
  7. n += w.completedTasks;
  8. if (w.isLocked())
  9. ++n;
  10. }
  11. return n + workQueue.size();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. }

getCompletedTaskCount: 统计运行完成的线程总数

  1. public long getCompletedTaskCount() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. long n = completedTaskCount;
  6. for (Worker w : workers)
  7. n += w.completedTasks;
  8. return n;
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. }

5.内部类

静态内部类

阻塞策略: CallerRunsPolicy 被拒绝的任务回到主线程中执行

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. r.run();
  6. }
  7. }
  8. }

阻塞策略: AbortPolicy 被拒绝的任务 异常抛出

  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. throw new RejectedExecutionException("Task " + r.toString() +
  5. " rejected from " +
  6. e.toString());
  7. }
  8. }

阻塞策略: DiscardPolicy 被拒绝的任务 直接丢弃,不做任何处理

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. }
  5. }

阻塞策略:DiscardOldestPolicy 被拒绝的任务 移除阻塞队列中的头部任务,然后执行 被拒绝的任务

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. e.getQueue().poll();
  6. e.execute(r);
  7. }
  8. }
  9. }

内部类

Worker:

线程工作类,实现了Runable类,继承了抽象类 AbstractQueuedSynchronizer(FIFO队列,锁同步),work运行后,同时只有一个线程

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  2. private static final long serialVersionUID = 6138294804551838833L;
  3. //运行的线程
  4. final Thread thread;
  5. //初始任务
  6. Runnable firstTask;
  7. //线程任务计数器
  8. volatile long completedTasks;
  9. //Work 构造,根据新任务构建
  10. Worker(Runnable firstTask) {
  11. setState(-1); //禁止中断,直到worker开始运行
  12. this.firstTask = firstTask;
  13. this.thread = getThreadFactory().newThread(this);
  14. }
  15. //运行的主循环委托给外部
  16. public void run() {runWorker(this);}
  17. // state 0 代表无锁, 1代表已锁定
  18. protected boolean isHeldExclusively() {
  19. return getState() != 0;
  20. }
  21. //尝试获取锁
  22. protected boolean tryAcquire(int unused) {
  23. if (compareAndSetState(0, 1)) {
  24. setExclusiveOwnerThread(Thread.currentThread());
  25. return true;
  26. }
  27. return false;
  28. }
  29. //尝试释放锁
  30. protected boolean tryRelease(int unused) {
  31. setExclusiveOwnerThread(null);
  32. setState(0);
  33. return true;
  34. }
  35. public void lock() { acquire(1); }
  36. public boolean tryLock() { return tryAcquire(1); }
  37. public void unlock() { release(1); }
  38. public boolean isLocked() { return isHeldExclusively(); }
  39. void interruptIfStarted() {
  40. Thread t;
  41. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  42. try {
  43. t.interrupt();
  44. } catch (SecurityException ignore) {
  45. }
  46. }
  47. }
  48. }