构造器七大参数

第 1 个参数:corePoolSize

表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。

第 2 个参数:maximumPoolSize

表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。

第 3 个参数:keepAliveTime

表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。

第 4 个参数:unit

表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。

第 5 个参数:workQueue

表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。

第 6 个参数:threadFactory

表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程。

第 7 个参数:RejectedExecutionHandler

表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列 workQueue 中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制。

拒绝策略

ThreadPoolExecutor 内部提供了一些拒绝策略

CallerRunsPolicy

把任务交给当前线程来执行,它直接调用了传入线程的 run 方法

源码

  1. /**
  2. * A handler for rejected tasks that runs the rejected task
  3. * directly in the calling thread of the {@code execute} method,
  4. * unless the executor has been shut down, in which case the task
  5. * is discarded.
  6. */
  7. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  8. /**
  9. * Creates a {@code CallerRunsPolicy}.
  10. */
  11. public CallerRunsPolicy() { }
  12. /**
  13. * Executes task r in the caller's thread, unless the executor
  14. * has been shut down, in which case the task is discarded.
  15. *
  16. * @param r the runnable task requested to be executed
  17. * @param e the executor attempting to execute this task
  18. */
  19. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  20. if (!e.isShutdown()) {
  21. r.run();
  22. }
  23. }
  24. }

测试

