Executor
Executor 框架是在 Java5 中引入的,通过该框架来控制线程的启动,执行,关闭,简化并发编程。
Executor 基于生产者-消费者模式,提交任务的线程相当于生产者,执行任务的线程相当于消费者。同时,Executor 的实现还提供了对任务执行的生命周期管理的支持。
Executor 管理多个异步任务的执行,而无需程序员显式地管理线程的生命周期。这里的异步是指多个任务的执行互不干扰,不需要进行同步操作。
- Executor
接口。Eexcutor 框架的基础,将任务提交和任务执行解耦。 - ExecutorService
接口。继承自 Executor 接口,具备管理执行器和任务生命周期的方法,提交任务机制更完善。 - ThreadPoolExecutor
核心实现类。执行被提交的任务。 - ScheduledThreadPoolExecutor
实现类。支持定时或者延迟执行任务。 - Executors
提供了一系列静态工厂用于创建各种线程池。返回的线程池都实现了 ExecutorService 接口。 - execute(Runnable) 和 submit(Runnable)
- execute(Runnable) 接受一个 java.lang.Runnable 对象作为参数,并且以异步的方式去执行
- submit(Runnable) 同样接受一个 java.lang.Runnable 对象作为参数,但是会返回一个 Future 对象。该Future 对象可以用于判断 Runnable 是否结束执行。
Executor 框架由 3 大部分组成:
- 任务
每个任务都是用 Runnable/Callable 接口的实现类表示。 - 任务执行
任务执行的核心是采用 Executor 框架,核心类是 ThreadPoolExecutor。 - 异步计算的结果
异步任务需要返回结果,提交任务后需要返回 Future, FutureTask 实现。
运行过程:
- 第一步:主线程创建实现 Runnable 或者 Callable 接口的任务对象。
- 第二步:把 Runnable 对象直接交给 ExecutorService 执行,EexcutorService.execute(Runnable) 或者 ,EexcutorService.submit(Runnable)
- 第三步:主线程可以执行 FutureTask 的 get() 来等待任务执行完成。
线程池
线程池是指管理一组同构工作线程的资源池。
使用线程池的好处:
- 线程复用
可以重用线程池中的线程,减少因对象创建、销毁所带来的性能开销。 - 可控制最大并发数
提高系统资源利用率,同时避免过多的资源竞争,避免堵塞。 - 线程管理
能够对多线程进行简单的管理,使线程的使用简单、高效。 - 提高响应性
不会因为等待创建线程而延迟任务执行,从而提高了响应性。
线程池的创建
1. Executors
Executors 提供了一系列静态工厂用于创建各种线程池:
newFixedThreadPool
创建一个固定长度的线程池。每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadPool
创建单个工作线程来执行任务,如果某个线程异常,则会创建另一个线程来替代,能确保依照任务在队列中的顺序来串行执行。public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
创建一个固定长度的线程池。可以定时或者延迟执行任务。public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
}
2. ThreadPoolExecutor
创建线程池主要是 ThreadPoolExecutor 类来完成。ThreadPoolExecutor 的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor 的构造方法为:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数说明:
- corePoolSize:线程池的核心线程数。
- 在创建线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。除非调用了prestartAllCoreThreads()方法或prestartCoreThread()方法,在任务没有到来之前就预创建corePoolSize个线程或一个线程。
- 在创建线程池后,默认情况下,线程池中的线程数为0,当有任务到来时线程池就会创建一个线程去执行任务
- 当线程池中的线程数目达到corePoolSize后,就会把后续到达的任务放到缓存任务队列当中。
- 核心线程在allowCoreThreadTimeout被设置为true时会超时并被回收,默认情况下不会被回收)
- maximumPoolSize:线程池的最大线程数。
- workQueue:任务等待队列。用于保存等待执行任务的阻塞队列。
- keepAliveTime:当线程数大于核心线程数时,多余空闲线程存活的最大时间。
- (即在核心线程数和队列满了之后的线程的存活时间)
- 当线程空闲时间达到keepAliveTime,该线程会退回,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会陆续退出直到线程数量为0
- 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率
- unit:时间单位
- threadFactory:用于创建新线程,
Executors.defaultThreadFactory()
- handler:线程池的饱和策略
- AbortPolicy:直接抛出异常,默认策略
- CallerRunsPolicy:用调用者所在的线程来执行任务
- DiscardOldestPolicy:丢弃队列中最靠前的任务,并执行当前的任务
- DiscardPolicy:直接丢弃任务
- 实现 RejectedExecutionHandler 接口的自定义 handler
- allowCoreThreadTimeOut :线程池中的一个重要属性,默认false
- false:核心线程即使在空闲时也保持存活状态
- true:核心线程使用keepAliveTime超时等待工作
3. 创建线程池的正确姿势
Executors 创建线程池会发生 OOM(Out Of Memory)
Java中的 BlockingQueue 主要有两种:
- ArrayBlockingQueue
用数组实现的有界阻塞队列,必须设置容量。 - LinkedBlockingQueue
一个用链表实现的有界阻塞队列,容量可以选择进行设置,如果不设置的话,将是一个无边界的阻塞队列,最大长度为 Integer.MAX_VALUE。
看 Executors.newFixedThreadPool 源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
LinkedBlockingQueue 并没有设置容量,LinkedBlockingQueue 就是一个无边界队列,对于一个无边界队列来说,是可不断的向队列中加入任务,这种情况下就有可能因为任务过多而导致内存溢出问题。
创建线程池的正确姿势
避免使用 Executors 创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor 的构造函数来自己创建线程池。在创建的同时,给 BlockQueue 指定容量即可。
private static ExecutorService executor =
new ThreadPoolExecutor(10, 10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));
线程池的工作过程
- 线程池刚创建时,里面没有一个线程,任务队列作为参数传入。此时,即使任务队列中有任务,线程池也不会马上执行他们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数 < corePoolSize,那么创建或启动工作线程执行任务
- 如果正在运行的线程数 >= corePoolSize,那么将任务放入任务队列
- 如果队列满了,并且正在运行的线程数 < maximumPoolSize,那么将创建非核心线程执行任务
- 如果队列满了,并且正在运行的线程数 >= maximumPoolSize,那么线程池会抛出异常
RejectExecutionExcaption
- 当一个线程完成任务后,在从队列中取出一个任务来执行
- 如果一个线程空闲超过一定时间(keepAliveTime),线程池会判断,如果正在运行的线程数 > corePoolSize,则回收该线程。线程池在任务执行完后,线程数会维持在 corePoolSize 的大小。
线程池的状态
- RUNNING:能接受新提交的任务,并且也能够处理阻塞队列中的任务
- SHUTDOWN:不再接受新提交的任务,但可以处理存量任务
- STOP:不再接受新提交的任务,也不处理存量任务
- TIDYING:所有的任务都已终止
- TERMINATED:terminated() 方法执行完后进入该状态
状态转换图:
shutdown() 和 shutdownNow() 这两个方法的原理都是遍历线程池中所有的线程,然后依次中断线程。 shutdown() 和 shutdownNow() 还是有不一样的地方:
- shutdownNow() 首先将线程池的状态设置为 STOP ,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表
- shutdown() 只是将线程池的状态设置为 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程
可以看出 shutdown() 方法会将正在执行的任务继续执行完,而 shutdownNow() 会直接中断正在执行的任务。
设置线程池大小
要想合理配置线程池的大小,首先分析任务的特性,可以从以下几个角度分析:
1. 任务的性质
- CPU 密集型任务配置尽可能少的线程数量,如配置 (N cpu) + 1 个线程的线程池。
- IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如 2 * (N cpu) +1。
- 混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。
2. 任务的优先级
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
3. 任务的执行时间
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
4. 任务的依赖性
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用 CPU。
注意:
- 可通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数。
- 最佳线程数 = (线程等待时间/线程 CPU 时间 + 1) (N cpu)
- 阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。