Java并发编程系列34 | 深入理解线程池(下)

Java并发编程系列34 | 深入理解线程池(下)

原创 何适 java进阶架构师
来自专辑
进阶架构师 | 并发编程专题
★★★建议星标我们★★★
公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为星标”!这样才不会错过每日进阶架构文章呀。

Java并发编程系列34 | 深入理解线程池(下) - 图1
Java并发编程系列34 | 深入理解线程池(下) - 图2
2020年Java原创面试题库连载中
【000期】Java最全面试题库思维导图
【001期】JavaSE面试题(一):面向对象
【002期】JavaSE面试题(二):基本数据类型与访问修饰符
【003期】JavaSE面试题(三):JavaSE语法(1)
【004期】JavaSE面试题(四):JavaSE语法(3)
【005期】JavaSE面试题(五):String类
【006期】JavaSE面试题(六):泛型
【007期】JavaSE面试题(七):异常
【008期】JavaSE面试题(八):集合之List
【009期】JavaSE面试题(九):集合之Set
【010期】JavaSE面试题(十):集合之Map
【011期】JavaSE面试题(十一):多线程(1)
【012期】JavaSE面试题(十二):多线程(2)
【013期】JavaSE面试题(十三):多线程(3)
【014期】JavaSE面试题(十四):基本IO流
【015期】JavaSE面试题(十五):网络IO流
【016期】JavaSE面试题(十六):反射
【017期】JavaSE面试题(十七):JVM之内存模型
【018期】JavaSE面试题(十八):JVM之垃圾回收
【020期】JavaSE系列面试题汇总(共18篇)
【019期】JavaWeb面试题(一):JDBC
【021期】JavaWeb面试题(二):HTTP协议
【022期】JavaWeb面试题(三):Cookie和Session
【023期】JavaWeb面试题(四):JSP
【024期】JavaWeb面试题(五):Filter和Listener
【025期】Java工具面试题(一):版本控制工具
【026期】Java工具面试题(二):项目管理工具
【027期】Java设计模式面试题
【028期】JavaWeb系列面试题汇总(共10篇)
【029期】JavaEE面试题(一)Web应用服务器
【030期】JavaEE面试题(二)SpringMVC
【031期】JavaEE面试题(三)Spring(1)
【032期】JavaEE面试题(四)Spring(2)

【033期】JaveEE面试题(五)MyBatis
【034期】JavaEE面试题(六)Hibernate
【035期】JavaEE面试题(七)SpringBoot(1)
更多内容,点击上面蓝字查看Java并发编程系列34 | 深入理解线程池(下) - 图3

