前言

线程池: 简单理解,它就是一个管理线程的池子。

线程池的优点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    线程池的总结

    工作流程

    图解线程池流程
    先来一张通俗易懂的:
    20210218191247309.png
    文字理解:工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也以招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。
    将其翻译成线程池理解:
    20210218191944638.png

    关键参数

    线程池关键参数的理解(在图中可以看到):

  • corePoolSize(必需):核心线程数。即池中一直保持存活的线程数,即使这些线程处于空闲。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。

  • maximumPoolSize(必需):池中允许的最大线程数。当核心线程全部繁忙且任务队列打满之后,线程池会临时追加线程,直到总线程数达到maximumPoolSize这个上限。
  • keepAliveTime(必需):线程空闲超时时间。当非核心线程处于空闲状态的时间超过这个时间后,该线程将被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。
  • unit(必需):keepAliveTime参数的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)
  • workQueue(必需):任务队列,采用阻塞队列实现。当核心线程全部繁忙时,后续由execute方法提交的Runnable将存放在任务队列中,等待被线程处理。
  • threadFactory(可选):线程工厂。指定线程池创建线程的方式。
  • handler(可选):拒绝策略。当线程池中线程数达到maximumPoolSize且workQueue打满时,后续提交的任务将被拒绝,handler可以指定用什么方式拒绝任务。

图解参数之间的关系:
640.webp

工作原理

看完工作流程后再来看下线程池工作原理(图解):
20210214224304426.png

拒绝策略

  1. AbortPolicy(抛出一个异常,默认的)
  2. DiscardPolicy(直接丢弃任务)
  3. DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
  4. CallerRunsPolicy(交给线程池调用所在的线程进行处理)

    任务队列

    ArrayBlockingQueue

    ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

    LinkedBlockingQueue

    LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列

    DelayQueue

    DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

    PriorityBlockingQueue

    PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;

    SynchronousQueue

    SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

创建方式

创建线程池的方式:

  1. 通过ThreadPoolExecutor构造函数实现(推荐)
  2. 通过 Executor 框架的工具类 Executors 来实现,总共有三种类型(方法内部也是调用了ThreadPoolExecutor的构造方法)
  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

    ThreadPoolExecutor

    《阿里巴巴 Java 开发手册》“并发处理”章节中,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。(具体原因可参照线程池的优点)
    并且强制要求,不允许使用 Executors 去快捷创建线程池,而是通过 ThreadPoolExecutor 构造函数的方式,除了避免 OOM 的原因外,这样的处理方式可以让我们更加明确线程池的运行规则,规避资源耗尽的风险。
    也就是说使用有界队列,控制线程创建数量。因为实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。我们应该显示地给我们的线程池命名,有助于后期快速定位问题。

    Executors 返回线程池对象的弊端: FixedThreadPoolSingleThreadExecutor:允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM(java.lang.OutOfMemoryError:内存用完了)。 CachedThreadPoolScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

