1、使用线程池的原因和好处

使用线程池的原因

  1. 反复创建线程开销大(线程的创建、销毁、切换,进程需要切换到内核态,完成后再切换回用户态,内核态和用户态切换的开销非常大)
  2. 过多的线程会占用太多内存

    使用线程池的好处

  3. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  4. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  5. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

2、线程池提交任务流程

image.png
1)当提交一个新任务到线程池时,线程池判断corePoolSize线程池是否都在执行任务,如果有空闲线程,则创建一个新的工作线程来执行任务,直到当前线程数等于corePoolSize;
2)如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
3)如果阻塞队列满了,那就创建新的线程执行当前任务,直到线程池中的线程数达到maxPoolSize,这时再有任务来,由饱和策略来处理提交的任务

3、线程池参数详解

image.png

3.1、corePoolSize和maxPoolSize

corePoolSize 核心线程数
maxPoolSize 最大线程数

3.2、keepAliveTime

非核心空闲线程存活时间
如果线程池当前线程数多于corePoolSize,如果线程的空闲时间大于keepAliveTime,那么线程会被中止

3.3、workQueue

工作队列,最常见的三种工作队列为
SynchronousQueue 直接交换,不存储线程
LinkedBlockingQueue 无界队列
ArrayBlockingQueue 有界队列

3.4、threadFactory

ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。

3.5、Handler 拒绝策略

RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常,查看源码,从jdk5开始RejectedExecutionHandler接口有四个实现类,jdk8亦是如此。

  • AbortPolicy:不处理,直接抛出异常。
  • CallerRunsPolicy:若线程池还没关闭,调用当前所在线程来运行任务,r.run()执行。
  • DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉,不抛出异常。

    4、增减线程的特点

1、通过设置corePollSize和maxPoolSize相同,就可以创建固定大小的线程池
2、线程池希望保持较小的线程数,只有在负载很大的情况下才增加它。
3、通过设置maxPoolSize为很大的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
4、只有在队列填满时才创建多于corePoolSize的线程,如果你使用的是无界队列(如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。

5、常见线程池

  1. //固定线程数线程池
  2. ExecutorService executorService1 = Executors.newFixedThreadPool(20);
  3. //可缓存线程池
  4. ExecutorService executorService2 = Executors.newCachedThreadPool();
  5. //定时任务线程池
  6. ScheduledExecutorService executorService3 = Executors.newScheduledThreadPool(20);
  7. //单线程线程池 FixedThreadPool的特例
  8. ExecutorService executorService4 = Executors.newSingleThreadExecutor();
  9. //单线程定时任务线程池 ScheduledThreadPool的特例
  10. ScheduledExecutorService executorService5 = Executors.newSingleThreadScheduledExecutor();

5.1、 FixedThreadPool

核心线程数和最大线程数相同,即固定线程数,队列为无界队列,可无限存放任务。
有OOM风险。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

5.2、CachedThreadPool

最大线程数为整型最大值,线程数几乎可以无限增加,当线程空闲时可以对线程进行回收。SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。
可无限创建线程,有OOM风险。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

5.3、ScheduledThreadPool

支持定时或周期性执行任务的线程池

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

用法
1、延时执行线程
service.schedule(new Task(), 10, TimeUnit.SECONDS);
2、以固定的频率执行任务,周期为任务开始的时间为下一次循环的时间起点开始计时间
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
3、以固定的频率执行任务,周期为任务结束的时间为下一次循环的时间起点开始计时间
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);

5.4、SingleThreadExecutor

单线程线程池 FixedThreadPool的特例,核心线程和最大线程的数量都为1,为线程数为1的固定长度线程池

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

5.5、SingleThreadScheduledExecutor

支持定时或周期性执行任务的单线程线程池 ,ScheduledThreadPool的特例
核心线程数为1

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  2. return new DelegatedScheduledExecutorService
  3. (new ScheduledThreadPoolExecutor(1));
  4. }

6、线程池常用的阻塞队列

FixedThreadPool LinkedBlockingQueue
CachedThreadPool SynchronousQueue
ScheduledThreadPool DelayedWorkQueue
SingleThreadExecutor LinkedBlockingQueue
SingleThreadScheduledExecutor DelayedWorkQueue
  • LinkedBlockingQueue