Java并发编程系列34 | 深入理解线程池(下) - 图4
本文是深入理解线程池下篇:

  1. 线程池介绍
  2. Executor框架接口
  3. 线程池状态
  4. 线程池参数
  5. 线程池创建
  6. 执行过程
  7. 关闭线程池
  8. 其他问题
  • 任务拒绝策略
  • 线程池中的线程初始化
  • 线程池容量的动态调整
  • 线程池的监控

    6. 执行过程

    这一节详细分析任务提交到线程池之后,线程池是如何处理和执行任务的。

    6.1 execute()方法

    execute()方法执行过程如下:
  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务,即使有空闲线程,也要创建一个新线程
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。Java并发编程系列34 | 深入理解线程池(下) - 图5

    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. int c = ctl.get();
    5. /*
    6. * 当前核心线程数小于corePoolSize,则新建一个线程放入线程池中。
    7. * 注意这里不管核心线程有没有空闲,都会创建线程
    8. */
    9. if (workerCountOf(c) < corePoolSize) {
    10. // 创建线程,并执行command。addWorker方法后面详细讲解。
    11. if (addWorker(command, true))
    12. return;
    13. c = ctl.get();// 如果添加失败,则重新获取ctl值
    14. }
    15. // 当前核心线程数大于等于corePoolSize,将任务添加到队列
    16. if (isRunning(c) && workQueue.offer(command)) {
    17. /*
    18. * 再次检查线程池的运行状态
    19. * 如果不是运行状态,将command从workQueue中移除,使用拒绝策略处理command
    20. */
    21. int recheck = ctl.get();
    22. if (!isRunning(recheck) && remove(command))
    23. reject(command);
    24. // 如果有效线程数为0,创建一个线程
    25. else if (workerCountOf(recheck) == 0)
    26. addWorker(null, false);
    27. }
    28. /*
    29. * 当前核心线程数大于等于corePoolSize,且workQueue队列添加任务失败,尝试创建maximumPoolSize中的线程来执行任务
    30. */
    31. else if (!addWorker(command, false))
    32. reject(command);
    33. }

    6.2 addWorker方法

    addWorkerr(Runnable firstTask, boolean core)方法的主要工作是在线程池中创建一个新的线程并执行:

  5. 增加线程数量ctl;

  6. 创建Worker对象来执行任务,每一个Worker对象都会创建一个线程;
  7. worker添加成功后,启动这个worker中的线程
  • 参数firstTask:这个新创建的线程需要第一个执行的任务;firstTask==null,表示创建线程,到workQueue中取任务执行;
  • 参数core:true代表使用corePoolSize作为创建线程的界限;false代表使用maximumPoolSize作为界限

    1. /**
    2. * 在线程池中创建一个新的线程并执行
    3. * @param firstTask 这个新创建的线程需要第一个执行的任务;firstTask==null,表示创建线程,到workQueue中取任务执行
    4. * @param core true代表使用corePoolSize作为创建线程的界限;false代表使用maximumPoolSize作为界限
    5. * @return
    6. */
    7. private boolean addWorker(Runnable firstTask, boolean core) {
    8. // 1. 增加线程数量ctl
    9. retry:
    10. for (;;) {
    11. int c = ctl.get();
    12. int rs = runStateOf(c);// 获取运行状态
    13. /*
    14. * 不能创建线程的几种情况:
    15. * 1. 线程池已关闭且rs == SHUTDOWN,不允许提交任务,且中断正在执行的任务
    16. * 2. 线程池已关闭且firstTask!=null,
    17. * 3. 线程池已关闭且workQueue为空
    18. */
    19. if (rs >= SHUTDOWN &&
    20. ! (rs == SHUTDOWN &&
    21. firstTask == null &&
    22. ! workQueue.isEmpty()))
    23. return false;
    24. for (;;) {
    25. int wc = workerCountOf(c);// 获取线程数
    26. // 判断线程数上限
    27. if (wc >= CAPACITY ||
    28. wc >= (core ? corePoolSize : maximumPoolSize))
    29. return false;
    30. // 尝试增加workerCount,如果成功,则跳出外层for循环
    31. if (compareAndIncrementWorkerCount(c))
    32. break retry;
    33. // CAS失败,循环尝试
    34. c = ctl.get();
    35. if (runStateOf(c) != rs)
    36. continue retry;
    37. }
    38. }
    39. boolean workerStarted = false;
    40. boolean workerAdded = false;
    41. Worker w = null;
    42. try {
    43. /*
    44. * 2. 创建Worker对象来执行任务,每一个Worker对象都会创建一个线程
    45. * Worker类下文详细讲解
    46. */
    47. w = new Worker(firstTask);
    48. final Thread t = w.thread;
    49. if (t != null) {
    50. final ReentrantLock mainLock = this.mainLock;
    51. mainLock.lock();
    52. try {
    53. int rs = runStateOf(ctl.get());
    54. /*
    55. * 判断状态:
    56. * 小于 SHUTTDOWN 那就是 RUNNING,最正常的情况
    57. * 等于 SHUTDOWN,不接受新的任务但是会继续执行等待队列中的任务,所以要求firstTask == null
    58. */
    59. if (rs < SHUTDOWN ||
    60. (rs == SHUTDOWN && firstTask == null)) {
    61. if (t.isAlive()) // precheck that t is startable
    62. throw new IllegalThreadStateException();
    63. workers.add(w);// 添加worker
    64. int s = workers.size();
    65. // largestPoolSize记录着线程池中出现过的最大线程数量
    66. if (s > largestPoolSize)
    67. largestPoolSize = s;
    68. workerAdded = true;
    69. }
    70. } finally {
    71. mainLock.unlock();
    72. }
    73. // 3. worker添加成功,启动这个worker中的线程
    74. if (workerAdded) {
    75. t.start();
    76. workerStarted = true;
    77. }
    78. }
    79. } finally {
    80. if (! workerStarted)
    81. addWorkerFailed(w);
    82. }
    83. return workerStarted;
    84. }

    6.3 Worker类

    线程池中的每一个线程被封装成一个Worker对象,线程池维护的其实就是一组Worker对象。

    1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    2. // 线程被封装成Worker
    3. final Thread thread;
    4. /*
    5. * 在创建线程的时候,如果同时指定的需要执行的第一个任务。
    6. * 可以为 null,线程自己到任务队列中取任务执行
    7. */
    8. Runnable firstTask;
    9. // 线程完成的任务数
    10. volatile long completedTasks;
    11. // Worker 只有这一个构造方法
    12. Worker(Runnable firstTask) {
    13. setState(-1); // inhibit interrupts until runWorker
    14. this.firstTask = firstTask;
    15. this.thread = getThreadFactory().newThread(this);// 调用 ThreadFactory 来创建一个新的线程
    16. }
    17. /**
    18. * worker工作,调用外部类的 runWorker 方法,循环等待队列中获取任务并执行,下文详细介绍
    19. */
    20. public void run() {
    21. runWorker(this);
    22. }
    23. // ... 其他几个方法用AQS及锁的操作,不关注
    24. }

    6.4 runWorker方法

    循环从等待队列中获取任务并执行:

  1. 获取到新任务就执行;
  2. 获取不到就阻塞等待新任务;
  3. 队列中没有任务或空闲线程超时,销毁线程。

    1. /**
    2. * 循环从等待队列中获取任务并执行
    3. */
    4. final void runWorker(Worker w) {
    5. Thread wt = Thread.currentThread();
    6. Runnable task = w.firstTask;
    7. w.firstTask = null;
    8. w.unlock(); // allow interrupts
    9. boolean completedAbruptly = true;
    10. try {
    11. /*
    12. * 循环调用 getTask() 获取任务,getTask()下文详细讲解
    13. * 获取到任务就执行,
    14. * 获取不到就阻塞等待新任务,
    15. * 返回null任务就销毁当前线程
    16. */
    17. while (task != null || (task = getTask()) != null) {
    18. w.lock();
    19. // 如果线程池状态大于等于 STOP,中断
    20. if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(
    21. ctl.get(), STOP))) && !wt.isInterrupted())
    22. wt.interrupt();
    23. try {
    24. beforeExecute(wt, task);// 钩子方法,留给需要的子类实现
    25. Throwable thrown = null;
    26. try {
    27. task.run();// 真正执行任务,执行execute()中传入任务的run方法
    28. } catch (RuntimeException x) {
    29. thrown = x;
    30. throw x;
    31. } catch (Error x) {
    32. thrown = x;
    33. throw x;
    34. } catch (Throwable x) {
    35. thrown = x;
    36. throw new Error(x);
    37. } finally {
    38. afterExecute(task, thrown);// 钩子方法,留给需要的子类实现
    39. }
    40. } finally {
    41. task = null;
    42. w.completedTasks++;
    43. w.unlock();
    44. }
    45. }
    46. completedAbruptly = false;
    47. } finally {
    48. // 如果到这里,需要销毁线程:
    49. // 1. getTask 返回 null退出while循环,队列中没有任务或空闲线程超时
    50. // 2. 任务执行过程中发生了异常
    51. processWorkerExit(w, completedAbruptly);
    52. }
    53. }

    6.5 getTask方法

    获取workQueue中的任务

  4. 正常情况,直接workQueue.take()获取到任务返回;

  5. workQueue中没有任务,当前线程阻塞直到获取到任务;
  6. getTask()返回 null, runWorker()方法会销毁当前线程,如下情况返回null:
  • 状态为SHUTDOWN && workQueue.isEmpty(),任务队列没有任务,且即将关闭线程池,销毁当前线程
  • 状态 >= STOP,关闭线程池,销毁当前线程
  • 当前线程数超过最大maximumPoolSize,销毁当前线程
  • 空闲线程超时keepAliveTime,需要销毁线程
    1. /**
    2. * 获取workQueue中的任务
    3. * 1. 正常情况,直接workQueue.take()获取到任务返回;
    4. * 2. workQueue中没有任务,当前线程阻塞直达获取到任务;
    5. * 3. getTask()返回 null, runWorker()方法会销毁当前线程,如下情况返回null:
    6. * 状态为SHUTDOWN && workQueue.isEmpty()
    7. * 状态 >= STOP
    8. * 当前线程数 wc > maximumPoolSize
    9. * 空闲线程超时keepAliveTime
    10. */
    11. private Runnable getTask() {
    12. boolean timedOut = false; // Did the last poll() time out?
    13. for (;;) {
    14. int c = ctl.get();
    15. int rs = runStateOf(c);
    16. /*
    17. * 两种返回null的情况:
    18. * 1. rs == SHUTDOWN && workQueue.isEmpty()
    19. * 2. rs >= STOP
    20. */
    21. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    22. decrementWorkerCount();// CAS 操作,减少工作线程数
    23. return null;
    24. }
    25. int wc = workerCountOf(c);
    26. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    27. /*
    28. * 两种返回null的情况:
    29. * 1. 当前线程数 wc > maximumPoolSize,return null
    30. * 2. 空闲线程超时,return null
    31. */
    32. if ((wc > maximumPoolSize || (timed && timedOut))
    33. && (wc > 1 || workQueue.isEmpty())) {
    34. if (compareAndDecrementWorkerCount(c))
    35. return null;
    36. continue;
    37. }
    38. /*
    39. * 到 workQueue 中获取任务并返回
    40. */
    41. try {
    42. Runnable r = timed ?
    43. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    44. workQueue.take();
    45. if (r != null)
    46. return r;
    47. timedOut = true;
    48. } catch (InterruptedException retry) {
    49. timedOut = false;
    50. }
    51. }
    52. }

    6.6 总结

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
  5. 线程池中的线程执行完当前任务后,会循环到任务队列中取任务继续执行;线程获取队列中任务时会阻塞,直到获取到任务返回;

