1. 线程池介绍

1.1 为什么使用线程池

  1. 如果不适用线程池来使用线程会有什么坏处?
  • 反复创建线程的开销大
  • 过多线程会占用太多内存
  1. 使用线程池的好处
  • 加快响应速度,不用反复创建和销毁线程
  • 可以合理利用CPU和内存,可以通过线程池来掌控
  • 统一资源管理,当任务多了,我们就可以利用线程池来统一操作。

1.2 适用线程池的场合

  • 服务器接收请求量大,我们可以通过线程池来复用线程,减少线程的创建和销毁开销。
  • 实际开发中我们推荐线程都用线程池来管理。

2. 线程池参数详解

2.1 线程池的参数列表

参数名称 类型 解释
corePoolSize int 核心线程数
maxPoolSize int 最大线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务储存队列
threadFactory ThreadFactory 线程池创建新线程的工厂类
Handler RejectedExecutionHandler 任务执行的拒绝策略
  • corePoolSize:线程池在完成初始化后,默认线程池没有任何线程,当任务到来,就会创建新的线程去执行任务,直到corePoolSize满。
  • maxPoolSize:线程池在一定情况下回再核心线程的基础上,额外增加一些线程,线程数的上线就是maxPoolSize

2.2 corePoolSize与maxPoolSize控制添加线程的过程:

  1. 线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会去创建一个新线程来运行任务
  2. 如果线程数量等于或者大于了corePoolSize,但是少于maxPoolSize,就放入任务队列(BlockingQueue)中。
  3. 如果队列满了,并且线程小于maxPoolSize,则创建新线程来运行任务
  4. 如果队列满了,并且线程数量到了maxPoolSize,就执行拒绝策略

并发编程之线程池的使用 - 图1

  • 线程池增减的特点
    • 如果corePoolSize和maxPoolSize设置为相同,则线程池大小就为固定值
    • 线程池希望保持较少的线程数量,只有队列满的情况下才会去增加线程
    • 如果maxPoolSize设置为很高,如:Integer.MAX_VALUE,由于几乎达不到这个值,则可以允许线程容纳任意数量的并发任务
    • 因为只有队列满才会创建高于corePoolSize的线程,所以如果使用无界队列比如LinkedBlockingQueue,那线程数就不会超过corePoolSize

2.2 KeepAliveTime 保持存活时间

  • 如果线程池当前线程数量多于corePoolSize,则多出来的线程当空闲时间超过了KeepAliveTime的时候,就会被终止。
  • 如果不通过allowCoreThreadTimeOut(boolean)设置allowCoreThreadTimeOut参数,corePoolSize的线程,是不会被终止的。

2.3 threadFactory 线程工厂

  • 新的线程是由ThreadFactory创建。
  • Executors默认使用Executors.defaultThreadFactory()创建,切创建出来的线程都在一个线程组,线程优先级都为Thread.NORM_PRIORITY也就是5,不是守护线程。
  • 如果自己指定ThreadFactory,则可以指定线程名、线程组、优先级、守护线程等参数

2.4 workQueue 工作队列

  • 常见的队列:
    • 直接交换:SynchronousQueue,它内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。
    • 无界队列:LinkedBlockingQueue,因为它是链表结构,所以它不会被塞满。设置成这种队列,maxPoolSize就会是失效了,但是如果任务数量猛增容易造成OOM
    • 有界队列:ArrayBlockingQueue,它是有最大值的一个队列,如果队列满了且maxPoolSize大于corePoolSize就会去创建新的线程来执行任务。

3. 线程池创建实例

