构造方法

  1. //五参数构造方法
  2. private static final RejectedExecutionHandler defaultHandler =
  3. new AbortPolicy();
  4. public ThreadPoolExecutor(int corePoolSize,
  5. int maximumPoolSize,
  6. long keepAliveTime,
  7. TimeUnit unit,
  8. BlockingQueue<Runnable> workQueue) {
  9. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  10. Executors.defaultThreadFactory(), defaultHandler);
  11. }
  12. //七参数构造方法
  13. public ThreadPoolExecutor(int corePoolSize,
  14. int maximumPoolSize,
  15. long keepAliveTime,
  16. TimeUnit unit,
  17. BlockingQueue<Runnable> workQueue,
  18. ThreadFactory threadFactory,
  19. RejectedExecutionHandler handler) {
  20. if (corePoolSize < 0 ||
  21. maximumPoolSize <= 0 ||
  22. maximumPoolSize < corePoolSize ||
  23. keepAliveTime < 0)
  24. throw new IllegalArgumentException();
  25. if (workQueue == null || threadFactory == null || handler == null)
  26. throw new NullPointerException();
  27. 。。。。。。
  28. }

通过五参数构造方法可以得知

  • Executors.defaultThreadFactory() 指定了默认的创建线程工程工厂。
  • defaultHandler 指定了默认的拒绝策略 AbortPolicy

另外,七个参数的分别含义为:

  • corePoolSize 核心线程池数量。
  • maximumPoolSize 最大线程池数量。
  • keepAliveTime 线程闲置的超时时间。超时后,非核心线程就会被销毁。如果设置了allowCoreThreadTimeOut(true) 核心线程闲置时间超过设置值后,也会被销毁。
  • unit 存活时间单位。
  • workQueue 线程池任务阻塞队列。
  • ThreadFactory threadFactory 创建线程的工厂。
  • RejectedExecutionHandler handler 当前队列已满,任务不能执行的时候的拒绝策略。 一共四种 CallerRunsPolicyAbortPolicyDiscardPolicyDiscardOldestPolicy

本次源码基于 android-29

execute

  1. //ThreadPoolExecutor.java
  2. public void execute(Runnable command) {
  3. if (command == null)
  4. throw new NullPointerException();
  5. int c = ctl.get();
  6. //首先判断线程池数量是否大于核心线程池数量
  7. if (workerCountOf(c) < corePoolSize) {
  8. //小于核心线程池数量,添加核心线程池。添加成功则 return 返回。
  9. if (addWorker(command, true))
  10. return;
  11. c = ctl.get();
  12. }
  13. //线程池数量大于核心线程。
  14. //判断线程池当前状态是否处于运行状态,如果是,则添加到阻塞队列 workQueue 里面。
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. int recheck = ctl.get();
  17. //再次检查线程池状态,如果不是运行状态,则将 command 从 workQueue 中移除。
  18. if (!isRunning(recheck) && remove(command))
  19. //然后执行拒绝策略。
  20. reject(command);
  21. //如果是运行状态,但是线程池数量为 0,创建一个非核心线程。
  22. else if (workerCountOf(recheck) == 0)
  23. addWorker(null, false);
  24. }
  25. else if (!addWorker(command, false)) //如果上面添加到队列失败,队列任务爆满,则通过添加一个非核心线程去执行这个任务。
  26. //任务执行失败,执行拒绝策略。
  27. reject(command);
  28. }

二次检查线程池的原因:

在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将command加入workqueue是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command永远不会执行。