示例:一个简单的for遍历线程

  1. import java.util.Date;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class App
  6. {
  7. //corePoolSize: 定义核心线程数为 3
  8. private static final Integer CORE_POOL_SIZE = 3;
  9. //maximumPoolSize:定义最大线程数为 9
  10. private static final Integer MAX_POOL_SIZE = 9;
  11. //workQueue:定义任务队列为 ArrayBlockingQueue,并且容量为 90
  12. private static final Integer QUEUE_CAPACITY = 90;
  13. //keepAliveTime: 定义等待时间为 1
  14. private static final Long KEEP_ALIVE_TIME = 1L;
  15. public static void main(String[] args) {
  16. //通过ThreadPoolExecutor构造函数自定义参数创建
  17. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  18. CORE_POOL_SIZE,
  19. MAX_POOL_SIZE,
  20. KEEP_ALIVE_TIME,
  21. TimeUnit.SECONDS,//unit: 等待时间的单位为 TimeUnit.SECONDS
  22. new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY), //workQueue:定义任务队列为 ArrayBlockingQueue
  23. new ThreadPoolExecutor.CallerRunsPolicy());//handler:定义拒绝策略为 CallerRunsPolicy
  24. // 模拟向线程池提交(6个线程)任务
  25. for (int i = 0; i < 6; i++) {
  26. //执行线程;这里直接使用的Runnable匿名对象;也可以自行创建Runnable或Callable的实现类
  27. executor.execute(new Runnable() {
  28. @Override
  29. public void run() {
  30. Long start = System.currentTimeMillis(); //获取线程开始执行时间
  31. //这里随便写了一个遍历功能的线程
  32. for (int numValue = 0; numValue <= 2; numValue++) {
  33. System.out.println(Thread.currentThread().getName() + ":" + numValue + " " + new Date(System.currentTimeMillis()));
  34. try {
  35. Thread.sleep(500);
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. System.out.println(Thread.currentThread().getName() + ":当前线程任务结束,总共花费时间:" + (System.currentTimeMillis() - start) / 1000 + "秒");
  41. }
  42. }
  43. );
  44. }
  45. //终止线程池
  46. executor.shutdown(); //设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
  47. // executor.shutdownNow(); //(慎用)设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
  48. while (!executor.isTerminated()) {
  49. //使用isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,必须在shutdown()方法关闭线程池之后才能使用,
  50. //否则isTerminated()永远为false,线程将一直阻塞在该判断的地方,导致程序最终崩溃。
  51. }
  52. System.out.println("所有线程都已结束");
  53. }
  54. }

打印结果:

  1. pool-1-thread-2:0 Thu Sep 30 10:42:01 CST 2021
  2. pool-1-thread-1:0 Thu Sep 30 10:42:01 CST 2021
  3. pool-1-thread-3:0 Thu Sep 30 10:42:01 CST 2021
  4. pool-1-thread-2:1 Thu Sep 30 10:42:02 CST 2021
  5. pool-1-thread-1:1 Thu Sep 30 10:42:02 CST 2021
  6. pool-1-thread-3:1 Thu Sep 30 10:42:02 CST 2021
  7. pool-1-thread-2:2 Thu Sep 30 10:42:02 CST 2021
  8. pool-1-thread-1:2 Thu Sep 30 10:42:02 CST 2021
  9. pool-1-thread-3:2 Thu Sep 30 10:42:02 CST 2021
  10. pool-1-thread-3:当前线程任务结束,总共花费时间:1
  11. pool-1-thread-3:0 Thu Sep 30 10:42:03 CST 2021
  12. pool-1-thread-1:当前线程任务结束,总共花费时间:1
  13. pool-1-thread-1:0 Thu Sep 30 10:42:03 CST 2021
  14. pool-1-thread-2:当前线程任务结束,总共花费时间:1
  15. pool-1-thread-2:0 Thu Sep 30 10:42:03 CST 2021
  16. pool-1-thread-3:1 Thu Sep 30 10:42:03 CST 2021
  17. pool-1-thread-2:1 Thu Sep 30 10:42:03 CST 2021
  18. pool-1-thread-1:1 Thu Sep 30 10:42:03 CST 2021
  19. pool-1-thread-3:2 Thu Sep 30 10:42:04 CST 2021
  20. pool-1-thread-1:2 Thu Sep 30 10:42:04 CST 2021
  21. pool-1-thread-2:2 Thu Sep 30 10:42:04 CST 2021
  22. pool-1-thread-2:当前线程任务结束,总共花费时间:1
  23. pool-1-thread-1:当前线程任务结束,总共花费时间:1
  24. pool-1-thread-3:当前线程任务结束,总共花费时间:1
  25. 所有线程都已结束

打印结果分析:
该示例中总共向线程池提交了6个线程任务,但是该线程池的核心线程只有3个,所以根据示例中定义的拒绝策略和任务队列,剩下的3个线程在任务队列中,根据打印结果可以看到,当前面的3个线程分别执行完毕时,空出的核心线程会被剩下的3个线程任务拿到,然后继续执行剩下的线程任务