3.1 使用Executors创建线程池

  1. FixedThreadPool
    1. /**
    2. * FixedThreadPool
    3. * @author yiren
    4. */
    5. public class FixedThreadPool {
    6. public static void main(String[] args) {
    7. ExecutorService executorService = Executors.newFixedThreadPool(4);
    8. for (int i = 0; i < 1000; i++) {
    9. executorService.execute(() -> {
    10. try {
    11. TimeUnit.MILLISECONDS.sleep(500);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. System.out.println(Thread.currentThread().getName());
    16. });
    17. }
    18. }
    19. }
  1. ...
  2. pool-1-thread-4
  3. pool-1-thread-2
  4. pool-1-thread-4
  5. pool-1-thread-1
  6. pool-1-thread-3
  7. pool-1-thread-2
  8. pool-1-thread-1
  9. pool-1-thread-4
  10. pool-1-thread-3
  11. ...
  • 我们可以看到控制台,始终只有4个线程来回使用
  • 我们看下源码

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }
  • 里面实际创建的就是ThreadPoolExecutor然后参数的coremax设置成了相同值,并且工作队列是无界队列。所以不会创建超过corePoolSize的线程数量。

  • 如果队列过长,会造成OOM

    1. /**
    2. * FixedThreadPool OOM
    3. * -Xmx8m -Xms8m
    4. * @author yiren
    5. */
    6. public class FixedThreadPoolOom {
    7. public static void main(String[] args) {
    8. ExecutorService executorService = Executors.newFixedThreadPool(1);
    9. Runnable runnable = () -> {
    10. try {
    11. Thread.sleep(Integer.MAX_VALUE);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. };
    16. for (int i = 0; i < Integer.MAX_VALUE; i++) {
    17. executorService.execute(runnable);
    18. }
    19. }
    20. }
  1. Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
  2. at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
  3. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
  4. at com.imyiren.concurrency.threadpool.FixedThreadPoolOom.main(FixedThreadPoolOom.java:23)
  1. SingleThreadExecutor
    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }
  • 可见方法内,coremax都为1,是单线程线程池,且它的阻塞队列也是无界队列链表。等同于newFixedThreadPool(1),这里就不演示了。
  1. CachedThreadPool
    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }
  • CachedThreadPool的core为0,maxInteger.MAX_VALUE,并且工作队列为直接交换队列,所以来多少线程就创建多少线程,并且这个线程任务使用结束过后,不会立马终止,会等待60s,做一个缓存处理,提高利用率,过时不使用就会自己销毁。由于maxInteger.MAX_VALUE线程数量特别多也容易OOM
  1. ScheduledThreadPool
  • 按照时间周期执行任务

    1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    2. return new ScheduledThreadPoolExecutor(corePoolSize);
    3. }
    4. public ScheduledThreadPoolExecutor(int corePoolSize) {
    5. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    6. new DelayedWorkQueue());
    7. }
    8. public ThreadPoolExecutor(int corePoolSize,
    9. int maximumPoolSize,
    10. long keepAliveTime,
    11. TimeUnit unit,
    12. BlockingQueue<Runnable> workQueue) {
    13. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    14. Executors.defaultThreadFactory(), defaultHandler);
    15. }
  • 由上面源码我们可以看到,newScheduledThreadPool只需要指定核心线程数,创建了一个ThreadPoolExecutor的子类ScheduledThreadPoolExecutor,并且它的工作队列是一个延迟队列。

  • 我们看一下如何使用

    1. /**
    2. * ScheduledThreadPool
    3. * @author yiren
    4. */
    5. public class ScheduledThreadPool {
    6. public static void main(String[] args) {
    7. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    8. System.out.println(LocalDateTime.now());
    9. scheduledExecutorService.schedule(() -> System.out.println(LocalDateTime.now() + " "+ Thread.currentThread().getName() + " delay 5s"), 5, TimeUnit.SECONDS);
    10. scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println(LocalDateTime.now() + " "+ Thread.currentThread().getName()), 1, 3, TimeUnit.SECONDS);
    11. }
    12. }
  1. 2020-02-16T18:35:21.598
  2. 2020-02-16T18:35:22.606 pool-1-thread-1
  3. 2020-02-16T18:35:25.609 pool-1-thread-1
  4. 2020-02-16T18:35:26.605 pool-1-thread-2 delay 5s
  5. 2020-02-16T18:35:28.614 pool-1-thread-1
  6. 2020-02-16T18:35:31.608 pool-1-thread-1
  7. 2020-02-16T18:35:34.610 pool-1-thread-1
  • 第一种用法即:schedule(Runnable, long, TimeUnit);指定任务的Runnable,延迟多久执行,延迟时间的单位。
  • 第二种用法即:scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);指定任务的Runnable,初始执行延迟的时间initialDelay,随后每隔period执行一次,并指定时间单位。
  1. workStealingPool
  1. public static ExecutorService newWorkStealingPool() {
  2. return new ForkJoinPool
  3. (Runtime.getRuntime().availableProcessors(),
  4. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  5. null, true);
  6. }
  7. public ForkJoinPool(int parallelism,
  8. ForkJoinWorkerThreadFactory factory,
  9. UncaughtExceptionHandler handler,
  10. boolean asyncMode) {
  11. this(checkParallelism(parallelism),
  12. checkFactory(factory),
  13. handler,
  14. asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
  15. "ForkJoinPool-" + nextPoolId() + "-worker-");
  16. checkPermission();
  17. }
  • JDK1.8加入的新线程池,我们可以看到他并不是使用的ThreadPoolExecutor,而是新的线程池类ForkJoinPool,它能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。
  • ForkJoinPool它是一个并行的线程池,参数中传入的是一个线程并发的数量,这里和之前就有很明显的区别,前面4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题。这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作。

