概述

Executor框架是Java 5引入的一个并发编程框架,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,更易管理,效率更好(用线程池实现,节约开销)。

为什么要用线程池(thread pool)呢?因为构建一个新的线程涉及与操作系统的交互,会付出一定的代价,如果程序中创建了大量生命期很短的线程,则应该使用线程池。一个线程池中包含许多准备运行的空闲线程。将Runnable对象交给线程池,就会有一个线程调用run方法,当run方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。
另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。

重要API

Executor

  1. public interface Executor {
  2. /**
  3. * Executes the given command at some time in the future. The command
  4. * may execute in a new thread, in a pooled thread, or in the calling
  5. * thread, at the discretion of the {@code Executor} implementation.
  6. *
  7. * @param command the runnable task
  8. * @throws RejectedExecutionException if this task cannot be
  9. * accepted for execution
  10. * @throws NullPointerException if command is null
  11. */
  12. void execute(Runnable command);
  13. }

Executor接口中只定义了一个方法execute(),该方法接收一个Runnable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。在调用了execute()方法后,会将接收到的任务在一个线程上执行,这个线程可能是新创建的,可能是线程池中空闲的线程,可能是调用线程。

ExecutorService

**
ExecutorService是Java中对线程池定义的一个接口,即它就是一个线程池,它继承了Executor。在Executor框架中Executors提供了很多创建ExecutorService线程池的工厂方法。

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. List<Runnable> shutdownNow();
  4. boolean isShutdown();
  5. boolean isTerminated();
  6. boolean awaitTermination(long timeout, TimeUnit unit)
  7. throws InterruptedException;
  8. <T> Future<T> submit(Callable<T> task);
  9. <T> Future<T> submit(Runnable task, T result);
  10. Future<?> submit(Runnable task);
  11. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  12. throws InterruptedException;
  13. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  14. long timeout, TimeUnit unit)
  15. throws InterruptedException;
  16. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  17. throws InterruptedException, ExecutionException;
  18. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  19. long timeout, TimeUnit unit)
  20. throws InterruptedException, ExecutionException, TimeoutException;
  21. }

ExecutorService有一个shutdown()方法用于关闭线程池,结束线程运行。

execute()和submit()的区别:

  • execute()可以接收实现了Runnable接口的任务,submit()不仅可以接收Runnable,还可以接收Callable接口任务
  • execute()无返回值,submit()会返回一个Future对象,可用来查询该任务的状态

ScheduledExecutorService

ScheduledExecutorService是ExecutorService类的子类,该接口具有为
预定执行**或重复执行任务而设计的方法。它是一种允许使用线程池机制的java.util.Timer的泛化。Executors工厂类里newScheduledThreadPool()和newSingleThreadScheduledExecutor()返回的就是ScheduledExecutorService。

  1. public interface ScheduledExecutorService extends ExecutorService {
  2. //预定在指定的时间之后执行任务
  3. public ScheduledFuture<?> schedule(Runnable command,
  4. long delay, TimeUnit unit);
  5. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  6. long delay, TimeUnit unit);
  7. //预定在初始的延迟结束后,周期性地运行给定地任务,周期长度是period
  8. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  9. long initialDelay,
  10. long period,
  11. TimeUnit unit);
  12. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  13. long initialDelay,
  14. long delay,
  15. TimeUnit unit);
  16. }

Executors

Executors类同样在concurrent包下,它是一个工厂类,提供了很多创建线程池的工厂方法和一些实用方法。

  1. public class Executors {
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>());
  6. }
  7. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  8. return new DelegatedScheduledExecutorService
  9. (new ScheduledThreadPoolExecutor(1));
  10. }
  11. public static ExecutorService newCachedThreadPool() {
  12. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  13. 60L, TimeUnit.SECONDS,
  14. new SynchronousQueue<Runnable>());
  15. }
  16. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  17. return new ScheduledThreadPoolExecutor(corePoolSize);
  18. }
  19. }

Executors一共可以创建下面这四类线程池:

  • newCachedThreadPool
  • newFixedThreadPool
  • newScheduledThreadPool
  • newSingleThreadExecutor

newCachedThreadPool

  • 创建一个可缓存线程池,它会先查看线程池里有没有已创建的空闲线程,如果有,则reuse(),若无,则创建新线程。
  • 该线程池通常用于执行一些生存期很短的异步型任务,对于生存期短的异步任务,它是 Executor 的首选。
  • 能 reuse()的线程,必须是 timeout IDLE(空闲)内的池中线程,即空闲的线程。默认的空闲时间timeout是60s,超过这个时间该线程就会终止并被移出线程池