无界队列

  • SynchronousQueue

SynchronousQueue队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递

  • DelayedWorkQueue

延迟队列可以把任务按时间进行排序,无界队列

7、为什么不自动创建线程池

自动创建的线程池都存在OOM的风险,我们应该根据需求选择适合自己的线程数量,更可以在必要的时候拒绝新任务的提交,避免资源耗尽的风险。

FixedThreadPool LinkedBlockingQueue为无界队列
CachedThreadPool 可无限创建线程
ScheduledThreadPool DelayedWorkQueue为无界队列
SingleThreadExecutor LinkedBlockingQueue为无界队列
SingleThreadScheduledExecutor DelayedWorkQueue为无界队列

8、线程池中合适的线程数是多少

区分是cpu密集型任务还是IO密集型任务,推测大致的最优线程数,然后通过压测

CPU 密集型任务
CPU 密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。

耗时 IO 型任务
耗时 IO 型,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费

《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:

  1. 线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。

太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。

结论

线程的平均工作时间所占比例越高,就需要越少的线程;
线程的平均等待时间所占比例越高,就需要越多的线程;
针对不同的程序,进行对应的实际测试就可以得到最合适的选择。

9、停止线程池

shutdown()

调用 shutdown() 方法之后线程池并不是立刻就被关闭,因为这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭。但这并不代表 shutdown() 操作是没有任何效果的,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务

isShutdown()

返回 true 或者 false 来判断线程池是否已经开始了关闭工作,也就是是否执行了 shutdown 或者 shutdownNow 方法。true 并不代表线程池此时已经彻底关闭了,这仅仅代表线程池开始了关闭的流程

isTerminated()

检测线程池是否真正停止。不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕。

awaitTermination()

阻塞当前线程,若线程池在指定时间内完全停止即任务全部执行完毕,返回true
若线程池在指定时间内未完全停止,返回false
等待期间线程被中断,方法会抛出 InterruptedException 异常

shutdownNow()

