Executor

Executor 框架是在 Java5 中引入的,通过该框架来控制线程的启动,执行,关闭,简化并发编程
Executor 基于生产者-消费者模式提交任务的线程相当于生产者,执行任务的线程相当于消费者。同时,Executor 的实现还提供了对任务执行的生命周期管理的支持。

Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作
线程池 - 图1

  • Executor
    接口。Eexcutor 框架的基础,将任务提交和任务执行解耦。
  • ExecutorService
    接口。继承自 Executor 接口,具备管理执行器和任务生命周期的方法,提交任务机制更完善。
  • ThreadPoolExecutor
    核心实现类。执行被提交的任务。
  • ScheduledThreadPoolExecutor
    实现类。支持定时或者延迟执行任务。
  • Executors
    提供了一系列静态工厂用于创建各种线程池。返回的线程池都实现了 ExecutorService 接口。
  • execute(Runnable) 和 submit(Runnable)
    • execute(Runnable) 接受一个 java.lang.Runnable 对象作为参数,并且以异步的方式去执行
    • submit(Runnable) 同样接受一个 java.lang.Runnable 对象作为参数,但是会返回一个 Future 对象。该Future 对象可以用于判断 Runnable 是否结束执行

Executor 框架由 3 大部分组成:

  • 任务
    每个任务都是用 Runnable/Callable 接口的实现类表示。
  • 任务执行
    任务执行的核心是采用 Executor 框架,核心类是 ThreadPoolExecutor。
  • 异步计算的结果
    异步任务需要返回结果,提交任务后需要返回 Future, FutureTask 实现。
    image.png

运行过程:

  • 第一步:主线程创建实现 Runnable 或者 Callable 接口的任务对象。
  • 第二步:把 Runnable 对象直接交给 ExecutorService 执行,EexcutorService.execute(Runnable) 或者 ,EexcutorService.submit(Runnable)
  • 第三步:主线程可以执行 FutureTask 的 get() 来等待任务执行完成。

线程池

线程池是指管理一组同构工作线程的资源池。

使用线程池的好处:

  • 线程复用
    可以重用线程池中的线程,减少因对象创建、销毁所带来的性能开销。
  • 可控制最大并发数
    提高系统资源利用率,同时避免过多的资源竞争,避免堵塞。
  • 线程管理
    能够对多线程进行简单的管理,使线程的使用简单、高效。
  • 提高响应性
    不会因为等待创建线程而延迟任务执行,从而提高了响应性。

线程池的创建

1. Executors

Executors 提供了一系列静态工厂用于创建各种线程池:

  • newFixedThreadPool
    创建一个固定长度的线程池。每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
    3. new LinkedBlockingQueue<Runnable>());
    4. }
  • newCachedThreadPool
    创建一个可缓存的线程池。如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
    3. new SynchronousQueue<Runnable>());
    4. }
  • newSingleThreadPool
    创建单个工作线程来执行任务,如果某个线程异常,则会创建另一个线程来替代,能确保依照任务在队列中的顺序来串行执行。

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    4. }
  • newScheduledThreadPool
    创建一个固定长度的线程池。可以定时或者延迟执行任务。

    1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2. return new ScheduledThreadPoolExecutor(corePoolSize);
    3. }
    4. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    5. public ScheduledThreadPoolExecutor(int corePoolSize) {
    6. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    7. new DelayedWorkQueue());
    8. }
    9. }

2. ThreadPoolExecutor

创建线程池主要是 ThreadPoolExecutor 类来完成。ThreadPoolExecutor 的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor 的构造方法为:

  1. ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

