使用线程池的好处

  • 降低资源消耗。不用重复创建、销毁线程
  • 提高响应速度。不需要创建线程,直接使用线程池中的线程,使用完放到线程池中,不需要销毁
  • 提高线程可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源还会降低系统稳定性,使用线程池可以统一进行分配、调优和监控

ThreadPoolExecutor

image.pngimage.png

  • ThreadPoolExecutor
    1. Executor接口是Excutor框架的基础,它将任务的提交和任务的执行分离开了
    2. ExecutorService接口继承了Executor接口,在Executor基础上又提供了submit()、shutdown()等方法的扩展,是真正意义上的线程池接口
    3. AbstractExecutorService抽象类实现了ExecutorService中大部分方法
    4. ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
  • ScheduledThreadPoolExecutor
    1. ScheduledExecutorService接口继承了ExecutorService接口,提供了带“周期执行”功能的ExecutorService
    2. ScheduledThreadPoolExecutor是ScheduledExecutorService的实现类,可以设置给定时间后执行任务,或者周期性执行任务,相比Timer更加灵活,功能更强大


线程池各个参数的意义

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. // ……
  9. }

corePoolSize

核心线程数,保留在池中的线程数(即使它们处于空闲状态),除非设置了allowCoreThreadTimeOut

  • 每当提交一个任务时,线程池就创建一个新的线程执行任务,直到当前线程数达到corePoolSize
  • 如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行
  • 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize

keepAliveTime

当线程数大于核心线程数时,这是多余的空闲线程将在终止之前等待新任务的最长时间

  • 线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用

unit

keepAliveTime参数的时间单位

workQueue

用于在执行任务之前保留任务的队列。 此队列将仅保存execute方法提交的Runnable任务

用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列

使用无界队列作为工作队列会对线程池带来如下影响:

  1. 当线程池的线程数达到corePoolSize后,新的任务会一直放到阻塞队列中,maximumPoolSize就成了一个无效参数
  2. keepAliveTime和unit同样也变成无效参数
  3. 使用无界队列最大的问题是可能会耗尽系统资源,有界队列有助于防止资源耗尽,同时即使使用有界队列也需要注意控制队列的大小在一个合适的范围

所以,常用的阻塞队列一般是ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue以及PriorityBlockingQueue

threadFactory

执行程序创建新线程时要使用的工厂,通过自定义的线程工厂可以给每个新建的线程做一些设置,比如设置特殊的线程名、设置线程为守护线程等等

RejectedExecutionHandler

线程池的拒绝策略:
阻塞队列中任务数量达到可以存储的最大任务数,线程池中的线程数量达到最大线程数,再提交任务进来,就需要按照这个指定的拒绝策略来处理这个任务

线程池提供了4种拒绝策略

  1. AbortPolicy:直接抛出异常,默认策略
  2. CallerRunsPolicy:用调用者所在的线程来执行任务
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
  4. DiscardPolicy:直接丢弃任务

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义拒绝策略,如记录日志或持久化存储不能处理的任务

线程池扩展

ThreadPoolExecutor类预留了两个任务执行前后的处理方法、一个关闭线程池时调用的方法

  1. // 任务执行前执行
  2. protected void beforeExecute(Thread t, Runnable r) { }
  3. // 任务执行后执行
  4. protected void afterExecute(Runnable r, Throwable t) { }
  5. // 退出线程池调用
  6. protected void terminated() { }

线程池工作原理

概述

image.png

  1. 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁 )
  2. 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue
  3. 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务
  4. 如果创建新线程使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler中的rejectedExecution()方法

任务提交

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
  • submit()方法用于提交需要返回值的任务
    • 线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功
      • 可以通过future的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,
      • 而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完

关闭线程池

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池
它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止

但是它们存在一定的区别:

  • shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
  • 而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true
当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true

至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法

配置线程池

Runtime.getRuntime().availableProcessors():当前设备CPU核数

计算密集型

  • 任务需要大量运算,过程中不会阻塞,需要一直消耗CPU资源
  • 只有在真正多核CPU上才能通过多线程得到加速
  • 尽可能少的配置线程数

配置:CPU核数+1

IO密集型

  • 任务需要大量IO,即任务执行过程会发生阻塞
  • 单线程中执行IO密集型任务会导致大量的CPU运算能力浪费在阻塞等待上面
  • 使用多线程可以大大加速程序运行,主要利用了CPU被阻塞的时间

配置

  • 2*CPU核数
  • CPU核数/(1-阻塞系数) 阻塞系数通常在0.8~0.9

对于 IO 型的任务的最佳线程数,有个公式可以计算 :Nthreads = NCPU * UCPU * (1 + W/C)
其中:

  • NCPU 是处理器的核的数目
  • UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)
  • W/C 是等待时间与计算时间的比率