addWorker

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. //获取线程池状态
  6. int rs = runStateOf(c);
  7. // Check if queue empty only if necessary.
  8. //当线程池状态大于 SHUTDOWN 时,就不能继续执行新的任务。
  9. //当前状态为 SHUTDOWN 时,只有传入的任务为 null,并且队列不为空,才会继续执行任务。
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN &&
  12. firstTask == null &&
  13. ! workQueue.isEmpty()))
  14. return false;
  15. //这里判断线程数是否达到了阈值。
  16. for (;;) {
  17. int wc = workerCountOf(c);
  18. //这里其实有一个隐形的线程数最大值 CAPACITY。
  19. //下面就是根据当前是否创建的是否是核心线程,来设定阈值来判断线程数是否超出。
  20. if (wc >= CAPACITY ||
  21. wc >= (core ? corePoolSize : maximumPoolSize))
  22. return false;
  23. //通过 CAS 来增加线程工作数量。成功就退出 retry 这个外部大循环。
  24. if (compareAndIncrementWorkerCount(c))
  25. break retry;
  26. c = ctl.get(); // Re-read ctl
  27. /*
  28. *上面 CAS 增加线程数没有成功,检测线程池状态是否发生了改变。
  29. *改变了:重新进行 retry 大循环。
  30. *未改变:继续内部循环,尝试 CAS 增加线程数。
  31. *CAS 增加线程数成功后,后续才能进行增加线程执行任务。
  32. **/
  33. if (runStateOf(c) != rs)
  34. continue retry;
  35. // else CAS failed due to workerCount change; retry inner loop
  36. }
  37. }
  38. boolean workerStarted = false;
  39. boolean workerAdded = false;
  40. Worker w = null;
  41. try {
  42. //创建一个 worker,并将要执行的任务传进去
  43. w = new Worker(firstTask);
  44. final Thread t = w.thread;
  45. if (t != null) {
  46. final ReentrantLock mainLock = this.mainLock;
  47. mainLock.lock();
  48. try {
  49. // Recheck while holding lock.
  50. // Back out on ThreadFactory failure or if
  51. // shut down before lock acquired.
  52. int rs = runStateOf(ctl.get());
  53. //再次检测线程池状态,继续执行下去的条件是:
  54. //1.线程池状态处于 RUNNING
  55. //2.或者处于SHUTDOWN 状态,但是阻塞队列不为空。
  56. if (rs < SHUTDOWN ||
  57. (rs == SHUTDOWN && firstTask == null)) {
  58. //检测线程状态
  59. if (t.isAlive()) // precheck that t is startable
  60. throw new IllegalThreadStateException();
  61. //将 worker 添加到集合中
  62. workers.add(w);
  63. int s = workers.size();
  64. //largestPoolSize 可以表示线程池达到的最大并发
  65. if (s > largestPoolSize)
  66. largestPoolSize = s;
  67. workerAdded = true;
  68. }
  69. } finally {
  70. mainLock.unlock();
  71. }
  72. if (workerAdded) {
  73. //执行线程。
  74. t.start();
  75. workerStarted = true;
  76. }
  77. }
  78. } finally {
  79. if (! workerStarted)
  80. //执行失败
  81. addWorkerFailed(w);
  82. }
  83. return workerStarted;
  84. }

addWorker 方法中
①首先检测了线程池状态,当线程池处于 RUNNING 或者 处于 SHUTDOWN 状态并且阻塞队列不为空时才继续进行下去。
②然后检测线程池中线程数量是否达到了阈值(阈值大小是根绝添加的线程是否是核心线程来决定的),达到了,返回 false。
③创建 Worker ,再次进行第①步的检查,符合条件,就将创建好的 Worker 添加到集合中。
④最后执行 Worker 中的 Thread。

下面看一下 Worker 内部是怎么工作的,是怎么创建线程的。最后再看一下如果执行失败,addWorkerFailed 内部逻辑是什么。

Worker

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  2. ...省略一些源码...
  3. /** Thread this worker is running in. Null if factory fails. */
  4. final Thread thread;
  5. /** Initial task to run. Possibly null. */
  6. Runnable firstTask;
  7. /** Per-thread task counter */
  8. volatile long completedTasks;
  9. /**
  10. * Creates with given first task and thread from ThreadFactory.
  11. * @param firstTask the first task (null if none)
  12. */
  13. Worker(Runnable firstTask) {
  14. setState(-1); // inhibit interrupts until runWorker
  15. this.firstTask = firstTask;
  16. //这里 getThreadFactory() 获取的 ThreadFactory 就是通过
  17. //ThreadPoolExecutor 构造传入的 ThreadFactory
  18. this.thread = getThreadFactory().newThread(this);
  19. }
  20. /** Delegates main run loop to outer runWorker */
  21. public void run() {
  22. runWorker(this);
  23. }
  24. ......
  25. }