Executors

FixedThreadPool

创建可重用固定线程数的线程池。

  • FixedThreadPool只有核心线程,且数量固定,没有非核心线程。
  • keepAliveTime设置为0L,代表多余的线程会被立即终止;即没有非空闲时间;因为不会产生多余的线程,所以keepAliveTime是无效的参数。
  • 任务队列为无界的阻塞队列LinkedBlockingQueue(容量默认为Integer.MAX_VALUE)。

20190424125302141.png

  1. 提交任务
  2. 如果线程数少于核心线程,创建核心线程执行任务
  3. 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
  4. 如果线程执行完任务,去阻塞队列取任务,继续执行。

示例:

  1. ExecutorService executor = Executors.newFixedThreadPool(10);
  2. for (int i = 0; i < Integer.MAX_VALUE; i++) {
  3. executor.execute(()->{
  4. try {
  5. Thread.sleep(1000);
  6. } catch (InterruptedException e) {
  7. //do nothing
  8. }
  9. });

使用场景:
适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
缺点:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)

    CachedThreadPool

    一个根据需要创建线程的线程池。
    核心线程数为0
    最大线程数为Integer.MAX_VALUE
    阻塞队列是SynchronousQueue
    非核心线程空闲存活时间为60秒
    当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

    1. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>(),
    5. threadFactory);
    6. }

    20190424131202663.png

  5. 提交任务

  6. 因为没有核心线程,所以任务直接加到SynchronousQueue队列。
  7. 判断是否有空闲线程,如果有,就去取出任务执行。
  8. 如果没有空闲线程,就新建一个线程执行。
  9. 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则被销毁。

使用场景:
因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。


缺点:
CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

SingleThreadExecutor

只有一个线程的线程池。

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0
    1. ExecutorService executor = Executors.newSingleThreadExecutor();
    2. for (int i = 0; i < 5; i++) {
    3. executor.execute(() -> {
    4. System.out.println(Thread.currentThread().getName()+"正在执行");
    5. });
    6. }
    20190424133324115.png
  1. 提交任务
  2. 线程池是否有一条线程在,如果没有,新建线程执行任务
  3. 如果有,讲任务加到阻塞队列
  4. 当前的唯一线程,从队列取任务,执行完一个,再继续取,如此循环。。。。

使用场景:
适用于串行执行任务的场景,一个任务一个任务地执行。
缺点:
SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的任务队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的任务队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM,

常用方法

提交

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
AbstractExecutorService接口中的一个 submit 方法为例:

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) thrownew NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }

上面方法调用的 newTaskFor()方法返回了一个 FutureTask 对象。

  1. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  2. return new FutureTask<T>(runnable, value);
  3. }


对比execute()方法:

  1. public void execute(Runnable command) {
  2. ...
  3. }

关闭

shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
shutdownNow() :关闭线程池,线程的状态变为STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List;该方法要慎用,容易造成不可控的后果

判断

isShutDown 当调用 shutdown()方法后,返回为 true,否则为false。
isTerminated 当调用shutdown() 方法后,并且所有提交的任务完成后返回为 true,否则为false。

状态切换

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
  • 调用线程池的shutdownNow()方法,可以切换到STOP状态;

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • 线程池中执行的任务为空,进入TIDYING状态;

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0。
  • terminated()执行完毕,进入TERMINATED状态

TERMINATED

  • 该状态表示线程池彻底终止

    配置线程数

    CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

参考文章:
https://mp.weixin.qq.com/s?__biz=Mzg2OTA0Njk0OA==&mid=2247485808&idx=1&sn=1013253533d73450cef673aee13267ab&chksm=cea246bbf9d5cfad1c21316340a0ef1609a7457fea4113a1f8d69e8c91e7d9cd6285f5ee1490&token=510053261&lang=zh_CN&scene=21#wechat_redirect
https://juejin.cn/post/6844903889678893063
https://blog.csdn.net/mu_wind/article/details/113806680