等待时间与计算时间我们在 Linux 下使用相关的 vmstat 命令或者 top 命令查看

其他一些原则:

  1. 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行
  2. 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行
  3. 依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU
  4. 建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千

    假设,我们现在有一个 Web 系统,里面使用了线程池来处理业务,在某些情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里 如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题

预定义线程池(Executors)

FixedThreadPool

创建固定线程数的线程池。

  • 适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景
  • 适用于负载比较重的服务器,需要限制线程数进而控制资源消耗

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }
    6. // 设置自定义线程工厂
    7. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    8. return new ThreadPoolExecutor(nThreads, nThreads,
    9. 0L, TimeUnit.MILLISECONDS,
    10. new LinkedBlockingQueue<Runnable>(),
    11. threadFactory);
    12. }
  • 核心线程数、最大线程数都被设置为指定的参数nThreads

  • 这里keepAliveTime设置为0,意味着多余的空闲线程会被立即终止
  • 使用有界队列LinkedBlockingQueue作为线程池的工作队列
    • 队列的容量为Integer.MAX_VALUE

SingleThreadExecutor

创建单个线程的线程池

  • 适用于需要保证各个任务顺序执行,并且在任意时间点不会有多个活动线程的应用场景

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }
    7. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    8. return new FinalizableDelegatedExecutorService
    9. (new ThreadPoolExecutor(1, 1,
    10. 0L, TimeUnit.MILLISECONDS,
    11. new LinkedBlockingQueue<Runnable>(),
    12. threadFactory));
    13. }
  • 核心线程数、最大线程数都被设置为1

  • keepAliveTime设置为0
  • 工作队列使用的是LinkedBlockingQueue
    • 队列的容量为Integer.MAX_VALUE

CachedThreadPool

会根据需要创建新线程的线程池

  • 大小无界,适用于执行很多短期异步任务的小程序,或者负载较轻的服务器

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }
    6. static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    7. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    8. 60L, TimeUnit.SECONDS,
    9. new SynchronousQueue<Runnable>(),
    10. threadFactory);
    11. }
  • 核心线程数为0

  • 最大线程数为Integer.MAX_VALUE
    • 意味着如果主线程提交任务速度>线程处理任务速度,将会不断创建新线程,可能导致耗尽CPU和内存资源
  • keepAliveTime设置为60,意味着空闲线程等待新任务的最长时间是60s
  • 工作队列使用没有容量的SynchronousQueue

WorkStealingPool

利用所有运行的处理器数量来创建一个工作窃取的线程池,使用forkJoin实现

  1. public static ExecutorService newWorkStealingPool(int parallelism) {
  2. return new ForkJoinPool
  3. (parallelism,
  4. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  5. null, true);
  6. }
  7. public static ExecutorService newWorkStealingPool() {
  8. return new ForkJoinPool
  9. (Runtime.getRuntime().availableProcessors(),
  10. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  11. null, true);
  12. }

ScheduledThreadPoolExecutor

可以安排命令在给定的延迟后运行或定期执行的线程池

ScheduledThreadPoolExecutor的几个重要方法

  1. // 向定时任务线程池提交一个延时Runnable任务(仅执行一次)
  2. public ScheduledFuture<?> schedule(Runnable command,
  3. long delay,
  4. TimeUnit unit) {
  5. if (command == null || unit == null)
  6. throw new NullPointerException();
  7. RunnableScheduledFuture<?> t = decorateTask(command,
  8. new ScheduledFutureTask<Void>(command, null,
  9. triggerTime(delay, unit)));
  10. delayedExecute(t);
  11. return t;
  12. }
  13. // 向定时任务线程池提交一个延时的Callable任务(仅执行一次)
  14. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  15. long delay,
  16. TimeUnit unit) {
  17. if (callable == null || unit == null)
  18. throw new NullPointerException();
  19. RunnableScheduledFuture<V> t = decorateTask(callable,
  20. new ScheduledFutureTask<V>(callable,
  21. triggerTime(delay, unit)));
  22. delayedExecute(t);
  23. return t;
  24. }
  25. // 向定时任务线程池提交一个固定时间间隔执行的任务
  26. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  27. long initialDelay,
  28. long period,
  29. TimeUnit unit) {
  30. if (command == null || unit == null)
  31. throw new NullPointerException();
  32. if (period <= 0)
  33. throw new IllegalArgumentException();
  34. ScheduledFutureTask<Void> sft =
  35. new ScheduledFutureTask<Void>(command,
  36. null,
  37. triggerTime(initialDelay, unit),
  38. unit.toNanos(period));
  39. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  40. sft.outerTask = t;
  41. delayedExecute(t);
  42. return t;
  43. }
  44. // 向定时任务线程池提交一个固定延时间隔执行的任务
  45. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  46. long initialDelay,
  47. long delay,
  48. TimeUnit unit) {
  49. if (command == null || unit == null)
  50. throw new NullPointerException();
  51. if (delay <= 0)
  52. throw new IllegalArgumentException();
  53. ScheduledFutureTask<Void> sft =
  54. new ScheduledFutureTask<Void>(command,
  55. null,
  56. triggerTime(initialDelay, unit),
  57. unit.toNanos(-delay));
  58. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  59. sft.outerTask = t;
  60. delayedExecute(t);
  61. return t;
  62. }
  • 固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从理论上讲是确定的,当然执行任务的时间不能超过执行周期

    • 执行任务的时间超过执行周期,下一次执行时间 = 任务完成开始 + 执行周期
  • 固定延时间隔的任务是指每次执行完任务以后都延时一个固定的时间。由于操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时间是不确定的,也就导致了每次任务的执行周期存在一定的波动

    • 执行任务的时间超过延时时间,下一次执行时间 = 本次任务开始 + 延时时间,很可能本次任务未完成就开始进行下一次任务了