newFixedThreadPool

  • 创建一个定长线程池,可控制线程的最大并发数
  • 可以reuse()已有空闲线程,创建新线程时如果超过了最大数量,则会进入队列等待,直到有其他线程被移出池子
  • 从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
    • fixed 池线程数固定,空闲线程会一直被保留
    • cache 池线程数支持 0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60 秒 IDLE 。

newScheduledThreadPool

  • 创建一个调度型线程池,支持定时及周期性任务执行

newSingleThreadExecutor

  • 创建一个单线程化的线程池,即只有一个线程,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

一般来说,CachedTheadPool 在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的 Executor 的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用 FixedThreadPool。(该段话摘自《Thinking in Java》第四版)

CompletionService

CompletionService接口定义了一组任务管理接口:

  1. public interface CompletionService<V> {
  2. //提交任务
  3. Future<V> submit(Callable<V> task);
  4. Future<V> submit(Runnable task, V result);
  5. //获取任务结果
  6. Future<V> take() throws InterruptedException;
  7. //获取任务结果
  8. Future<V> poll();
  9. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  10. }

ExecutorCompletionService
**
ExecutorCompletionService类是CompletionService接口的实现,该类就是一个已完成任务的执行结果的服务,管理器。它内部管理一个阻塞队列,该队列里存的是已经完成的任务的Future对象,即执行结果对象。

  1. public class ExecutorCompletionService<V> implements CompletionService<V> {
  2. //提交一个任务给底层的执行器
  3. public Future<V> submit(Callable<V> task);
  4. public Future<V> submit(Runnable task, V result);
  5. //移除下一个已完成的结果,并返回该Future对象,如果没有任何已完成的结果可用则阻塞
  6. public Future<V> take() throws InterruptedException;
  7. //移除下一个已完成的结果,如果没有任何已完成结果可用则返回null
  8. public Future<V> poll();
  9. //同上,该方法将等待给定的时间
  10. public Future<V> poll(long timeout, TimeUnit unit)
  11. throws InterruptedException;
  12. }

在使用ExecutorCompletionService类之前,我们想要知道多个任务的执行结果时,我们可以使用List容器存储所有的submit的任务返回的Future对象。但这会造成一些不必要的问题。而使用ExecutorCompletionService服务类,则可以解决这些问题。

  1. /**
  2. * 第一张方法,使用List收集结果
  3. * 如果任务未完成,则会进入阻塞,那么后面已经完成的任务就无法获得结果,导致了不必要的等待
  4. * 即,已完成的任务可能得不到及时处理
  5. * @throws ExecutionException
  6. * @throws InterruptedException
  7. */
  8. public void case1() throws ExecutionException, InterruptedException {
  9. ExecutorService executorService = Executors.newCachedThreadPool();
  10. List<Future<String>> futures = new ArrayList<>();
  11. for(int i = 0; i < 5; i++){
  12. //收集结果
  13. futures.add(executorService.submit(new Callable<String>() {
  14. @Override
  15. public String call() throws Exception {
  16. Thread.sleep(5000);
  17. return Thread.currentThread().getName();
  18. }
  19. }));
  20. }
  21. //处理任务结果
  22. for(Future<String> future : futures){
  23. System.out.println(future.get());
  24. }
  25. //关闭线程池
  26. executorService.shutdown();
  27. }
  28. /**
  29. * 第二种方法,在处理结果时判断任务是否已完成
  30. * 若已完成,则处理结果,并移出List容器
  31. * 然后回到列表开头,重新遍历获取结果,直到所有任务执行完毕
  32. * 这解决了上面的已完成任务得不到及时处理的问题,但是多次循环遍历会降低性能
  33. * @throws ExecutionException
  34. * @throws InterruptedException
  35. */
  36. public void case2() throws ExecutionException, InterruptedException {
  37. ExecutorService executorService = Executors.newCachedThreadPool();
  38. List<Future<String>> futures = new ArrayList<>();
  39. for(int i = 0; i < 5; i++){
  40. //收集结果
  41. futures.add(executorService.submit(new Callable<String>() {
  42. @Override
  43. public String call() throws Exception {
  44. Thread.sleep(5000);
  45. return Thread.currentThread().getName();
  46. }
  47. }));
  48. }
  49. //处理任务结果
  50. for(int i = 0; i < futures.size() ;i++){
  51. if(futures.get(i).isDone()) {
  52. System.out.println(futures.get(i).get());
  53. futures.remove(futures.get(i));
  54. i--;
  55. }
  56. //回到列表开头,重新获得结果
  57. if(i == futures.size()-1) {
  58. i = -1;
  59. }
  60. }
  61. //关闭线程池
  62. executorService.shutdown();
  63. }
  64. /**
  65. * 第三种方法,使用ExecutorCompletionService管理异步任务
  66. *
  67. * @throws ExecutionException
  68. * @throws InterruptedException
  69. */
  70. public void case3() throws ExecutionException, InterruptedException {
  71. ExecutorService executorService = Executors.newCachedThreadPool();
  72. ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
  73. for(int i = 0; i < 5; i++){
  74. //收集结果
  75. completionService.submit(new Callable<String>() {
  76. @Override
  77. public String call() throws Exception {
  78. Thread.sleep(5000);
  79. return Thread.currentThread().getName();
  80. }
  81. });
  82. }
  83. //处理任务结果
  84. int completionTask = 0;
  85. while(completionTask < 5) {
  86. //如果完成队列中没有数据, 则阻塞; 否则返回队列中的数据
  87. Future<String> future = completionService.take();
  88. System.out.println("result: " + future.get());
  89. completionTask++;
  90. }
  91. //关闭线程池
  92. executorService.shutdown();
  93. }

