前言
线程池: 简单理解,它就是一个管理线程的池子。
线程池的优点:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池的总结
工作流程
图解线程池流程
先来一张通俗易懂的:
文字理解:工厂中有固定的一批工人,称为正式工人,工厂接收的订单由这些工人去完成。当订单增加,正式工人已经忙不过来了,工厂会将生产原料暂时堆积在仓库中,等有空闲的工人时再处理(因为工人空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度)。仓库堆积满了后,订单还在增加怎么办?工厂只能临时扩招一批工人来应对生产高峰,而这批工人高峰结束后是要清退的,所以称为临时工。当时临时工也以招满后(受限于工位限制,临时工数量有上限),后面的订单只能忍痛拒绝了。
将其翻译成线程池理解:
关键参数
线程池关键参数的理解(在图中可以看到):
corePoolSize(必需):核心线程数。即池中一直保持存活的线程数,即使这些线程处于空闲。但是将allowCoreThreadTimeOut参数设置为true后,核心线程处于空闲一段时间以上,也会被回收。
- maximumPoolSize(必需):池中允许的最大线程数。当核心线程全部繁忙且任务队列打满之后,线程池会临时追加线程,直到总线程数达到maximumPoolSize这个上限。
- keepAliveTime(必需):线程空闲超时时间。当非核心线程处于空闲状态的时间超过这个时间后,该线程将被回收。将allowCoreThreadTimeOut参数设置为true后,核心线程也会被回收。
- unit(必需):keepAliveTime参数的时间单位。有:TimeUnit.DAYS(天)、TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.NANOSECONDS(纳秒)
- workQueue(必需):任务队列,采用阻塞队列实现。当核心线程全部繁忙时,后续由execute方法提交的Runnable将存放在任务队列中,等待被线程处理。
- threadFactory(可选):线程工厂。指定线程池创建线程的方式。
- handler(可选):拒绝策略。当线程池中线程数达到maximumPoolSize且workQueue打满时,后续提交的任务将被拒绝,handler可以指定用什么方式拒绝任务。
工作原理
拒绝策略
- AbortPolicy(抛出一个异常,默认的)
- DiscardPolicy(直接丢弃任务)
- DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
- CallerRunsPolicy(交给线程池调用所在的线程进行处理)
任务队列
ArrayBlockingQueue
ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。LinkedBlockingQueue
LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列DelayQueue
DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。PriorityBlockingQueue
PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;SynchronousQueue
SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。
创建方式
创建线程池的方式:
- 通过ThreadPoolExecutor构造函数实现(推荐)
- 通过 Executor 框架的工具类 Executors 来实现,总共有三种类型(方法内部也是调用了ThreadPoolExecutor的构造方法)
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
ThreadPoolExecutor
《阿里巴巴 Java 开发手册》“并发处理”章节中,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。(具体原因可参照线程池的优点)
并且强制要求,不允许使用Executors
去快捷创建线程池,而是通过ThreadPoolExecutor
构造函数的方式,除了避免 OOM 的原因外,这样的处理方式可以让我们更加明确线程池的运行规则,规避资源耗尽的风险。
也就是说使用有界队列,控制线程创建数量。因为实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。我们应该显示地给我们的线程池命名,有助于后期快速定位问题。Executors 返回线程池对象的弊端:
FixedThreadPool
和SingleThreadExecutor
:允许请求的队列长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM(java.lang.OutOfMemoryError:内存用完了)。CachedThreadPool
和ScheduledThreadPool
:允许创建的线程数量为Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
示例:一个简单的for遍历线程
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class App
{
//corePoolSize: 定义核心线程数为 3
private static final Integer CORE_POOL_SIZE = 3;
//maximumPoolSize:定义最大线程数为 9
private static final Integer MAX_POOL_SIZE = 9;
//workQueue:定义任务队列为 ArrayBlockingQueue,并且容量为 90
private static final Integer QUEUE_CAPACITY = 90;
//keepAliveTime: 定义等待时间为 1
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,//unit: 等待时间的单位为 TimeUnit.SECONDS
new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY), //workQueue:定义任务队列为 ArrayBlockingQueue
new ThreadPoolExecutor.CallerRunsPolicy());//handler:定义拒绝策略为 CallerRunsPolicy
// 模拟向线程池提交(6个线程)任务
for (int i = 0; i < 6; i++) {
//执行线程;这里直接使用的Runnable匿名对象;也可以自行创建Runnable或Callable的实现类
executor.execute(new Runnable() {
@Override
public void run() {
Long start = System.currentTimeMillis(); //获取线程开始执行时间
//这里随便写了一个遍历功能的线程
for (int numValue = 0; numValue <= 2; numValue++) {
System.out.println(Thread.currentThread().getName() + ":" + numValue + " " + new Date(System.currentTimeMillis()));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":当前线程任务结束,总共花费时间:" + (System.currentTimeMillis() - start) / 1000 + "秒");
}
}
);
}
//终止线程池
executor.shutdown(); //设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
// executor.shutdownNow(); //(慎用)设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,该方法要慎用,容易造成不可控的后果
while (!executor.isTerminated()) {
//使用isTerminated()函数判断线程池中的所有线程是否执行完毕时候,不能直接使用该函数,必须在shutdown()方法关闭线程池之后才能使用,
//否则isTerminated()永远为false,线程将一直阻塞在该判断的地方,导致程序最终崩溃。
}
System.out.println("所有线程都已结束");
}
}
打印结果:
pool-1-thread-2:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-1:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-3:0 Thu Sep 30 10:42:01 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:02 CST 2021
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-2:0 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-2:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-1:1 Thu Sep 30 10:42:03 CST 2021
pool-1-thread-3:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-1:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:2 Thu Sep 30 10:42:04 CST 2021
pool-1-thread-2:当前线程任务结束,总共花费时间:1秒
pool-1-thread-1:当前线程任务结束,总共花费时间:1秒
pool-1-thread-3:当前线程任务结束,总共花费时间:1秒
所有线程都已结束
打印结果分析:
该示例中总共向线程池提交了6个线程任务,但是该线程池的核心线程只有3个,所以根据示例中定义的拒绝策略和任务队列,剩下的3个线程在任务队列中,根据打印结果可以看到,当前面的3个线程分别执行完毕时,空出的核心线程会被剩下的3个线程任务拿到,然后继续执行剩下的线程任务
Executors
FixedThreadPool
创建可重用固定线程数的线程池。
- FixedThreadPool只有核心线程,且数量固定,没有非核心线程。
- keepAliveTime设置为0L,代表多余的线程会被立即终止;即没有非空闲时间;因为不会产生多余的线程,所以keepAliveTime是无效的参数。
- 任务队列为无界的阻塞队列LinkedBlockingQueue(容量默认为Integer.MAX_VALUE)。
- 提交任务
- 如果线程数少于核心线程,创建核心线程执行任务
- 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
- 如果线程执行完任务,去阻塞队列取任务,继续执行。
示例:
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//do nothing
}
});
使用场景:
适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
缺点:
- 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
- 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
- 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)
CachedThreadPool
一个根据需要创建线程的线程池。
核心线程数为0
最大线程数为Integer.MAX_VALUE
阻塞队列是SynchronousQueue
非核心线程空闲存活时间为60秒
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
提交任务
- 因为没有核心线程,所以任务直接加到SynchronousQueue队列。
- 判断是否有空闲线程,如果有,就去取出任务执行。
- 如果没有空闲线程,就新建一个线程执行。
- 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则被销毁。
使用场景:
因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。
缺点:
CachedThreadPool允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
SingleThreadExecutor
只有一个线程的线程池。
- 核心线程数为1
- 最大线程数也为1
- 阻塞队列是LinkedBlockingQueue
- keepAliveTime为0
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在执行");
});
}
- 提交任务
- 线程池是否有一条线程在,如果没有,新建线程执行任务
- 如果有,讲任务加到阻塞队列
- 当前的唯一线程,从队列取任务,执行完一个,再继续取,如此循环。。。。
使用场景:
适用于串行执行任务的场景,一个任务一个任务地执行。
缺点:
SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的任务队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor 使用无界队列作为线程池的任务队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点就是可能会导致 OOM,
常用方法
提交
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
submit()
方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的get()
方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
以AbstractExecutorService
接口中的一个 submit 方法为例:
public Future<?> submit(Runnable task) {
if (task == null) thrownew NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
上面方法调用的 newTaskFor()
方法返回了一个 FutureTask 对象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
对比execute()
方法:
public void execute(Runnable command) {
...
}
关闭
shutdown()
:关闭线程池,线程池的状态变为 SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。
shutdownNow()
:关闭线程池,线程的状态变为STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List;该方法要慎用,容易造成不可控的后果
判断
isShutDown
当调用 shutdown()
方法后,返回为 true,否则为false。
isTerminated
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true,否则为false。
状态切换
RUNNING
- 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
- 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
- 调用线程池的shutdownNow()方法,可以切换到STOP状态;
SHUTDOWN
- 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
- 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;
STOP
- 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- 线程池中执行的任务为空,进入TIDYING状态;
TIDYING
- 该状态表明所有的任务已经运行终止,记录的任务数量为0。
- terminated()执行完毕,进入TERMINATED状态
TERMINATED
- 该状态表示线程池彻底终止
配置线程数
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
参考文章:
https://mp.weixin.qq.com/s?__biz=Mzg2OTA0Njk0OA==&mid=2247485808&idx=1&sn=1013253533d73450cef673aee13267ab&chksm=cea246bbf9d5cfad1c21316340a0ef1609a7457fea4113a1f8d69e8c91e7d9cd6285f5ee1490&token=510053261&lang=zh_CN&scene=21#wechat_redirect
https://juejin.cn/post/6844903889678893063
https://blog.csdn.net/mu_wind/article/details/113806680