Java ThreadPoolExecutor

线程池的作用

● 利用线程池管理并复用线程、控制最大并发数等既然使用了线程池就需要确保线程池是在复用的,每次new一个线程池出来可能比不用线程池还糟糕。如果没有直接声明线程池而是使用其他人提供的类库来获得一个线程池,请务必查看源码,以确认线程池的实例化方式和配置是符合预期的。
● 实现任务线程队列缓存策略和拒绝机制。
● 实现某些与时间相关的功能,如定时执行、周期执行等
● 隔离线程环境比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔离开,避免各服务线程相互影响。
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序 都可以使用线程池。合理地使用线程池能够带来3个好处:
1、降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2、提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
3、提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的主要处理流程

2021-05-07-13-33-03-966222.png

接口定义和实现类

类型 名称 描述
接口 Executor 最上层的接口,定义了执行任务的方法execute
接口 ExecutorService 继承了Executor接口,拓展了CallableFuture、关闭方法
接口 ScheduledExecutorService 继承了ExecutorService,增加了定时任务相关方法
实现类 ThreadPoolExecutor 基础、标准的线程池实现
实现类 ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor,实现了ScheduledExecutorService中相关定时任务的方法

ThreadPoolExecutor 类图

2021-05-07-13-33-05-636628.png
java中的线程池都是基于ThreadPoolExecutor 来实现的。
可以认为ScheduledThreadPoolExecutor是最丰富的实现类。

ExecutorService 方法定义

  1. public interface ExecutorService extends Executor {
  2. /**
  3. * 在之前提交的,需要被执行的任务中,有序的进行关闭操作,并且此时不会再接受新的任务
  4. * 如果此时所有的任务已经关闭的话,那么就不会起到什么效果,因为已经没有任务可关闭了
  5. */
  6. void shutdown();
  7. /**
  8. * 尝试关闭所有正在执行的任务,并且中断正在等待要执行的任务,返回一个包含正在等待的任务的列表
  9. * @return
  10. */
  11. List<Runnable> shutdownNow();
  12. /**
  13. * 如果线程已经关闭了,就返回true
  14. * @return
  15. */
  16. boolean isShutdown();
  17. /**
  18. * 如果所有的线程任务已经关闭了,就返回true
  19. * @return
  20. */
  21. boolean isTerminated();
  22. /**
  23. * 只有当所有的任务都成功执行,否则会一直处于阻塞状态,只有当一下情况发生时,才会中断阻塞
  24. * 例如收到一个关闭的请求,或者超时发生、或者当前的线程被中断后
  25. * @param timeout
  26. * @param unit
  27. * @return
  28. * @throws InterruptedException
  29. */
  30. boolean awaitTermination(long timeout, TimeUnit unit)
  31. throws InterruptedException;
  32. /**
  33. * 提交一个需要返回结果的任务去执行,返回一个有结果的消息体,只有成功执行后,才会返回结果
  34. * @param task
  35. * @param <T>
  36. * @return
  37. */
  38. <T> Future<T> submit(Callable<T> task);
  39. /**
  40. * 只有当任务成功被执行后,才会返回给定的结果
  41. * @param task
  42. * @param result
  43. * @param <T>
  44. * @return
  45. */
  46. <T> Future<T> submit(Runnable task, T result);
  47. /**
  48. * 提交一个Runnable任务用于执行,和返回代表任务的Future。
  49. * Future的get方法成功执行后,返回null
  50. */
  51. Future<?> submit(Runnable task);
  52. /**
  53. * 提交一批任务,并返回一批任务的结果列表
  54. * @param tasks
  55. * @param <T>
  56. * @return
  57. * @throws InterruptedException
  58. */
  59. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  60. throws InterruptedException;
  61. /**
  62. * 执行给定的任务集合,执行完毕或者超时后,返回结果,其他任务终止
  63. *
  64. */
  65. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  66. long timeout, TimeUnit unit)
  67. throws InterruptedException;
  68. /**
  69. * 提交一批任务信息,当其中一个成功的执行,没有返回异常的时候,就返回结果
  70. * @param tasks
  71. * @param <T>
  72. * @return
  73. * @throws InterruptedException
  74. * @throws ExecutionException
  75. */
  76. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  77. throws InterruptedException, ExecutionException;
  78. /**
  79. * 执行给定的任务集合,任意一个执行成功或超时后,返回结果,其他任务终止
  80. */
  81. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  82. long timeout, TimeUnit unit)
  83. throws InterruptedException, ExecutionException, TimeoutException;
  84. }