使用线程池

以下是一个简单的线程池使用例子:

  1. public static void main(String[] args) throws IOException, ClassNotFoundException {
  2. //通过Executors工厂类创建一个线程池
  3. ExecutorService executorService = Executors.newCachedThreadPool();
  4. //提交任务给线程池执行
  5. executorService.submit(()-> System.out.println("Thread 1"));
  6. executorService.execute(()-> System.out.println("Thread 2"));
  7. //关闭线程池
  8. executorService.shutdown();
  9. }

使用线程池的步骤:

  • 使用Executor框架进行线程池管理时,首先为程序,使用Executors工厂类按需创建一个线程池 ExecutorService。
  • 接着调用execute()或submit()方法向线程池提交任务(实现Runnable或Callable的实例),线程池就会调用线程执行任务。
  • 当用完一个线程池时,调用shutdown()方法,启动该池的关闭序列。被关闭的线程池不再接受新的任务,当所有任务完成后,线程池中的线程死亡。或者调用shutdownNow()方法,该池会取消尚未开始的所有任务并试图中断正在运行的线程。
  1. //可用这一对象来调用调用isDone,cancel,isCancelled,但是get方法在完成时只是简单地返回null
  2. Future<?> submit(Runnable task);
  3. //返回一个Future对象,get方法返回指定地result对象
  4. Future<T> submit(Runnable task, T result);
  5. //接收Callable接口任务
  6. Future<T> submit(Callable<T> task);

execute()和submit()的区别:

自定义线程池

Executors工厂类有很多创建线程池的工厂方法,但在阿里代码规范里使用Executors时,会提示“手动创建线程池,效果会更好哦”。

我们可用通过ThreadPoolExecutor类来自定义线程池,通过自己指定线程池的属性。

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. //任务执行前保存任务的队列,仅保存由 execute 方法提交的 Runnable 任务
  3. private final BlockingQueue<Runnable> workQueue;
  4. //线程池中的空闲线程所能持续的最长时间
  5. private volatile long keepAliveTime;
  6. //线程池中所保存的核心线程数,包括空闲线程
  7. private volatile int corePoolSize;
  8. //池中允许的最大线程数
  9. private volatile int maximumPoolSize;
  10. }
  11. //持续时间的单位
  12. Timeunit unit;

当线程池通过execute()执行任务时,按照以下顺序执行:

  1. 如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务
  2. 如果线程池中的线程数量大于等于corePoolSize,但缓冲队列 workQueue 未满,则将新添加的任务放到 workQueue中,按照 FIFO 的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
  3. 如果线程池中的线程数量大于等于corePoolSize,且缓冲队列 workQueue 已满,但线程池中的线程数量小于 maximumPoolSize,则会创建新的线程来处理被添加的任务;
  4. 如果线程池中的线程数量等于了 maximumPoolSize(线程溢出),会使用RejectedExecutionHandler来处理线程溢出,它在处理线程溢出时有 4 种方式。