参数说明:

  • corePoolSize:线程池的核心线程数。
    • 在创建线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。除非调用了prestartAllCoreThreads()方法或prestartCoreThread()方法,在任务没有到来之前就预创建corePoolSize个线程或一个线程。
    • 在创建线程池后,默认情况下,线程池中的线程数为0,当有任务到来时线程池就会创建一个线程去执行任务
    • 当线程池中的线程数目达到corePoolSize后,就会把后续到达的任务放到缓存任务队列当中。
    • 核心线程在allowCoreThreadTimeout被设置为true时会超时并被回收,默认情况下不会被回收)
  • maximumPoolSize:线程池的最大线程数。
  • workQueue:任务等待队列。用于保存等待执行任务的阻塞队列。
  • keepAliveTime:当线程数大于核心线程数时,多余空闲线程存活的最大时间。
    • (即在核心线程数和队列满了之后的线程的存活时间)
    • 当线程空闲时间达到keepAliveTime,该线程会退回,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会陆续退出直到线程数量为0
    • 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
  • unit:时间单位
  • threadFactory:用于创建新线程,Executors.defaultThreadFactory()
  • handler:线程池的饱和策略
    • AbortPolicy:直接抛出异常,默认策略
    • CallerRunsPolicy:用调用者所在的线程来执行任务
    • DiscardOldestPolicy:丢弃队列中最靠前的任务,并执行当前的任务
    • DiscardPolicy:直接丢弃任务
    • 实现 RejectedExecutionHandler 接口的自定义 handler
  • allowCoreThreadTimeOut :线程池中的一个重要属性,默认false
    • false:核心线程即使在空闲时也保持存活状态
    • true:核心线程使用keepAliveTime超时等待工作

3. 创建线程池的正确姿势

Executors 创建线程池会发生 OOM(Out Of Memory)

Java中的 BlockingQueue 主要有两种:

  • ArrayBlockingQueue
    用数组实现的有界阻塞队列,必须设置容量。
  • LinkedBlockingQueue
    一个用链表实现的有界阻塞队列,容量可以选择进行设置,如果不设置的话,将是一个无边界的阻塞队列,最大长度为 Integer.MAX_VALUE

看 Executors.newFixedThreadPool 源码:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

LinkedBlockingQueue 并没有设置容量,LinkedBlockingQueue 就是一个无边界队列,对于一个无边界队列来说,是可不断的向队列中加入任务,这种情况下就有可能因为任务过多而导致内存溢出问题。

创建线程池的正确姿势

避免使用 Executors 创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor 的构造函数来自己创建线程池。在创建的同时,给 BlockQueue 指定容量即可。

  1. private static ExecutorService executor =
  2. new ThreadPoolExecutor(10, 10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));

线程池的工作过程

  1. 线程池刚创建时,里面没有一个线程,任务队列作为参数传入。此时,即使任务队列中有任务,线程池也不会马上执行他们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    • 如果正在运行的线程数 < corePoolSize,那么创建或启动工作线程执行任务
    • 如果正在运行的线程数 >= corePoolSize,那么将任务放入任务队列
    • 如果队列满了,并且正在运行的线程数 < maximumPoolSize,那么将创建非核心线程执行任务
    • 如果队列满了,并且正在运行的线程数 >= maximumPoolSize,那么线程池会抛出异常 RejectExecutionExcaption
  3. 当一个线程完成任务后,在从队列中取出一个任务来执行
  4. 如果一个线程空闲超过一定时间(keepAliveTime),线程池会判断,如果正在运行的线程数 > corePoolSize,则回收该线程。线程池在任务执行完后,线程数会维持在 corePoolSize 的大小。

线程池的状态

  • RUNNING:能接受新提交的任务,并且也能够处理阻塞队列中的任务
  • SHUTDOWN:不再接受新提交的任务,但可以处理存量任务
  • STOP:不再接受新提交的任务,也不处理存量任务
  • TIDYING:所有的任务都已终止
  • TERMINATED:terminated() 方法执行完后进入该状态

状态转换图:
image.png

shutdown() 和 shutdownNow() 这两个方法的原理都是遍历线程池中所有的线程,然后依次中断线程。 shutdown() 和 shutdownNow() 还是有不一样的地方:

  • shutdownNow() 首先将线程池的状态设置为 STOP ,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表
  • shutdown() 只是将线程池的状态设置为 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程

可以看出 shutdown() 方法会将正在执行的任务继续执行完,而 shutdownNow() 会直接中断正在执行的任务

设置线程池大小

要想合理配置线程池的大小,首先分析任务的特性,可以从以下几个角度分析:

1. 任务的性质

  • CPU 密集型任务配置尽可能少的线程数量,如配置 (N cpu) + 1 个线程的线程池。
  • IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如 2 * (N cpu) +1。
  • 混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。

2. 任务的优先级

优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

3. 任务的执行时间

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

4. 任务的依赖性

依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用 CPU。

注意:

  • 可通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数
  • 最佳线程数 = (线程等待时间/线程 CPU 时间 + 1) (N cpu)
  • 阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。