第一部分:使用线程池的好处

线程池提供了一种限制和管理资源(包括执行一个任务)。每个线程池还维护一些基本统计信息,例如已完成任务的数量。
线程池的好处有:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行;
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    第二部分:Executor框架

    2.1 简介

    Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

    this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

2.2 Executor框架结构(主要由三大部分组成)

2.2.1 任务(Runnable/Callable)

执行任务需要实现的 Runnable 接口 或 Callable 接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2.2.2 任务的执行( Executor )

如下图所示,包括任务执行机制的核心接口 Executor,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

注意:通过查看 ScheduledThreadPoolExecutor 源代码我们发现 ScheduledThreadPoolExecutor 实际上是继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService ,而 ScheduledExecutorService 又实现了 ExecutorService,正如我们下面给出的类关系图显示的一样。

ThreadPoolExecutor 类描述:

  1. //AbstractExecutorService实现了ExecutorService接口
  2. public class ThreadPoolExecutor extends AbstractExecutorService

ScheduledThreadPoolExecutor 类描述:

  1. //ScheduledExecutorService继承ExecutorService接口
  2. public class ScheduledThreadPoolExecutor
  3. extends ThreadPoolExecutor
  4. implements ScheduledExecutorService

线程池详解 - 图1

2.2.3 异步计算的结果(Future)

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。当我们把 Runnable 接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutorScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

2.3 Executor 框架的使用示意图

线程池详解 - 图2

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  2. 把创建完成的实现 Runnable/Callable 接口的 对象直接交给 ExecutorService 执行:ExecutorService.execute(Runnable command) 或者也可以把 Runnable/Callable 对象提交给 ExecutorService 执行:ExecutorService.submit(Runnable task)ExecutorService.submit(Callable <T> task)
  3. 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现 Future 接口的对象(刚刚也提到过了执行 execute() 方法和 submit() 方法的区别,submit() 会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
  4. 最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。

    第三部分:⭕ThreadPoolExecutor类简单介绍

    3.1 ThreadPoolExecutor类分析

    ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法都是给定某些默认参数的构造方法,比如默认制定拒绝策略是什么)。
    1. /**
    2. * 用给定的初始参数创建一个新的ThreadPoolExecutor。
    3. */
    4. public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
    5. int maximumPoolSize,//线程池的最大线程数
    6. long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
    7. TimeUnit unit,//时间单位
    8. BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
    9. ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
    10. RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
    11. ) {
    12. if (corePoolSize < 0 ||
    13. maximumPoolSize <= 0 ||
    14. maximumPoolSize < corePoolSize ||
    15. keepAliveTime < 0)
    16. throw new IllegalArgumentException();
    17. if (workQueue == null || threadFactory == null || handler == null)
    18. throw new NullPointerException();
    19. this.corePoolSize = corePoolSize;
    20. this.maximumPoolSize = maximumPoolSize;
    21. this.workQueue = workQueue;
    22. this.keepAliveTime = unit.toNanos(keepAliveTime);
    23. this.threadFactory = threadFactory;
    24. this.handler = handler;
    25. }
    ThreadPoolExecutor 3 个最重要的参数:
  • corePoolSize:核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize:当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor 其他4个常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit:keepAliveTime 参数的时间单位。
  3. threadFactory:executor 创建新线程的时候会用到。
  4. handler:饱和策略。

线程池详解 - 图3ThreadPoolExecutor 饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义一些策略:

  • **ThreadPoolExecutor.AbortPolicy**:抛出 RejectedExecutionException来拒绝新任务的处理。
  • **ThreadPoolExecutor.CallerRunsPolicy**:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • **ThreadPoolExecutor.DiscardPolicy**:不处理新任务,直接丢弃掉。
  • **ThreadPoolExecutor.DiscardOldestPolicy**: 此策略将丢弃最早的未处理的任务请求。

    Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

3.2 推荐使用 ThreadPoolExecutor 构造函数创建线程池

在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。为什么呢?

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

另外《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor:允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool:允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM。