自定义线程池例子:

  1. //创建一个核心线程池为5,最大线程数量为10,超时时间50秒,使用有界阻塞队列
  2. ThreadPoolExecutor pool = new ThreadPoolExecutor(5,10,60,
  3. TimeUnit.SECONDS,
  4. new ArrayBlockingQueue<Runnable>(10));
  5. pool.execute(()-> System.out.println("Thread one"));
  6. pool.execute(()-> System.out.println("Thread two"));

线程池

关于线程池,ThreadpoolExecutor 是蛮重要的,所以它的七个参数有必要背一下:

  1. corePoolSoze 核心线程数
  2. maximumPoolSize 最大线程数
  3. keepAliveTime 生存时间 即线程超过了这个时间没有工作就会被回收
  4. TimeUnit 生存时间的单位
  5. workQueue 任务队列 用于存放未执行的任务
  6. threadFactory 线程工厂
  7. handler 拒绝策略 即线程池忙,而且任务队列满这种情况下就需要执行拒绝策略
    1. Abort 抛异常
    2. Discard 扔掉,不抛异常
    3. DiscardOldest 扔掉排队时间最久的
    4. CallerRuns 调用者处理服务

先来康康JDK提供的几个默认的线程池。

SingleThreadPool

  1. void single(){
  2. ExecutorService executorService = Executors.newSingleThreadExecutor();
  3. }

SingleThreadPool ,顾名思义,是单个线程的线程池,这里面的线程只有一个在执行。

  1. public class Executors {
  2. public static ExecutorService newSingleThreadExecutor() {
  3. return new FinalizableDelegatedExecutorService
  4. (new ThreadPoolExecutor(1, 1,
  5. 0L, TimeUnit.MILLISECONDS,
  6. new LinkedBlockingQueue<Runnable>()));
  7. }
  8. }

可以看到,SingleThreadPool 的核心线程数和最大线程数都是1,其用的任务队列是链表阻塞队列。因为该线程池只有一个线程,所以线程不需要回收,可以一直复用,所以过期时间设置为0.
也就是说,这个线程池可以一个个来执行你所提交的任务,且保证执行的顺序和你提交的顺序一致。而且任务队列使用的是 Linked 无边界的阻塞队列,因此任务也可以一直提交。

CachedThreadPool

  1. void cached(){
  2. ExecutorService executorService = Executors.newCachedThreadPool();
  3. }
  4. public class Executors {
  5. public static ExecutorService newCachedThreadPool() {
  6. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  7. 60L, TimeUnit.SECONDS,
  8. new SynchronousQueue<Runnable>());
  9. }
  10. }

CachedThreadPool,缓存线程池,很明显是用来做缓存的。
它的源码里定义的核心线程数是0,最大线程数却是 Integer.MAX_VALUE,非常非常大了。这个线程池里的线程只要60s不工作就会被回收。它的任务队列使用的是 SynchronousQueue,这是一种本质上不存储任何数据的队列,因为它的特点是每个插入操作必须等待另一个线程进行相应的删除操作,在这里简单说就是把你丢进去的任务一个一个慢慢运行起来,不急,相当于过安检。其次它的拒绝策略使用的是默认的拒绝策略,即抛异常。

从这里可以看到 CachedThreadPool 的特点其实就是来一个任务启动一个线程,当然前提是线程池里存在且还没达到60s回收时间的线程,否则我就使用现有的线程。这个线程池最大的缺点就是会开启特别多的线程,接近没有上限。

FixedThreadPool

  1. void fixed(){
  2. ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. }
  4. public class Executors {
  5. public static ExecutorService newFixedThreadPool(int nThreads) {
  6. return new ThreadPoolExecutor(nThreads, nThreads,
  7. 0L, TimeUnit.MILLISECONDS,
  8. new LinkedBlockingQueue<Runnable>());
  9. }
  10. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  11. return new ThreadPoolExecutor(nThreads, nThreads,
  12. 0L, TimeUnit.MILLISECONDS,
  13. new LinkedBlockingQueue<Runnable>(),
  14. threadFactory);
  15. }
  16. }

FixedThreadPool,在这里Fixed不是修复的意思,是固定的意思,即固定线程数的线程池。
该线程池需要传入一个参数,核心线程数和最大线程数都是固定维持传入的那个参数。该线程池的线程数量固定,因此和 Single 一样不需要回收,可以复用,过期时间设置为了0。然后该线程池也是使用的 LinkedBlockingQueue 无边界阻塞队列,用于存放任务。

听说阿里不建议用 LinkedBlockingQueue

