1、线程池的优势

(1)降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
(2)提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
(3)方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
(4)提供更强大的功能,延时定时线程池。

2、Executors工具类

Executors类提供了4种不同的线程池:
(1)newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO,
LIFO, 优先级)执行。
(2)newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
(3)newScheduledThreadPool
创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
(4)newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线
程。
注意实际开发过程中,不允许使用Executors类创建线程池;
image.png

3、线程池ThreadPoolExecutor**

  1. public ThreadPoolExecutor(int corePoolSize, // 核心线程数
  2. int maximumPoolSize,// 线程总数最大值
  3. long keepAliveTime,// 非核心线程闲置超长时间
  4. TimeUnit unit,// 超时单位
  5. BlockingQueue<Runnable> workQueue,// 阻塞队列
  6. ThreadFactory threadFactory,// 线程工厂
  7. RejectedExecutionHandler handler)// 拒绝策略
  • int corePoolSize => 该线程池中核心线程数最大值

    核心线程: 线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程 核心线程默认情况下会一直存活在线程池中,即使这个核心线程闲置状态d。 如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉

  • int maximumPoolSize

    该线程池中线程总数最大值 线程总数 = 核心线程数 + 非核心线程数。

  • long keepAliveTime

    该线程池中非核心线程闲置超时时长 一个非核心线程,如果闲置状态的时长超过这个参数所设定的时长,就会被销毁掉 如果设置allowCoreThreadTimeOut = true,则会作用于核心线程

  • TimeUnit unit

    keepAliveTime的单位,TimeUnit是一个枚举类型,其包括:

    1. NANOSECONDS : 1微毫秒 = 1微秒 / 1000
    2. MICROSECONDS : 1微秒 = 1毫秒 / 1000
    3. MILLISECONDS : 1毫秒 = 1秒 /1000
    4. SECONDS : 秒
    5. MINUTES : 分
    6. HOURS : 小时
    7. DAYS : 天
  • BlockingQueue workQueue

    该线程池中的任务队列:维护着等待执行的Runnable对象 当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务 常用的workQueue类型:

    1. SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
    2. LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
    3. ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
    4. DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
  • ThreadFactory threadFactory

    用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,

  • RejectedExecutionHandler handler

    线程池的拒绝策略:当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

拒绝策略 拒绝行为
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务
ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

4、线程池流程

线程池 - 图2

5、最大线程数的定义(maximumPoolSize)

java.lang.**Runtime.getRuntime()**.availableProcessors()方法:
作用:获取可用Java虚拟机的处理器数量。
**CPU密集型任务**
顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的应用,完全是靠CPU的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程上下文切换,比较理想方案是:
线程数 = CPU核数+1,也可以设置成CPU核数2,但还要看JDK的版本以及CPU配置(服务器的CPU有超线程)。
一般设置CPU
2即可。
**IO密集型任务**
我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输,不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现,对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。那么这个线程池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切换是有代价的。目前总结了一套公式,对于IO密集型应用:
线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))
混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

6、自定义线程池

代码演示:

  1. ThreadPoolExecutor threadPool = null;
  2. try {
  3. threadPool = new ThreadPoolExecutor(
  4. 2,
  5. Runtime.getRuntime().availableProcessors(),
  6. 20L,
  7. TimeUnit.HOURS,
  8. new LinkedBlockingQueue<>(3),
  9. Executors.defaultThreadFactory(),
  10. new ThreadPoolExecutor.AbortPolicy()
  11. );
  12. for (int i = 1; i <= 12; i++) {
  13. final int j = i;
  14. threadPool.execute(() -> {
  15. System.out.println(Thread.currentThread().getName() + "\t" + j);
  16. });
  17. }
  18. } finally {
  19. if (threadPool != null) {
  20. threadPool.shutdown();
  21. }
  22. }

7、优雅的关闭线程池

ExecutorService关于关闭主要有如下几个方法:

  • shutdown:在线程池队列中的提交的任务会执行,无法提交新的任务,注意调用这个方法,线程池不会等待(wait)在执行的任务执行完成,可以使用awaitTermination实现这个目的。这里需要注意的是:在执行的任务因为是异步线程执行的,任务还是会继续执行,只是说线程池不会阻塞等待任务执行完成
  • List<Runnable> shutdownNow():试图关闭正在执行的任务,不会执行已经提交到队列但是还没有执行的任务,返回等待执行的任务列表,同时此方法不会等待那些正在执行的任务执行完,等待执行的任务会从线程池队列移除。
  • isShutdown:线程池是否关闭
  • isTerminated:判断线程池关闭后所有的任务是否都执行完了,注意这个方法只有在 shutdown或shutdownNow方法调用后才有效
  • awaitTermination:阻塞,直到一下任务情况出现:
  • shutdown:调用后所有任务执行完成
  • 超时
  • 当前线程中断

正确关闭线程池

  • 首先任务要能够响应中断
  • 关闭流程:
    • 调用shutdown禁止提交新的任务
    • 调用 awaitTermination等待任务执行完成
    • 调用shutdownNow强制关闭那些执行任务过长(可能无法正常停止)的任务
      1. // 创建线程池对象
      2. ThreadPoolExecutor threadPool = null;
      3. try {
      4. threadPool = new ThreadPoolExecutor(
      5. 2,
      6. Runtime.getRuntime().availableProcessors(),
      7. 10L,
      8. TimeUnit.SECONDS,
      9. new LinkedBlockingQueue<>(2),
      10. Executors.defaultThreadFactory(),
      11. new ThreadPoolExecutor.CallerRunsPolicy()
      12. );
      13. // 执行线程任务
      14. threadPool.execute(() -> {
      15. for (int i = 0; i < 10; i++) {
      16. final int j = i;
      17. System.out.println(Thread.currentThread().getName() + "\t" + j);
      18. }
      19. });
      20. } finally {
      21. // 释放资源
      22. threadPool.shutdown();
      23. try {
      24. if (threadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
      25. threadPool.shutdownNow();
      26. }
      27. } catch (InterruptedException e) {
      28. threadPool.shutdownNow();
      29. }
      30. }
      异步操作关闭线程池
      1. // 创建线程池对象
      2. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
      3. 2,
      4. Runtime.getRuntime().availableProcessors(),
      5. 10L,
      6. TimeUnit.SECONDS,
      7. new LinkedBlockingQueue<>(2),
      8. Executors.defaultThreadFactory(),
      9. new ThreadPoolExecutor.CallerRunsPolicy()
      10. );
      11. // 执行线程任务
      12. threadPool.execute(() -> {
      13. for (int i = 0; i < 10; i++) {
      14. final int j = i;
      15. System.out.println(Thread.currentThread().getName() + "\t" + j);
      16. }
      17. });
      18. // 异步方式关闭线程池
      19. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      20. threadPool.shutdown();
      21. try {
      22. if (!threadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
      23. threadPool.shutdownNow();
      24. }
      25. } catch (InterruptedException e) {
      26. threadPool.shutdownNow();
      27. }
      28. }));