方式一:通过 ThreadPoolExecutor 构造函数实现(推荐)
线程池详解 - 图4
方式二:通过 Executor 框架的工具类 Executors 来实现
我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

对应 Executors 工具类中的方法如图所示:线程池详解 - 图5

第四部分:⭕ThreadPoolExecutor使用示例

4.1 示例代码:Runnable+ThreadPoolExecutor

首先创建一个 Runnable 接口的实现类:

  1. public class MyRunnable implements Runnable{
  2. private String command;
  3. public MyRunnable(String s){
  4. this.command = s;
  5. }
  6. @Override
  7. public void run() {
  8. System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
  9. processCommand();
  10. System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
  11. }
  12. private void processCommand(){
  13. try {
  14. Thread.sleep(5000);
  15. }catch (Exception e){
  16. e.printStackTrace();
  17. }
  18. }
  19. public String toString(){
  20. return this.command;
  21. }
  22. }

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池:

  1. public class ThreadPoolExecutorDemo {
  2. private static final int CORE_POOL_SIZE = 5;
  3. private static final int MAX_POOL_SIZE = 10;
  4. private static final int QUEUE_CAPACITY = 100;
  5. private static final long KEEP_ALIVE_TIME = 1L;
  6. public static void main(String[] args) {
  7. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  8. CORE_POOL_SIZE,
  9. MAX_POOL_SIZE,
  10. KEEP_ALIVE_TIME,
  11. TimeUnit.SECONDS,
  12. new ArrayBlockingQueue<>(QUEUE_CAPACITY),
  13. new ThreadPoolExecutor.CallerRunsPolicy()
  14. );
  15. for(int i = 0;i<10;i++){
  16. Runnable worker = new MyRunnable(""+i);
  17. executor.execute(worker);
  18. }
  19. executor.shutdown();
  20. while (!executor.isTerminated()){
  21. }
  22. System.out.println("Finished all threads.");
  23. }
  24. }

可以看到我们上面的代码指定了:

  1. corePoolSize:核心线程数为 5;
  2. maximumPoolSize:最大线程数 10;
  3. keepAliveTime:等待时间为 1L;
  4. unit:等待时间的单位为 TimeUnit.SECONDS;
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy。

输出:

  1. D:\developer_tools\Java\jdk1.8.0_291\bin\java.exe "-javaagent:D:\developer_tools\IntelliJ IDEA 2021.1.2\lib\idea_rt.jar=54180:D:\developer_tools\IntelliJ IDEA 2021.1.2\bin" -Dfile.encoding=UTF-8 -classpath D:\developer_tools\Java\jdk1.8.0_291\jre\lib\charsets.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\deploy.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-64.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\javaws.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\jce.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\jfr.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\jsse.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\management-agent.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\plugin.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\resources.jar;D:\developer_tools\Java\jdk1.8.0_291\jre\lib\rt.jar;D:\workspace_idea\XiaoZhaoLearn\target\classes;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\projectlombok\lombok\1.18.20\lombok-1.18.20.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\jetbrains\kotlin\kotlin-stdlib-jdk8\1.5.30\kotlin-stdlib-jdk8-1.5.30.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\jetbrains\kotlin\kotlin-stdlib\1.5.30\kotlin-stdlib-1.5.30.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\jetbrains\annotations\13.0\annotations-13.0.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\jetbrains\kotlin\kotlin-stdlib-common\1.5.30\kotlin-stdlib-common-1.5.30.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\jetbrains\kotlin\kotlin-stdlib-jdk7\1.5.30\kotlin-stdlib-jdk7-1.5.30.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\junit\junit\4.13.1\junit-4.13.1.jar;D:\developer_tools\Java\apache-maven-3.8.1\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar concurrent.threadpooltest.ThreadPoolExecutorDemo
  2. pool-1-thread-1 Start. Time = Tue Aug 31 15:03:39 CST 2021
  3. pool-1-thread-2 Start. Time = Tue Aug 31 15:03:39 CST 2021
  4. pool-1-thread-5 Start. Time = Tue Aug 31 15:03:39 CST 2021
  5. pool-1-thread-4 Start. Time = Tue Aug 31 15:03:39 CST 2021
  6. pool-1-thread-3 Start. Time = Tue Aug 31 15:03:39 CST 2021
  7. pool-1-thread-1 End. Time = Tue Aug 31 15:03:44 CST 2021
  8. pool-1-thread-2 End. Time = Tue Aug 31 15:03:44 CST 2021
  9. pool-1-thread-5 End. Time = Tue Aug 31 15:03:44 CST 2021
  10. pool-1-thread-3 End. Time = Tue Aug 31 15:03:44 CST 2021
  11. pool-1-thread-1 Start. Time = Tue Aug 31 15:03:44 CST 2021
  12. pool-1-thread-4 End. Time = Tue Aug 31 15:03:44 CST 2021
  13. pool-1-thread-3 Start. Time = Tue Aug 31 15:03:44 CST 2021
  14. pool-1-thread-5 Start. Time = Tue Aug 31 15:03:44 CST 2021
  15. pool-1-thread-2 Start. Time = Tue Aug 31 15:03:44 CST 2021
  16. pool-1-thread-4 Start. Time = Tue Aug 31 15:03:44 CST 2021
  17. pool-1-thread-5 End. Time = Tue Aug 31 15:03:49 CST 2021
  18. pool-1-thread-4 End. Time = Tue Aug 31 15:03:49 CST 2021
  19. pool-1-thread-1 End. Time = Tue Aug 31 15:03:49 CST 2021
  20. pool-1-thread-3 End. Time = Tue Aug 31 15:03:49 CST 2021
  21. pool-1-thread-2 End. Time = Tue Aug 31 15:03:49 CST 2021
  22. Finished all threads.
  23. Process finished with exit code 0

