线程池介绍

Java中的线程池

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。在Java中,线程池是JDK中提供的ThreadPoolExecutor类 和 ScheduledThreadPoolExecutor 类。
image.png
使用线程池有很多好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

    总体设计

    ThreadPoolExecutor 的运行机制如下图所示:
    image.png
    线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
    (1)直接申请线程执行该任务;
    (2)缓冲到队列中等待线程执行;
    (3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

    生命周期

    image.png
    image.png
    线程池状态转化图如下所示:
    image.png

    任务执行机制

    在任务执行机制之前,需要先了解线程池的7个核心参数,以 ThreadPoolExecutor 类为例:
    image.png
    其中,救急线程就是当核心线程全部不空闲并且任务缓存队列也已经达到容量上限,此时在有任务进来就会创建救急线程,其容量为maximumPoolSize-corePoolSize。
    image.png
    image.png
    对应的执行流程如下:
  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

image.png

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:
image.png
图5 阻塞队列
使用不同的队列可以实现不一样的任务存取策略。
image.png

自定义线程池

自定义线程池模型

image.png
一个线程池,需要具有存放线程的池子、存放任务的任务队列、以及提交任务的线程,如上图所示,这样其实刚好对应生产者和消费者模式,也就是说,自定义的线程池也是生产者消费者模式的一种应用。

定义任务队列

任务队列可以使用阻塞队列来实现,下面是任务队列的代码:

注意: 关于阻塞队列和非阻塞队列,可以参考下面的文章:https://www.jianshu.com/p/94e3bc3e05ff

  1. @Slf4j(topic = "c.BlockingQueue")
  2. public class BlockingQueue<T> {
  3. /**
  4. * 任务队列
  5. */
  6. private final Deque<T> queue = new ArrayDeque<>();
  7. /**
  8. * 任务队列大小
  9. */
  10. private final int capcity;
  11. /**
  12. * 锁对象
  13. */
  14. private final Lock lock = new ReentrantLock();
  15. /**
  16. * 生产者等待条件变量
  17. */
  18. private final Condition putWaitSet = lock.newCondition();
  19. /**
  20. * 消费者等待条件变量
  21. */
  22. private final Condition takeWaitSet = lock.newCondition();
  23. public BlockingQueue(int queueCapcity) {
  24. this.capcity = queueCapcity;
  25. }
  26. public T take(long timeout, TimeUnit unit) throws InterruptedException {
  27. lock.lock();
  28. try {
  29. //转为纳秒
  30. long nanos = unit.toNanos(timeout);
  31. while (queue.isEmpty()) {
  32. //超时等待
  33. if (nanos <= 0) {
  34. return null;
  35. }
  36. // awaitNanos 返回的是剩余的等待时间
  37. nanos = takeWaitSet.awaitNanos(nanos);
  38. }
  39. T t = queue.removeFirst();
  40. putWaitSet.signal();
  41. return t;
  42. } finally {
  43. lock.unlock();
  44. }
  45. }
  46. public T take() throws InterruptedException {
  47. lock.lock();
  48. try {
  49. while (queue.isEmpty()) {
  50. takeWaitSet.await();
  51. }
  52. T t = queue.removeFirst();
  53. putWaitSet.signal();
  54. return t;
  55. } finally {
  56. lock.unlock();
  57. }
  58. }
  59. public void put(T t) throws InterruptedException {
  60. lock.lock();
  61. try {
  62. while (queue.size() == capcity) {
  63. log.debug("等待加入任务队列 {} ...", t);
  64. putWaitSet.await();
  65. }
  66. log.debug("加入任务队列 {}", t);
  67. queue.addLast(t);
  68. takeWaitSet.signal();
  69. } finally {
  70. lock.unlock();
  71. }
  72. }
  73. public boolean put(T task, long timeout, TimeUnit timeUnit) {
  74. lock.lock();
  75. try {
  76. long nanos = timeUnit.toNanos(timeout);
  77. while (queue.size() == capcity) {
  78. try {
  79. if (nanos <= 0) {
  80. return false;
  81. }
  82. log.debug("等待加入任务队列 {} ...", task);
  83. nanos = putWaitSet.awaitNanos(nanos);
  84. } catch (InterruptedException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. log.debug("加入任务队列 {}", task);
  89. queue.addLast(task);
  90. takeWaitSet.signal();
  91. return true;
  92. } finally {
  93. lock.unlock();
  94. }
  95. }
  96. public void tryPut(RejectPolicy<T> rejectPolicy, T task) throws InterruptedException {
  97. lock.lock();
  98. try {
  99. // 判断队列是否满
  100. if (queue.size() == capcity) {
  101. rejectPolicy.reject(this, task);
  102. } else { // 有空闲
  103. log.debug("加入任务队列 {}", task);
  104. queue.addLast(task);
  105. putWaitSet.signal();
  106. }
  107. } finally {
  108. lock.unlock();
  109. }
  110. }
  111. public int size() {
  112. lock.lock();
  113. try {
  114. return queue.size();
  115. } finally {
  116. lock.unlock();
  117. }
  118. }
  119. }

其中,RejectPolicy 是函数式接口,代表拒绝策略:

  1. @FunctionalInterface // 拒绝策略
  2. interface RejectPolicy<T> {
  3. void reject(BlockingQueue<T> queue, T task) throws InterruptedException;
  4. }

自定义线程池

  1. @Slf4j(topic = "c.ThreadPool")
  2. public class ThreadPool {
  3. //任务队列
  4. private final BlockingQueue<Runnable> taskQueue;
  5. // 线程集合
  6. private final HashSet<Worker> workers = new HashSet<>();
  7. // 核心线程数
  8. private final int coreSize;
  9. //超时等待
  10. private final long timeout;
  11. //时间单位
  12. private final TimeUnit timeUnit;
  13. //拒绝策略
  14. private final RejectPolicy rejectPolicy;
  15. public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int taskQueueCapcity, RejectPolicy rejectPolicy) {
  16. this.coreSize = coreSize;
  17. this.timeout = timeout;
  18. this.timeUnit = timeUnit;
  19. this.taskQueue = new BlockingQueue<>(taskQueueCapcity);
  20. this.rejectPolicy = rejectPolicy;
  21. }
  22. public void execute(Runnable task) throws InterruptedException {
  23. synchronized (workers) {
  24. if (workers.size() < coreSize) {
  25. Worker worker = new Worker(task);
  26. log.debug("新增 worker{}, {}", worker, task);
  27. workers.add(worker);
  28. worker.start();
  29. } else {
  30. // 1) 死等
  31. // 2) 带超时等待
  32. // 3) 让调用者放弃任务执行
  33. // 4) 让调用者抛出异常
  34. // 5) 让调用者自己执行任务
  35. taskQueue.tryPut(rejectPolicy, task);
  36. }
  37. }
  38. }
  39. class Worker extends Thread {
  40. private Runnable task;
  41. public Worker(Runnable task) {
  42. this.task = task;
  43. }
  44. @SneakyThrows
  45. @Override
  46. public void run() {
  47. //如果当前线程本来就携带任务就立刻执行,否则从任务队列获取任务
  48. while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
  49. try {
  50. log.debug("正在执行...{}", task);
  51. task.run();
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. } finally {
  55. task = null;
  56. }
  57. }
  58. synchronized (workers) {
  59. log.debug("worker 被移除{}", this);
  60. workers.remove(this);
  61. }
  62. }
  63. @Override
  64. public boolean equals(Object o) {
  65. if (this == o) {
  66. return true;
  67. }
  68. if (o == null || getClass() != o.getClass()) {
  69. return false;
  70. }
  71. Worker worker = (Worker) o;
  72. return Objects.equals(task, worker.task);
  73. }
  74. @Override
  75. public int hashCode() {
  76. return Objects.hash(task);
  77. }
  78. }
  79. public static void main(String[] args) throws InterruptedException {
  80. ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 5, ((queue, task) -> {
  81. //采用抛出异常的拒绝策略
  82. throw new RuntimeException("拒绝任务加入");
  83. }));
  84. for (int i = 0; i < 50; i++) {
  85. int j = i + 1;
  86. threadPool.execute(() -> {
  87. log.debug("正在执行第" + j + "个任务");
  88. });
  89. }
  90. }
  91. }