ScheduledExecutorService

  1. public interface ScheduledExecutorService extends ExecutorService {
  2. //创建并执行一个一次性任务, 过了延迟时间就会被执行
  3. public ScheduledFuture<?> schedule(Runnable command,
  4. long delay, TimeUnit unit);
  5. //创建并执行一个一次性任务, 过了延迟时间就会被执行
  6. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  7. long delay, TimeUnit unit);
  8. //创建并执行一个周期性任务
  9. //过了给定的初始延迟时间,会第一次被执行
  10. //执行过程中发生了异常,那么任务就停止
  11. //一次任务 执行时长超过了周期时间,下一次任务会等到该次任务执行结束后,立刻执行,
  12. //这也是它和scheduleWithFixedDelay的重要区别
  13. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  14. long initialDelay,
  15. long period,
  16. TimeUnit unit);
  17. //创建并执行一个周期性任务
  18. //过了初始延迟时间,第一次被执行,后续以给定的周期时间执行
  19. //执行过程中发生了异常,那么任务就停止
  20. //一次任务执行时长超过了周期时间,下一次任务会在该次任务执行结束的时间基础上,计算执行延时。
  21. //对于超过周期的长时间处理任务的不同处理方式,这是它和scheduleAtFixedRate的重要区别。
  22. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  23. long initialDelay,
  24. long delay,
  25. TimeUnit unit);
  26. }

Executors工具类常用方法