4.2 线程池原理分析

我们通过代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。
为了搞懂线程池的原理,我们需要首先分析一下 execute方法。在 4.1 节中的 Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要:

  1. // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. private static int workerCountOf(int c) {
  4. return c & CAPACITY;
  5. }
  6. //任务队列
  7. private final BlockingQueue<Runnable> workQueue;
  8. public void execute(Runnable command) {
  9. // 如果任务为null,则抛出异常。
  10. if (command == null)
  11. throw new NullPointerException();
  12. // ctl 中保存的线程池当前的一些状态信息
  13. int c = ctl.get();
  14. // 下面会涉及到 3 步 操作
  15. // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
  16. // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
  17. if (workerCountOf(c) < corePoolSize) {
  18. if (addWorker(command, true))
  19. return;
  20. c = ctl.get();
  21. }
  22. // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
  23. // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
  24. if (isRunning(c) && workQueue.offer(command)) {
  25. int recheck = ctl.get();
  26. // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
  27. if (!isRunning(recheck) && remove(command))
  28. reject(command);
  29. // 如果当前线程池为空就新创建一个线程并执行。
  30. else if (workerCountOf(recheck) == 0)
  31. addWorker(null, false);
  32. }
  33. //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
  34. //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
  35. else if (!addWorker(command, false))
  36. reject(command);
  37. }