看一下运行结果:
image.png
拒绝策略确实生效了。

ThreadPoolExecutor类

使用Executors创建ThreadPoolExecutor对象

image.png
根据上面的7个核心参数,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池,下面介绍几种常用的:

newFixedThreadPool

image.png

newCachedThreadPool

image.png
image.png
SynchronousQueue作用演示如下:

  1. @Slf4j(topic = "c.SynchronousQueueTest")
  2. public class SynchronousQueueTest {
  3. public static void main(String[] args) throws InterruptedException {
  4. SynchronousQueue<Integer> integers = new SynchronousQueue<>();
  5. new Thread(() -> {
  6. try {
  7. log.debug("putting {} ", 1);
  8. integers.put(1);
  9. log.debug("{} putted...", 1);
  10. log.debug("putting...{} ", 2);
  11. integers.put(2);
  12. log.debug("{} putted...", 2);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }, "t1").start();
  17. Thread.sleep(1000);
  18. new Thread(() -> {
  19. try {
  20. log.debug("taking {}", 1);
  21. integers.take();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }, "t2").start();
  26. Thread.sleep(1000);
  27. new Thread(() -> {
  28. try {
  29. log.debug("taking {}", 2);
  30. integers.take();
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }, "t3").start();
  35. }
  36. }

image.png

newSingleThreadExecutor

image.png

提交任务

下面是提交任务的API:

  1. // 执行任务
  2. void execute(Runnable command);
  3. // 提交任务 task,用返回值 Future 获得任务执行结果
  4. <T> Future<T> submit(Callable<T> task);
  5. // 提交 tasks 中所有任务
  6. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  7. throws InterruptedException;
  8. // 提交 tasks 中所有任务,带超时时间
  9. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  10. long timeout, TimeUnit unit)
  11. throws InterruptedException;
  12. // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
  13. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  14. throws InterruptedException, ExecutionException;
  15. // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
  16. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  17. long timeout, TimeUnit unit)
  18. throws InterruptedException, ExecutionException, TimeoutException;

execute和submit方法使用如下:

  1. public class Test {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. ExecutorService pool = Executors.newFixedThreadPool(2);
  4. pool.execute(() -> {
  5. System.out.println("调用 execute 提交任务");
  6. });
  7. Future<Boolean> result = pool.submit(() -> {
  8. System.out.println("调用 submit 提交任务");
  9. return true;
  10. });
  11. System.out.println(result.get());
  12. }
  13. }

