1、继承结构

image.png

2、线程池状态及线程数

线程池使用一个整数ctl表示线程池状态和线程数,高3位表示线程池的运行状态,除去高3位的低位表示当前线程池中拥有的线程数量

线程池状态 说明 状态转换 二进制
RUNNING 能够接收新任务,以及对已添加的任务进行处理 线程池的初始化状态是RUNNING -1左移29位 11100000 00000000 00000000 00000000
SHUTDOWN 不接收新任务,但能处理已添加的任务 调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN 0 左移29位 00000000 00000000 00000000 00000000
STOP 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务 调用线程池的shutdownNow()接口时,线程池由(RUNNING 或者 SHUTDOWN ) -> STOP 1 左移29位 00100000 00000000 00000000 00000000
TIDYING 当所有的任务已终止,CTL记录线程数为0
tryTerminate方法中如果线程池状态大于等于STOP或者线程池状态等于SHUTDOWN 且任务队列为空,并且线程池线程数为0,则将线程池状态修改为TIDYING 2 左移29位 01000000 00000000 00000000 00000000
TERMINATED 线程池彻底终止 线程池状态设置为TIDYING后,会执行terminated钩子方法,然后将线程池状态设置为
TERMINATED
3 左移29位 01100000 00000000 00000000 00000000

关于TIDYING状态详细说明

线程池修改为TIDYING状态的两种情况:
1、执行shutdown()或者shutdownNow()方法的外部线程修改,shutdown()方法中断所有空闲线程,shutdownnow()中断所有线程,之后执行tryTerminate 方法,如果所有工作线程被唤醒后将线程池线程数减1,则线程池线程数为0,外部线程抢到锁,设置线程池状态为TIDYING
2、执行shutdown()或者shutdownNow()方法的外部线程在执行tryTerminate方法时,线程池线程数不为0,之后唤醒的空闲线程执行tryTerminate方法时抢锁,设置线程池状态为TIDYING


通过一些位运算能够计算出线程的线程数和状态,并且还提供了通过线程数和状态计算ctl的方法

  1. /**
  2. * 高3位表示线程池的运行状态
  3. * 除去高3位的低位: 表示当前线程池中拥有的线程数量
  4. */
  5. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  6. /**
  7. * 表示在ctl中,低COUNT_BITS位为存放线程数量的位
  8. */
  9. private static final int COUNT_BITS = Integer.SIZE - 3;
  10. /**
  11. * 用于计算线程池状态的mask
  12. * 00011111 11111111 11111111 11111111
  13. */
  14. private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
  15. /**
  16. * 11100000 00000000 00000000 00000000
  17. * 运行中
  18. */
  19. private static final int RUNNING = -1 << COUNT_BITS;
  20. /**
  21. * 00000000 00000000 00000000 00000000
  22. * 关闭 不接收新任务,但能处理已添加的任务*/
  23. private static final int SHUTDOWN = 0 << COUNT_BITS;
  24. /**
  25. * 00100000 00000000 00000000 00000000
  26. * 停止 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
  27. */
  28. private static final int STOP = 1 << COUNT_BITS;
  29. /**
  30. * 01000000 00000000 00000000 00000000
  31. * 所有的任务已终止
  32. */
  33. private static final int TIDYING = 2 << COUNT_BITS;
  34. /**
  35. * 01100000 00000000 00000000 00000000
  36. * 线程池彻底终止
  37. */
  38. private static final int TERMINATED = 3 << COUNT_BITS;
  39. /**
  40. * 获取线程池当前运行状态
  41. * COUNT_MASK : 00011111 11111111 11111111 11111111
  42. * ~COUNT_MASK : 11100000 00000000 00000000 00000000
  43. * @param c ctl
  44. * @return
  45. */
  46. private static int runStateOf(int c) { return c & ~COUNT_MASK; }
  47. /**
  48. * 获取线程池线程数量
  49. * COUNT_MASK : 00011111 11111111 11111111 11111111
  50. * @param c ctl
  51. * @return
  52. */
  53. private static int workerCountOf(int c) { return c & COUNT_MASK; }
  54. /**
  55. * 重置当前线程池的ctl值
  56. * rs 11100000 00000000 00000000 00000000 运行中
  57. * wc 00000000 00000000 00000000 00001010 10个线程
  58. * ctl 11100000 00000000 00000000 00001010
  59. * @param rs 线程池状态
  60. * @param wc 线程池线程数量
  61. * @return
  62. */
  63. private static int ctlOf(int rs, int wc) { return rs | wc; }
  64. /**
  65. * 比较当前线程池ctl的状态是否小于某个状态s
  66. * 注:任何情况下 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
  67. * @param c ctl
  68. * @param s 传入状态
  69. * @return
  70. */
  71. private static boolean runStateLessThan(int c, int s) {
  72. return c < s;
  73. }
  74. /**
  75. * 比较当前线程池ctl的状态是否大于等于某个状态s
  76. * @param c ctl
  77. * @param s 传入状态
  78. * @return
  79. */
  80. private static boolean runStateAtLeast(int c, int s) {
  81. return c >= s;
  82. }
  83. /**
  84. * 是否是RUNNING状态
  85. * @param c ctl
  86. * @param s 传入状态
  87. * @return
  88. */
  89. private static boolean isRunning(int c) {
  90. return c < SHUTDOWN;
  91. }
  92. /**
  93. * 使用CAS增加线程数量
  94. */
  95. private boolean compareAndIncrementWorkerCount(int expect) {
  96. return ctl.compareAndSet(expect, expect + 1);
  97. }
  98. /**
  99. * 使用CAS减少线程数量
  100. */
  101. private boolean compareAndDecrementWorkerCount(int expect) {
  102. return ctl.compareAndSet(expect, expect - 1);
  103. }
  104. /**
  105. * 将线程数量-1 一定成功 底层用while(true)一直重试
  106. */
  107. private void decrementWorkerCount() {
  108. ctl.addAndGet(-1);
  109. }