使用该线程池的一个好处是可以并行的处理任务,当然别的也不是不行。一个小例子就是给一堆数据做判断,是不是质数,然后将质数放到另外一个指定的位置。那么使用线程池并行计算就可以将任务切分开来并行处理,可以提高计算效率。

并行和并发: 并行是多个CPU可以同时进行处理,并发是多个任务同时过来。并行是并发的子集。

Cached vs Fixed

这两个线程池的一个区别就是线程的数量大小,一个是几乎无限大一个是固定。那明明有更大的不用我为什么要用固定那么大的?小盘友你是否有很多问号?
其实这就涉及到一个资源问题,什么时候需要精准控制你有多少个线程数呢?多数情况下是你需要预估并发量的时候,为了防止线程池中的数量过多,需要手动控制线程个数量。为什么要防止线程池中数量过多呢?因为当线程数量过多的时候,最终它们会竞争同样的稀缺的处理器和内存资源,浪费大量的时间在上下文切换上,反之,如果线程的数目过少,处理器的一些核就没有办法得到充分利用。因此,线程池的大小究竟多大?这是个学问。而且我也不会!(理直气壮!)

Brian Goetz 和合著者们建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

Nt = Nc Uc ( 1 + W / C ) Nt:线程数量 Nc:处理器核的数量 (可以通过 Runtime.getRuntime().availableProcessors() 得到) Uc:期望的CPU利用率 (应该在0到1之间) W/C:等待时间与计算时间的比率

——《Java并发编程实战》

如果你这个任务不确定它的数量是否平稳,但是要保证任务来的时候有人做,那么可以用Cache,当然你要保证这个任务不会堆积。那Fixed的话就是这个任务比较平稳,可以大概估算一个值。

又听说阿里都不用,要自己估算 草

ScheduledPool

  1. void scheduled(){
  2. ExecutorService executorService = Executors.newScheduledThreadPool(10);
  3. }
  4. public class Executors {
  5. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  6. return new ScheduledThreadPoolExecutor(corePoolSize);
  7. }
  8. public ScheduledThreadPoolExecutor(int corePoolSize) {
  9. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  10. new DelayedWorkQueue());
  11. }
  12. public ThreadPoolExecutor(int corePoolSize,
  13. int maximumPoolSize,
  14. long keepAliveTime,
  15. TimeUnit unit,
  16. BlockingQueue<Runnable> workQueue) {
  17. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  18. Executors.defaultThreadFactory(), defaultHandler);
  19. }
  20. }

ScheduleThreadPool (这个单词真难拼),定时任务线程池,专门用来执行定时任务的。
在源码里创建这个线程池的方法包了好几层,最终还是创建的是 ThreadPoolExecutor,即他本质还是 ThreadPoolExecutor。其核心线程池由用户自己定义,而最大线程数非常大,也不作回收,任务队列使用的是 DelayedWorkQueue,该任务队列需要当数据对象的延时时间达到时才能插入到队列进行存储,也就是实现定时执行任务的核心

它有个好用的方法是 scheduleAtFixedRate() ,即间隔多长时间在一个固定的频率上执行一次这个任务,第一个参数是(Delay)第一个任务执行之前需要往后面推多长时间;第二个参数(period)间隔多长时间,第三个参数是时间单位

一个阿里面试题:假如提高一个闹钟服务,订阅这个服务的人特别多,10亿人,就意味着在每天早上七点钟的时候会有10亿并发量涌向你这的服务器,你怎么优化?

ThreadPoolExecutor

一些属性

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. //control 高三位表示线程池状态 低29位表示线程数量
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4. //COUNT_BIT 为29
  5. private static final int COUNT_BITS = Integer.SIZE - 3;
  6. //线程池郧西最大线程数 2*29-1
  7. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  8. // runState is stored in the high-order bits
  9. //线程池的五种状态
  10. //正常运行的
  11. private static final int RUNNING = -1 << COUNT_BITS;
  12. //调用了shutdown 方法,进入shutdown 状态
  13. private static final int SHUTDOWN = 0 << COUNT_BITS;
  14. //调用了 shutdownnow 方法,马上停止
  15. private static final int STOP = 1 << COUNT_BITS;
  16. //调用了shutdown 然后线程也执行完了,正在整理的这个过程
  17. private static final int TIDYING = 2 << COUNT_BITS;
  18. //整个线程全部结束
  19. private static final int TERMINATED = 3 << COUNT_BITS;
  20. // Packing and unpacking ctl
  21. private static int runStateOf(int c) { return c & ~CAPACITY; }
  22. private static int workerCountOf(int c) { return c & CAPACITY; }
  23. private static int ctlOf(int rs, int wc) { return rs | wc; }
  24. private final BlockingQueue<Runnable> workQueue;
  25. private final ReentrantLock mainLock = new ReentrantLock();
  26. private final Condition termination = mainLock.newCondition();
  27. private final HashSet<Worker> workers = new HashSet<Worker>();
  28. }

