类图
Executor
基础接口
将任务提交和任务执行进行解耦
基于生产者消费者模型, 提交任务的执行是生产者, 执行任务的线程是消费者
ExecutorService
扩展了Executor, 添加了生命周期的管理方法和用于任务提交的便利方法
状态: 运行-关闭-终止
JUC 线程池
Java线程池,是典型的池化思想的产物
线程池作用
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
在JVM中默认一个线程需要使用256k~1M(取决于32位还是64位操作系统)的内存。
CPU调度列入其中,这边不将线程调度列入是因为睡眠中的线程不会被调度(OS控制),如果不是睡眠中的线程那么是一定需要被调度的。
不用线程池
如果一个线程的时间非常长,就没必要用线程池了(不是不能作长时间操作,而是不宜。),况且我们还不能控制线程池中线程的开始、挂起、和中止。
Executor线程池构造4种线程池
Executors提供的工厂方法
Java通过Executors(jdk1.5并发包 java.util.current)提供四种静态方法创建线程池
Executors工厂类中提供的 四种线程池 其实也只是ThreadPoolExecutor的核心构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。
手动创建线程池效果更好
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
1.△newSingleThreadExecutor 单个线程
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
单例类线程池只有一个线程,无边界队列,适合cpu密集的运算
// 源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2.△ newCachedThreadPool 可缓存线程
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
// 源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3.△ newFixedThreadPool 固定线程
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
4.△newScheduledThreadPool 定时线程
创建一个定长线程池,支持定时及周期性任务执行。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
ThreadFactory namedFactory = new ThreadFactoryBuilder()
.setNameFormat("chat-pool-%d").build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5, namedFactory);
HelloThread helloThread = new HelloThread();
ScheduledFuture<?> scheduledFuture1 = executor.scheduleAtFixedRate(helloThread, 1, 1, TimeUnit.SECONDS);
// 调用
try {
TimeUnit.SECONDS.sleep(1);
scheduledFuture1.cancel(true);
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
核心线程池 ThreadPoolExecutor
执行策略
当新的线程请求进来时,会先判断核心线程数
===核心线程数**corePoolSize如果未满则直接新建线程并执行,执行完将其放回线程池;
===核心线程数如果已满
======再检查任务队列workQueue,如果没满就将当前线程请求加入缓冲队列 ,等待空闲线程分配;
======队列已满, 再检查线程池当前存在的线程数是否已达到规定的最大线程maximumPoolSize,如果没有达到就创建线程执行;
============如果达到就执行对应的拒绝策略**。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
根据源码可以发现整个线程池大致分为 3 个部分,
- 是创建 worker 线程
execute(Runnable command)
- 添加任务到 workQueue;
addWorker(Runnable firstTask, boolean core)
- worker 线程执行具体任务
线程数配置
1.线程池的默认值
- corePoolSize=1
核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,
当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中 - queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
线程池最大线程数,它表示在线程池中最多能创建多少个线程; - keepAliveTime=60s 空闲线程允许的最大的存活时间
- allowCoreThreadTimeout=false
- unit: 存活时间的单位
- workQueue: 阻塞任务队列
- threadFactory: 线程工厂用来创建线程,一般设置线程名称
- rejectedExcutionHandler=AbortPolicy() 拒绝策略,针对当队列满了时新来任务的处理方式
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2. 项目中如何配置线程数量?
额外知识: CPU密集型(CPU-bound) CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。 IO密集型(I/O bound) IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。 计算密集型程序适合C语言多线程,I/O密集型适合脚本语言开发的多线程。
如果是CPU密集型应用,则线程池大小设置为CPU核数+1
如果是IO密集型应用,则线程池大小设置为CPU核数*2+1
更多请参考: https://www.jianshu.com/p/f30ee2346f9f
3. 阻塞队列
1、[有界] ArrayBlockingQueue:由数组结构组成的阻塞队列
表现稳定,添加和删除使用同一个锁,通常性能不如后者
2、[有界] LinkedBlockingQueue:由链表组成的有界(但大小默认值为Integer.MAX_Value)
添加和删除两把锁是分开的,所以竞争会小一些
3、{无界} PriorityBlockingQueue:支持优先级排序的无界阻塞队列
上面三个队列他们也是存在共性的
put take 操作都是阻塞的
offer poll 操作不是阻塞的,offer 队列满了会返回false不会阻塞,poll 队列为空时会返回null不会阻塞
补充一点,并不是在所有场景下,非阻塞都是好的,阻塞代表着不占用CPU,在有些场景也是需要阻塞的,put take 存在必有其存在的必然性
4、{无界} DelayQueue:使用优先级队列实现的延迟无界阻塞队列
5、SynchronizedQueue:不存储元素的阻塞队列,也即单个元素的队列
6、{无界} LinkedTransferQueue:由链表结构组成的无界阻塞队列
7、LinkedBlockingDeque:由链表结构组成的双向阻塞队列
现在也来说一说无界队列的共同点
- put 操作永远都不会阻塞,空间限制来源于系统资源的限制
- 底层都使用 CAS 无锁编程
4. 拒绝策略解析
当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
jdk默认提供了四种拒绝策略:
CallerRunsPolicy - 则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy 默认 - 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy - 直接丢弃
DiscardOldestPolicy - 当触发拒绝策略,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
线程池方法
1.execute() 与 submit() 的区别?
execute() 参数 Runnable ;submit() 参数 (Runnable) 或 (Runnable 和 结果 T) 或 (Callable)
execute() 没有返回值;而 submit() 有返回值
submit() 的返回值 Future 调用get方法时,可以捕获处理异常
设计一个线程池
1)线程池管理类[单例设计模式]
主要用于实现创建线程和添加线程处理的新任务,执行任务以及如何回收已经执行完任务的线程。
增加任务的方法(addTask)、批量增加任务的方法(batchAddTask)、得到实例的方法(getInstanse)以及执行任务的方法(execute)
//增加任务的方法
public void addTask(Task task){
lock.lock();
taskQueue.add(task);
taskQueue.notifyAll();
lock.unlock();
}
//执行任务的方法
public void execute(){
for(int i=0; i<workThreads.length; i++){
WorkThread wt = workThreads[i];
if(wt.isWaiting()){
new Thread(wt).start();
break;
}
}
2)工作线程类
线程池中的线程,也实现了Runnable接口,它主要用于处理任务队列中的任务。
在run()方法里面它首先判断任务队列里面是否有任务,如果没有就等待新任务加入,如果有任务就从任务队列中取出任务并执行,在线程池中可以设定工作线程的个数
while(isRunning){
lock.lock();
while(taskQueue.isEmpty()){
try{
taskQueue.wait();
}catch(Exception e){
}
}
task = (Task)taskQueue.remove(0);
try{
new Thread(task).start();
}catch(Exception e){
e.printStackTrace();
}
lock.unlock();
}
}
3)任务类
定义任务的各种属性,以及要执行的操作
public class Task implements Runnable{
public void run(){
}
}
4)任务队列 Map、Set以及List
按先来先服务的顺序用于存放新加入的任务,以便让工作线程来执行
在文中通过Collections.synchronizedList将其转换为一个线程安全的类。任务队列定义如下所示:
private List<Task> taskQueue = Collections.synchronizedList(new LinkedList<Task>());
ForkJoinPool
这边推荐大家使用 newWorkStealingPool,也就是ForkJoinPool。采取了工作窃取的模式。 后续会跟大家一起聊聊 ForkJoinPool。
https://www.toutiao.com/i6735249778763891204/
线程池监控
运行时状态实时查看
用户基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数,