在执行 shutdownNow 方法之后,首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,我们可以根据返回的任务 List 来进行一些补救的操作,例如记录在案并在后期重试。

  1. public class ShutDownThreadPool {
  2. public static void main(String[] args) {
  3. ExecutorService executorService = new ThreadPoolExecutor(10, 10,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>());
  6. IntStream.range(0, 1000).forEach(e -> executorService.execute(new ShutDownTask()));
  7. try {
  8. Thread.sleep(1000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println("线程池状态:" + executorService.isShutdown());
  13. executorService.shutdown();
  14. System.out.println("线程池已关闭");
  15. System.out.println("线程池状态:" + executorService.isShutdown());
  16. System.out.println("线程池是否完全中止:" + executorService.isTerminated());
  17. //当前线程会阻塞在该位置,如果只指定时间内线程池真正停止,也就是所有任务全部结束了,就返回true否则返回false
  18. boolean b = false;
  19. try {
  20. b = executorService.awaitTermination(1, TimeUnit.SECONDS);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println("awaitTermination结果:" + b);
  25. System.out.println("线程池状态:" + executorService.isShutdown());
  26. System.out.println("线程池是否完全中止:" + executorService.isTerminated());
  27. List<Runnable> runnables = executorService.shutdownNow();
  28. System.out.println("还有" + runnables.size() + "个未执行任务");
  29. executorService.execute(new ShutDownTask());
  30. }
  31. public static class ShutDownTask implements Runnable{
  32. @Override
  33. public void run() {
  34. try {
  35. Thread.sleep(100);
  36. System.out.println("当前线程名:" + Thread.currentThread().getName());
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }
  42. }

10、线程池钩子方法

beforeExecute,afterExecute

  1. package com.imooc.thread_demo.threadpool;
  2. import java.util.concurrent.*;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. import java.util.stream.IntStream;
  6. /**
  7. * @Author: zhangjx
  8. * @Date: 2020/10/10 17:23
  9. * @Description:
  10. */
  11. public class PauseableThreadPool extends ThreadPoolExecutor {
  12. private boolean isPaused;
  13. private final ReentrantLock lock = new ReentrantLock();
  14. private Condition unPaused = lock.newCondition();
  15. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
  16. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  17. }
  18. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
  19. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
  20. }
  21. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
  22. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
  23. }
  24. public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
  25. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
  26. }
  27. @Override
  28. protected void beforeExecute(Thread t, Runnable r) {
  29. super.beforeExecute(t, r);
  30. lock.lock();
  31. try {
  32. while(isPaused){
  33. unPaused.await();
  34. }
  35. }catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }finally {
  38. lock.unlock();
  39. }
  40. }
  41. @Override
  42. protected void afterExecute(Runnable r, Throwable t) {
  43. super.afterExecute(r, t);
  44. }
  45. private void pause(){
  46. lock.lock();
  47. try {
  48. isPaused = true;
  49. }finally {
  50. lock.unlock();
  51. }
  52. }
  53. private void resume(){
  54. lock.lock();
  55. try {
  56. isPaused = false;
  57. unPaused.signalAll();
  58. }finally {
  59. lock.unlock();
  60. }
  61. }
  62. public static void main(String[] args) throws InterruptedException {
  63. PauseableThreadPool executorService = new PauseableThreadPool(10, 10,
  64. 0L, TimeUnit.MILLISECONDS,
  65. new LinkedBlockingQueue<Runnable>());
  66. Runnable runnable = new Runnable() {
  67. @Override
  68. public void run() {
  69. System.out.println("线程 " + Thread.currentThread().getName() + " 正在执行");
  70. try {
  71. Thread.sleep(10);
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. };
  77. IntStream.range(0, 1000).forEach(e -> executorService.execute(runnable));
  78. Thread.sleep(100);
  79. executorService.pause();
  80. System.out.println("线程池已停止");
  81. Thread.sleep(5000);
  82. executorService.resume();
  83. System.out.println("线程池已恢复");
  84. }
  85. }

11、线程池的实现原理

线程池的组成部分

  • 线程池管理器
  • 工作线程
  • 任务队列
  • 任务接口(Task)

线程类关系

image.png

线程池如何做到线程复用

相同线程执行不同任务

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {
  6. if (addWorker(command, true))
  7. return;
  8. c = ctl.get();
  9. }
  10. if (isRunning(c) && workQueue.offer(command)) {
  11. int recheck = ctl.get();
  12. if (! isRunning(recheck) && remove(command))
  13. reject(command);
  14. else if (workerCountOf(recheck) == 0)
  15. addWorker(null, false);
  16. }
  17. else if (!addWorker(command, false))
  18. reject(command);
  19. }

addWorker 方法的主要作用是在线程池中创建一个线程并执行第一个参数传入的任务,第二个参数如果传入 true 代表增加线程时判断当前线程是否少于 corePoolSize,小于则增加新线程,大于等于则不增加;同理,如果传入 false 代表增加线程时判断当前线程是否少于 maxPoolSize,小于则增加新线程,大于等于则不增加

ThreadPoolExecutor内部类Worker类runWorker方法

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

可以看出,在 execute 方法中,多次调用 addWorker 方法把任务传入,addWorker 方法会添加并启动一个 Worker,这里的 Worker 可以理解为是对 Thread 的包装,Worker 内部有一个 Thread 对象,它正是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWorker 方法中。
可以看出,实现线程复用的逻辑主要在一个不停循环的 while 循环体中。

  1. 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
  2. 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。

在这里,我们找到了最终的实现,通过取 Worker 的 firstTask 或者 getTask方法从 workQueue 中取出了新任务,并直接调用 Runnable 的 run 方法来执行任务,也就是如之前所说的,每个线程都始终在一个大循环中,反复获取任务,然后执行任务,从而实现了线程的复用。

12、线程池的状态和注意点

线程池状态

RUNNING :接受新任务并处理排队任务
SHUTDOWN:不接受新任务但是处理排队任务
STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务
TIDYING:中文为整洁,当所有任务都中止了。workerCount为0时,线程会转换到TIDING状态,并将运行terminate()钩子方法
TERMINATED:terminate()运行完成

注意点

避免任务堆积
避免线程过度增加
排查线程泄露