前面例子里在主线程里起异步线程的方法都是直接new Thread,把线程任务(实现了Runnable接口的类的实例)给Thread类初始化,这种方式起异步线程做任务起的是“野线程”,工程中是不可能允许这么初始化线程做任务的,实际工程中都是起一个线程池,把任务递交给线程池来完成。为什么用线程池而不是直接new Thread呢?原因如下:

  • 直接new Thread初始化线程,不方便管理线程;
  • 当并发任务很多时,对每个任务都new Thread初始化线程会占用内存,且新建和销毁一个线程是很消耗资源的,有时初始化和销毁一个线程的时间比线程执行任务的时间都长。

公司一般会自己封装一些初始化线程池的工厂类方法,这些方法实际都是在java提供的线程池的工厂类方法的基础上包了一层,需要注意的是,在新建线程池时,要根据任务的情况配置线程池的参数,因为初始化一个线程池也是要消耗资源的。

1、ThreadPoolExecutor类

1.1 ThreadPoolExecutor类图

ThreadPoolExecutor类位于java.uitl.concurrent.ThreadPoolExecutor,是java线程池里最核心的一个类,学习线程池可以从这个类入手,该类的类图如下:
ThreadPoolExecutor - 图1 从类图可以看到,ThreadPoolExecutor类是继承自抽象类AbstractExecutorService,AbstractExecutorService类里已经实现了一些接口的方法,比如submit等;AbstractExecutorService抽象类实现了ExecutorService接口,需要说明的是一般初始化一个线程池的时候都是用ExecutorService接口做向上转型;ExecutorService接口继承的是Executor接口,Executor接口里仅有一个方法execute,接收一个Runnable类型的参数。

1.2 ThreadPoolExecutor类说明

ThreadPoolExecutor类是初始化一个线程池的最基本的类,无论是java里线程池创建的工厂类(比如newSingleThreadExecutor类)还是每个公司自己封装的线程池创建工厂方法,底层都是用的ThreadPoolExecutor类,下面具体介绍一下ThreadPoolExecutor类里的成员和方法。

1.2.1 构造函数

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. .....
  3. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  4. BlockingQueue<Runnable> workQueue);
  5. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
  7. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  8. BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
  9. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  10. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
  11. ...
  12. }

ThreadPoolExecutor类有四种构造函数,前三个构造函数都是通过最后一个构造函数初始化的,可以看到初始化ThreadPoolExecutor类是有些公共的参数,下面介绍一下重点的几个参数。
corePoolSize
核心线程的数目。线程池在被初始化后,默认情况下是没有线程的,而是等到有任务submit给线程池后,线程池才会创建线程执行任务。当线程池中的线程数目超过corePoolSize后,线程池会把后面submit上来的任务放到阻塞队列里。
maximumPoolSize
线程池最大线程数。线程池里最多可以创建的线程数,上面提到当线程池中的线程数目超过corePoolSize后,线程池会把后面submit上来的任务放到阻塞队列里,但当阻塞队列也满了的时候,线程池会为后续submit的任务再创建线程,直到线程池数达到maximumPoolSize后,线程池就不再接受新submit上来的任务了,此时线程池会执行拒绝策略。
keepAliveTime
线程池中的线程没有执行任务时保持多长时间会终止。当线程池中的线程数目超过corePoolSize时,如果一个线程空闲的时间超过keepAliveTime,线程会终止直到线程池里的线程数目不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(false)方法,即使线程池中的线程数目没有达到corePoolSize,线程空闲的时间超过keepAliveTime也会被终止,直到线程池中的线程数目为0。
BlockingQueue workQueue
存放任务的阻塞队列。用来存放等待执行的任务,分为三种:ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。

  • ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
  • LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  • synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