通过上面我们了解一一下各个线程的特点。以及内部的参数。使用Executors往往不容易契合我们的业务需求。在阿里的Java代码约定中,明确指出了如下:(取自IDEA提示):

  • 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下:
    1)FixedThreadPool和SingleThreadPool:
      允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
    2)CachedThreadPool:
      允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

3.2 使用ThreadPoolExecutor创建线程池

  1. 如果使用ThreadPoolExecutor,我们如何设置线程数是一个问题
    • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右
    • 耗时IO型(读写数据库、文件、网络流等):最佳线程数一般大于CPU核心数的很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:
    • 线程数=CPU核心数 * ( 1 + 平均等待时间/平均工作时间 )
    • 更加精准的方式是需要做压测
  2. 使用ThreadPoolExecutor创建线程池的推荐方法(取自阿里Java插件)
    1. // Positive example 1:
    2. //org.apache.commons.lang3.concurrent.BasicThreadFactory
    3. ScheduledExecutorService executorService =
    4. new ScheduledThreadPoolExecutor(1,
    5. new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
  1. //Positive example 2:
  2. ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
  3. .setNameFormat("demo-pool-%d").build();
  4. //Common Thread Pool
  5. ExecutorService pool = new ThreadPoolExecutor(5, 200,
  6. 0L, TimeUnit.MILLISECONDS,
  7. new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  8. pool.execute(()-> System.out.println(Thread.currentThread().getName()));
  9. pool.shutdown();//gracefully shutdown
  1. Positive example 3:
  2. <bean id="userThreadPool"
  3. class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  4. <property name="corePoolSize" value="10" />
  5. <property name="maxPoolSize" value="100" />
  6. <property name="queueCapacity" value="2000" />
  7. <property name="threadFactory" value= threadFactory />
  8. <property name="rejectedExecutionHandler">
  9. <ref local="rejectedExecutionHandler" />
  10. </property>
  11. </bean>
  12. //in code
  13. userThreadPool.execute(thread);

4. 停止线程池的方法

4.1 shutdown()

  • 这个方法执行了过后,会通知线程池停止,但是不会立即停止,线程池会执行完当前正在执行的任务以及队列里面的任务过后才会停止。
  • 在这个期间,不会接受新的任务,如果提交新任务就会报错

4.2 isShutdown()

  • 如果我们不知道线程是否进入了shutdown(),我们可以通过调用isShutdown()来判断,注意这个isShutdown()是判断是否调用了shutdown()方法,而不是指完全停止了。

4.3 isTerminated()

  • 那我们要判断这个线程池是否完全停止了呢?isTerminated()

4.4 awaitTermination(timeout, TimeUnit);

  • 这个方法用来阻塞等待一个时间后,查看是否完全停止

4.5 案例:

  1. /**
  2. * @author yiren
  3. */
  4. public class ThreadPoolShutdown {
  5. public static void main(String[] args) throws InterruptedException {
  6. ExecutorService executorService = Executors.newFixedThreadPool(5);
  7. Runnable runnable = () -> {
  8. try {
  9. Thread.sleep(500);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. System.out.println(Thread.currentThread().getName());
  14. };
  15. for (int i = 0; i < 50; i++) {
  16. executorService.execute(runnable);
  17. }
  18. Thread.sleep(1500);
  19. executorService.shutdown();
  20. System.out.println("======> shutdown!");
  21. try {
  22. executorService.execute(() -> {
  23. System.out.println("new Task-1");
  24. });
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. boolean isTerminated = executorService.awaitTermination(3, TimeUnit.SECONDS);
  29. System.out.println("executorService.awaitTermination(3, TimeUnit.SECONDS) = " + isTerminated);
  30. if (executorService.isShutdown()) {
  31. System.out.println("线程已经进入了关闭阶段,无法提交");
  32. } else {
  33. executorService.execute(()->{
  34. System.out.println("new Task-2");
  35. });
  36. }
  37. Thread.sleep(10000);
  38. System.out.println("executorService.isTerminated() = " + executorService.isTerminated());
  39. }
  40. }
  1. pool-1-thread-2
  2. ......
  3. pool-1-thread-3
  4. ======> shutdown
  5. 线程已经进入了关闭阶段,无法提交
  6. pool-1-thread-1
  7. ......
  8. pool-1-thread-2
  9. java.util.concurrent.RejectedExecutionException: Task com.imyiren.concurrency.threadpool.ThreadPoolShutdown?Lambda$2/1072408673@5b480cf9 rejected from java.util.concurrent.ThreadPoolExecutor@6f496d9f[Shutting down, pool size = 5, active threads = 5, queued tasks = 35, completed tasks = 10]
  10. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  11. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  12. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  13. at com.imyiren.concurrency.threadpool.ThreadPoolShutdown.main(ThreadPoolShutdown.java:28)
  14. pool-1-thread-1
  15. ......
  16. pool-1-thread-2
  17. pool-1-thread-3
  18. executorService.isTerminated() = true

4.6 shutdownNow()

  • 通过中断信号来停止所有的线程,并发工作队列里面的线程任务以Runnable列表形式返回
  • 案例

    1. /**
    2. * @author yiren
    3. */
    4. public class ThreadPoolShutdownNow {
    5. public static void main(String[] args) throws InterruptedException {
    6. ExecutorService executorService = Executors.newFixedThreadPool(5);
    7. Runnable runnable = () -> {
    8. try {
    9. Thread.sleep(500);
    10. System.out.println(Thread.currentThread().getName());
    11. } catch (InterruptedException e) {
    12. System.out.println(Thread.currentThread().getName() + " Interrupted");
    13. }
    14. };
    15. for (int i = 0; i < 50; i++) {
    16. executorService.execute(runnable);
    17. }
    18. Thread.sleep(1500);
    19. List<Runnable> runnableList = executorService.shutdownNow();
    20. System.out.println("shutdownNow!");
    21. System.out.println("runnableList.size() = " + runnableList.size());
    22. }
    23. }
  1. pool-1-thread-2
  2. pool-1-thread-5
  3. pool-1-thread-3
  4. pool-1-thread-4
  5. pool-1-thread-1
  6. pool-1-thread-2
  7. pool-1-thread-1
  8. pool-1-thread-4
  9. pool-1-thread-5
  10. pool-1-thread-3
  11. pool-1-thread-1 Interrupted
  12. pool-1-thread-4 Interrupted
  13. pool-1-thread-2 Interrupted
  14. shutdownNow!
  15. pool-1-thread-3 Interrupted
  16. runnableList.size() = 35
  17. pool-1-thread-5 Interrupted
  18. Process finished with exit code 0

5. 如何拒绝线程任务

5.1 拒绝时机

  1. 当Executor关闭时,提交新任务会被拒绝
  2. 当Executor的最大线程和工作队列,他们使用有限大小的并且已经达到最大值时

5.2 拒绝策略

  1. AbortPolicy 抛出异常
  2. DiscardPolicy 默默丢弃,你无法得到通知
  3. DiscardOldestPolicy 默默丢弃最老的
  4. CallerRunsPolicy 让提交任务的线程执行

6. 使用周期函数定制线程

6.1 可暂停线程池

  1. /**
  2. * 演示任务执行前后的周期执行任务
  3. *
  4. * @author yiren
  5. */
  6. public class CanPauseThreadPool extends ThreadPoolExecutor {
  7. private boolean isPaused;
  8. private final Lock lock = new ReentrantLock();
  9. private Condition unPaused = lock.newCondition();
  10. public CanPauseThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
  11. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  12. }
  13. private void pause() {
  14. lock.lock();
  15. try {
  16. isPaused = true;
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
  21. @Override
  22. protected void beforeExecute(Thread t, Runnable r) {
  23. super.beforeExecute(t, r);
  24. lock.lock();
  25. try {
  26. while (isPaused) {
  27. unPaused.await();
  28. }
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }finally {
  32. lock.unlock();
  33. }
  34. }
  35. public void resume() {
  36. lock.lock();
  37. try {
  38. isPaused = false;
  39. unPaused.signalAll();
  40. }finally {
  41. lock.unlock();
  42. }
  43. }
  44. public static void main(String[] args) throws InterruptedException {
  45. CanPauseThreadPool canPauseThreadPool = new CanPauseThreadPool(4, 10, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024));
  46. for (int i = 0; i < 100; i++) {
  47. canPauseThreadPool.execute(()->{
  48. try {
  49. Thread.sleep(100);
  50. System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now());
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. });
  55. }
  56. TimeUnit.SECONDS.sleep(1);
  57. // 开始暂停
  58. canPauseThreadPool.pause();
  59. System.out.println("====> 开始暂停 " + LocalDateTime.now());
  60. TimeUnit.SECONDS.sleep(2);
  61. System.out.println("====> 恢复线程池 " + LocalDateTime.now());
  62. canPauseThreadPool.resume();
  63. }
  64. }
  1. pool-1-thread-1 2020-02-16T19:58:56.685
  2. pool-1-thread-2 2020-02-16T19:58:56.685
  3. pool-1-thread-3 2020-02-16T19:58:56.787
  4. pool-1-thread-1 2020-02-16T19:58:56.790
  5. pool-1-thread-2 2020-02-16T19:58:56.790
  6. pool-1-thread-4 2020-02-16T19:58:56.790
  7. pool-1-thread-3 2020-02-16T19:58:56.892
  8. pool-1-thread-1 2020-02-16T19:58:56.894
  9. pool-1-thread-4 2020-02-16T19:58:56.894
  10. pool-1-thread-2 2020-02-16T19:58:56.894
  11. ====> 开始暂停 2020-02-16T19:58:56.950
  12. pool-1-thread-3 2020-02-16T19:58:56.996
  13. pool-1-thread-1 2020-02-16T19:58:56.997
  14. pool-1-thread-2 2020-02-16T19:58:56.997
  15. pool-1-thread-4 2020-02-16T19:58:56.997
  16. ====> 恢复线程池 2020-02-16T19:58:58.955
  17. pool-1-thread-1 2020-02-16T19:58:59.056
  18. pool-1-thread-3 2020-02-16T19:58:59.056
  19. pool-1-thread-4 2020-02-16T19:58:59.056
  20. pool-1-thread-2 2020-02-16T19:58:59.056
  21. pool-1-thread-1 2020-02-16T19:58:59.157
  22. pool-1-thread-2 2020-02-16T19:58:59.157
  23. pool-1-thread-4 2020-02-16T19:58:59.157
  • 由以上案例我们可以知道,我们可以通过重写线程池的周期函数,来在线程池执行任务前停止任务的执行。

6.2 afterExecute

  • 除以上的beforeExecute ,线程池还提供了afterExecute

7. 线程池简单分析

7.1 关系图

并发编程之线程池的使用 - 图2

我们可以通过IDEA的diagrams工具选中这几个接口和类显示如上关系图。

  • Executor只有一个执行任务的方法

    1. public interface Executor {
    2. void execute(Runnable command);
    3. }
  • ExecutorService 继承自Executor,包含了一些管理方法

    1. public interface ExecutorService extends Executor {
    2. void shutdown();
    3. List<Runnable> shutdownNow();
    4. boolean isTerminated();
    5. boolean awaitTermination(long timeout, TimeUnit unit)
    6. throws InterruptedException;
    7. <T> Future<T> submit(Callable<T> task);
    8. <T> Future<T> submit(Runnable task, T result);
    9. Future<?> submit(Runnable task);
    10. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    11. throws InterruptedException;
    12. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    13. long timeout, TimeUnit unit)
    14. throws InterruptedException;
    15. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    16. throws InterruptedException, ExecutionException;
    17. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    18. long timeout, TimeUnit unit)
    19. throws InterruptedException, ExecutionException, TimeoutException;
    20. }
  • AbstractExecutorServiceThreadPoolExecutor就是具体实现了

  • Executors 则是一个工具类,用来辅助创建线程等。

7.2 一个线程池的组成

主要由以下几个组件组成:

  1. 线程池管理器
  2. 工作线程
  3. 任务队列
  4. 任务接口

7.3 线程池实现任务的复用

  • 首先我们看一下execute(Runnable) 方法

    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. int c = ctl.get();
    5. /*1.获取当前正在运行线程数是否小于核心线程池,
    6. 是则新创建一个线程执行任务,否则将任务放到任务队列中*/
    7. if (workerCountOf(c) < corePoolSize) {
    8. if (addWorker(command, true))
    9. return;
    10. c = ctl.get();
    11. }
    12. // 添加到队列
    13. /*2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,
    14. 所以此时将线程放到任务队列中*/
    15. //线程池是否处于运行状态,且是否任务插入任务队列成功
    16. if (isRunning(c) && workQueue.offer(command)) {
    17. int recheck = ctl.get();
    18. //线程池是否处于运行状态,如果不是则使刚刚的任务出队
    19. if (! isRunning(recheck) && remove(command))
    20. reject(command);
    21. else if (workerCountOf(recheck) == 0)
    22. addWorker(null, false);
    23. }
    24. // 执行拒绝策略
    25. else if (!addWorker(command, false))
    26. reject(command);
    27. }
  • 我们任务进去了过了,线程会封装成一个Worker,线程复用的时候,就是使用固定线程的run方法来不断去判断队列里面是否有任务,有的话就拿出来执行。主要就是Worker里面的runWorker方法:

    1. final void runWorker(Worker w) {
    2. Thread wt = Thread.currentThread();
    3. Runnable task = w.firstTask;
    4. w.firstTask = null;
    5. w.unlock(); // allow interrupts
    6. boolean completedAbruptly = true;
    7. try {
    8. while (task != null || (task = getTask()) != null) {
    9. w.lock();
    10. // If pool is stopping, ensure thread is interrupted;
    11. // if not, ensure thread is not interrupted. This
    12. // requires a recheck in second case to deal with
    13. // shutdownNow race while clearing interrupt
    14. if ((runStateAtLeast(ctl.get(), STOP) ||
    15. (Thread.interrupted() &&
    16. runStateAtLeast(ctl.get(), STOP))) &&
    17. !wt.isInterrupted())
    18. wt.interrupt();
    19. try {
    20. beforeExecute(wt, task);
    21. Throwable thrown = null;
    22. try {
    23. task.run();
    24. } catch (RuntimeException x) {
    25. thrown = x; throw x;
    26. } catch (Error x) {
    27. thrown = x; throw x;
    28. } catch (Throwable x) {
    29. thrown = x; throw new Error(x);
    30. } finally {
    31. afterExecute(task, thrown);
    32. }
    33. } finally {
    34. task = null;
    35. w.completedTasks++;
    36. w.unlock();
    37. }
    38. }
    39. completedAbruptly = false;
    40. } finally {
    41. processWorkerExit(w, completedAbruptly);
    42. }
    43. }
  • 我们可以看到 取出task,然后调用run方法。里面还涵盖了上面的周期函数before和after

8. 线程池状态

  • RUNNING:可以接受新任务并处理排队任务

    private static final int RUNNING = -1 << COUNT_BITS; //111,00000000000000000000000000000

  • SHUTDOWN:不接受新任务,但是可以处理排队任务

    private static final int SHUTDOWN = 0 << COUNT_BITS; // 000,00000000000000000000000000000

  • STOP:不接受新任务,也不处理排队任务,并且中断正在执行的任务

    private static final int STOP = 1 << COUNT_BITS; // 001,00000000000000000000000000000

  • TIDYING:所有线程都已经终止,workerCount为零时,线程会转换到TIDYING状态,并调用terminate()方法

    private static final int TIDYING = 2 << COUNT_BITS; // 010,00000000000000000000000000000

  • TERMINATED:termiante()运行完成

    private static final int TERMINATED = 3 << COUNT_BITS;// 011,00000000000000000000000000000