这里列举了 ThreadPoolExecutor 的一些属性,其中下面就是线程池的五种状态,需要注意的就是那个原子类 ctl。
ctl 意思是 control,Integer是32位的,它采用了高三位来记录线程池的状态,低二十九位来记录线程池的线程数量。那为什么要这么做呢,我也不知道。首先线程池和状态和线程数量都是需要记录的,且需要保证同步,如果采用两个变量,那么就需要维护两个值的线程同步,而现在他放在一个值里,只需要维护一个值,效率会高一点。

然后下面三个方法看官方的注解可以看出来,这个是用来操作计算 ctl 这个值的。

  • runStateOf 获取线程池状态,通过按位与操作,将低29位全部变为0
  • workerCountOf 获取线程池worker数量,通过按位与操作,将高3位全部变为0
  • ctlOf 根据线程池状态和线程池worker数量,生成ctl值

然后就是可以看到,ReentrantLock,ThreadPoolExecutor 底层采用了 ReentrantLock 来进行同步管理。另外使用了 BlockingQueue 阻塞队列作为并发容器,存储任务。然后主要的工作线程则是使用 HashSet 来进行存储。

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. //基本的属性校验
  9. if (corePoolSize < 0 ||
  10. maximumPoolSize <= 0 ||
  11. maximumPoolSize < corePoolSize ||
  12. keepAliveTime < 0)
  13. throw new IllegalArgumentException();
  14. //空指针校验
  15. if (workQueue == null || threadFactory == null || handler == null)
  16. throw new NullPointerException();
  17. this.acc = System.getSecurityManager() == null ?
  18. null :
  19. AccessController.getContext();
  20. this.corePoolSize = corePoolSize;
  21. this.maximumPoolSize = maximumPoolSize;
  22. this.workQueue = workQueue;
  23. this.keepAliveTime = unit.toNanos(keepAliveTime);
  24. this.threadFactory = threadFactory;
  25. this.handler = handler;
  26. }
  27. private static final RejectedExecutionHandler defaultHandler =
  28. new AbortPolicy();

从上面的构造器中也可以看到,我们之前所说的这个类的七大属性。
其中若 ThreadFactory 没有指定,则使用默认的 Executors.defaultThreadFactory() ,它是个 DefaultThreadFactory 。默认的拒绝策略是 Abort 抛异常。

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. /**
  6. * This class will never be serialized, but we provide a
  7. * serialVersionUID to suppress a javac warning.
  8. */
  9. private static final long serialVersionUID = 6138294804551838833L;
  10. /** Thread this worker is running in. Null if factory fails. */
  11. final Thread thread;
  12. /** Initial task to run. Possibly null. */
  13. Runnable firstTask;
  14. /** Per-thread task counter */
  15. volatile long completedTasks;
  16. }

还有一点就是,线程池里一个任务对应一个内部类 Worker 对象,里面存储的是要运行的任务和运行的线程环境。以及定义了一些加锁的方法等等,就不具体展开了。

有意思的是,这个 Worker 类实现了 Runnable 接口,因此他本身就可以放在线程里运行。其次他继承了 AQS 类,且实现了一些锁的方法,因此它本身也是一个锁。所以后面用到它的时候,都是直接用它本身的锁,而没有必要另外创建锁。

execute()