流程图如下:
线程池详解 - 图6
addWorker 这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则的话返回的就是false。

  1. // 全局锁,并发操作必备
  2. private final ReentrantLock mainLock = new ReentrantLock();
  3. // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
  4. private int largestPoolSize;
  5. // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
  6. private final HashSet<Worker> workers = new HashSet<>();
  7. //获取线程池状态
  8. private static int runStateOf(int c) { return c & ~CAPACITY; }
  9. //判断线程池的状态是否为 Running
  10. private static boolean isRunning(int c) {
  11. return c < SHUTDOWN;
  12. }
  13. /**
  14. * 添加新的工作线程到线程池
  15. * @param firstTask 要执行
  16. * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
  17. * @return 添加成功就返回true否则返回false
  18. */
  19. private boolean addWorker(Runnable firstTask, boolean core) {
  20. retry:
  21. for (;;) {
  22. //这两句用来获取线程池的状态
  23. int c = ctl.get();
  24. int rs = runStateOf(c);
  25. // Check if queue empty only if necessary.
  26. if (rs >= SHUTDOWN &&
  27. ! (rs == SHUTDOWN &&
  28. firstTask == null &&
  29. ! workQueue.isEmpty()))
  30. return false;
  31. for (;;) {
  32. //获取线程池中线程的数量
  33. int wc = workerCountOf(c);
  34. // core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize
  35. if (wc >= CAPACITY ||
  36. wc >= (core ? corePoolSize : maximumPoolSize))
  37. return false;
  38. //原子操作将workcount的数量加1
  39. if (compareAndIncrementWorkerCount(c))
  40. break retry;
  41. // 如果线程的状态改变了就再次执行上述操作
  42. c = ctl.get();
  43. if (runStateOf(c) != rs)
  44. continue retry;
  45. // else CAS failed due to workerCount change; retry inner loop
  46. }
  47. }
  48. // 标记工作线程是否启动成功
  49. boolean workerStarted = false;
  50. // 标记工作线程是否创建成功
  51. boolean workerAdded = false;
  52. Worker w = null;
  53. try {
  54. w = new Worker(firstTask);
  55. final Thread t = w.thread;
  56. if (t != null) {
  57. // 加锁
  58. final ReentrantLock mainLock = this.mainLock;
  59. mainLock.lock();
  60. try {
  61. //获取线程池状态
  62. int rs = runStateOf(ctl.get());
  63. //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
  64. //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
  65. // firstTask == null证明只新建线程而不执行任务
  66. if (rs < SHUTDOWN ||
  67. (rs == SHUTDOWN && firstTask == null)) {
  68. if (t.isAlive()) // precheck that t is startable
  69. throw new IllegalThreadStateException();
  70. workers.add(w);
  71. //更新当前工作线程的最大容量
  72. int s = workers.size();
  73. if (s > largestPoolSize)
  74. largestPoolSize = s;
  75. // 工作线程是否启动成功
  76. workerAdded = true;
  77. }
  78. } finally {
  79. // 释放锁
  80. mainLock.unlock();
  81. }
  82. //// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
  83. if (workerAdded) {
  84. t.start();
  85. /// 标记线程启动成功
  86. workerStarted = true;
  87. }
  88. }
  89. } finally {
  90. // 线程启动失败,需要从工作线程中移除对应的Worker
  91. if (! workerStarted)
  92. addWorkerFailed(w);
  93. }
  94. return workerStarted;
  95. }

4.3 常见对比

4.3.1 Runnable vs Callable

Runnable 自 Java 1.0 以来一直存在,但 Callable 仅在 Java 1.5 中引入,目的就是为了来处理 Runnable 不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。
工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。
Runnable.java

  1. @FunctionalInterface
  2. public interface Runnable {
  3. /**
  4. * 被线程执行,没有返回值也无法抛出异常
  5. */
  6. public abstract void run();
  7. }

Callable.java

  1. @FunctionalInterface
  2. public interface Callable<V> {
  3. /**
  4. * 计算结果,或在无法这样做时抛出异常。
  5. * @return 计算得出的结果
  6. * @throws 如果无法计算结果,则抛出异常
  7. */
  8. V call() throws Exception;
  9. }

4.3.2 execute() vs submit()

  1. **execute()**方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. **submit()**方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

AbstractExecutorService 接口中的一个 submit() 方法为例子来看看源代码:

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }

上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象:

  1. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  2. return new FutureTask<T>(runnable, value);
  3. }

execute()方法:

  1. public void execute(Runnable command) {
  2. ...
  3. }