ScheduledThreadPoolExecutor

包含若干线程的定时任务线程池

  • 需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景

    1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2. return new ScheduledThreadPoolExecutor(corePoolSize);
    3. }
    4. public static ScheduledExecutorService newScheduledThreadPool(
    5. int corePoolSize, ThreadFactory threadFactory) {
    6. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    7. }

SingleThreadScheduledExecutor

只包含一个线程的定时任务线程池

  • 适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景

    1. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    2. return new DelegatedScheduledExecutorService
    3. (new ScheduledThreadPoolExecutor(1));
    4. }
    5. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    6. return new DelegatedScheduledExecutorService
    7. (new ScheduledThreadPoolExecutor(1, threadFactory));
    8. }
  • 底层也是通过ScheduledThreadPoolExecutor实现的,只不过核心线程数设置为1

CompletionService

image.png

成员

可以视为Executor和BlockingQueue的结合体

  • CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获取任务执行的结果
  • Executor执行计算任务、BlockingQueue保存任务执行结果
    1. public class ExecutorCompletionService<V> implements CompletionService<V> {
    2. // 执行任务
    3. private final Executor executor;
    4. // 完成将任务封装成FutureTask的工作
    5. private final AbstractExecutorService aes;
    6. // 保存任务执行结果
    7. private final BlockingQueue<Future<V>> completionQueue;
    8. }

创建

创建一个BlockingQueue(默认使用的基于链表的LinkedBlockingQueue,也可以自定义),该BlockingQueue的作用是保存Executor执行的结果

  1. public ExecutorCompletionService(Executor executor) {
  2. if (executor == null)
  3. throw new NullPointerException();
  4. this.executor = executor;
  5. this.aes = (executor instanceof AbstractExecutorService) ?
  6. (AbstractExecutorService) executor : null;
  7. this.completionQueue = new LinkedBlockingQueue<Future<V>>();
  8. }
  9. public ExecutorCompletionService(Executor executor,
  10. BlockingQueue<Future<V>> completionQueue) {
  11. if (executor == null || completionQueue == null)
  12. throw new NullPointerException();
  13. this.executor = executor;
  14. this.aes = (executor instanceof AbstractExecutorService) ?
  15. (AbstractExecutorService) executor : null;
  16. this.completionQueue = completionQueue;
  17. }

使用

提交

当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法(done方法在任务完成后才会被执行到),之后把Executor执行的计算结果放入BlockingQueue中

  1. private class QueueingFuture extends FutureTask<Void> {
  2. // 构造QueueingFuture等待被执行,后续由线程池executor触发执行
  3. QueueingFuture(RunnableFuture<V> task) {
  4. super(task, null);
  5. this.task = task;
  6. }
  7. // 任务完成后执行这个方法
  8. protected void done() { completionQueue.add(task); }
  9. private final Future<V> task;
  10. }
  11. public Future<V> submit(Callable<V> task) {
  12. if (task == null) throw new NullPointerException();
  13. RunnableFuture<V> f = newTaskFor(task);
  14. executor.execute(new QueueingFuture(f));
  15. return f;
  16. }
  17. public Future<V> submit(Runnable task, V result) {
  18. if (task == null) throw new NullPointerException();
  19. RunnableFuture<V> f = newTaskFor(task, result);
  20. // 先构造QueueingFuture,这个过程中会将f放到completionQueue中;再执行任务
  21. executor.execute(new QueueingFuture(f));
  22. return f;
  23. }
  24. private RunnableFuture<V> newTaskFor(Runnable task, V result) {
  25. if (aes == null)
  26. return new FutureTask<V>(task, result);
  27. else
  28. return aes.newTaskFor(task, result);
  29. }

与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中

take

  1. public Future<V> take() throws InterruptedException {
  2. return completionQueue.take();
  3. }

take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间

使用方式

  1. 自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了

  2. 使用CompletionService来维护处理不同线程的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序