可以自己实例化线程池,也可用Executors创建线程池的工厂类,推荐自己实例化线程池。
ExecutorService 的抽象类AbstractExecutorService提供了submit、invokeAll 等方法的实现,但是核心方法Executor.execute()并没有在这里实现。因为所有的任务都在该方法执行,不同实现会带来不同的执行策略。
通过Executors的静态工厂方法可以创建三个线程池的包装对象

  • ForkJoinPool
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

    Executors.newWorkStealingPool

    JDK8 引入,创建持有足够线程的线程池支持给定的并行度,并通过使用多个队列减少竞争,构造方法中把CPU数量设置为默认的并行度。返回ForkJoinPool ( JDK7引入)对象,它也是AbstractExecutorService 的子类

    1. public static ExecutorService newWorkStealingPool(int parallelism) {
    2. return new ForkJoinPool
    3. (parallelism,
    4. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    5. null, true);
    6. }

    Executors.newCachedThreadPool

    创建一个无界的缓冲线程池,它的任务队列是一个同步队列。任务加入到池中

  • 若池中有空闲线程,则用空闲线程执行

  • 若无,则创建新线程执行
  • 池中的线程空闲超过60秒,将被销毁。线程数随任务的多少变化。适用于执行耗时较小的异步任务。
  • 线程池的核心线程数=0最大线程数= Integer.MAX_ _VALUE
  • maximumPoolSize 最大可至Integer.MAX_VALUE,是高度可伸缩的线程池。若达到该上限,没有服务器能够继续工作,直接OOM。
  • keepAliveTime默认为60秒;
  • 工作线程处于空闲状态,则回收工作线程;如果任务数增加,再次创建出新线程处理任务。
    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }

    Executors.newScheduledThreadPool

    能定时执行任务的线程池。该池的核心线程数由参数指定,线程数最大至Integer.MAX_ VALUE,与上述一样存在OOM风险。ScheduledExecutorService接口的实现类,支持定时及周期性任务执行;相比TimerScheduledExecutorService 更安全,功能更强大。与newCachedThreadPool的区别是不回收工作线程。
    1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2. return new ScheduledThreadPoolExecutor(corePoolSize);
    3. }

    Executors.newSingleThreadExecutor

    创建一个单线程的线程池,相当于单线程串行执行所有任务,保证按任务的提交顺序依次执行。只有1个线程来执行无界任务队列的单-线程池。该线程池确保任务按加入的顺序一个一个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1)的区别在于,单线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。
    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }

    Executors.newFixedThreadPool

    创建一个固定大小任务队列容量无界的线程池,输入的参数即是固定线程数;既是核心线程数也是最大线程数;不存在空闲线程,所以keepAliveTime等于0。
    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }

    ThreadPoolExecutor 核心属性

    ```java // 状态控制属性:高3位表示线程池的运行状态,剩下的29位表示当前有效的线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务, // 即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于 // 线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, // 线程池会提前创建并启动所有基本线程。 private volatile int corePoolSize;

// 线程池线程最大数量,如果队列满了,并且已创建的线程数小于最大线程数, // 则线程池会再创建新的线程执行任务。如果使用了无界的任务队列这个参数就没什么效果。 private volatile int maximumPoolSize;

// 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。 private volatile ThreadFactory threadFactory;

// 饱和策略,默认情况下是AbortPolicy。 private volatile RejectedExecutionHandler handler;

// 线程池的工作线程空闲后,保持存活的时间。如果任务很多,并且每个任务执行的时间比较短, // 可以调大时间,提高线程的利用率。 private volatile long keepAliveTime;

// 用于保存等待执行的任务的阻塞队列 private final BlockingQueue workQueue;

// 存放工作线程的容器,必须获取到锁才能访问 private final HashSet workers = new HashSet();

// ctl的拆包和包装 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }

  1. :::info
  2. ctl状态控制属性,高3位表示线程池的运行状态(runState),剩下的29位表示当前有效的线程数量(workerCount)线程池最大线程数是(1 << COUNT_BITS) - 1 = 536 870 911
  3. :::
  4. ```java
  5. @Native public static final int SIZE = 32;
  6. private static final int COUNT_BITS = Integer.SIZE - 3;
  7. private static final int CAPACITY = (1 << COUNT_BITS) - 1;

线程池的运行状态runState
状态 解释
RUNNING 运行态,可处理新任务并执行队列中的任务
SHUTDOW 关闭态,不接受新任务,但处理队列中的任务
STOP 停止态,不接受新任务,不处理队列中任务,且打断运行中任务
TIDYING 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
TERMINATED 结束态,terminated() 方法已完成

2021-05-07-13-33-07-649700.png