这个 execute 方法还算简单。

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. //worker数量比核心线程数少,直接创建核心worker执行任务
  6. if (workerCountOf(c) < corePoolSize) {
  7. if (addWorker(command, true))
  8. return;
  9. c = ctl.get();
  10. }
  11. //worker数量超过核心线程数,任务直接进入队列
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. int recheck = ctl.get();
  14. //线程池不是RUNNING状态,说明执行过shutdown,需要拒绝任务
  15. if (! isRunning(recheck) && remove(command))
  16. reject(command);
  17. //核心线程数允许为0,所以这里需要判断0值
  18. else if (workerCountOf(recheck) == 0)
  19. addWorker(null, false);
  20. }
  21. //线程池不是运行状态,或任务进入队列失败,则尝试创建worker执行任务
  22. else if (!addWorker(command, false))
  23. reject(command);
  24. }
  1. 首先通过 ctl 获取到当前工作线程数量,如果小于核心线程数,则调用 addWorker() 方法,将提交的任务直接执行。addWorker 的第二个参数是是否是核心线程,因此这里是直接添加进核心线程。
  2. 如果工作线程数不小于核心线程数,判断线程池是否正常运行。如果正常运行,尝试将任务添加到任务队列中去。
    1. 添加成功之后,这里又进行了一次recheck,如果线程池不是正常运行则删除原来的任务,执行拒绝策略。
    2. 如果二次检测正常,检测工作线程数量是否是0,如果是则添加个null
  3. 如果工作线程没有正常运行或者添加到任务队列失败了,或者是上面的核心线程数满了,然后尝试将任务作为非核心线程执行,如果不成功则执行拒绝策略

这里有个二次检测,因为执行 workQueue.offer(command) 的时候线程池的状态可能会改变了,为了保证同步于是进行了二次检测。这个实现和单例的模式的 double check 一样。
后面判断工作线程是否是0的逻辑就是如果里面没有线程了,那我线程池正常运行就添加非核心线程。

addWorker()

addWorker() 方法,就是主要的添加线程执行任务的方法,第一个参数是需要执行的任务,第二个参数是判断是否为核心线程。

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. //一个死循环
  4. for (;;) {
  5. //获取目前的线程池状态
  6. int c = ctl.get();
  7. int rs = runStateOf(c);
  8. //判断线程是否是正常运行,如果不是正常运行就判空
  9. // Check if queue empty only if necessary.
  10. if (rs >= SHUTDOWN &&
  11. ! (rs == SHUTDOWN &&
  12. firstTask == null &&
  13. ! workQueue.isEmpty()))
  14. return false;
  15. //第二个死循环
  16. for (;;) {
  17. //获取当前工作线程数
  18. int wc = workerCountOf(c);
  19. //判断是否大于最大容量或者 判断如果是核心线程,是否大于核心线程数
  20. //如果是非核心线程,是否大于最大线程数
  21. if (wc >= CAPACITY ||
  22. wc >= (core ? corePoolSize : maximumPoolSize))
  23. return false;
  24. //使用 CAS 去增加线程数,如果成功,break最外层死循环
  25. if (compareAndIncrementWorkerCount(c))
  26. break retry;
  27. //如 CAS 增加线程数不成功,在读取一次ctl
  28. c = ctl.get(); // Re-read ctl
  29. //如果线程状态不一致了,最外层循环再来一次
  30. if (runStateOf(c) != rs)
  31. continue retry;
  32. // else CAS failed due to workerCount change; retry inner loop
  33. }
  34. }
  35. boolean workerStarted = false;
  36. boolean workerAdded = false;
  37. Worker w = null;
  38. try {
  39. //新建一个worker,把任务放进去。这里的thread线程就是从 ThreadFactory 的 newThread()
  40. //方法获取的,传入的是this,即当前线程
  41. w = new Worker(firstTask);
  42. final Thread t = w.thread;
  43. //如果线程不为null
  44. if (t != null) {
  45. //获取锁,同步加锁操作
  46. final ReentrantLock mainLock = this.mainLock;
  47. mainLock.lock();
  48. try {
  49. // Recheck while holding lock.
  50. // Back out on ThreadFactory failure or if
  51. // shut down before lock acquired.
  52. //二次检测线程池状态
  53. int rs = runStateOf(ctl.get());
  54. if (rs < SHUTDOWN ||
  55. (rs == SHUTDOWN && firstTask == null)) {
  56. //判断该线程是否正在运行,如果是就抛出异常
  57. if (t.isAlive()) // precheck that t is startable
  58. throw new IllegalThreadStateException();
  59. //否则将改任务加入到工作线程数组里
  60. workers.add(w);
  61. //获取加入后的工作线程数,如果大于最大线程数,则,置为当前的线程数量
  62. int s = workers.size();
  63. if (s > largestPoolSize)
  64. largestPoolSize = s;
  65. //返回添加成功
  66. workerAdded = true;
  67. }
  68. } finally {
  69. //释放锁
  70. mainLock.unlock();
  71. }
  72. //判断是否添加成功,是就执行
  73. if (workerAdded) {
  74. t.start();
  75. workerStarted = true;
  76. }
  77. }
  78. } finally {
  79. //如果发现启动失败,执行failed
  80. if (! workerStarted)
  81. addWorkerFailed(w);
  82. }
  83. return workerStarted;
  84. }