Java并发编程系列34 | 深入理解线程池(下) - 图6Java并发编程系列34 | 深入理解线程池(下) - 图7

7. 关闭线程池

关闭线程池使用shutdown方法或shutdownNow方法,最终目的是将线程池状态设置成TERMINATED。

7.1 shutdown方法

shutdown方法过程:

  1. 将线程池切换到SHUTDOWN状态;
  2. 调用interruptIdleWorkers方法请求中断所有空闲的worker;
  3. 调用tryTerminate尝试结束线程池。

    1. public void shutdown() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. checkShutdownAccess();// 安全策略判断
    6. advanceRunState(SHUTDOWN);// CAS设置线程池状态为SHUTDOWN
    7. interruptIdleWorkers();// 中断空闲线程
    8. onShutdown(); // 钩子方法,用于ScheduledThreadPoolExecutor
    9. } finally {
    10. mainLock.unlock();
    11. }
    12. // 尝试结束线程池,下文详细讲解
    13. tryTerminate();
    14. }

    7.2 tryTerminate方法

    结束线程池,最终将线程池状态设置为TERMINATED。

    1. /**
    2. * 结束线程池,最终将线程池状态设置为TERMINATED
    3. */
    4. final void tryTerminate() {
    5. for (;;) {
    6. int c = ctl.get();
    7. /*
    8. * 当前线程池的状态为以下几种情况时,直接返回:
    9. * 1. RUNNING,因为还在运行中,不能停止;
    10. * 2. TIDYING或TERMINATED,已经关闭了;
    11. * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
    12. */
    13. if (isRunning(c) ||
    14. runStateAtLeast(c, TIDYING) ||
    15. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
    16. return;
    17. // 如果线程数量不为0,则中断一个空闲的工作线程,并返回
    18. if (workerCountOf(c) != 0) { // Eligible to terminate
    19. interruptIdleWorkers(ONLY_ONE);
    20. return;
    21. }
    22. final ReentrantLock mainLock = this.mainLock;
    23. mainLock.lock();
    24. try {
    25. // 尝试设置状态为TIDYING
    26. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    27. try {
    28. terminated();// 钩子方法,留给子类实现
    29. } finally {
    30. // 设置状态为TERMINATED
    31. ctl.set(ctlOf(TERMINATED, 0));
    32. termination.signalAll();
    33. }
    34. return;
    35. }
    36. } finally {
    37. mainLock.unlock();
    38. }
    39. // else retry on failed CAS
    40. }
    41. }

    7.3 shutdownNow方法

    shutdownNow方法过程:

  4. 将线程池切换到STOP状态;

  5. 中断所有工作线程,无论是否空闲;
  6. 取出阻塞队列中没有被执行的任务并返回;
  7. 调用tryTerminate尝试结束线程池。
    1. public List<Runnable> shutdownNow() {
    2. List<Runnable> tasks;
    3. final ReentrantLock mainLock = this.mainLock;
    4. mainLock.lock();
    5. try {
    6. checkShutdownAccess();// 安全策略判断
    7. advanceRunState(STOP);// CAS设置线程池状态为STOP
    8. interruptWorkers();// 中断所有工作线程,无论是否空闲
    9. tasks = drainQueue();// 取出阻塞队列中没有被执行的任务并返回
    10. } finally {
    11. mainLock.unlock();
    12. }
    13. tryTerminate();// 结束线程池,最终将线程池状态设置为TERMINATED
    14. return tasks;
    15. }

    shutdown方法 VS shutdownNow方法

    • shutdown方法设置线程池状态为SHUTDOWN,SHUTDOWN状态不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
    • shutdownNow方法设置线程池状态为STOP,STOP状态不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。