3、属性说明

  1. /**
  2. * 任务缓存队列 线程池中的线程达到 核心线程数时,提交的任务就会提交到缓存队列,缓存队列满了就会创建线程数到最大线程数
  3. * 一般为 :
  4. * LinkedBlockingQueue 无界队列
  5. * ArrayBlockingQueue 有界队列
  6. */
  7. private final BlockingQueue<Runnable> workQueue;
  8. /**
  9. * 线程池全局锁 增加减少线程时,修改线程池运行状态需要获取锁
  10. */
  11. private final ReentrantLock mainLock = new ReentrantLock();
  12. /**
  13. * 线程池中真正存放工作线程的位置
  14. */
  15. private final HashSet<Worker> workers = new HashSet<>();
  16. /**
  17. * 当外部线程调用awaitTermination()方法时,外部线程会等待当前线程池为Termination为止
  18. *
  19. * termination.await()会将调用线程阻塞
  20. * termination.signalAll()会将阻塞线程依次唤醒
  21. *
  22. * 实现原理: 将外部线程封装为WaitNode放入Condition队列中,外部线程会被park 阻塞
  23. * 当线程池处于Termination状态时,通过unpark将其唤醒
  24. */
  25. private final Condition termination = mainLock.newCondition();
  26. /**
  27. * 记录线程池生命周期内线程数最大值
  28. */
  29. private int largestPoolSize;
  30. /**
  31. * 记录线程池完成任务总数
  32. */
  33. private long completedTaskCount;
  34. /**
  35. * 创建线程使用线程工厂 自定义线程名
  36. */
  37. private volatile ThreadFactory threadFactory;
  38. /**
  39. * 拒绝策略 默认AbortPolicy
  40. * AbortPolicy:不处理,直接抛出异常。
  41. * CallerRunsPolicy:若线程池还没关闭,调用当前所在线程来运行任务,r.run()执行。
  42. * DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。
  43. * DiscardPolicy:不处理,丢弃掉,不抛出异常。
  44. */
  45. private volatile RejectedExecutionHandler handler;
  46. /**
  47. * 空闲线程存活时间
  48. * allowCoreThreadTimeOut==true时 核心线程数量内线程也会被回收
  49. * allowCoreThreadTimeOut==false时 会维持核心线程数量内线程存活
  50. */
  51. private volatile long keepAliveTime;
  52. /**
  53. * 控制核心线程数内的线程是否可以被回收
  54. * true 可以 false 不可以
  55. */
  56. private volatile boolean allowCoreThreadTimeOut;
  57. /**
  58. * 核心线程数量
  59. */
  60. private volatile int corePoolSize;
  61. /**
  62. * 最大线程数量
  63. */
  64. private volatile int maximumPoolSize;
  65. /**
  66. * 默认拒绝策略
  67. */
  68. private static final RejectedExecutionHandler defaultHandler =
  69. new AbortPolicy();