image.png

注意: 任务执行完毕后程序依然没有正常停止的原因是线程池中的核心线程不会在执行完任务和立即被销毁,而是再等待任务的执行。相反的,如果是救急线程,如果在60s内还没有任务执行,就会销毁救急线程了。

正确处理异常

先看一段代码:

  1. @Slf4j(topic = "c.ExceptionTest")
  2. public class ExceptionTest {
  3. public static void main(String[] args) {
  4. ExecutorService pool = Executors.newFixedThreadPool(1);
  5. pool.submit(() -> {
  6. log.debug("task1");
  7. int i = 1 / 0;
  8. });
  9. }
  10. }

测试结果:
image.png
可以看到,如果我们不主动处理异常,是不会打印堆栈信息的。因此在线程池中应该正确的处理任务中可能出现的异常,通常有下面两种做法:

主动捕捉异常

  1. @Slf4j(topic = "c.ExceptionTest")
  2. public class ExceptionTest {
  3. public static void main(String[] args) {
  4. ExecutorService pool = Executors.newFixedThreadPool(1);
  5. pool.submit(() -> {
  6. try {
  7. log.debug("task1");
  8. int i = 1 / 0;
  9. } catch (Exception e) {
  10. log.error("error:", e);
  11. }
  12. });
  13. }
  14. }

image.png

使用 Future

如果执行任务过程出现了异常,如果任务结果使用了 Future ,那么调用它的 get 方法的时候就会返回异常信息,如:

  1. @Slf4j(topic = "c.ExceptionTest")
  2. public class ExceptionTest {
  3. public static void main(String[] args) {
  4. ExecutorService pool = Executors.newFixedThreadPool(1);
  5. Future<Boolean> f = pool.submit(() -> {
  6. log.debug("task1");
  7. int i = 1 / 0;
  8. return true;
  9. });
  10. log.debug("result:{}", f.get())
  11. }
  12. }

image.png

ScheduledThreadPoolExecutor类

背景介绍