这个 addWorker() 方法的逻辑大概是这样的:

  1. 上来先一个死循环,判断当前线程池的状态,如果是 SHUTDWON或者线程池为空,传入的任务为空的情况下,直接返回false
  2. 接着第二个死循环,判断当前工作线程是否大于最大容量,或者是否大于核心线程数或最大线程数,如果是,也直接返回false
  3. 如果上面判断通过了,采用 CAS 进行给工作线程数加1。如果成功,退出两层死循环,继续往下做;否则,如果线程池的状态变了,从最外层循环再做一次;没变就继续第二层循环。
  4. 接下来就是真正的添加工作线程的操作,用任务和当前线程创建一个 Worker,然后加锁同步操作。
  5. 二次检测,判断当前线程池状态。然后判断当前线程是否正在运行,即是否调用了start,如果是则不再添加任务,抛出异常,否则,把当前worker放入工作线程容器。
  6. 释放锁
  7. 判断是否添加成功,如果添加成功就执行该任务,方法结束,否则,调用 addWorkerFailed()

runWorker()

  1. final void runWorker(Worker w) {
  2. //获取当前线程,获取任务,然后把worker里的任务置空
  3. Thread wt = Thread.currentThread();
  4. Runnable task = w.firstTask;
  5. w.firstTask = null;
  6. w.unlock(); // allow interrupts
  7. boolean completedAbruptly = true;
  8. try {
  9. //循环,加锁
  10. while (task != null || (task = getTask()) != null) {
  11. w.lock();
  12. // If pool is stopping, ensure thread is interrupted;
  13. // if not, ensure thread is not interrupted. This
  14. // requires a recheck in second case to deal with
  15. // shutdownNow race while clearing interrupt
  16. //如果当前线程STOP了且没有被打断,就执行打断
  17. //如果当前线程没有STOP,则尝试中断之后二次判断是否中断了且没有被打断,如没有则打断
  18. if ((runStateAtLeast(ctl.get(), STOP) ||
  19. (Thread.interrupted() &&
  20. runStateAtLeast(ctl.get(), STOP))) &&
  21. !wt.isInterrupted())
  22. wt.interrupt();
  23. try {
  24. //这个方法在该类里是空实现
  25. beforeExecute(wt, task);
  26. Throwable thrown = null;
  27. try {
  28. //执行该任务
  29. task.run();
  30. } catch (RuntimeException x) {
  31. thrown = x; throw x;
  32. } catch (Error x) {
  33. thrown = x; throw x;
  34. } catch (Throwable x) {
  35. thrown = x; throw new Error(x);
  36. } finally {
  37. //执行之后的方法,也是空实现
  38. afterExecute(task, thrown);
  39. }
  40. } finally {
  41. //任务执行完毕,置空,完成任务数++,释放锁
  42. task = null;
  43. w.completedTasks++;
  44. w.unlock();
  45. }
  46. }
  47. completedAbruptly = false;
  48. } finally {
  49. processWorkerExit(w, completedAbruptly);
  50. }
  51. }

该方法就是真正执行Worker的方法,在调用了Worker的 Run() 方法后就会执行这个方法,其步骤也很简单。
主要有趣的几点就是,一个是加锁机制。前面提到过 Worker 内部定义了 ReentrantLock 和一些锁方法来提供加锁。为什么要用他内部的锁不自己定义呢,因为他是个 Worker,顾名思义是个工人,本身就会有很多线程去访问这个对线把自己的任务丢给他让他执行,所有他干脆就以自己当作一个锁,使用自己的lock() 就好了,没必要另外定义。

为什么他本身也是一个锁呢?因为他继承了AQS

另外Worker实现了Runnable接口,所以他可以在线程里运行,本身也是一把锁,可以做同步。

回到上面的执行方法,那个while块是一个自旋,判断当任务不为null的时候就执行,任务为null了,这个 Worker就去任务队列拿任务执行。那为什么要while呢?if不行嘛?因为阻塞队列的特性,如果队列为空时,当前线程就会被阻塞等待,和消费者生产者模型一样,因此他需要while循环来保证被唤醒之后可以再次判断,保证确实有任务可以获取。因为上面的 unlock() 方法就是保证该线程可以在外部被中断,和唤醒一样。

此外,里面又有两个空实现方法 beforeExecute() 和 afterExecute() 。想起了模板方法设计模式,嗯。