线程池的构造:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//异常:核心线程数小于0 或者 最大线程数小于0 或者最大线程数小于核心线程数 或者 存活时间小于0
if (corePoolSize < 0 ||maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
//阻塞队列为null 或者 创建线程的工厂为null 或者拒绝策略为null
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:池中所保存的线程数,包括空闲线程。
the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
idle:
allowCoreThreadTimeOut:【public void allowCoreThreadTimeOut(boolean value)】
如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。
maximumPoolSize:池中允许的最大线程数。
the maximum number of threads to allow in the pool
keepAliveTime:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
when the number of threads is greater than the core, this is the maximum time that excess idle threads
will wait for new tasks before terminating
excess :
terminating:
(使)停止,结束,终止;到达终点站
unit:keepAliveTime 参数的时间单位。
the time unit for the {@code keepAliveTime} argument
workQueue:执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
the queue to use for holding tasks before they are executed. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.
threadFactory:执行程序创建新线程时使用的工厂。
the factory to use when the executor creates a new thread
handler:由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
the handler to use when execution is blocked because the thread bounds and queue capacities are reached
handler:
处理程序。驯兽员;(尤指)驯犬员;搬运工;操作者;组织者;顾问
bounds:
capacities :
容量;容积;容纳能力;领悟(或理解、办事)能力;职位;职责
线程池的执行:
核心和最大池大小:
ThreadPoolExecutor 将根据 corePoolSize(参见 getCorePoolSize()
))和 maximumPoolSize(参见 getMaximumPoolSize()
))设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable)
) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)
) 和 setMaximumPoolSize(int)
) 进行动态更改。
按需构造
默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread()
) 或 prestartAllCoreThreads()
) 对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。
/**
*启动核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策
* 略。如果已启动所有核心线程,此方法将返回 false。
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
}
/**
*启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程 * 策略。返回:已启动的线程数
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
创建新线程
使用 ThreadFactory
创建新线程。如果没有另外说明,则在同一个 ThreadGroup
中一律使用 Executors.defaultThreadFactory()
) 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
//返回用于创建新线程的默认线程工厂。
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
//此工厂创建同一 ThreadGroup 中 Executor 使用的所有新线程。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
//如果有 SecurityManager,则它使用 System.getSecurityManager() 组来调用此 // defaultThreadFactory 方法,其他情况则使用线程组。
//每个新线程都作为非守护程序而创建,并且具有设置为 Thread.NORM_PRIORITY 中较小者的优先级以及线 // 程组中允许的最大优先级。新线程具有可通过 pool-N-thread-M 的 Thread.getName() 来访问的名 //称,其中 N 是此工厂的序列号,M 是此工厂所创建线程的序列号。
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
保持活动时间
如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见 getKeepAliveTime(java.util.concurrent.TimeUnit)
))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit)
) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS
的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean)
) 方法也可将此超时策略应用于核心线程。
排队
所有 BlockingQueue
都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
- 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
- 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
- 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
排队有三种通用策略:
- 直接提交。工作队列的默认选项是
SynchronousQueue
,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。 - 无界队列。使用无界队列(例如,不具有预定义容量的
LinkedBlockingQueue
)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。 - 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如
ArrayBlockingQueue
)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
SynchronousQueue:
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。
LinkedBlockingQueue:
一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE
。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
此类及其迭代器实现 Collection
和 Iterator
接口的所有可选 方法。
ArrayBlockingQueue:
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
被拒绝的任务
当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable)
) 中提交的新任务将被拒绝。在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandler
的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
) 方法。下面提供了四种预定义的处理程序策略:
在默认的
ThreadPoolExecutor.AbortPolicy
中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
。public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException.
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
在
ThreadPoolExecutor.CallerRunsPolicy
中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果 //执行程序已关闭,则会丢弃该任务。
if (!e.isShutdown()) {
r.run();
}
}
}
在
ThreadPoolExecutor.DiscardPolicy
中,不能执行的任务将被删除。public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
//用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//获取并忽略下一个任务,否则如果该任务立即可用,执行程序将执行该任务,然后再试图重新 执行任务 //r;如果执行程序已关闭,则会丢弃任务 r。
if (!e.isShutdown()) {
e.getQueue().poll();//获取并移除此队列的头,如果此队列为空,则返回 null
e.execute(r);
}
}
}
定义和使用其他种类的 RejectedExecutionHandler
类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。
钩子 (hook) 方法
此类提供 protected 可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable)
) 和 afterExecute(java.lang.Runnable, java.lang.Throwable)
) 方法,这两种方法分别在执行每个任务之前和之后调用。它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。此外,还可以重写方法 terminated()
) 来执行 Executor 完全终止后需要完成的所有特殊处理。
如果钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。
队列维护
方法 getQueue()
) 允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。remove(java.lang.Runnable)
) 和 purge()
) 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。
getQueue
返回此执行程序使用的任务队列。对任务队列的访问主要用于调试和监控。此队列可能正处于活动使用状态中。获取任务队列不妨碍已加入队列的任务的执行。
remove
从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,purge()
) 方法可用于移除那些已被取消的 Future。
purge
尝试从工作队列移除所有已取消的 Future
任务。此方法可用作存储回收操作,它对功能没有任何影响。取消的任务不会再次执行,但是它们可能在工作队列中累积,直到 worker 线程主动将其移除。调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。
终止
程序 AND 不再引用的池没有剩余线程会自动 shutdown。如果希望确保回收取消引用的池(即使用户忘记调用 shutdown()
)),则必须安排未使用的线程最终终止:设置适当保持活动时间,使用 0 核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)
)。
执行流程
如何确定线程的数量
测试案例
package com.yuancheng.boot.thread;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import java.util.List;
import java.util.concurrent.*;
public class ThreadPoolExecutorDemo {
public static void main(String[] args) throws Exception{
ThreadPoolExecutorDemo t = new ThreadPoolExecutorDemo();
// t.testThreadPool();
// t.testThreadPool2();
// t.testThreadPool3();
// t.testThreadPool4();
// t.testThreadPool5();
// t.testThreadPool6();
t.testThreadPool7();
}
//立刻关闭
public void testThreadPool7() throws Exception{
ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
System.out.println("--------有任务进入拒绝队列了");
}
});
for (int i = 0; i < 15; i++) {
int n = i;
threadPoolExecutor.submit(()->{
try{
System.out.println("开始执行:"+n);
Thread.sleep(3000L);
System.out.println("执行结束:"+n);
}catch (Exception e ){
System.out.println("异常:"+n);
}
});
System.out.println("任务提交成功:"+i);
}
Thread.sleep(1000L);
List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
threadPoolExecutor.submit(()->{
System.out.println("追加一个任务");
});
System.out.println("最终的没有执行的任务有:"+shutdownNow.size());
}
public void testThreadPool6() throws Exception{
ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
System.out.println("--------有任务进入拒绝队列了");
}
});
for (int i = 0; i < 15; i++) {
int n = i;
threadPoolExecutor.submit(()->{
try{
System.out.println("开始执行:"+n);
Thread.sleep(3000L);
System.out.println("执行结束:"+n);
}catch (Exception e ){
System.out.println("异常:"+n);
}
});
System.out.println("任务提交成功:"+i);
}
Thread.sleep(1000L);
threadPoolExecutor.shutdown();
threadPoolExecutor.submit(()->{
System.out.println("追加一个任务");
});
}
//定时任务执行 周期性任务
public void testThreadPool5()throws Exception{
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
//等待上个任务执行完毕后,立刻执行
threadPoolExecutor.scheduleAtFixedRate(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务一被执行,时间是:"+System.currentTimeMillis());
},2,1,TimeUnit.SECONDS);
//等待上个任务执行完毕后,再延迟1秒
threadPoolExecutor.scheduleWithFixedDelay(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务二被执行,时间是:"+System.currentTimeMillis());
},2,1,TimeUnit.SECONDS);
System.out.println("定时任务,提交成功,时间是:"+System.currentTimeMillis()+";当前线程池的数量为:"+threadPoolExecutor.getPoolSize());
}
//定时任务执行 DelayedWorkQueue 延时队列
public void testThreadPool4()throws Exception{
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
threadPoolExecutor.schedule(()->{
System.out.println("任务被执行,现在的时间是"+System.currentTimeMillis());
},3,TimeUnit.SECONDS);
System.out.println("定时任务,提交成功,时间是:"+System.currentTimeMillis()+";当前线程池的数量为:"+threadPoolExecutor.getPoolSize());
}
public void testThreadPool3()throws Exception{
ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<>());
testCommon(threadPoolExecutor);
Thread.sleep(60000L);
System.out.println("60s后再看线程池中的数据量"+threadPoolExecutor.getPoolSize());
}
public void testThreadPool2()throws Exception{
ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
System.out.println("--------有任务进入拒绝队列了");
}
});
testCommon(threadPoolExecutor);
}
public void testThreadPool()throws Exception{
ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(5,10,5, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
testCommon(threadPoolExecutor);
}
public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception{
// 提交15个执行时间需要超过3秒的任务,看线程池的情况
for (int i = 0; i <15 ; i++) {
int n = i;
threadPoolExecutor.submit(()->{
try{
System.out.println("开始执行:"+n);
Thread.sleep(3000L);
System.out.println("执行结束:"+n);
}catch (Exception e ){
e.printStackTrace();
}
});
System.out.println("任务提交成功:"+i);
}
Thread.sleep(500L);
System.out.println("当前线程池的线程数量为:"+threadPoolExecutor.getPoolSize());
System.out.println("当前线程池等待的数量为:"+threadPoolExecutor.getQueue().size());
Thread.sleep(15000L);
System.out.println("当前线程池的线程数量为:"+threadPoolExecutor.getPoolSize());
System.out.println("当前线程池等待的数量为:"+threadPoolExecutor.getQueue().size());
}
}