4、工作线程Worker

  1. /**
  2. * Worker使用了AQS的独占模式
  3. * 独占模式两个重要属性: state 和 ExclusiveOwnerThread
  4. * state:
  5. * state==0 未被占用
  6. * state>0 被占用
  7. * state<0 初始化状态 该状态不能抢锁
  8. * ExclusiveOwnerThread:表示独占锁的线程
  9. */
  10. private final class Worker
  11. extends AbstractQueuedSynchronizer
  12. implements Runnable
  13. {
  14. /**
  15. * This class will never be serialized, but we provide a
  16. * serialVersionUID to suppress a javac warning.
  17. */
  18. private static final long serialVersionUID = 6138294804551838833L;
  19. /** worker内部封装的工作线程*/
  20. final Thread thread;
  21. /**
  22. * 如果firstTask不为空 worker启动后优先执行firstTask,执行完firstTask后会到queue中获取任务
  23. */
  24. Runnable firstTask;
  25. /** 记录当前worker完成任务数量 */
  26. volatile long completedTasks;
  27. // TODO: switch to AbstractQueuedLongSynchronizer and move
  28. // completedTasks into the lock word.
  29. /**
  30. * Creates with given first task and thread from ThreadFactory.
  31. * @param firstTask the first task (null if none)
  32. */
  33. Worker(Runnable firstTask) {
  34. //设置AQS独占模式为初始状态 该状态不能抢锁
  35. setState(-1); // inhibit interrupts until runWorker
  36. this.firstTask = firstTask;
  37. //使用线程工厂创建线程 并以当前worker为runnable
  38. this.thread = getThreadFactory().newThread(this);
  39. }
  40. /** worker启动时会执行run */
  41. public void run() {
  42. runWorker(this);
  43. }
  44. // Lock methods
  45. //
  46. // The value 0 represents the unlocked state.
  47. // The value 1 represents the locked state.
  48. /**
  49. * 当前worker的独占锁是否被独占
  50. * @return
  51. */
  52. protected boolean isHeldExclusively() {
  53. return getState() != 0;
  54. }
  55. /**
  56. * 尝试去占用worker独占锁
  57. * @param unused
  58. * @return
  59. */
  60. protected boolean tryAcquire(int unused) {
  61. //使用CAS修改AQS中的state 抢占成功则设置ExclusiveOwnerThread为当前线程
  62. if (compareAndSetState(0, 1)) {
  63. setExclusiveOwnerThread(Thread.currentThread());
  64. return true;
  65. }
  66. return false;
  67. }
  68. /**
  69. * 释放锁 外部不会调用 AQS内部调用
  70. * @param unused
  71. * @return
  72. */
  73. protected boolean tryRelease(int unused) {
  74. setExclusiveOwnerThread(null);
  75. setState(0);
  76. return true;
  77. }
  78. /**
  79. * 加锁失败时会阻塞线程,直到获取锁
  80. */
  81. public void lock() { acquire(1); }
  82. /**
  83. * 尝试去加锁 不会阻塞
  84. * 如果当前锁是未持有状态,那么加锁成功后就会返回true 否则 直接返回false
  85. * @return
  86. */
  87. public boolean tryLock() { return tryAcquire(1); }
  88. /**
  89. * 一般情况下调用unlock要保证当前线程是持有锁的
  90. * 特殊情况,当worker state == -1时 调用unlock表示初始化state 设置state=0
  91. * 启动worker之前会调用unlock方法
  92. */
  93. public void unlock() { release(1); }
  94. /**
  95. * 返回当前worker的lock是否被占用
  96. * @return
  97. */
  98. public boolean isLocked() { return isHeldExclusively(); }
  99. void interruptIfStarted() {
  100. Thread t;
  101. /**
  102. *线程启动状态 且 线程不为空 且没有被中断 则执行中断
  103. */
  104. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  105. try {
  106. t.interrupt();
  107. } catch (SecurityException ignore) {
  108. }
  109. }
  110. }
  111. }

5、重要方法

5.1、execute

execute为任务提交入口方法,如果当前线程池线程数量 小于核心线程数,则会创建新的worker线程,并将本次任务作为firstTask执行。如果核心线程数达到最大值,则将任务提交到任务队列,如果任务队列已满则尝试创建非核心线程。其中还包含了多处对线程池状态的校验,避免外部线程调用shutdown或者shutdownnow方法更改线程池状态,添加任务至任务队列前后都做了线程池是否RUNNING状态校验。

只有当线程池处于RUNNING状态的时候才可以提交任务,否则执行拒绝策略 ,如果线程池处于RUNNING状态,并且线程数达到最大线程数,任务队列满了,也会执行拒绝策略。

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //判断当前线程池线程数量 是否小于 核心线程数
  6. if (workerCountOf(c) < corePoolSize) {
  7. /**
  8. * 创建新的worker,并将command作为firstTask
  9. * 参数1:任务对象
  10. * 参数2: true 创建核心线程 false 创建额外线程
  11. */
  12. if (addWorker(command, true)){
  13. //如果创建新的worker线程成功 直接返回,addWorker中创建的新worker线程会将本次任务作为firstTask执行
  14. return;
  15. }
  16. /**
  17. * 创建核心线程失败
  18. * 可能原因:
  19. * 1、存在并发,当前线程和其他线程同时执行addWorker,但是其他线程先创建成功,并且核心线程数达到设置的核心线程数量
  20. * 2、当前线程池状态发生改变 线程池为非RUNNING状态 addWorker一般会失败
  21. * 特殊情况: SHUTDOWN状态下,firstTask == null 并且queue不为空 可能创建成功
  22. *
  23. */
  24. c = ctl.get();
  25. }
  26. /**
  27. * 执行到此处可能的情况:
  28. * 1、核心线程数达到设置的核心线程最大数量
  29. * 2、addWorker失败
  30. */
  31. /**
  32. * isRunning(c) 线程池处于运行状态
  33. * workQueue.offer(command) 把任务添加到任务队列中
  34. * true 任务添加到任务队列成功
  35. * false 任务添加到任务队列失败 可能任务队列满了
  36. */
  37. if (isRunning(c) && workQueue.offer(command)) {
  38. //任务添加到任务队列成功 再次判断线程池状态
  39. int recheck = ctl.get();
  40. /**
  41. * 条件1:! isRunning(recheck)
  42. * true 线程池处于非RUNNING状态 任务提交到队列后线程池状态被外部线程修改(外部线程调用shutdown或者shutdownnow),需要删除提交任务
  43. * false:线程池处于RUNNING状态
  44. * 条件2:remove(command)
  45. * 前置条件: 线程池处于非RUNNING状态 需要删除提交任务
  46. * true 从队列中移除任务成功 任务提交到队列后,还未被处理
  47. * false 从队列中移除任务失败,任务提交到队列后,在外部线程调用shutdown或者shutdownnow之前就被线程池的工作线程处理了
  48. */
  49. if (! isRunning(recheck) && remove(command)){
  50. //提交任务到队列后 线程池状态为非RUNNING了,并且移除任务成功,使用拒绝策略
  51. reject(command);
  52. }
  53. /**
  54. * 执行到此处可能的情况:
  55. * 1、线程池处于RUNNING状态
  56. * 2、线程池处于非RUNNING状态 但是移除提交任务时失败
  57. *
  58. * workerCountOf(recheck) == 0 表明线程池线程数为0,此处为担保机制,避免线程池处于RUNNING状态但是无线程可用
  59. */
  60. else if (workerCountOf(recheck) == 0)
  61. addWorker(null, false);
  62. }
  63. /**
  64. * 执行到此处可能的情况:
  65. * 1、当前线程池为非RUNNING 此时addWorker(command, false)一定会返回false
  66. * 2、线程数达到配置最大核心线程数,任务队列满了,添加任务到任务队列失败,如果当前线程数没达到最大线程数,尝试创建新worker线程
  67. */
  68. else if (!addWorker(command, false)){
  69. /** 执行拒绝策略
  70. * 1、当前线程池为非RUNNING
  71. * 2、线程数达到配置最大核心线程数,任务队列满了,并且线程数达到最大线程数
  72. */
  73. reject(command);
  74. }
  75. }