一般使用后两个,阻塞队列的选取对线程池的影响很大。
handler
拒绝任务的策略。拒绝任务的场景在maximumPoolSize中已经介绍,具体的handler策略有以下四种:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    1.2.2 线程池的状态

    // runState is stored in the high-order bits
      private static final int RUNNING    = -1 << COUNT_BITS;
      private static final int SHUTDOWN   =  0 << COUNT_BITS;
      private static final int STOP       =  1 << COUNT_BITS;
      private static final int TIDYING    =  2 << COUNT_BITS;
      private static final int TERMINATED =  3 << COUNT_BITS;
    

    线程池有5种状态:Running、ShutDown、Stop、Tidying、Terminated,转换图如图所示(图摘自链接2):
    ThreadPoolExecutor - 图2
    (1)RUNNING
    线程池处于RUNNING状态,可以接收新的任务,并且可以对已经接受了的任务进行处理。线程池一旦被创建,状态就是RUNNING,且线程池中的线程数目为0。
    (2)SHUTDOWN
    线程池处于SHUTDOWN状态,不能接收新的任务,但可以处理已经接收了的任务。执行线程池的实例方法shutdown()可以使线程池由RUNNING状态转变为SHUTDOWN状态。
    (3)STOP
    线程池处于STOP状态,不能接收新的任务,不能处理已经接收了的任务,并且会中断正在执行的任务。执行线程池的实例方法shutdownNow()可以使线程池由RUNNING/SHUTDOWN状态转变为STOP状态。
    (4)TIDYING
    当所有任务已终止,ctl记录的线程任务数量为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重写terminated()函数来实现。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
    (5)TERMINATED
    线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

    1.2.3 ThreadPoolExecutor类的成员

    这里挑部分成员变量说明其含义:

    // 任务缓存队列,用来存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue;          
    // 线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 用来存放工作集
    private final HashSet<Worker> workers = new HashSet<Worker>();  
    // 线程存活时间 
    private volatile long keepAliveTime;     
    //是否允许为核心线程设置存活时间 
    private volatile boolean allowCoreThreadTimeOut;  
    // 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) 
    private volatile int corePoolSize;   
    // 线程池最大能容忍的线程数  
    private volatile int maximumPoolSize;   
    // 线程池中当前的线程数
    private volatile int poolSize;       
    // 任务拒绝策略
    private volatile RejectedExecutionHandler handler; 
    // 线程工厂,用来创建线程
    private volatile ThreadFactory threadFactory;   
    // 用来记录线程池中曾经出现过的最大线程数
    private int largestPoolSize;   
    // 用来记录已经执行完毕的任务个数
    private long completedTaskCount;
    

    1.2.4 ThreadPoolExecutor类的方法

    ThreadPoolExecutor类里比较重要的两个方法是execute方法和submit方法,这两个方法都是线程池执行具体任务的。submit方法底层还是execute方法,只不过submit方法向线程池递交的任务是Future或者FutureTask类型,这一点在下一章讲。有关这两个方法的源码可以参考链接1。

    2、线程池执行任务的过程

    2.1 线程池执行任务的流程

  • 当线程池中的线程数目小于corePoolSize时,每来一个任务,线程池会创建一个线程去执行这个任务;

  • 当线程池中的线程数目大于corePoolSize且小于maxPoolSize时,线程池会尝试将任务添加到阻塞队列中,如果添加成功,该任务会等待空闲线程将其从阻塞队列中取出执行;如果失败(一般是阻塞队列已满),线程池会创建一个线程去执行该任务;
  • 当线程池中的线程数大于maxPoolSize时,会采取任务拒绝策略,分四种,前面已经介绍;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

流程图如图所示,图摘自链接3。
ThreadPoolExecutor - 图3

2.2 线程池相关概念解释

(1)阻塞队列
前面讲ThreadPoolExecutor类的构造函数里介绍了一部分,这里主要讲一下阻塞队列的概念。阻塞队列本质上还是个队列,是一种可以在多线程环境下使用,并且支持阻塞等待的队列,与一般队列的区别在于:

  • 多线程环境支持,多个线程可以同时访问的安全队列;
  • 支持生成和消费等待,即当队列为空时,消费者线程阻塞直到队列不为空;当队满了时,生产者线程阻塞知道队列有空间了;

阻塞队列是基于ReentrantLock实现的,BlockingQueue是一个接口,它的实现类有三个:ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
ArrayBlockingQueue
基于数组的先进先出队列,此队列创建时必须指定大小。ArrayBlockingQueue在内部维护了一个定长数组,以便缓存队列中的数据对象。并没有实现读写分离,也就意味着生产和消费不能完全并行。是一个有界队列。
LinkedBlockingQueue
基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE。LinkedBlockingQueue是基于链表的阻塞队列,在内部维护了一个数据缓冲队列(由一个链表构成),实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。是一个无界队列,
synchronousQueue
这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
(2)线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小;
  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小;

    3、几种常见的线程池

    java的Executors类提供了一组工厂方法用于创建常用的 ExecutorService和ScheduledExecutorService,返回ExecutorService接口的工厂方法有FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor等,返回ScheduledExecutorService接口的工厂方法有newSingleThreadScheduledExecutor和newScheduledThreadPool等。

    3.1 返回ExecutorService接口的工厂方法

    比较常用的有以下三个工厂方法:newFixedThreadPool,newCachedThreadPool 和 newSingleThreadExecutor,这三个方法都是通过new一个定制化的ThreadPoolExecutor对象并返回。

    3.1.1 newFixedThreadPool

    该方法在Executors类中的源码如下:
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory);
      }
    
    从源码可以看出,newFixedThreadPool方法里初始化的ThreadPoolExecutor类,corePoolSize和maxPoolSize的数目是一样的,因此KeepAlive时间设置不会生效,阻塞队列选取的是无界的LinkedBlockingQueue。
    newFixedThreadPool创建的线程池,当线程池里的线程数小于corePoolSize时,线程池会为任务创建线程执行,当大于等于corePoolSize时,线程池会把递交上来的任务放入阻塞队列中,由于是无界的LinkedBlockingQueue,因此后续的任务都会存入队列中。
    由于后续任务都会存入无界的阻塞队列中,不会出现队列满了的情况,因此newFixedThreadPool创建的线程池会保证递交上来的任务都被执行,不会启动拒绝策略;缺点是队列数量没有限制,在任务无限多这种极端情况下会造成内存问题。

    3.1.2 newSingleThreadPool

    该方法在Executors类中的源码如下:
    public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
    
    从源码可以看出,corePoolSize和maxPoolSize都被设置成了1,这就意味着newSingleThreadExecutor创建的线程池,同一时刻仅有一个工作线程,当线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。阻塞队列采用的是无界的LinkedBlockingQueue。
    SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,即上传的任务要保证按顺序串行执行,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题。

    3.1.3 newCachedThreadPool

    该方法在Executors类中的源码如下:
    public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
    }
    
    从源码可以看出,corePoolSize设置为0,maxPoolSize设置为无限大(Integer.MAX_VALUE,但实际上不可能存在这么多线程),KeepAlive设置为60s,意味着线程池中的空闲线程在停留超过60s后会被销毁,阻塞队列采用SynchronousQueue,虽然它是无界的,但它不会保存任务,也可以把它看成是容量为0的队列。
    CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。

