构造器七大参数
第 1 个参数:corePoolSize
表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。
第 2 个参数:maximumPoolSize
表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。
第 3 个参数:keepAliveTime
表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。
第 4 个参数:unit
表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。
第 5 个参数:workQueue
表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。
第 6 个参数:threadFactory
表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程。
第 7 个参数:RejectedExecutionHandler
表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。
拒绝策略
ThreadPoolExecutor
内部提供了一些拒绝策略
CallerRunsPolicy
把任务交给当前线程来执行,它直接调用了传入线程的 run 方法
源码
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
测试
先创建一个自定义的线程类,下面其它拒绝策略测试时都会用到
public class MyThread implements Runnable {
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:" + Thread.currentThread().getName() + " 任务:" + name);
}
}
测试代码:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 6; i++) {
System.out.println("创建任务:" + i);
executor.execute(new MyThread(i + ""));
}
}
执行结果:
创建任务:0
创建任务:1
创建任务:2
创建任务:3
创建任务:4
线程:pool-1-thread-1 任务:0
线程:main 任务:4
线程:pool-1-thread-2 任务:3
创建任务:5
线程:pool-1-thread-2 任务:2
线程:pool-1-thread-1 任务:1
线程:pool-1-thread-2 任务:5
执行结果解析:
- 任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
- 此时队列满了,线程池会创建一个新的线程来执行任务 3
- 任务 4 进入拒绝策略,所以执行任务 4 的线程是当前线程 main
- 因为任务 4 是 main 线程执行的,所以等任务 4 执行完成之后才创建任务 5,任务 5 会进入队列
等待任务 1 和 2 完成后,任务 5 也执行完成,整个流程结束
AbortPolicy
源码
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
测试
测试代码:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 6; i++) {
System.out.println("创建任务:" + i);
executor.execute(new MyThread(i + ""));
}
}
执行结果:
创建任务:0
创建任务:1
创建任务:2
创建任务:3
创建任务:4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.demo.MyThread@1f32e575 rejected from java.util.concurrent.ThreadPoolExecutor@279f2327[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.demo.Demo.main(Demo.java:23)
线程:pool-1-thread-1 任务:0
线程:pool-1-thread-2 任务:3
线程:pool-1-thread-2 任务:1
线程:pool-1-thread-1 任务:2
执行结果解析:
任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
- 此时队列满了,线程池会创建一个新的线程来执行任务 3
任务 4 进入拒绝策略,该拒绝策略抛出异常,由于 for 里面没有捕获异常,所以中止了
DiscardPolicy
什么都不做,忽略此任务,不推荐使用
源码:/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
测试
测试代码:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 6; i++) {
System.out.println("创建任务:" + i);
executor.execute(new MyThread(i + ""));
}
}
执行结果:
创建任务:0
创建任务:1
创建任务:2
创建任务:3
创建任务:4
创建任务:5
线程:pool-1-thread-1 任务:0
线程:pool-1-thread-2 任务:3
线程:pool-1-thread-2 任务:2
线程:pool-1-thread-1 任务:1
执行结果解析:
任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
- 此时队列满了,线程池会创建一个新的线程来执行任务 3
任务 4、5 进入拒绝策略,该拒绝策略什么都不做,直接忽略了
DiscardOldestPolicy
源码
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 把最早的任务抛弃
e.getQueue().poll();
// 执行当前任务
e.execute(r);
}
}
}
测试
测试代码:
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 6; i++) {
System.out.println("创建任务:" + i);
executor.execute(new MyThread(i + ""));
}
}
执行结果:
创建任务:0
创建任务:1
创建任务:2
创建任务:3
创建任务:4
创建任务:5
线程:pool-1-thread-1 任务:0
线程:pool-1-thread-2 任务:3
线程:pool-1-thread-1 任务:4
线程:pool-1-thread-2 任务:5
执行结果解析:
任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
- 此时队列满了,线程池会创建一个新的线程来执行任务 3
任务 4、5 进入拒绝策略,该拒绝把最早加入队列的任务 1、2 丢弃了
自定义拒绝策略
模仿内置的拒绝策略,实现 RejectedExecutionHandler 接口的 rejectedExecution 即可。
public class MyPolicy implements RejectedExecutionHandler {
public MyPolicy() { }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RejectedExecutionException("自定义拒绝策略 " + r.toString());
}
}
ThreadPoolExecutor扩展
继承 ThreadPoolExecutor 类,重写 beforeExecute 与 afterExecute 方法,实现计算线程的执行时间:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
// 保存线程执行开始时间
private final ThreadLocal<Long> localTime = new ThreadLocal<>();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
/**
* 开始执行之前
* @param t 线程
* @param r 任务
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 开始时间 (单位:纳秒)
Long sTime = System.nanoTime();
localTime.set(sTime);
System.out.printf("%s | before | time=%s%n", t.getName(), sTime);
super.beforeExecute(t, r);
}
/**
* 执行完成之后
* @param r 任务
* @param t 抛出的异常
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 结束时间 (单位:纳秒)
Long eTime = System.nanoTime();
// 执行总时间
Long totalTime = eTime - localTime.get();
System.out.printf("%s | after | time=%s | 耗时:%s 毫秒%n",
Thread.currentThread().getName(), eTime, (totalTime / 1000000.0));
super.afterExecute(r, t);
}
}
使用自定义扩展的线程池: ```java ThreadPoolExecutor executor = new MyThreadPoolExecutor(4, 8, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new MyPolicy());
for (int i = 0; i < 5; i++) { executor.execute(() -> { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } }); }
输出:
pool-1-thread-2 | before | time=134686402749177 pool-1-thread-4 | before | time=134686403241020 pool-1-thread-1 | before | time=134686402714875 pool-1-thread-3 | before | time=134686402850272 pool-1-thread-2 | after | time=134686616057068 | 耗时:213.307891 毫秒 pool-1-thread-2 | before | time=134686619274973 pool-1-thread-4 | after | time=134686668299389 | 耗时:265.058369 毫秒 pool-1-thread-2 | after | time=134686688080508 | 耗时:68.805535 毫秒 pool-1-thread-1 | after | time=134686859037104 | 耗时:456.322229 毫秒 pool-1-thread-3 | after | time=134687249156210 | 耗时:846.305938 毫秒
<a name="TmB2V"></a>
## 异常捕获
使用线程池的时候用execute提交任务后,主线程无法捕获子线程的异常,这个时候需要一些手段来实现。
<a name="bShmb"></a>
### 自定义的线程工厂
使用自定义的线程工厂,声明异常处理Handler,实现异常捕获:
```java
// 自定义线程工厂
ThreadFactory factory = (Runnable r) -> {
Thread t = new Thread(r);
// 增加异常处理
t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
System.out.println("exceptionHandler捕捉到异常:" + e.getMessage());
});
return t;
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
2,
100,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
factory, // 创建线程池的时候使用自定义的线程工厂
new ThreadPoolExecutor.CallerRunsPolicy()
);