RejectedExecutionHandler(拒绝策略)

  • AbortPolicy:直接抛出异常。
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。

    核心内部类 Worker

    1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    2. // 正在执行任务的线程
    3. final Thread thread;
    4. // 线程创建时初始化的任务
    5. Runnable firstTask;
    6. // 完成任务计数器
    7. volatile long completedTasks;
    8. Worker(Runnable firstTask) {
    9. // 在runWorker方法运行之前禁止中断,要中断线程必须先获取worker内部的互斥锁
    10. setState(-1); // inhibit interrupts until runWorker
    11. this.firstTask = firstTask;
    12. this.thread = getThreadFactory().newThread(this);
    13. }
    14. /** delegates main run loop to outer runworker */
    15. // 直接委托给外部runworker方法
    16. public void run() {
    17. runWorker(this);
    18. }
    19. ...
    20. }

    Worker 类将执行任务的线程封装到了内部,在初始化Worker 的时候,会调用ThreadFactory初始化新线程;Worker 继承了AbstractQueuedSynchronizer,在内部实现了一个互斥锁,主要目的是控制工作线程的中断状态。
    线程的中断一般是由其他线程发起的,比如ThreadPoolExecutor#interruptIdleWorkers(boolean)方法,它在调用过程中会去中断worker内部的工作线程,Work的互斥锁可以保证正在执行的任务不被打断。它是怎么保证的呢?在线程真正执行任务的时候,也就是runWorker方法被调用时,它会先获取到Work的锁,当在其他线程需要中断当前线程时也需要获取到work的互斥锁,否则不能中断。

    构造函数

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler) {
    8. if (corePoolSize < 0 ||
    9. maximumPoolSize <= 0 ||
    10. maximumPoolSize < corePoolSize ||
    11. keepAliveTime < 0)
    12. throw new IllegalArgumentException();
    13. if (workQueue == null || threadFactory == null || handler == null)
    14. throw new NullPointerException();
    15. this.acc = System.getSecurityManager() == null ?
    16. null :
    17. AccessController.getContext();
    18. this.corePoolSize = corePoolSize;
    19. this.maximumPoolSize = maximumPoolSize;
    20. this.workQueue = workQueue;
    21. this.keepAliveTime = unit.toNanos(keepAliveTime);
    22. this.threadFactory = threadFactory;
    23. this.handler = handler;
    24. }

    通过构造函数可以发现,构造函数就是在对线程池核心属性进行赋值,下面来介绍一下这些核心属性:

  • corePoolSize:核心线程数

  • maximumPoolSize:线程池最大数量
  • keepAliveTime:线程池的工作线程空闲后,保持存活的时间。
  • unit:线程活动保持时间的单位。
  • workQueue:用于保存等待执行的任务的阻塞队列,具体可以参考JAVA并发容器-阻塞队列
  • threadFactory:用于设置创建线程的工厂
  • handler:饱和策略,默认情况下是AbortPolicy

    execute() 提交线程

    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. // 获取控制的值
    5. int c = ctl.get();
    6. // 判断工作线程数是否小于corePoolSize
    7. if (workerCountOf(c) < corePoolSize) {
    8. // 新创建核心线程
    9. if (addWorker(command, true))
    10. return;
    11. c = ctl.get();
    12. }
    13. // 工作线程数大于或等于corePoolSize
    14. // 判断线程池是否处于运行状态,如果是将任务command入队
    15. if (isRunning(c) && workQueue.offer(command)) {
    16. int recheck = ctl.get();
    17. // 再次检查线程池的运行状态,如果不在运行中,那么将任务从队列里面删除,并尝试结束线程池
    18. if (! isRunning(recheck) && remove(command))
    19. // 调用驱逐策略
    20. reject(command);
    21. // 检查活跃线程总数是否为0
    22. else if (workerCountOf(recheck) == 0)
    23. // 新创建非核心线程
    24. addWorker(null, false);
    25. }
    26. // 队列满了,新创建非核心线程
    27. else if (!addWorker(command, false))
    28. // 调用驱逐策略
    29. reject(command);
    30. }

    该方法是没有返回值的

    addWorker() 新创建线程

    1. private boolean addWorker(Runnable firstTask, boolean core) {
    2. retry:
    3. for (;;) {
    4. int c = ctl.get();
    5. int rs = runStateOf(c);
    6. // 仅在必要的时候检查队列是否为NULL
    7. // 检查队列是否处于非运行状态
    8. if (rs >= SHUTDOWN &&
    9. ! (rs == SHUTDOWN &&
    10. firstTask == null &&
    11. ! workQueue.isEmpty()))
    12. return false;
    13. for (;;) {
    14. // 获取活跃线程数
    15. int wc = workerCountOf(c);
    16. // 判断线程是否超过最大值,当队列满了则验证线程数是否大于maximumPoolSize,
    17. // 没有满则验证corePoolSize
    18. if (wc >= CAPACITY ||
    19. wc >= (core ? corePoolSize : maximumPoolSize))
    20. return false;
    21. // 增加活跃线程总数,否则重试
    22. if (compareAndIncrementWorkerCount(c))
    23. // 如果成功跳出外层循环
    24. break retry;
    25. c = ctl.get(); // Re-read ctl
    26. // 再次校验一下线程池运行状态
    27. if (runStateOf(c) != rs)
    28. continue retry;
    29. // else CAS failed due to workerCount change; retry inner loop
    30. }
    31. }
    32. // 工作线程是否启动
    33. boolean workerStarted = false;
    34. // 工作线程是否创建
    35. boolean workerAdded = false;
    36. Worker w = null;
    37. try {
    38. // 新创建线程
    39. w = new Worker(firstTask);
    40. // 获取新创建的线程
    41. final Thread t = w.thread;
    42. if (t != null) {
    43. // 创建线程要获得全局锁
    44. final ReentrantLock mainLock = this.mainLock;
    45. mainLock.lock();
    46. try {
    47. // Recheck while holding lock.
    48. // Back out on ThreadFactory failure or if
    49. // shut down before lock acquired.
    50. int rs = runStateOf(ctl.get());
    51. // 检查线程池的运行状态
    52. if (rs < SHUTDOWN ||
    53. (rs == SHUTDOWN && firstTask == null)) {
    54. // 检查线程的状态
    55. if (t.isAlive()) // precheck that t is startable
    56. throw new IllegalThreadStateException();
    57. // 将新建工作线程存放到容器
    58. workers.add(w);
    59. int s = workers.size();
    60. if (s > largestPoolSize) {
    61. // 跟踪线程池最大的工作线程总数
    62. largestPoolSize = s;
    63. }
    64. workerAdded = true;
    65. }
    66. } finally {
    67. mainLock.unlock();
    68. }
    69. // 启动工作线程
    70. if (workerAdded) {
    71. t.start();
    72. workerStarted = true;
    73. }
    74. }
    75. } finally {
    76. if (! workerStarted)
    77. // 启动新的工作线程失败,
    78. // 1\. 将工作线程移除workers容器
    79. // 2\. 还原工作线程总数(workerCount)
    80. // 3\. 尝试结束线程
    81. addWorkerFailed(w);
    82. }
    83. return workerStarted;
    84. }

    如果启动新线程失败那么addWorkerFailed()这个方法将做以下三件事:

  • 1、将工作线程移除workers容器

  • 2、还原工作线程总数(workerCount)
  • 3、尝试结束线程

    execute() 执行过程

    2021-05-07-13-33-09-727760.png
    1、如果当前运行的线程少于corePoolSize,即使有空闲线程也会创建新线程来执行任务,(注意,执行这一步骤 需要获取全局锁)。如果调用了线程池的restartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。
    2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
    3、如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执 行这一步骤需要获取全局锁)。
    4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。

    线程任务的执行

    线程的正在执行是ThreadPoolExecutor.Worker#run()方法,但是这个方法直接委托给了外部的runWorker()方法,源码如下:

    1. // 直接委托给外部runworker方法
    2. public void run() {
    3. runWorker(this);
    4. }

    runWorker() 执行任务

    1. final void runWorker(Worker w) {
    2. // 当前Work中的工作线程
    3. Thread wt = Thread.currentThread();
    4. // 获取初始任务
    5. Runnable task = w.firstTask;
    6. // 初始任务置NULL(表示不是建线程)
    7. w.firstTask = null;
    8. // 修改锁的状态,使需发起中断的线程可以获取到锁(使工作线程可以响应中断)
    9. w.unlock(); // allow interrupts
    10. // 工作线程是否是异常结束
    11. boolean completedAbruptly = true;
    12. try {
    13. // 循环的从队列里面获取任务
    14. while (task != null || (task = getTask()) != null) {
    15. // 每次执行任务时需要获取到内置的互斥锁
    16. w.lock();
    17. // 1\. 当前工作线程不是中断状态,且线程池是STOP,TIDYING,TERMINATED状态,需要中断当前工作线程
    18. // 2\. 当前工作线程是中断状态,且线程池是STOP,TIDYING,TERMINATED状态,需要中断当前工作线程
    19. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
    20. && !wt.isInterrupted())
    21. // 中断线程,中断标志位设置成true
    22. wt.interrupt();
    23. try {
    24. // 执行任务前置方法,扩展用
    25. beforeExecute(wt, task);
    26. Throwable thrown = null;
    27. try {
    28. // 执行任务
    29. task.run();
    30. } catch (RuntimeException x) {
    31. thrown = x; throw x;
    32. } catch (Error x) {
    33. thrown = x; throw x;
    34. } catch (Throwable x) {
    35. thrown = x; throw new Error(x);
    36. } finally {
    37. // 执行任务后置方法,扩展用
    38. afterExecute(task, thrown);
    39. }
    40. } finally {
    41. // 任务NULL表示已经处理了
    42. task = null;
    43. w.completedTasks++;
    44. w.unlock();
    45. }
    46. }
    47. completedAbruptly = false;
    48. } finally {
    49. // 将工作线程从容器中剔除
    50. processWorkerExit(w, completedAbruptly);
    51. }
    52. }

    正在执行线程的方法,执行流程:
    1、获取到当前的工作线程
    2、获取初始化的线程任务
    3、修改锁的状态,使工作线程可以响应中断
    4、获取工作线程的锁(保证在任务执行过程中工作线程不被外部线程中断),如果获取到的任务是NULL,则结束当前工作线程
    5、判断先测试状态,看是否需要中断当前工作线程
    6、执行任务前置方法beforeExecute(wt, task);
    7、执行任务(执行提交到线程池的线程)task.run();
    8、执行任务后置方法afterExecute(task, thrown);,处理异常信息
    9、修改完成任务的总数
    10、解除当前工作线程的锁
    11、获取队列里面的任务,循环第4步
    12、将工作线程从容器中剔除
    wt.isInterrupted():获取中断状态,无副作用
    Thread.interrupted():获取中断状态,并将中断状态恢重置成false(不中断)
    beforeExecute(wt, task):执行任务前置方法,扩展用。如果这个方法在执行过程中抛出异常,那么会导致当前工作线程直接死亡而被回收,工作线程异常结束标记位completedAbruptly被设置成true,任务线程不能被执行
    task.run():执行任务
    afterExecute(task, thrown):执行任务后置方法,扩展用。这个方法可以收集到任务运行的异常信息,这个方法如果有异常抛出,也会导致当前工作线程直接死亡而被回收,工作线程异常结束标记位completedAbruptly被设置成true
    任务运行过程中的异常信息除了RuntimeException以外,其他全部封装成Error,然后被afterExecute方法收集
    terminated()这也是一个扩展方法,在线程池结束的时候调用

    getTask() 获取任务

    1. private Runnable getTask() {
    2. // 记录最后一次获取任务是不是超时了
    3. boolean timedOut = false; // Did the last poll() time out?
    4. for (;;) {
    5. int c = ctl.get();
    6. // 获取线程池状态
    7. int rs = runStateOf(c);
    8. // 线程池是停止状态或者状态是关闭并且队列为空
    9. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    10. // 扣减工作线程总数
    11. decrementWorkerCount();
    12. return null;
    13. }
    14. // 获取工作线程总数
    15. int wc = workerCountOf(c);
    16. // 工作线程是否需要剔除
    17. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    18. if ((wc > maximumPoolSize || (timed && timedOut))
    19. && (wc > 1 || workQueue.isEmpty())) {
    20. // 扣减工作线程总数
    21. if (compareAndDecrementWorkerCount(c))
    22. // 剔除工作线程,当返回为NULL的时候,runWorker方法的while循环会结束
    23. return null;
    24. continue;
    25. }
    26. try {
    27. Runnable r = timed ?
    28. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    29. workQueue.take();
    30. if (r != null)
    31. return r;
    32. timedOut = true;
    33. } catch (InterruptedException retry) {
    34. timedOut = false;
    35. }
    36. }
    37. }

    getTask() 阻塞或定时获取任务。当该方法返回NULL时,当前工作线程会结束,最后被回收,下面是返回NULL的几种情况:
    1、当前工作线程总数wc大于maximumPoolSize最大工作线程总数。maximumPoolSize可能被setMaximumPoolSize方法改变。
    2、当线程池处于停止状态时。
    3、当线程池处于关闭状态且阻塞队列为空。
    4、当前工作线程超时等待任务,并且当前工作线程总数wc大于corePoolSize或者allowCoreThreadTimeOut=true允许核心线程超时被回收,默认是false。

    processWorkerExit() 工作线程结束

    1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
    2. // 判断是否是异常情况导致工作线程被回收
    3. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    4. // 如果是扣减工作线程总数,如果不是在getTask()方法就已经扣减了
    5. decrementWorkerCount();
    6. final ReentrantLock mainLock = this.mainLock;
    7. mainLock.lock();
    8. try {
    9. // 将当前工作线程完成任务的总数加到completedTaskCount标志位上
    10. completedTaskCount += w.completedTasks;
    11. // 剔除当前工作线程
    12. workers.remove(w);
    13. } finally {
    14. mainLock.unlock();
    15. }
    16. // 尝试结束线程池
    17. tryTerminate();
    18. // 判刑是否需要新实例化工程线程
    19. int c = ctl.get();
    20. if (runStateLessThan(c, STOP)) {
    21. if (!completedAbruptly) {
    22. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    23. if (min == 0 && ! workQueue.isEmpty())
    24. min = 1;
    25. if (workerCountOf(c) >= min)
    26. return; // replacement not needed
    27. }
    28. addWorker(null, false);
    29. }
    30. }

    剔除线程流程:
    1、判断是否是异常情况导致工作线程被回收,如果是workerCount--
    2、获取到全局锁
    3、将当前工作线程完成任务的总数加到completedTaskCount标志位上
    4、剔除工作线程
    5、解锁
    6、尝试结束线程池tryTerminate()
    7、判刑是否需要重新实例化工程线程放到workers容器

    结束线程池

    shutdown() 关闭线程池

    1. public void shutdown() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. // 检查权限
    6. checkShutdownAccess();
    7. // 设置线程池状态为关闭
    8. advanceRunState(SHUTDOWN);
    9. // 中断线程
    10. interruptIdleWorkers();
    11. // 扩展方法
    12. onShutdown(); // hook for ScheduledThreadPoolExecutor
    13. } finally {
    14. mainLock.unlock();
    15. }
    16. // 尝试结束线池
    17. tryTerminate();
    18. }

    1、通过遍历工作线程容器workers,然后逐个中断工作线程,如果无法响应中断的任务可能永远无法终止。
    2、shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

    shutdownNow() 关闭线程池

    1. public List<Runnable> shutdownNow() {
    2. List<Runnable> tasks;
    3. final ReentrantLock mainLock = this.mainLock;
    4. mainLock.lock();
    5. try {
    6. // 检查权限
    7. checkShutdownAccess();
    8. // 设置线程池状态为停止状态
    9. advanceRunState(STOP);
    10. // 中断线程
    11. interruptIdleWorkers();
    12. // 将所有任务移动到list容器
    13. tasks = drainQueue();
    14. } finally {
    15. mainLock.unlock();
    16. }
    17. // 尝试结束线池
    18. tryTerminate();
    19. // 返回所有未执行的任务
    20. return tasks;
    21. }

    1、通过遍历工作线程容器workers,然后逐个中断工作线程,如果无法响应中断的任务可能永远无法终止。
    2、shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

    tryTerminate() 尝试结束线程池

    1. final void tryTerminate() {
    2. for (;;) {
    3. int c = ctl.get();
    4. // 判断是否在运行中,如果是直接返回
    5. if (isRunning(c) ||
    6. // 判断是否进入整理状态,如果进入了直接返回
    7. runStateAtLeast(c, TIDYING) ||
    8. // 如果是状态是关闭并且队列非空,也直接返回(关闭状态需要等到队列里面的线程处理完)
    9. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
    10. return;
    11. // 判断工作线程是否都关闭了
    12. if (workerCountOf(c) != 0) { // Eligible to terminate
    13. // 中断空闲线程
    14. interruptIdleWorkers(ONLY_ONE);
    15. return;
    16. }
    17. final ReentrantLock mainLock = this.mainLock;
    18. mainLock.lock();
    19. try {
    20. // 将状态替换成整理状态
    21. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    22. try {
    23. // 整理发放执行
    24. terminated();
    25. } finally {
    26. // 状态替换成结束状态
    27. ctl.set(ctlOf(TERMINATED, 0));
    28. termination.signalAll();
    29. }
    30. return;
    31. }
    32. } finally {
    33. mainLock.unlock();
    34. }
    35. // else retry on failed CAS
    36. }
    37. }

    结束线程池大致流程为:
    1、判断是否在运行中,如果是则不结束线程
    2、判断是否进入整理状态,如果是也不用执行后面内容了
    3、判断如果线程池是关闭状态并且队列非空,则不结束线程池(关闭状态需要等到队列里面的线程处理完)
    4、判断工作线程是否都关闭了,如果没有就发起中断工作线程的请求
    5、获取全局锁将线程池状态替换成整理状态
    6、调用terminated();扩展方法(这也是一个扩展方法,在线程池结束的时候调用)
    7、将线程池状态替换成结束状态
    8、解除全局锁

    注意:
  • 1、可以通过的shutdownshutdownNow方法来结束线程池。他们都是通过遍历工作线程容器,然后逐个中断工作线程,所以无法响应中断的任务 可能永远无法终止。

  • 2、shutdownshutdownNow的区别在于:shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;而 shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
  • 3、只要调用了shutdownshutdownNow那么isShutdown方法就会返回true
  • 4、当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true

    线程池的监控

    通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecuteafterExecuteterminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。

    getTaskCount()

    1. public long getTaskCount() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. long n = completedTaskCount;
    6. for (Worker w : workers) {
    7. n += w.completedTasks;
    8. if (w.isLocked())
    9. ++n;
    10. }
    11. return n + workQueue.size();
    12. } finally {
    13. mainLock.unlock();
    14. }
    15. }
    获取线程池需要执行的任务数量。总数=已经结束线工作程完成的任务数(completedTaskCount) + 还未结束线程工作线程完成的任务数(w.completedTasks)+正在执行的任务数(w.isLocked())+还未执行的任务数(workQueue.size())

    getCompletedTaskCount()

    1. public long getCompletedTaskCount() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. long n = completedTaskCount;
    6. for (Worker w : workers)
    7. n += w.completedTasks;
    8. return n;
    9. } finally {
    10. mainLock.unlock();
    11. }
    12. }
    获取线程池在运行过程中已完成的任务数量。总数=已经结束线工作程完成的任务数(completedTaskCount) + 还未结束线程工作线程完成的任务数(w.completedTasks)

    getLargestPoolSize()

    1. public int getLargestPoolSize() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. return largestPoolSize;
    6. } finally {
    7. mainLock.unlock();
    8. }
    9. }
    获取线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。

    getPoolSize()

    1. public int getPoolSize() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. // Remove rare and surprising possibility of
    6. // isTerminated() && getPoolSize() > 0
    7. return runStateAtLeast(ctl.get(), TIDYING) ? 0
    8. : workers.size();
    9. } finally {
    10. mainLock.unlock();
    11. }
    12. }
    获取线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。

    getActiveCount()

    1. public int getActiveCount() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. int n = 0;
    6. for (Worker w : workers)
    7. if (w.isLocked())
    8. ++n;
    9. return n;
    10. } finally {
    11. mainLock.unlock();
    12. }
    13. }

获取活动的线程数。

合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。

  • 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能 执行。
  • 可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
  • 建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。无界队列在某些异常情况下可能会撑爆内存。

N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。