@[toc]
1. 概念
线程池指一个可容纳多个线程的容器,其中的线程可反复使用,节省了创建和销毁线程的开销。使用线程池的好处有:
- 降低资源消耗:通过重复的利用已创建的线程可以降低线程创建和销毁造成的资源消耗
- 提高响应速度:当任务达到时,任务无需自己创建线程就能立即执行
- 提高线程的可管理性:使用线程池可以实现对于线程的统一分配、监控和调优
线程池整个的执行逻辑如下:
文字化表述为:
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
当线程数达到
corePoolSize
并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程如果队列选择了有界队列,那么任务超过了队列大小时,会创建
maximumPoolSize - corePoolSize
数目的救急线程如果线程到达
maximumPoolSize
仍然有新任务这时会执行拒绝策略(RejectedExecutionHandler)拒绝策略共有四种实现:
- AbortPolicy: 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy: 让调用者运行任务
- DiscardPolicy: 放弃本次任务
- DiscardOldestPolicy :放弃队列中最早的任务,本任务取而代之
当然也可通过实现RejectExecutionhandler来自定义拒绝策略。
2. 线程池
2.1 定义线程池
Java中提供了多种创建线程池的方式,本质上都是返回一个ThreadPoolExecutor
对象,不同之处在于指定的参数不同。其中,ThreadPoolExecutor
的定义如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
// defaultHandler:拒绝策略
// defaultThreadFactory:设置创建线程的工厂,用于对线程进行命名
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
其中方法参数列表中的参数含义如下:
corePoolSize
:核心线程的大小maximumPoolSize
:最大线程池的大小keepAliveTime
:空闲线程的存活时间BlockingQueue
:用来暂时保存任务的工作队列
其中BlockingQueue的实现选择有:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按照FIFO的原则对任务进行排序
- LinkedBlockingQueue:基于链表的阻塞队列,同样按照FIFO对任务进行排序
- SynchronousQueue:一个不存储任何元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直阻塞
- PriorityBlockingQueue:具有优先级的阻塞队列
2.2 创建线程池
一般线程池的创建主要包括以下几类:
- newFixedThreadPool:最常用的一种创建线程池的方式,方法定义如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
其中核心线程数和最大线程数都设置为nThreads
来创建一个固定容量的线程池,另外workQueue使用了基于链表实现的LinkedBlockingQueue。它具有如下的特点:
- 核心线程数 == 最大线程数(没有救急线程被创建),无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
- newCachedThreadPool:它的定义如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
根据定义可知,它具有如下特点:
- 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE
- 救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
- workQueue采用了 SynchronousQueue
- 适合任务数比较密集,但每个任务执行时间较短的情况
- newSingleThreadPool:定义如下
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
队列的核心线程数和最大线程数都是1,同时workQueue采用的也是LinkedBlockingQueue。它适用那些希望任务顺序执行的场景,任务执行完毕,这唯一的线程也不会被释放
- newScheduledThreadPool:适用于执行延时或者周期性任务,主要有如下的两种类型:
下面通过newFixedThreadPool创建一个容量为3的线程池:
public class ThreadPool {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(3);
Runnable r = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
};
es.submit(r);
es.submit(r);
es.submit(r);
//es.shutdown()
}
}
输出为:
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
NOTE:
Executors
为我们提供了如上几种构造线程池的便捷方法,对于服务器程序应该杜绝使用这些便捷方法,而是直接使用线程池ThreadPoolExecutor
的构造方法,避免无界队列可能导致的OOM以及线程个数限制不当导致的线程数耗尽等问题。
2.2 执行任务
向线程池提交任务有execute()
和submit()
两种方式,不同之处在于:
execute()
只用于提交那些不需要返回值的任务public void execute(Runnable command) {
e.execute(command);
}
submit()
用于提交那些需要返回值的任务,线程执行完毕后返回Future对象,通过Future的get()
可以获取到结果,获取的过程会阻塞当前的线程直到获取结束public Future<?> submit(Runnable task) {
return e.submit(task);
}
2.3 关闭线程池
关闭线程池主要是遍历线程池中的工作线程,然后逐个调用线程的interrupt()
来中断线程。主要包括如下两个方法:
shutdown()
:将线程池状态设置为SHUTDOWN,然后中断所有没有正在执行任务的线程,它可以保证正在执行的任务一定会执行完shutdownNow()
:将线程池状态置为STOP,并尝试停止所有正在执行或暂停任务的线程,最后返回等待执行任务的列表
3. 原理解析
3.1 原理
上面讲述了使用线程池的缘由,如何根据不同的业务场景创建不同类型的线程池,以及使用线程池来执行任务和最终关闭线程池。那么不同类型的线程池实现的底层依赖是什么呢?下面我们看一下它底层所依赖的Executor框架。Executor框架起到了一个相当于任务调度器的效果,它将分解后的任务映射到线程池中的线程上,而线程池中的线程和操作系统上的本地线程又是一一映射的,因此,任务通过调用线程池中的线程而执行,实际上的工作仍然由本地线程完成的。
整个Executor框架主要由任务的创建、任务的执行和计算结果的返回,示意图如下所示:
其中,主线程需要创建实现Runnable或Callable接口对象,创建的方法中包含任务的执行逻辑。创建完成后,主线程需要调用接口对象相应的方法,将其交给ExecutorService执行。如果任务任务执行会返回结果,那么将返回Future或是FutureTask对象,使用get()
就可以获取。此外,主线程还可以调用cancel()
来取消任务的执行。
Runnable不会返回结果,Callable可以返回结果。
Executors工具类可以将Runnable对象转换为Callable对象,对应的方法是:
public static Callable<Object> callable(Runnable task)
Executor的源码定义如下:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 其他代码
}
不同类型的线程池的创建都是返回ThreadPoolExecutor或ScheduledThreadPoolExecutor对象,不同之处在于传递的参数不同。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
那么不同的线程池如何是如何执行提交的任务呢?
3.2 execute执行过程
newFixedThreadPool用于创建固定线程数量的线程池,它的execute()
的源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程数小于核心线程数,则创建新线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
方法的执行流程如下:
当主线程调用execute()
提交任务时,如果当前线程池中已有的线程数量小于设定的核心线程数,那么直接创建新的线程来执行任务;如果当前的的核心线程数已满,则无法创建新线程,将任务加入到工作队列中。当线程池中的线程执行完任务处于空闲状态时,它会从工作队列中获取阻塞的任务继续执行。
值得注意的是,LinkedBlockingQueue基于链表实现的无界阻塞队列,那么只要有任务进入到线程池,能执行就执行,不能执行就放入队列,永远不会拒绝新任务。
newSingleThreadExecutor只会创建单个线程,因此,只要有任务提交或是队列中有阻塞任务,那么永远只有一个线程执行。执行流程如下所示:
如果此时线程池中无运行的线程,那么将创建一个线程执行任务;否则,只能将任务添加到队列中,而不能再创建新的线程。当线程池中的线程执行完任务后,如果队列还有阻塞的任务,它同样会取任务继续执行。
newCachedThreadPool的corePoolSize初始化为0,maximumPoolSize为Integer.MAX_VALUE,而且工作队列为SynchronousQueue,它是没有容量的。这意味着如果线程池为空,当有任务需要执行时,它会不断的创建新线程。如果任务的提交速度高于线程池线程处理的速度,那么创建线程可能会耗尽CPU和内存资源。
由于SynchronousQueue它是一个不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能添加元素。因此,当主线程提交任务时,如果此时线程池中正好有线程执行poll操作,那么主线程提交任务的offer操作正好了线程的poll操作配对,线程可以执行所提交的任务。如果提交任务时线程池为空,或者没有空闲线程执行poll操作,那么配对失败,那么线程池将创建一个新的线程来执行任务。
线程在执行完当前任务后,会调用队列的poll(keepLiveTime,TimeUnit.xxx)
方法等待设定的时间。如果等待过程中没有新的任务提交,那么空闲线程将终止;如果有新任务提交,那么将继续执行新任务。
ScheduledThreadPoolExecutor主要用于在给定的延时之后或者定期执行任务,因此,它和前面的几种执行任务时有所不同。它的工作队列选择的是DelayQueue来满足延时的需求,DelayQueue也是一个无界队列。ScheduledThreadPoolExecutor的执行过程如下所示:
当主线程调用scheduleAtFixedRate()
或者scheduleWithFixedDelay()
方法时,它会向DelayQueue中添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask,线程池中的线程需要从队列中取出来执行。ScheduledFutureTask中3个重要的变量保证了延期执行或是定期执行的效果,如下所示:
time:任务被执行的具体时间
sequenceNumber:任务添加到ScheduledThreadPoolExecutor中的序列号
period:任务执行的间隔周期
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
4. 总结
线程池相比于单线程有很多的优势,但不能说使用线程池就一定比单线程好。另外,线程池也有多种不同的实现方式,用户使用时需要针对于具体的业务场景进行选择。