构造器七大参数
第 1 个参数:corePoolSize
表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。
第 2 个参数:maximumPoolSize
表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。
第 3 个参数:keepAliveTime
表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。
第 4 个参数:unit
表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。
第 5 个参数:workQueue
表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。
第 6 个参数:threadFactory
表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程。
第 7 个参数:RejectedExecutionHandler
表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。
拒绝策略
ThreadPoolExecutor 内部提供了一些拒绝策略
CallerRunsPolicy
把任务交给当前线程来执行,它直接调用了传入线程的 run 方法
源码
/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
测试
先创建一个自定义的线程类,下面其它拒绝策略测试时都会用到
public class MyThread implements Runnable {private String name;public MyThread(String name) {this.name = name;}@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程:" + Thread.currentThread().getName() + " 任务:" + name);}}
测试代码:
public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 6; i++) {System.out.println("创建任务:" + i);executor.execute(new MyThread(i + ""));}}
执行结果:
创建任务:0创建任务:1创建任务:2创建任务:3创建任务:4线程:pool-1-thread-1 任务:0线程:main 任务:4线程:pool-1-thread-2 任务:3创建任务:5线程:pool-1-thread-2 任务:2线程:pool-1-thread-1 任务:1线程:pool-1-thread-2 任务:5
执行结果解析:
- 任务 0 直接被线程池中的线程执行
 - 任务 1 和 2 进入队列
 - 此时队列满了,线程池会创建一个新的线程来执行任务 3
 - 任务 4 进入拒绝策略,所以执行任务 4 的线程是当前线程 main
 - 因为任务 4 是 main 线程执行的,所以等任务 4 执行完成之后才创建任务 5,任务 5 会进入队列
 等待任务 1 和 2 完成后,任务 5 也执行完成,整个流程结束
AbortPolicy
源码
/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
测试
测试代码:
public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < 6; i++) {System.out.println("创建任务:" + i);executor.execute(new MyThread(i + ""));}}
执行结果:
创建任务:0创建任务:1创建任务:2创建任务:3创建任务:4Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.demo.MyThread@1f32e575 rejected from java.util.concurrent.ThreadPoolExecutor@279f2327[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at com.demo.Demo.main(Demo.java:23)线程:pool-1-thread-1 任务:0线程:pool-1-thread-2 任务:3线程:pool-1-thread-2 任务:1线程:pool-1-thread-1 任务:2
执行结果解析:
任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
 - 此时队列满了,线程池会创建一个新的线程来执行任务 3
 任务 4 进入拒绝策略,该拒绝策略抛出异常,由于 for 里面没有捕获异常,所以中止了
DiscardPolicy
什么都不做,忽略此任务,不推荐使用
源码:/*** A handler for rejected tasks that silently discards the* rejected task.*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
测试
测试代码:
public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardPolicy());for (int i = 0; i < 6; i++) {System.out.println("创建任务:" + i);executor.execute(new MyThread(i + ""));}}
执行结果:
创建任务:0创建任务:1创建任务:2创建任务:3创建任务:4创建任务:5线程:pool-1-thread-1 任务:0线程:pool-1-thread-2 任务:3线程:pool-1-thread-2 任务:2线程:pool-1-thread-1 任务:1
执行结果解析:
任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
 - 此时队列满了,线程池会创建一个新的线程来执行任务 3
 任务 4、5 进入拒绝策略,该拒绝策略什么都不做,直接忽略了
DiscardOldestPolicy
源码
/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {// 把最早的任务抛弃e.getQueue().poll();// 执行当前任务e.execute(r);}}}
测试
测试代码:
public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy());for (int i = 0; i < 6; i++) {System.out.println("创建任务:" + i);executor.execute(new MyThread(i + ""));}}
执行结果:
创建任务:0创建任务:1创建任务:2创建任务:3创建任务:4创建任务:5线程:pool-1-thread-1 任务:0线程:pool-1-thread-2 任务:3线程:pool-1-thread-1 任务:4线程:pool-1-thread-2 任务:5
执行结果解析:
任务 0 直接被线程池中的线程执行
- 任务 1 和 2 进入队列
 - 此时队列满了,线程池会创建一个新的线程来执行任务 3
 任务 4、5 进入拒绝策略,该拒绝把最早加入队列的任务 1、2 丢弃了
自定义拒绝策略
模仿内置的拒绝策略,实现 RejectedExecutionHandler 接口的 rejectedExecution 即可。
public class MyPolicy implements RejectedExecutionHandler {public MyPolicy() { }@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RejectedExecutionException("自定义拒绝策略 " + r.toString());}}
ThreadPoolExecutor扩展
继承 ThreadPoolExecutor 类,重写 beforeExecute 与 afterExecute 方法,实现计算线程的执行时间:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {// 保存线程执行开始时间private final ThreadLocal<Long> localTime = new ThreadLocal<>();public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}/*** 开始执行之前* @param t 线程* @param r 任务*/@Overrideprotected void beforeExecute(Thread t, Runnable r) {// 开始时间 (单位:纳秒)Long sTime = System.nanoTime();localTime.set(sTime);System.out.printf("%s | before | time=%s%n", t.getName(), sTime);super.beforeExecute(t, r);}/*** 执行完成之后* @param r 任务* @param t 抛出的异常*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {// 结束时间 (单位:纳秒)Long eTime = System.nanoTime();// 执行总时间Long totalTime = eTime - localTime.get();System.out.printf("%s | after | time=%s | 耗时:%s 毫秒%n",Thread.currentThread().getName(), eTime, (totalTime / 1000000.0));super.afterExecute(r, t);}}
使用自定义扩展的线程池: ```java ThreadPoolExecutor executor = new MyThreadPoolExecutor(4, 8, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new MyPolicy());
for (int i = 0; i < 5; i++) { executor.execute(() -> { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } }); }
输出:
pool-1-thread-2 | before | time=134686402749177 pool-1-thread-4 | before | time=134686403241020 pool-1-thread-1 | before | time=134686402714875 pool-1-thread-3 | before | time=134686402850272 pool-1-thread-2 | after | time=134686616057068 | 耗时:213.307891 毫秒 pool-1-thread-2 | before | time=134686619274973 pool-1-thread-4 | after | time=134686668299389 | 耗时:265.058369 毫秒 pool-1-thread-2 | after | time=134686688080508 | 耗时:68.805535 毫秒 pool-1-thread-1 | after | time=134686859037104 | 耗时:456.322229 毫秒 pool-1-thread-3 | after | time=134687249156210 | 耗时:846.305938 毫秒
<a name="TmB2V"></a>## 异常捕获使用线程池的时候用execute提交任务后,主线程无法捕获子线程的异常,这个时候需要一些手段来实现。<a name="bShmb"></a>### 自定义的线程工厂使用自定义的线程工厂,声明异常处理Handler,实现异常捕获:```java// 自定义线程工厂ThreadFactory factory = (Runnable r) -> {Thread t = new Thread(r);// 增加异常处理t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {System.out.println("exceptionHandler捕捉到异常:" + e.getMessage());});return t;};ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,100,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2),factory, // 创建线程池的时候使用自定义的线程工厂new ThreadPoolExecutor.CallerRunsPolicy());