5.2、addWorker

addWorker 添加线程池工作线程:

  • 线程池处于RUNNING状态

线程池核心线程数或者最大线程数未达到配置最大值,调用增加核心线程或者非核心线程。线程池核心线程数小于最大核心线程数时,提交线程池任务时调用addWorker
并给与初始任务,核心线程数达到配置值后,提交线程池任务时直接提交到任务队列,如果任务队列满了则调用addworker创建非核心线程并给与初始任务

  • 线程池处于SHUTDOWN状态

1、外部线程调用shutdown,中断空闲线程。执行退出逻辑processWorkerExit,如果此时任务队列还有数据,则需要调用**addWorker(null, false)**方法维护最少线程数(allowCoreThreadTimeOut配置为true即可以回收核心线程,此时任务队列有任务最少线程为1,allowCoreThreadTimeOut配置为false则最少线程数为配置最大核心线程数)
2、外部线程调用shutdown,此时任务队列有任务,任务执行过程中线程异常退出,此时需要调用**addWorker(null, false)**方法维持线程数。

image.png

  1. /**
  2. * 创建worker线程
  3. * @param firstTask 创建worker线程绑定的第一个任务
  4. * @param core 是否创建的核心线程
  5. * @return 创建worker是否成功
  6. * 创建worker失败情况:
  7. * 1、线程池状态大于SHUTDOWN,为STOP,TIDYING或者TERMINATED状态
  8. * 2、线程池状态为SHUTDOWN但是firstTask不为空 或者 线程池状态为SHUTDOWN但是任务队列为空
  9. * 3、线程池核心线程数或者最大线程数达到配置最大值
  10. * 4、开发人员实现的ThreadFactory创建线程为null
  11. * 创建worker成功情况:
  12. * 1、线程池状态为RUNNING状态且线程池核心线程数或者最大线程数未达到配置最大值
  13. * 2、线程池处于SHUTDOWN状态且任务队列不为空 firstTask为空,并且线程池核心线程数或者最大线程数未达到配置最大值
  14. */
  15. private boolean addWorker(Runnable firstTask, boolean core) {
  16. //外层for循环判断线程池状态
  17. retry:
  18. for (int c = ctl.get();;) {
  19. /**判断当前线程池状态是否还允许添加工作线程
  20. * 条件1:runStateAtLeast(c, SHUTDOWN)
  21. * true 线程池处于非RUNNING状态, 为SHUTDOWN,STOP,TIDYING或者TERMINATED状态
  22. * false 线程池处于RUNNING状态
  23. * 条件2:runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty())
  24. * 线程池处于STOP,TIDYING或者TERMINATED状态 或者 firstTask不为null(addworker可能不是execute调用) 或者 任务队列为空
  25. *
  26. *
  27. *为了排除这个情况: 线程池处于SHUTDOWN状态 但是队列任务没有处理完 这个时候还是允许添加worker,但是不允许再次提交task
  28. *
  29. * 返回false的情况:
  30. * 1、线程池状态为非RUNNING状态,线程池状态为STOP,TIDYING或者TERMINATED状态
  31. * 2、线程池状态为SHUTDOWN 但是firstTask不为空 或者 线程池状态为SHUTDOWN但是任务队列为空
  32. */
  33. if (runStateAtLeast(c, SHUTDOWN)
  34. && (runStateAtLeast(c, STOP)
  35. || firstTask != null
  36. || workQueue.isEmpty()))
  37. return false;
  38. //内层for循环CAS增加线程池线程数
  39. for (;;) {
  40. /**
  41. * 判断是否还能创建核心线程或者额外线程
  42. * core== true时
  43. * 判断当前线程数是否达到核心线程数
  44. * core== false时
  45. * 判断当前线程数是否达到最大线程数
  46. */
  47. if (workerCountOf(c)
  48. >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
  49. return false;
  50. /**
  51. * CAS增加线程数
  52. * true 线程数增加成功
  53. * false CAS冲突线程数增加失败
  54. * 1、其他线程也修改了线程数
  55. * 2、外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化
  56. */
  57. if (compareAndIncrementWorkerCount(c)){
  58. //CAS成功 直接跳出外层循环到下面的创建worker
  59. break retry;
  60. }
  61. c = ctl.get();
  62. /**
  63. * CAS失败 增加线程数失败,两种可能
  64. * 1、CAS并发,其他线程也修改了线程数导致CAS失败
  65. * 2、外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化
  66. */
  67. /**
  68. * 判断线程池状态是否为非RUNNING状态
  69. * 验证第二种CAS失败的可能 外部线程可能调用过shutdown或者shutdownNow 导致线程池状态发生变化
  70. */
  71. if (runStateAtLeast(c, SHUTDOWN)){
  72. //回到RETRY的下一步循环 让其去验证线程池状态
  73. continue retry;
  74. }
  75. //CAS失败由其他线程修改了线程数导致CAS失败 内层for循环再次尝试CAS增加线程数
  76. }
  77. }
  78. /**
  79. * 执行到此处说明允许创建线程
  80. */
  81. //表示创建的worker是否启动
  82. boolean workerStarted = false;
  83. //表示创建的worker是否添加到线程池
  84. boolean workerAdded = false;
  85. Worker w = null;
  86. try {
  87. //创建worker
  88. w = new Worker(firstTask);
  89. final Thread t = w.thread;
  90. //防止ThreadFactory存在bug,ThreadFactory是开发人员自己实现的
  91. if (t != null) {
  92. final ReentrantLock mainLock = this.mainLock;
  93. //持有全局锁 可能会阻塞 加锁后其他线程是无法更改线程池状态的
  94. mainLock.lock();
  95. try {
  96. // Recheck while holding lock.
  97. // Back out on ThreadFactory failure or if
  98. // shut down before lock acquired.
  99. int c = ctl.get();
  100. /**
  101. *条件1:isRunning(c) 当前线程池是否处于RUNNING状态
  102. *
  103. * 条件2:runStateLessThan(c, STOP) && firstTask == null
  104. * true:SHUTDOWN状态 并且 firstTask为空 (前置条件为当前线程池为非RUNNING状态)
  105. *
  106. */
  107. if (isRunning(c) ||
  108. (runStateLessThan(c, STOP) && firstTask == null)) {
  109. //判断线程池线程是否是NEW状态 防止实现ThreadFactory的开发人员将线程启动了
  110. if (t.getState() != Thread.State.NEW)
  111. throw new IllegalThreadStateException();
  112. //把新建的worker放入线程池
  113. workers.add(w);
  114. //更改创建的worker是否添加到线程池的标识
  115. workerAdded = true;
  116. //记录线程池生命周期内线程数最大值
  117. int s = workers.size();
  118. if (s > largestPoolSize)
  119. largestPoolSize = s;
  120. }
  121. } finally {
  122. //解锁
  123. mainLock.unlock();
  124. }
  125. /**
  126. * workerAdded
  127. * true 添加线程成功
  128. * false 在lock之前 外部线程更改了线程池状态导致添加失败
  129. */
  130. if (workerAdded) {
  131. //将线程放入线程池后 启动线程
  132. t.start();
  133. workerStarted = true;
  134. }
  135. }
  136. } finally {
  137. /**
  138. * 如果添加线程失败 需要做清理工作
  139. * 1、减少增加的线程数
  140. * 2、将worker移出线程池线程集合
  141. * 3、
  142. */
  143. if (! workerStarted)
  144. addWorkerFailed(w);
  145. }
  146. return workerStarted;
  147. }

5.3、runWorker

任务线程执行任务的核心逻辑,如果firstTask不为空执行firstTask,否则去任务队列获取任务,如果获取任务成功则获取工作线程的全局锁执行任务,执行完毕后释放全局锁,
再次去任务队列阻塞获取任务。如果从阻塞队列获取的任务为null(多种原因,在getTask方法细讲) 或者任务执行过程中抛出异常,则执行线程退出逻辑。

  1. /**
  2. * 新建worker线程的时候会把Worker自身传进去,thread.start方法时,线程启动后会调用Worker的run方法,Worker的run方法中调用了本方法runWorker(this)
  3. * @param w
  4. */
  5. final void runWorker(Worker w) {
  6. //Worker对应的线程
  7. Thread wt = Thread.currentThread();
  8. //初始任务
  9. Runnable task = w.firstTask;
  10. w.firstTask = null;
  11. //当worker state == -1时 调用unlock表示初始化state 设置state=0 和 exclusiveOwnerThread == null
  12. w.unlock(); // allow interrupts
  13. //表示是否突然退出
  14. boolean completedAbruptly = true;
  15. try {
  16. /**
  17. * 条件1: task != null firstTask不为null则直接执行循环体
  18. *
  19. * 条件2: (task = getTask()) != null
  20. * getTask()是一个会阻塞的方法 如果getTask()返回null说明要执行退出逻辑
  21. * true 当前线程从任务队列中获取任务成功
  22. */
  23. while (task != null || (task = getTask()) != null) {
  24. /**
  25. * worker加锁 设置独占锁为当前线程
  26. * 当线程池shutdown时时通过独占锁判断当前worker的状态 根据独占锁是否空闲判断当前worker是否在工作 worker处理完后会释放锁
  27. */
  28. w.lock();
  29. /**
  30. * @TODO
  31. * 本质是强制刷新当前线程的中断标记位,因为可能上次执行task时,业务代码将当前线程的中断标记位设置为true 并且没有处理,此处强制刷新一下,不会影响到后面的task
  32. *
  33. * 1、 线程池处于STOP ,TIDYING或者TERMINATED状态并且线程没有设置中断状态,此时要给线程一个中断信号
  34. * 2、 第一次判断线程池状态处于RUNNING或者SHUTDOWN状态后,外部线程调用shutdown或者shutdownnow修改了线程池状态,并且当前线程被中断,清除中断标记位后
  35. *
  36. */
  37. if ((runStateAtLeast(ctl.get(), STOP) ||
  38. (Thread.interrupted() &&
  39. runStateAtLeast(ctl.get(), STOP))) &&
  40. !wt.isInterrupted())
  41. wt.interrupt();
  42. try {
  43. //执行前钩子方法 留给子类去实现
  44. beforeExecute(wt, task);
  45. try {
  46. //执行任务
  47. task.run();
  48. //执行后钩子方法 留给子类去实现
  49. afterExecute(task, null);
  50. } catch (Throwable ex) {
  51. afterExecute(task, ex);
  52. throw ex;
  53. }
  54. } finally {
  55. task = null;
  56. //更新当前worker完成任务数量
  57. w.completedTasks++;
  58. //worker处理完一个任务后释放独占锁 正常情况下会回到getTask()获取任务
  59. w.unlock();
  60. }
  61. }
  62. /**
  63. * completedAbruptly 表明正常退出
  64. * 代码执行到此处的情况
  65. * getTask()返回null时 当前线程要执行退出逻辑
  66. */
  67. completedAbruptly = false;
  68. } finally {
  69. // task.run()内部出现异常时 直接从 w.unlock()跳到该行
  70. processWorkerExit(w, completedAbruptly);
  71. }
  72. }

5.4、getTask

getTask是从任务队列阻塞获取任务,根据allowCoreThreadTimeOut配置和当前线程池线程情况决定阻塞获取任务是否设置超时时间,如果allowCoreThreadTimeOut配置为true,则核心线程数内线程也可以回收,如果allowCoreThreadTimeOut配置为false,可以回收非核心线程,如果当前线程可以回收,则设置阻塞获取任务的超时时间为线程池的最大空闲时间。超时后返回null,线程进入退出逻辑,被回收;当前线程无法回收则一直阻塞等待获取任务。

返回null的四种情况
1、 线程池处于STOP,TIDYING或者TERMINATED状态
2、线程池处于SHUTDOWN状态 并且 任务队列为空
3、线程池的线程数量大于最大线程数量限制
4、线程池中的线程数大于核心线程数 或者是 allowCoreThreadTimeOut配置为true 核心线程数内线程可以回收 时 获取任务超时返回null

  1. /**
  2. * 获取任务队列的任务 返回null时 调用getTask()的线程会退出
  3. *什么情况下会返回null
  4. * 1、 线程池处于STOP,TIDYING或者TERMINATED状态
  5. * 2、线程池处于SHUTDOWN状态 并且 任务队列为空
  6. * 3、线程池的线程数量大于最大线程数量限制
  7. * 4、线程池中的线程数大于核心线程数 或者是 allowCoreThreadTimeOut配置为true 核心线程数内线程可以回收 时 获取任务超时返回null
  8. * @return
  9. */
  10. private Runnable getTask() {
  11. //当前线程获取任务是否超时
  12. boolean timedOut = false; // Did the last poll() time out?
  13. for (;;) {
  14. int c = ctl.get();
  15. /**
  16. * 条件1 runStateAtLeast(c, SHUTDOWN)
  17. * true: 线程池处于非RUNNING
  18. * 条件2 runStateAtLeast(c, STOP) || workQueue.isEmpty()
  19. * 线程池处于STOP,TIDYING或者TERMINATED状态 或者 任务队列为空
  20. *
  21. * 条件成立两种可能:
  22. * 1、线程池处于STOP,TIDYING或者TERMINATED状态
  23. * 2、线程池处于SHUTDOWN状态 并且 任务队列为空
  24. */
  25. if (runStateAtLeast(c, SHUTDOWN)
  26. && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
  27. //执行退出逻辑 使用CAS减少线程数
  28. decrementWorkerCount();
  29. return null;
  30. }
  31. /**
  32. * 执行到这里的情况
  33. * 1、线程池处于RUNNING状态
  34. * 2、线程池处于SHUTDOWN状态 但是任务队列不为空
  35. */
  36. int wc = workerCountOf(c);
  37. /**
  38. * timed ==true 当前线程获取task时是支持超时机制的 workQueue.poll设置超时时间 否则一直阻塞等待
  39. *
  40. * 条件1: allowCoreThreadTimeOut 控制核心线程数内的线程是否可以被回收 true可以false不可以
  41. * 条件2:wc > corePoolSize 当前线程数是否大于核心线程数
  42. * 前置条件:allowCoreThreadTimeOut配置为false 核心线程数内线程不可以回收
  43. */
  44. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  45. /**
  46. * 条件1为true表示线程可以被回收
  47. *
  48. * 条件1: wc > maximumPoolSize || (timed && timedOut)
  49. * 条件1.1 wc > maximumPoolSize
  50. * true 当前线程数大于最大线程数 外部线程可能调用setMaximumPoolSize方法将线程池的最大线程数设置的比初始化的小
  51. * 条件1.2 timed && timedOut
  52. * 当前线程获取task时是支持超时机制 并且当前线程上一次循环获取task已经超时
  53. *
  54. * 条件2:wc > 1 || workQueue.isEmpty()
  55. * 前置条件:当前线程可以被回收,但是还需要进一步判断
  56. * 条件 2.1 wc > 1
  57. * true 说明当前线程池还有其他线程 当前线程可以直接被回收 返回null
  58. * 条件 2.2 workQueue.isEmpty()
  59. * 前置条件:wc = 1 本线程为最后一个线程
  60. * true 任务队列已经没有任务 最后一个线程也可以被回收 返回null
  61. *
  62. *
  63. */
  64. if ((wc > maximumPoolSize || (timed && timedOut))
  65. && (wc > 1 || workQueue.isEmpty())) {
  66. /**
  67. * 将线程数-1
  68. * 当前CAS可能会失败,失败后再次循环上去,检查线程池状态,CAS减少线程数
  69. * 1、可能其他线程也退出回收
  70. * 2、线程池状态改变了
  71. */
  72. if (compareAndDecrementWorkerCount(c))
  73. return null;
  74. continue;
  75. }
  76. try {
  77. //获取任务逻辑 根据是否支持超时机制的选择poll方法
  78. Runnable r = timed ?
  79. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  80. workQueue.take();
  81. if (r != null)
  82. return r;
  83. //当前线程获取任务超时了 走循环 检测到超时 返回null
  84. timedOut = true;
  85. } catch (InterruptedException retry) {
  86. timedOut = false;
  87. }
  88. }
  89. }

5.5、processWorkerExit

空闲线程或者异常线程退出逻辑

  1. /**
  2. *
  3. * @param w 当前要退出的worker
  4. * @param completedAbruptly 是否突然退出
  5. */
  6. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  7. /**
  8. * 条件成立 表明当前worker是发生异常退出的,task任务执行过程中向上抛出了异常
  9. */
  10. if (completedAbruptly){
  11. //将线程数量-1
  12. decrementWorkerCount();
  13. }
  14. //获取全局锁
  15. final ReentrantLock mainLock = this.mainLock;
  16. mainLock.lock();
  17. try {
  18. //将当前线程完成的任务数累加到线程池完成的任务总数上
  19. completedTaskCount += w.completedTasks;
  20. //从线程池移除该线程
  21. workers.remove(w);
  22. } finally {
  23. mainLock.unlock();
  24. }
  25. tryTerminate();
  26. int c = ctl.get();
  27. //当前线程池状态为RUNNING或者SHUTDOWN
  28. if (runStateLessThan(c, STOP)) {
  29. if (!completedAbruptly) {
  30. //线程正常退出
  31. //线程池最低持有的线程数量 如果核心线程数内的线程是也可以被回收 则是 0 否则是核心线程数
  32. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  33. if (min == 0 && ! workQueue.isEmpty()){
  34. //如果最小线程数可以为0 但是任务队列还有任务 线程池至少要留一个线程
  35. min = 1;
  36. }
  37. /**
  38. * 如果线程池线程数拥有足够的线程 线程退出 否则新添加一个线程
  39. */
  40. if (workerCountOf(c) >= min)
  41. return;
  42. }
  43. /**
  44. * 执行该代码的情况
  45. * 1、当前线程在执行task时异常退出 此处新创建一个新worker加入线程池
  46. * 2、allowCoreThreadTimeOut配置为true,当前线程池状态为RUNNING或者SHUTDOWN,任务队列还有任务,线程池至少要留一个线程
  47. * 3、allowCoreThreadTimeOut配置为false,当前线程池状态为RUNNING或者SHUTDOWN,当前线程数量小于corePoolSize,需要维持核心线程数量
  48. */
  49. addWorker(null, false);
  50. }
  51. }