在『任务调度线程池』(即 ScheduledThreadPoolExecutor类) 功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。 Timer(已经过时了)演示如下:

  1. @Slf4j(topic = "c.ScheduledTest")
  2. public class TimerTest {
  3. public static void main(String[] args) {
  4. Timer timer = new Timer();
  5. TimerTask task1 = new TimerTask() {
  6. @Override
  7. public void run() {
  8. log.debug("task 1");
  9. try {
  10. Thread.sleep(2000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. };
  16. TimerTask task2 = new TimerTask() {
  17. @Override
  18. public void run() {
  19. log.debug("task 2");
  20. }
  21. };
  22. // 使用 timer 添加两个任务,希望它们都在 1s 后执行
  23. // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
  24. timer.schedule(task1, 1000);
  25. timer.schedule(task2, 1000);
  26. }
  27. }

image.png
可以间隔2s执行,task 2需要等待task 1执行完毕,可见确实是串行执行的。然后对比任务调度线程池:

  1. @Slf4j(topic = "c.ScheduledTest")
  2. public class ScheduledTest {
  3. public static void main(String[] args) {
  4. ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  5. // 添加两个任务,希望它们都在 1s 后执行
  6. log.debug("start...");
  7. executor.schedule(() -> {
  8. log.debug("task 1");
  9. try {
  10. Thread.sleep(2000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }, 1000, TimeUnit.MILLISECONDS);
  15. executor.schedule(() -> {
  16. log.debug("task 0");
  17. }, 1000, TimeUnit.MILLISECONDS);
  18. }
  19. }

说明: 第一个参数是任务对象,第二个参数是多少秒后执行,第三个参数是时间单位

image.png
当然,这两个任务能够并行执行时因为“newScheduledThreadPool(2)”,如果说线程池中只有一个线程,还是会串行执行的。

scheduleAtFixedRate方法

  1. @Slf4j(topic = "c.ScheduledTest")
  2. public class ScheduledTest {
  3. public static void main(String[] args) {
  4. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  5. log.debug("start...");
  6. pool.scheduleAtFixedRate(() -> {
  7. log.debug("running...");
  8. }, 1, 1, TimeUnit.SECONDS);
  9. }
  10. }

image.png
由运行结果不难看出,scheduleAtFixedRate的第一个参数是任务对象、第二个参数是时间n后执行任务,第三个参数是间隔时间n反复执行任务,第四个参数则是n的时间单位。但是,我们看一种情况,如果线程执行任务的时间超过了循环执行的间隔时间:

  1. @Slf4j(topic = "c.ScheduledTest")
  2. public class ScheduledTest {
  3. public static void main(String[] args) {
  4. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  5. log.debug("start...");
  6. pool.scheduleAtFixedRate(() -> {
  7. log.debug("running...");
  8. try {
  9. Thread.sleep(2000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }, 1, 1, TimeUnit.SECONDS);
  14. }
  15. }

image.png
可以看到这样会被撑到执行完这个任务的时间。

scheduleWithFixedDelay方法

  1. @Slf4j(topic = "c.ScheduledTest")
  2. public class ScheduledTest {
  3. public static void main(String[] args) {
  4. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  5. log.debug("start...");
  6. pool.scheduleWithFixedDelay(() -> {
  7. log.debug("running...");
  8. try {
  9. Thread.sleep(2000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }, 1, 1, TimeUnit.SECONDS);
  14. }
  15. }

image.png
不难发现,同 scheduleAtFixedRate 方法不同的是,第三个参数表示“上一个任务结束 —> 延时 n —> 下一个任务开始”

应用之定时任务

让一段程序在每周周四的18点被执行,代码如下所示:

  1. @Slf4j(topic = "c.ExampleTest")
  2. public class ExampleTest {
  3. public static void main(String[] args) {
  4. // 获得当前时间
  5. LocalDateTime now = LocalDateTime.now();
  6. // 获取本周四 18:00:00.000
  7. LocalDateTime thursday =
  8. now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
  9. // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
  10. if (now.compareTo(thursday) >= 0) {
  11. thursday = thursday.plusWeeks(1);
  12. }
  13. // 计算时间差,即延时执行时间
  14. long initialDelay = Duration.between(now, thursday).toMillis();
  15. // 计算间隔时间,即 1 周的毫秒值
  16. long oneWeek = 7 * 24 * 3600 * 1000;
  17. ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  18. log.debug("start...");
  19. executor.scheduleAtFixedRate(() -> {
  20. log.debug("执行定时任务");
  21. }, initialDelay, oneWeek, TimeUnit.MILLISECONDS);
  22. }
  23. }

除了上面两个线程池,JDK1.7之后还提供了 ForkJoinPool 线程池,这个线程池是基于分治的思想设计的,它是适用于能够进行任务拆分的 cpu 密集型运算 。ForkJoinPool 线程池的参考文章:https://www.cnblogs.com/myseries/p/12582271.html

线程池在业务中的实践

快速响应用户请求

描述:用户发起的实时请求,服务追求响应时间。比如说用户要查看一个商品的信息,那么我们需要将商品维度的一系列信息如商品的价格、优惠、库存、图片等等聚合起来,展示给用户。

分析:从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
image.png
并行执行任务提升任务响应速度

快速处理批量任务

描述:离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表。

分析:这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时的完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。
image.png
并行执行任务提升批量任务执行速度

参考文章: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html