先创建一个自定义的线程类,下面其它拒绝策略测试时都会用到

  1. public class MyThread implements Runnable {
  2. private String name;
  3. public MyThread(String name) {
  4. this.name = name;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. Thread.sleep(1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. System.out.println("线程:" + Thread.currentThread().getName() + " 任务:" + name);
  14. }
  15. }

测试代码:

  1. public static void main(String[] args) {
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
  3. TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
  4. for (int i = 0; i < 6; i++) {
  5. System.out.println("创建任务:" + i);
  6. executor.execute(new MyThread(i + ""));
  7. }
  8. }

执行结果:

  1. 创建任务:0
  2. 创建任务:1
  3. 创建任务:2
  4. 创建任务:3
  5. 创建任务:4
  6. 线程:pool-1-thread-1 任务:0
  7. 线程:main 任务:4
  8. 线程:pool-1-thread-2 任务:3
  9. 创建任务:5
  10. 线程:pool-1-thread-2 任务:2
  11. 线程:pool-1-thread-1 任务:1
  12. 线程:pool-1-thread-2 任务:5

执行结果解析:

  • 任务 0 直接被线程池中的线程执行
  • 任务 1 和 2 进入队列
  • 此时队列满了,线程池会创建一个新的线程来执行任务 3
  • 任务 4 进入拒绝策略,所以执行任务 4 的线程是当前线程 main
  • 因为任务 4 是 main 线程执行的,所以等任务 4 执行完成之后才创建任务 5,任务 5 会进入队列
  • 等待任务 1 和 2 完成后,任务 5 也执行完成,整个流程结束

    AbortPolicy

    终止策略,抛出异常并终止执行,它是默认的拒绝策略

    源码

    1. /**
    2. * A handler for rejected tasks that throws a
    3. * {@code RejectedExecutionException}.
    4. */
    5. public static class AbortPolicy implements RejectedExecutionHandler {
    6. /**
    7. * Creates an {@code AbortPolicy}.
    8. */
    9. public AbortPolicy() { }
    10. /**
    11. * Always throws RejectedExecutionException.
    12. *
    13. * @param r the runnable task requested to be executed
    14. * @param e the executor attempting to execute this task
    15. * @throws RejectedExecutionException always
    16. */
    17. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    18. throw new RejectedExecutionException("Task " + r.toString() +
    19. " rejected from " +
    20. e.toString());
    21. }
    22. }

    测试

    测试代码:

    1. public static void main(String[] args) {
    2. ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
    3. TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.AbortPolicy());
    4. for (int i = 0; i < 6; i++) {
    5. System.out.println("创建任务:" + i);
    6. executor.execute(new MyThread(i + ""));
    7. }
    8. }

    执行结果:

    1. 创建任务:0
    2. 创建任务:1
    3. 创建任务:2
    4. 创建任务:3
    5. 创建任务:4
    6. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.demo.MyThread@1f32e575 rejected from java.util.concurrent.ThreadPoolExecutor@279f2327[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    7. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    8. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    9. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    10. at com.demo.Demo.main(Demo.java:23)
    11. 线程:pool-1-thread-1 任务:0
    12. 线程:pool-1-thread-2 任务:3
    13. 线程:pool-1-thread-2 任务:1
    14. 线程:pool-1-thread-1 任务:2

    执行结果解析:

  • 任务 0 直接被线程池中的线程执行

  • 任务 1 和 2 进入队列
  • 此时队列满了,线程池会创建一个新的线程来执行任务 3
  • 任务 4 进入拒绝策略,该拒绝策略抛出异常,由于 for 里面没有捕获异常,所以中止了

    DiscardPolicy

    什么都不做,忽略此任务,不推荐使用
    源码:

    1. /**
    2. * A handler for rejected tasks that silently discards the
    3. * rejected task.
    4. */
    5. public static class DiscardPolicy implements RejectedExecutionHandler {
    6. /**
    7. * Creates a {@code DiscardPolicy}.
    8. */
    9. public DiscardPolicy() { }
    10. /**
    11. * Does nothing, which has the effect of discarding task r.
    12. *
    13. * @param r the runnable task requested to be executed
    14. * @param e the executor attempting to execute this task
    15. */
    16. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    17. }
    18. }

    测试

    测试代码:

    1. public static void main(String[] args) {
    2. ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
    3. TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardPolicy());
    4. for (int i = 0; i < 6; i++) {
    5. System.out.println("创建任务:" + i);
    6. executor.execute(new MyThread(i + ""));
    7. }
    8. }

    执行结果:

    1. 创建任务:0
    2. 创建任务:1
    3. 创建任务:2
    4. 创建任务:3
    5. 创建任务:4
    6. 创建任务:5
    7. 线程:pool-1-thread-1 任务:0
    8. 线程:pool-1-thread-2 任务:3
    9. 线程:pool-1-thread-2 任务:2
    10. 线程:pool-1-thread-1 任务:1

    执行结果解析:

  • 任务 0 直接被线程池中的线程执行

  • 任务 1 和 2 进入队列
  • 此时队列满了,线程池会创建一个新的线程来执行任务 3
  • 任务 4、5 进入拒绝策略,该拒绝策略什么都不做,直接忽略了

    DiscardOldestPolicy

    丢弃最早加入的任务,把当前任务加入添加进去

    源码

    1. /**
    2. * A handler for rejected tasks that discards the oldest unhandled
    3. * request and then retries {@code execute}, unless the executor
    4. * is shut down, in which case the task is discarded.
    5. */
    6. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    7. /**
    8. * Creates a {@code DiscardOldestPolicy} for the given executor.
    9. */
    10. public DiscardOldestPolicy() { }
    11. /**
    12. * Obtains and ignores the next task that the executor
    13. * would otherwise execute, if one is immediately available,
    14. * and then retries execution of task r, unless the executor
    15. * is shut down, in which case task r is instead discarded.
    16. *
    17. * @param r the runnable task requested to be executed
    18. * @param e the executor attempting to execute this task
    19. */
    20. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    21. if (!e.isShutdown()) {
    22. // 把最早的任务抛弃
    23. e.getQueue().poll();
    24. // 执行当前任务
    25. e.execute(r);
    26. }
    27. }
    28. }

    测试

    测试代码:

    1. public static void main(String[] args) {
    2. ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 100,
    3. TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new ThreadPoolExecutor.DiscardOldestPolicy());
    4. for (int i = 0; i < 6; i++) {
    5. System.out.println("创建任务:" + i);
    6. executor.execute(new MyThread(i + ""));
    7. }
    8. }

    执行结果:

    1. 创建任务:0
    2. 创建任务:1
    3. 创建任务:2
    4. 创建任务:3
    5. 创建任务:4
    6. 创建任务:5
    7. 线程:pool-1-thread-1 任务:0
    8. 线程:pool-1-thread-2 任务:3
    9. 线程:pool-1-thread-1 任务:4
    10. 线程:pool-1-thread-2 任务:5

    执行结果解析:

  • 任务 0 直接被线程池中的线程执行

  • 任务 1 和 2 进入队列
  • 此时队列满了,线程池会创建一个新的线程来执行任务 3
  • 任务 4、5 进入拒绝策略,该拒绝把最早加入队列的任务 1、2 丢弃了

    自定义拒绝策略

    模仿内置的拒绝策略,实现 RejectedExecutionHandler 接口的 rejectedExecution 即可。

    1. public class MyPolicy implements RejectedExecutionHandler {
    2. public MyPolicy() { }
    3. @Override
    4. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    5. throw new RejectedExecutionException("自定义拒绝策略 " + r.toString());
    6. }
    7. }

    ThreadPoolExecutor扩展

    继承 ThreadPoolExecutor 类,重写 beforeExecute 与 afterExecute 方法,实现计算线程的执行时间:

    1. public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    2. // 保存线程执行开始时间
    3. private final ThreadLocal<Long> localTime = new ThreadLocal<>();
    4. public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    5. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    6. }
    7. /**
    8. * 开始执行之前
    9. * @param t 线程
    10. * @param r 任务
    11. */
    12. @Override
    13. protected void beforeExecute(Thread t, Runnable r) {
    14. // 开始时间 (单位:纳秒)
    15. Long sTime = System.nanoTime();
    16. localTime.set(sTime);
    17. System.out.printf("%s | before | time=%s%n", t.getName(), sTime);
    18. super.beforeExecute(t, r);
    19. }
    20. /**
    21. * 执行完成之后
    22. * @param r 任务
    23. * @param t 抛出的异常
    24. */
    25. @Override
    26. protected void afterExecute(Runnable r, Throwable t) {
    27. // 结束时间 (单位:纳秒)
    28. Long eTime = System.nanoTime();
    29. // 执行总时间
    30. Long totalTime = eTime - localTime.get();
    31. System.out.printf("%s | after | time=%s | 耗时:%s 毫秒%n",
    32. Thread.currentThread().getName(), eTime, (totalTime / 1000000.0));
    33. super.afterExecute(r, t);
    34. }
    35. }

    使用自定义扩展的线程池: ```java ThreadPoolExecutor executor = new MyThreadPoolExecutor(4, 8, 10,

    1. TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new MyPolicy());

for (int i = 0; i < 5; i++) { executor.execute(() -> { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } }); }

  1. 输出:

pool-1-thread-2 | before | time=134686402749177 pool-1-thread-4 | before | time=134686403241020 pool-1-thread-1 | before | time=134686402714875 pool-1-thread-3 | before | time=134686402850272 pool-1-thread-2 | after | time=134686616057068 | 耗时:213.307891 毫秒 pool-1-thread-2 | before | time=134686619274973 pool-1-thread-4 | after | time=134686668299389 | 耗时:265.058369 毫秒 pool-1-thread-2 | after | time=134686688080508 | 耗时:68.805535 毫秒 pool-1-thread-1 | after | time=134686859037104 | 耗时:456.322229 毫秒 pool-1-thread-3 | after | time=134687249156210 | 耗时:846.305938 毫秒

  1. <a name="TmB2V"></a>
  2. ## 异常捕获
  3. 使用线程池的时候用execute提交任务后,主线程无法捕获子线程的异常,这个时候需要一些手段来实现。
  4. <a name="bShmb"></a>
  5. ### 自定义的线程工厂
  6. 使用自定义的线程工厂,声明异常处理Handler,实现异常捕获:
  7. ```java
  8. // 自定义线程工厂
  9. ThreadFactory factory = (Runnable r) -> {
  10. Thread t = new Thread(r);
  11. // 增加异常处理
  12. t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
  13. System.out.println("exceptionHandler捕捉到异常:" + e.getMessage());
  14. });
  15. return t;
  16. };
  17. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  18. 2,
  19. 2,
  20. 100,
  21. TimeUnit.SECONDS,
  22. new LinkedBlockingQueue<>(2),
  23. factory, // 创建线程池的时候使用自定义的线程工厂
  24. new ThreadPoolExecutor.CallerRunsPolicy()
  25. );