三种线程池的对比关系如下,表格总结摘自链接3。
ThreadPoolExecutor - 图4

4、线程池的拒绝策略

当任务过多且线程池的任务队列已满时,此时就会执行线程池的拒绝策略,线程池的拒绝策略默认有以下 4 种:

  1. AbortPolicy:拒绝新的任务并丢弃,并抛出异常;
  2. DiscardPolicy:拒绝新的任务并丢弃,不会发出通知也不会抛异常;
  3. DiscardOldestPolicy:丢弃队列中开头的任务,即存活时间最长的任务,腾出空间处理新来的任务;
  4. CallerRunsPolicy:会把这个任务交于提交线程池任务的线程去执行,即谁提交任务谁负责执行。

默认的拒绝策略为 AbortPolicy 中止策略。

如何自定义线程池拒绝策略?

  1. 将自定义的线程拒绝策略的类实现RejectedExecutionHandler接口,并将拒绝的逻辑在rejectedExecution方法里实现;
  2. 创建线程池时,通过线程池的set方法将自定义线程池拒绝策略的类给线程池的RejectedExecutionHandler成员属性。

    5、线程池使用示例

    创建两个线程池,一个是ThreadPoolExecutor类,另一个是Executors类的工厂方法,运行线程任务采用了execute方法和submit方法。
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    public class Main
    {
     public static void main(String[] args)
     {
         // 用ThreadPoolExecutor类初始化线程池
         ExecutorService threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
             new LinkedBlockingDeque<>());
         RunnableTask runnableTask1 = new RunnableTask("Jerry");
         RunnableTask runnableTask2 = new RunnableTask("Cissie");
         // execute方法执行Runnable类型的任务
         threadPoolExecutor.execute(runnableTask1);
         threadPoolExecutor.execute(runnableTask2);
         // 关闭线程池
         threadPoolExecutor.shutdown();
         // 用Executors类的工场方法初始化线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
         // 初始化Callable类型的任务
         MyCallableTask myCallableTask1 = new MyCallableTask(100);
         MyCallableTask myCallableTask2 = new MyCallableTask(200);
         Future<Integer> future1 = fixedThreadPool.submit(myCallableTask1);
         Future<Integer> future2 = fixedThreadPool.submit(myCallableTask2);
         // 关闭线程池
         fixedThreadPool.shutdown();
         // 主线程获取异步任务的执行结果
         System.out.println("主线程获取异步任务的执行结果");
         try{
             System.out.println("Future1的结果: " + future1.get());
             System.out.println("Future2的结果: " + future2.get());
         }
         catch (InterruptedException e)
         {
             e.printStackTrace();
         }
         catch (ExecutionException e)
         {
             e.printStackTrace();
         }
     }
     static class RunnableTask implements Runnable
     {
         String val;
         public RunnableTask(String val)
         {
             this.val = val;
         }
         @Override public void run()
         {
             System.out.println(Thread.currentThread().getName() + "执行Runnable任务,打印值:" + this.val);
         }
     }
     static class MyCallableTask implements Callable<Integer>
     {
         int sum;
         public MyCallableTask(int sum)
         {
             this.sum = sum;
         }
         @Override public Integer call() throws Exception
         {
             System.out.println(Thread.currentThread().getName() + "执行Callable任务");
             for (int i = 0; i < 100; ++i)
             {
                 sum += i;
             }
             return sum;
         }
     }
    }
    
    运行结果如下:
    pool-1-thread-2执行Runnable任务,打印值:Cissie
    pool-1-thread-1执行Runnable任务,打印值:Jerry
    主线程获取异步任务的执行结果
    pool-2-thread-1执行Callable任务
    pool-2-thread-2执行Callable任务
    Future1的结果: 5050
    Future2的结果: 5150
    

    6、如何配置线程池的参数

    见链接,没咋仔细看。

    参考

    Java并发编程:线程池的使用
    线程池的五种状态
    Java并发编程:线程池的使用
    线程池之ScheduledThreadPoolExecutor