4.3.3 shutdown() vs shutdownNow()

  • **shutdown()**:关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • **shutdownNow()**:关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

    4.3.4 isTerminated() vs isShutdown()

  • **isShutDown()**:当调用 shutdown() 方法后返回为 true。

  • **isTerminated()**:当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true。

    第五部分:几种常见线程池详解

    5.1 FixedThreadPool

    5.1.1 介绍

    FixedThreadPool 被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:
    1. /**
    2. * 创建一个可重用固定数量线程的线程池
    3. */
    4. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    5. return new ThreadPoolExecutor(nThreads, nThreads,
    6. 0L, TimeUnit.MILLISECONDS,
    7. new LinkedBlockingQueue<Runnable>(),
    8. threadFactory);
    9. }
    另外还有一个 FixedThreadPool 的实现方法,和上面的类似:
    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }
    从上面源代码可以看出新创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。

    5.1.2 执行任务过程介绍

    FixedThreadPool 的 execute() 方法运行示意图:
    线程池详解 - 图7
    上图说明:
  1. 如果当前运行的线程数小于 corePoolSize,如果再来新任务的话,就创建新的线程来执行任务;
  2. 当前运行的线程数等于 corePoolSize 后,如果再来新任务的话,会将任务加入 LinkedBlockingQueue;
  3. 线程池中的线程执行完手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;

    5.1.3 为什么不推荐使用 FixedThreadPool?

    FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  4. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;

  5. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
  6. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  7. 运行中的 FixedThreadPool (未执行 shutdown()shutdownNow()) 不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

    5.2 SingleThreadExecutor 详解

    5.2.1 介绍

    SingleThreadExecutor 是只有一个线程的线程池。SingleThreadExecutor 的实现:

    1. /**
    2. *返回只有一个线程的线程池
    3. */
    4. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    5. return new FinalizableDelegatedExecutorService
    6. (new ThreadPoolExecutor(1, 1,
    7. 0L, TimeUnit.MILLISECONDS,
    8. new LinkedBlockingQueue<Runnable>(),
    9. threadFactory));
    10. }
    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }

    从上面源代码可以看出新创建的 SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 都被设置为 1,其他参数和 FixedThreadPool 相同。

    5.2.2 执行任务过程介绍

    SingleThreadExecutor 的运行示意图(该图片来源:《Java 并发编程的艺术》):
    线程池详解 - 图8
    上图说明:

  8. 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;

  9. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue;
  10. 线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行;

    5.2.3 为什么不推荐使用SingleThreadExecutor?

    SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM。

    5.3 CachedThreadPool 详解

    5.3.1 介绍

    CachedThreadPool 是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool 的实现:

    1. /**
    2. * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
    3. */
    4. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    5. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    6. 60L, TimeUnit.SECONDS,
    7. new SynchronousQueue<Runnable>(),
    8. threadFactory);
    9. }
    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }

    CachedThreadPool 的corePoolSize 被设置为空(0),maximumPoolSize 被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

    5.3.2 执行任务过程介绍

    CachedThreadPool 的 execute()方法的执行示意图(该图片来源:《Java 并发编程的艺术》): 线程池详解 - 图9
    上图说明:

  11. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;

  12. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成。

    5.3.3 为什么不推荐使用 CachedThreadPool?

    CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM。

    第六部分:ScheduledThreadPoolExecutor详解

    ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务。

    6.1 简介

    ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。
    ScheduledThreadPoolExecutor 和 Timer 的比较:
  • Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
  • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
  • 在TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机(即计划任务将不再运行)。ScheduledThreadExecutor 不仅捕获运行时异常,还允许在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。

    备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等。

6.2 运行机制

线程池详解 - 图10
ScheduledThreadPoolExecutor 的执行主要分为两大部分:

  1. 当调用ScheduledThreadPoolExecutor的 scheduleAtFixedRate() 方法或者scheduleWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask
  2. 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor 做了如下修改:

  • 使用 DelayQueue 作为任务队列;
  • 获取任务的方不同
  • 执行周期任务后,增加了额外的处理

    6.3 ScheduledThreadPoolExecutor执行周期任务的步骤

    线程池详解 - 图11
  1. 线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指 ScheduledFutureTask的 time 大于等于当前系统的时间;
  2. 线程 1 执行这个 ScheduledFutureTask;
  3. 线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
  4. 线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

    第七部分:线程池大小确定

    线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。线程数量过多的影响和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。

    上下文切换: 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。 Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的。 CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。