可以看出 Worker 本身就实现了 Runnable 接口,并且通过 构造方法 来创建了一个 Thread ,并将自身作为参数传入了 Thread
所以就看 run 方法中的 runWorker(this) 方法。

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. //这里获取 task 来执行。
  9. while (task != null || (task = getTask()) != null) {
  10. w.lock();
  11. // If pool is stopping, ensure thread is interrupted;
  12. // if not, ensure thread is not interrupted. This
  13. // requires a recheck in second case to deal with
  14. // shutdownNow race while clearing interrupt
  15. //这里又进行了线程池状态的判断
  16. //如果线程池状态不小于 STOP,就是处于 STOP,TIDYING,TERMINATED 这三个时,将线程中断
  17. if ((runStateAtLeast(ctl.get(), STOP) ||
  18. (Thread.interrupted() &&
  19. runStateAtLeast(ctl.get(), STOP))) &&
  20. !wt.isInterrupted())
  21. wt.interrupt();
  22. try {
  23. //子类可以重写,执行任务之前做一些操作
  24. beforeExecute(wt, task);
  25. Throwable thrown = null;
  26. try {
  27. //这里是最终执行任务的地方。
  28. task.run();
  29. } catch (RuntimeException x) {
  30. thrown = x; throw x;
  31. } catch (Error x) {
  32. thrown = x; throw x;
  33. } catch (Throwable x) {
  34. thrown = x; throw new Error(x);
  35. } finally {
  36. //子类可以重写,执行任务之后做一些操作
  37. afterExecute(task, thrown);
  38. }
  39. } finally {
  40. task = null;
  41. //增加任务执行数
  42. w.completedTasks++;
  43. w.unlock();
  44. }
  45. }
  46. completedAbruptly = false;
  47. } finally {
  48. //线程池没有队列,并且已经超时。关闭线程。
  49. processWorkerExit(w, completedAbruptly);
  50. }
  51. }

这里可以看出是有一个循环进行不断的取出任务来进行执行的。本次 task 执行完后,又通过 getTask() 取出任务来执行。从这里也可以看到线程复用的一面