5.6、shutdown

shutdown会先将线程池设置为SHUTDOWN状态,然后中断所有的空闲工作线程

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. //获取全局锁
  4. mainLock.lock();
  5. try {
  6. //判断有没有权限
  7. checkShutdownAccess();
  8. //设置线程池状态为SHUTDOWN
  9. advanceRunState(SHUTDOWN);
  10. //中断空闲线程
  11. interruptIdleWorkers();
  12. //扩展使用
  13. onShutdown();
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. tryTerminate();
  18. }

中断空闲线程方法逻辑

  1. private void interruptIdleWorkers() {
  2. interruptIdleWorkers(false);
  3. }
  4. /**
  5. *
  6. * @param onlyOne true 只中断一个线程 false中断所有线程
  7. */
  8. private void interruptIdleWorkers(boolean onlyOne) {
  9. final ReentrantLock mainLock = this.mainLock;
  10. mainLock.lock();
  11. try {
  12. //循环所有worker
  13. for (Worker w : workers) {
  14. Thread t = w.thread;
  15. /**
  16. * 条件1: !t.isInterrupted()
  17. * true: 线程没有中断
  18. * 条件2:w.tryLock()
  19. * true:说明当前worker处于空闲状态,目前线程在queue.poll或者queue.take 阻塞中
  20. */
  21. if (!t.isInterrupted() && w.tryLock()) {
  22. try {
  23. //给空闲worker一个中断信号 处于queue阻塞的线程会被唤醒 return null,执行退出逻辑
  24. t.interrupt();
  25. } catch (SecurityException ignore) {
  26. } finally {
  27. //释放线程独占锁
  28. w.unlock();
  29. }
  30. }
  31. if (onlyOne)
  32. break;
  33. }
  34. } finally {
  35. //释放全局锁
  36. mainLock.unlock();
  37. }
  38. }

