前言
本文主要内容
- 介绍线程池的参数
- 介绍Java并发库提供的线程池
- 介绍线程池如何实现 Thread-Per-Message 模式
- 介绍线程池如何实现 Work Thread 模式
状态转换
参数
概览
看一下 ThreadPoolExecutor 构造器的参数
- corePoolSize:线程池中一直处于 RUNNING 的线程数
- maximumPoolSize:同时处于 RUNNIING 状态的最大线程数
- keepAliveTime:超出corePoolSize数量的活跃线程等待新任务的时间
- unit:时间单位
- workQueue:阻塞队列,存储任务
- threadFactory:创建线程的工厂
- handler:拒绝策略
TimeUnit
时间单位,是一个枚举类
工作队列
工作队列使用BlockingQueue,这只是一个接口
看一看它的实现类都有哪些
看起来很多,但是并不复杂,部分特征体现在名字上。按照是否有界进行分类
有界
ArrayBlockingQueue
- 使用数组实现
- 构造时,必须传入容量大小
- 使用 ReentrantLock 保证线程安全
使用 Condition notEmpty 和 Condition notFull 来进行通信,实现阻塞
LinkedBlockingQueue
使用链表实现
- 使用 int capacity 来限制容量,若不设置,则为 Integer.MAX_VALUE
- 使用 ReentrantLock 保证线程安全
使用 Condition notEmpty 和 Condition notFull 来进行通信,实现阻塞
无界
LinkedBlockingDeque
使用链表实现
- 双端队列,前后都可以添加和移出
- 使用 ReentrantLock 保证线程安全
- 使用 Condition notEmpty 和 Condition notFull 来进行通信,实现阻塞
SynchronousQueue
并不维护队列,put时必须有线程进行poll,否则阻塞,poll 时也必须有线程进行put,否则阻塞。
LinkedTransferQueue
LinkedBlockingQueue 与 SynchronousQueue 的结合,即可以维持队列,又可以将信息直接传给等待的线程,而不存入队列中。put时,如果有等待的线程,则直接交给等待的线程,如果没有,则放入队列尾
PriorityBlockingQueue
- 使用数组实现
- 可以设置初始容量,在添加时,当容量不足会使用 tryGrow 方法进行扩容(扩大约 50%)
- 支持按照优先级出队(细节先省略,之后另开篇并发容器再聊)
- 使用 ReentrantLock 保证线程安全
- 使用 Condition notEmpty 和 Condition notFull 来进行通信,实现阻塞
DelayQueue
在 PriorityBlockingQueue 的基础上增加延迟出队
拒绝策略
RejectedExecutionHandler为拒绝策略接口,只定义了一个方法
当线程池不再接受任务时,调用 rejectedExecution 处理后续传递来的任务,其实现类有
默认使用 AbortPolicy
CallerRunsPolicy
如果线程池不是 shutdown 状态,则在当前线程执行任务,否则直接丢弃
AbortPolicy(默认)
抛出 RejectedExecutionException 异常,注明是哪个任务,来自哪个线程池
DiscardPolicy
DiscardOldestPolicy
线程工厂
创建线程,可以在这里给创建的线程添加一些附加属性,或者进行记录。比如说根据线程进行的工作,给线程进行命名,方便排查错误
这里就来看下默认使用的DefaultThreadFactory,如果不传递线程工厂的话,就使用这个工厂类
DefaultThreadFactory
代码如下
/**
* The default thread factory.
*/
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);//记录线程池数量,给线程池编号
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);//记录线程数量,给池中线程编号
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();//获取安全管理器,设置线程权限,避免访问敏感资源
group = (s != null) ? s.getThreadGroup() ://如果未设置安全管理器,则使用当前线程的线程组
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);//设置线程组 线程名字 线程堆栈大小
if (t.isDaemon())
t.setDaemon(false);//设置为非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)//设置为默认优先级 5
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
主要完成了以下工作
- 对线程进行命名:pool-线程池编号-thread-线程编号
- 设置线程组
- 如果设置了SecurityManager(关于SecurityManager可以看下面的参考),则使用SecurityManager的线程组
- 如果未设置,则使用当前线程的线程组
- 设置为非守护线程
- 设置线程优先级为默认优先级 5
Java并发库提供的线程池
newCachedThreadPool()
其实提供的线程池就是将以上的所介绍的参数实现进行组装,看看源码就知道功能了。看下源码
主要关注两个方面,一是最大线程数为 Integer.MAX_VALUE,当任务过多时,有资源耗尽的风险;二是阻塞队列使用 SynchronousQueue,也就是不维护队列,任务到来就直接传递给处理的线程。
这里也可以选择另外一个方法,指定创建线程的工厂
newFixedThreadPool(int nThreads)
这个就相当于使用 Work Thread 模式了,只有核心线程。阻塞队列使用的是 LinkedBlockingQueue,虽然这是个有界队列,但是该方法没有暴露设置阻塞队列大小的参数,默认是 Integer.MAX_VALUE,因此当任务数量过多时,也有资源耗尽的危险
newSingleThreadExecutor()
保证线程池中有一个活跃线程,但是使用了 LinkedBlockingQueue 做阻塞队列,且无法设置阻塞队列大小,因此也存在资源耗尽的风险
newSingleThreadScheduledExecutor()
newWorkStealingPool(int parallelism)
线程池实现 Work Thread 模式的方法
我觉得线程池是 Thread-Per-Message 模式与 Work Thread 模式的结合,也可以说是 Work Thread 模式的优化版,解决了它无法灵活应对任务数突然增多的问题。core 线程使用 Work Thread 模式,来复用线程;maximumPoolSize - corePoolSize 的部分线程数就是使用了 Thread-Per-Message模式,来应对突增的任务数,并在随着任务数的递减而逐渐销毁,减少资源消耗,同时 maximumPoolSize 也限制了线程占用的最大资源,避免服务器因创建大量线程而宕机。
使用延迟加载初始化线程池
ThreadPoolExecutor并不是在创建实例的时候,就创建了 corePoolSize 数量的线程,而是使用延迟加载,当有任务到达时,如果当前线程数量小于 corePoolSize 则创建核心线程,去进行处理
通过不停获取任务来保证核心线程一直运行
线程池的内部类 Worker 实现了 Runnable ,来看下它的 run 方法。这里是跳转到了 runWorker
在 runWorker 中,实现线程不退出的方法如下。通过不停获取任务来来保证线程不退出
线程池实现 Thread-Per-Message 模式的方法
任务数超过核心线程数时创建非核心线程
以下是 ThreadPoolExecutor 中 execute 方法 的部分代码
在这个方法中,先判断当前任务数是否小于核心数,如果小于则创建核心线程去执行该任务
如果大于核心线程数,则将任务添加到阻塞队列中,然后创建非核心线程去执行
利用任务获取时间来保证非核心线程按时退出
非核心线程当执行完任务后,会随着任务量的减少,逐渐退出,这个功能是通过使用获取任务时间来实现的。相关代码在 getTask() 方法中
getTask 中的主体逻辑在这个死循环中
在 getTask() 方法中通过 timed 来记录是否需要设定获取任务时间。如果当前活跃线程数 > 核心线程数量,则设置获取任务时间
之后,需要在规定时间内获取到任务,如果超时,timeOut 会设置为 true
之后在下次循环中,CAS 减少活跃线程数记录,然后返回 null,也就是标志着获取任务失败
获取任务失败,退出循环,清理退出线程