8. 其他问题

一些不需要长篇大论介绍的知识点,这里简单说下。

8.1 任务拒绝策略

构造线程时传入的 RejectedExecutionHandler 类型参数 handler 就是拒绝策略。
RejectedExecutionHandler只有一个钩子方法,执行拒绝策略。

  1. public interface RejectedExecutionHandler {
  2. void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
  3. }

ThreadPoolExecutor 中有四个已经定义好的RejectedExecutionHandler实现类可供我们直接使用。(我们也可以实现自己的策略)

  1. /**
  2. * 由提交任务的线程自己来执行这个任务
  3. */
  4. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  5. public CallerRunsPolicy() { }
  6. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  7. if (!e.isShutdown()) {
  8. r.run();
  9. }
  10. }
  11. }
  12. /**
  13. * 默认的策略:接抛出 RejectedExecutionException 异常
  14. */
  15. public static class AbortPolicy implements RejectedExecutionHandler {
  16. public AbortPolicy() { }
  17. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  18. throw new RejectedExecutionException("Task " + r.toString() +
  19. " rejected from " +
  20. e.toString());
  21. }
  22. }
  23. /**
  24. * 不做任何处理,直接忽略掉这个任务
  25. */
  26. public static class DiscardPolicy implements RejectedExecutionHandler {
  27. public DiscardPolicy() { }
  28. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  29. }
  30. }
  31. /**
  32. * 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
  33. */
  34. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  35. public DiscardOldestPolicy() { }
  36. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  37. if (!e.isShutdown()) {
  38. e.getQueue().poll();
  39. e.execute(r);
  40. }
  41. }
  42. }

8.2 线程池的监控

我们可以通过线程池提供的参数和方法对线程池进行监控:

Java并发编程系列34 | 深入理解线程池(下) - 图8

之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。

  • 《java面试宝典5.0》(初中级)
  • 《350道Java面试题:整理自100+公司》(中高级)
  • 《资深java面试宝典-视频版》(资深)
  • 《Java[BAT]面试必备》(资深)

分别适用于初中级,中高级资深级工程师的面试复习。
内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。
Java并发编程系列34 | 深入理解线程池(下) - 图9
获取方式:点“在看”,V信关注上述Java最全面试题库号并回复 【面试】即可领取,更多精彩陆续奉上。
Java并发编程系列34 | 深入理解线程池(下) - 图10
看到这里,证明有所收获
必须点个在看支持呀,喵