5.7、shutdownnow

shutdownnow会先将线程池设置为STOP状态,然后中断所有的工作线程,最后导出未处理任务

  1. public List<Runnable> shutdownNow() {
  2. //返回值
  3. List<Runnable> tasks;
  4. final ReentrantLock mainLock = this.mainLock;
  5. //获取全局锁
  6. mainLock.lock();
  7. try {
  8. checkShutdownAccess();
  9. //将线程池状态改为STOP
  10. advanceRunState(STOP);
  11. //中断线程池中所有线程
  12. interruptWorkers();
  13. //导出未处理的task
  14. tasks = drainQueue();
  15. } finally {
  16. mainLock.unlock();
  17. }
  18. tryTerminate();
  19. return tasks;
  20. }

中断所有线程逻辑

  1. private void interruptWorkers() {
  2. // assert mainLock.isHeldByCurrentThread();
  3. for (Worker w : workers)
  4. w.interruptIfStarted();
  5. }
  6. void interruptIfStarted() {
  7. Thread t;
  8. /**
  9. *线程启动状态 且 线程不为空 且没有被中断 则执行中断
  10. */
  11. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  12. try {
  13. t.interrupt();
  14. } catch (SecurityException ignore) {
  15. }
  16. }
  17. }

5.8、tryTerminate