getTask

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. //这里又进行了线程池状态的判断
  8. //如果线程池处于 STOP 直接返回 null,
  9. //或者处于SHUTDOWN 但是阻塞队列workQueue 为空,也会返回 null
  10. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  11. decrementWorkerCount();
  12. return null;
  13. }
  14. int wc = workerCountOf(c);
  15. // Are workers subject to culling?
  16. //该线程是否要检测超时退出
  17. //allowCoreThreadTimeOut 是代表核心线程是不是要超时退出,或者线程数超过核心线程数。
  18. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  19. //同时满足下列两个条件时,返回 null ,上一层函数就退出循环。(runWorker 退出循环,线程结束。)
  20. //①线程数超过最大数,或者当前线程允许超时并且已经超时
  21. //②线程数大于 1,或者队列为空。(这个条件代表最后一个线程必须队列为空时才能退出)
  22. if ((wc > maximumPoolSize || (timed && timedOut))
  23. && (wc > 1 || workQueue.isEmpty())) {
  24. //CAS 操作减少线程数
  25. if (compareAndDecrementWorkerCount(c))
  26. return null;
  27. continue;
  28. }
  29. try {
  30. //如果满足超时退出条件,就通过 poll 方法最多等待阻塞 keepAliveTime 时间来获取 Task
  31. //否则就通过 take 方法一直阻塞,直到有任务返回。
  32. //这里会响应中断,一旦线程池关闭,就打断阻塞状态。
  33. Runnable r = timed ?
  34. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  35. workQueue.take();
  36. if (r != null)
  37. return r;
  38. //超时,下次循环用来判断是否要返回 null,结束线程。
  39. timedOut = true;
  40. } catch (InterruptedException retry) {
  41. timedOut = false;
  42. }
  43. }
  44. }
  1. getTask 中又一次检测了线程池状态。并检测线程池数量是否达到阈值。
  2. 根据是否要满足超时退出的条件来选择是 阻塞等待一段时间通过 poll 取任务,还是一直阻塞通过 take 来取任务。

    processWorkerExit

    这里我们看如何结束线程

    1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
    2. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    3. decrementWorkerCount();
    4. final ReentrantLock mainLock = this.mainLock;
    5. mainLock.lock();
    6. try {
    7. //记录执行的任务数
    8. completedTaskCount += w.completedTasks;
    9. //将 worker 移除
    10. workers.remove(w);
    11. } finally {
    12. mainLock.unlock();
    13. }
    14. //线程池状态改变,尝试中止线程池
    15. tryTerminate();
    16. int c = ctl.get();
    17. //如果线程池状态是 RUNNING 或者 SHUTDOWN
    18. if (runStateLessThan(c, STOP)) {
    19. if (!completedAbruptly) {
    20. //线程池中最小的数量取决于核心线程是否允许超时退出
    21. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    22. //如果队列还有任务,要至少留一个线程
    23. if (min == 0 && ! workQueue.isEmpty())
    24. min = 1;
    25. if (workerCountOf(c) >= min)
    26. return; // replacement not needed
    27. }
    28. //线程数量小于最小数,尝试添加线程执行任务
    29. addWorker(null, false);
    30. }
    31. }

    在 addWorker 中如果添加 Worker 失败,则执行 addWorkerFailed 方法

    1. private void addWorkerFailed(Worker w) {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. if (w != null)
    6. //移除 worker
    7. workers.remove(w);
    8. //通过 CAS 减少 worker 数
    9. decrementWorkerCount();
    10. tryTerminate();
    11. } finally {
    12. mainLock.unlock();
    13. }
    14. }

    总结

    image.png
    image.png

问答

线程池如何实现

总结就是这个问题的答案

非核心线程延迟死亡,如何实现

通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡

核心线程为什么不死

通过阻塞队列take(),让线程一直等待,直到获取到任务

如何释放核心线程

将allowCoreThreadTimeOut设置为true。

非核心线程能成为核心线程吗

线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。

Runnable在线程池里如何执行

线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。

线程数如何做选择

这就要看任务类型是计算密集型任务还是IO密集型任务了,区别在于CPU占用率。计算密集型任务涉及内存数据的存取,CPU处于忙绿状态,因此并发数相应要低一些。而IO密集型任务,因为外部设备速度不匹配问题,CPU更多是处于等待状态,因此可以把时间片分给其他线程,因此并发数可以高一些。

常见的不同类型的线程池的功效如何做到

常见的线程池有:
CachedThreadPool:适合异步任务多,但周期短的场景
FixedThreadPool: 适合有一定异步任务,周期较长的场景,能达到有效的并发状态
SingleThreadExecutor: 适合任务串行的场景
ScheduledThreadPool: 适合周期性执行任务的场景
对于如何选择线程池就要看具体的场景,其中的差异通过构造参数可以到达效果,通过之前的分析,就能知道参数的具体作用以及为什么能达到效果。取FixedThreadPool来看,抛砖引玉。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

nThreads个数量核心线程持续并发任务,没有非核心线程,如果没有任务,则通过take()阻塞等待,不允许核心线程死亡。并且阻塞队列为LinkedBlockingQueue,容量为Integer.MAX_VALUE,可以视为无界队列,更难走到拒绝添加线程逻辑。

参考文章

你了解线程池吗
11、线程池之getTask
第十二章 线程池原理