尝试将线程池设置为TERMINATED
调用对象可能为两种:
1、外部线程调用shutdown或者shutdownnow ,调用tryTerminate
2、空闲工作线程退出或者异常工作线程退出,调用tryTerminate

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. /**
  5. * 条件1: isRunning(c)
  6. * 线程池处于RUNNING状态
  7. * 条件2: runStateAtLeast(c, TIDYING)
  8. * 线程池状态大于等于TIDYING,说明有其他线程在执行TIDYING -> TERMINATED状态了
  9. * 条件3:runStateLessThan(c, STOP) && ! workQueue.isEmpty()
  10. * 线程池处于SHUTDOWN(此处执行前提条件就是 线程池不处于RUNNING状态) 且任务队列不为空
  11. */
  12. if (isRunning(c) ||
  13. runStateAtLeast(c, TIDYING) ||
  14. (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
  15. return;
  16. /**
  17. *
  18. * 执行到此处的情况:
  19. * 1、线程池状态大于等于STOP
  20. * 2、线程池状态等于SHUTDOWN 且任务队列为空
  21. */
  22. //当前线程池线程数大于0
  23. if (workerCountOf(c) != 0) { // Eligible to terminate
  24. //中断一个空闲线程
  25. /**
  26. * 1、中断的空闲线程在getTask中 pool|task 阻塞,唤醒后,返回null
  27. * 2、在runWorker方法中执行退出逻辑,退出逻辑中会调用tryTerminate方法 唤醒下一个空闲线程
  28. * 3、因为线程池状态为(线程池状态大于等于STOP | 线程池状态等于SHUTDOWN 且任务队列为空)最终调用addWorker失败,最终空闲线程都会在这儿退出
  29. * 非空闲线程执行执行task抛出异常也会调用 tryTerminate,最终退出
  30. */
  31. interruptIdleWorkers(ONLY_ONE);
  32. return;
  33. }
  34. /**
  35. * 执行到这里说明当前线程为线程池最后一个线程
  36. * (1) 线程池处于STOP,TIDYING或者TERMINATED状态 (2) 线程池处于SHUTDOWN状态 并且 任务队列为空
  37. * 满足这两种状态时 getTask被唤醒后会减少线程池线程数 所以上面最后一个线程执行workerCountOf(c) != 0时通过了
  38. */
  39. final ReentrantLock mainLock = this.mainLock;
  40. mainLock.lock();
  41. try {
  42. //设置线程池状态为TIDYING
  43. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  44. try {
  45. //调用钩子方法
  46. terminated();
  47. } finally {
  48. //设置线程池状态为TERMINATED
  49. ctl.set(ctlOf(TERMINATED, 0));
  50. //唤醒调用awaitTermination()方法的线程
  51. termination.signalAll();
  52. }
  53. return;
  54. }
  55. } finally {
  56. mainLock.unlock();
  57. }
  58. // else retry on failed CAS
  59. }
  60. }