前面例子里在主线程里起异步线程的方法都是直接new Thread,把线程任务(实现了Runnable接口的类的实例)给Thread类初始化,这种方式起异步线程做任务起的是“野线程”,工程中是不可能允许这么初始化线程做任务的,实际工程中都是起一个线程池,把任务递交给线程池来完成。为什么用线程池而不是直接new Thread呢?原因如下:
- 直接new Thread初始化线程,不方便管理线程;
- 当并发任务很多时,对每个任务都new Thread初始化线程会占用内存,且新建和销毁一个线程是很消耗资源的,有时初始化和销毁一个线程的时间比线程执行任务的时间都长。
公司一般会自己封装一些初始化线程池的工厂类方法,这些方法实际都是在java提供的线程池的工厂类方法的基础上包了一层,需要注意的是,在新建线程池时,要根据任务的情况配置线程池的参数,因为初始化一个线程池也是要消耗资源的。
1、ThreadPoolExecutor类
1.1 ThreadPoolExecutor类图
ThreadPoolExecutor类位于java.uitl.concurrent.ThreadPoolExecutor,是java线程池里最核心的一个类,学习线程池可以从这个类入手,该类的类图如下: 从类图可以看到,ThreadPoolExecutor类是继承自抽象类AbstractExecutorService,AbstractExecutorService类里已经实现了一些接口的方法,比如submit等;AbstractExecutorService抽象类实现了ExecutorService接口,需要说明的是一般初始化一个线程池的时候都是用ExecutorService接口做向上转型;ExecutorService接口继承的是Executor接口,Executor接口里仅有一个方法execute,接收一个Runnable类型的参数。
1.2 ThreadPoolExecutor类说明
ThreadPoolExecutor类是初始化一个线程池的最基本的类,无论是java里线程池创建的工厂类(比如newSingleThreadExecutor类)还是每个公司自己封装的线程池创建工厂方法,底层都是用的ThreadPoolExecutor类,下面具体介绍一下ThreadPoolExecutor类里的成员和方法。
1.2.1 构造函数
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
ThreadPoolExecutor类有四种构造函数,前三个构造函数都是通过最后一个构造函数初始化的,可以看到初始化ThreadPoolExecutor类是有些公共的参数,下面介绍一下重点的几个参数。
corePoolSize
核心线程的数目。线程池在被初始化后,默认情况下是没有线程的,而是等到有任务submit给线程池后,线程池才会创建线程执行任务。当线程池中的线程数目超过corePoolSize后,线程池会把后面submit上来的任务放到阻塞队列里。
maximumPoolSize
线程池最大线程数。线程池里最多可以创建的线程数,上面提到当线程池中的线程数目超过corePoolSize后,线程池会把后面submit上来的任务放到阻塞队列里,但当阻塞队列也满了的时候,线程池会为后续submit的任务再创建线程,直到线程池数达到maximumPoolSize后,线程池就不再接受新submit上来的任务了,此时线程池会执行拒绝策略。
keepAliveTime
线程池中的线程没有执行任务时保持多长时间会终止。当线程池中的线程数目超过corePoolSize时,如果一个线程空闲的时间超过keepAliveTime,线程会终止直到线程池里的线程数目不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(false)方法,即使线程池中的线程数目没有达到corePoolSize,线程空闲的时间超过keepAliveTime也会被终止,直到线程池中的线程数目为0。
BlockingQueue
存放任务的阻塞队列。用来存放等待执行的任务,分为三种:ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
一般使用后两个,阻塞队列的选取对线程池的影响很大。
handler
拒绝任务的策略。拒绝任务的场景在maximumPoolSize中已经介绍,具体的handler策略有以下四种:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
1.2.2 线程池的状态
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
线程池有5种状态:Running、ShutDown、Stop、Tidying、Terminated,转换图如图所示(图摘自链接2):
(1)RUNNING
线程池处于RUNNING状态,可以接收新的任务,并且可以对已经接受了的任务进行处理。线程池一旦被创建,状态就是RUNNING,且线程池中的线程数目为0。
(2)SHUTDOWN
线程池处于SHUTDOWN状态,不能接收新的任务,但可以处理已经接收了的任务。执行线程池的实例方法shutdown()可以使线程池由RUNNING状态转变为SHUTDOWN状态。
(3)STOP
线程池处于STOP状态,不能接收新的任务,不能处理已经接收了的任务,并且会中断正在执行的任务。执行线程池的实例方法shutdownNow()可以使线程池由RUNNING/SHUTDOWN状态转变为STOP状态。
(4)TIDYING
当所有任务已终止,ctl记录的线程任务数量为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重写terminated()函数来实现。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
(5)TERMINATED
线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。1.2.3 ThreadPoolExecutor类的成员
这里挑部分成员变量说明其含义:
// 任务缓存队列,用来存放等待执行的任务 private final BlockingQueue<Runnable> workQueue; // 线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁 private final ReentrantLock mainLock = new ReentrantLock(); // 用来存放工作集 private final HashSet<Worker> workers = new HashSet<Worker>(); // 线程存活时间 private volatile long keepAliveTime; //是否允许为核心线程设置存活时间 private volatile boolean allowCoreThreadTimeOut; // 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int corePoolSize; // 线程池最大能容忍的线程数 private volatile int maximumPoolSize; // 线程池中当前的线程数 private volatile int poolSize; // 任务拒绝策略 private volatile RejectedExecutionHandler handler; // 线程工厂,用来创建线程 private volatile ThreadFactory threadFactory; // 用来记录线程池中曾经出现过的最大线程数 private int largestPoolSize; // 用来记录已经执行完毕的任务个数 private long completedTaskCount;
1.2.4 ThreadPoolExecutor类的方法
ThreadPoolExecutor类里比较重要的两个方法是execute方法和submit方法,这两个方法都是线程池执行具体任务的。submit方法底层还是execute方法,只不过submit方法向线程池递交的任务是Future或者FutureTask类型,这一点在下一章讲。有关这两个方法的源码可以参考链接1。
2、线程池执行任务的过程
2.1 线程池执行任务的流程
当线程池中的线程数目小于corePoolSize时,每来一个任务,线程池会创建一个线程去执行这个任务;
- 当线程池中的线程数目大于corePoolSize且小于maxPoolSize时,线程池会尝试将任务添加到阻塞队列中,如果添加成功,该任务会等待空闲线程将其从阻塞队列中取出执行;如果失败(一般是阻塞队列已满),线程池会创建一个线程去执行该任务;
- 当线程池中的线程数大于maxPoolSize时,会采取任务拒绝策略,分四种,前面已经介绍;
- 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
2.2 线程池相关概念解释
(1)阻塞队列
前面讲ThreadPoolExecutor类的构造函数里介绍了一部分,这里主要讲一下阻塞队列的概念。阻塞队列本质上还是个队列,是一种可以在多线程环境下使用,并且支持阻塞等待的队列,与一般队列的区别在于:
- 多线程环境支持,多个线程可以同时访问的安全队列;
- 支持生成和消费等待,即当队列为空时,消费者线程阻塞直到队列不为空;当队满了时,生产者线程阻塞知道队列有空间了;
阻塞队列是基于ReentrantLock实现的,BlockingQueue是一个接口,它的实现类有三个:ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
ArrayBlockingQueue
基于数组的先进先出队列,此队列创建时必须指定大小。ArrayBlockingQueue在内部维护了一个定长数组,以便缓存队列中的数据对象。并没有实现读写分离,也就意味着生产和消费不能完全并行。是一个有界队列。
LinkedBlockingQueue
基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE。LinkedBlockingQueue是基于链表的阻塞队列,在内部维护了一个数据缓冲队列(由一个链表构成),实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。是一个无界队列,
synchronousQueue
这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
(2)线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:设置核心池大小;
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小;
3、几种常见的线程池
java的Executors类提供了一组工厂方法用于创建常用的 ExecutorService和ScheduledExecutorService,返回ExecutorService接口的工厂方法有FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor等,返回ScheduledExecutorService接口的工厂方法有newSingleThreadScheduledExecutor和newScheduledThreadPool等。3.1 返回ExecutorService接口的工厂方法
比较常用的有以下三个工厂方法:newFixedThreadPool,newCachedThreadPool 和 newSingleThreadExecutor,这三个方法都是通过new一个定制化的ThreadPoolExecutor对象并返回。3.1.1 newFixedThreadPool
该方法在Executors类中的源码如下:
从源码可以看出,newFixedThreadPool方法里初始化的ThreadPoolExecutor类,corePoolSize和maxPoolSize的数目是一样的,因此KeepAlive时间设置不会生效,阻塞队列选取的是无界的LinkedBlockingQueue。public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newFixedThreadPool创建的线程池,当线程池里的线程数小于corePoolSize时,线程池会为任务创建线程执行,当大于等于corePoolSize时,线程池会把递交上来的任务放入阻塞队列中,由于是无界的LinkedBlockingQueue,因此后续的任务都会存入队列中。
由于后续任务都会存入无界的阻塞队列中,不会出现队列满了的情况,因此newFixedThreadPool创建的线程池会保证递交上来的任务都被执行,不会启动拒绝策略;缺点是队列数量没有限制,在任务无限多这种极端情况下会造成内存问题。3.1.2 newSingleThreadPool
该方法在Executors类中的源码如下:
从源码可以看出,corePoolSize和maxPoolSize都被设置成了1,这就意味着newSingleThreadExecutor创建的线程池,同一时刻仅有一个工作线程,当线程因为处理异常等原因终止的时候,ThreadPoolExecutor会自动创建一个新的线程继续进行工作。阻塞队列采用的是无界的LinkedBlockingQueue。public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
SingleThreadExecutor 适用于在逻辑上需要单线程处理任务的场景,即上传的任务要保证按顺序串行执行,同时无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝;缺点和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题。3.1.3 newCachedThreadPool
该方法在Executors类中的源码如下:
从源码可以看出,corePoolSize设置为0,maxPoolSize设置为无限大(Integer.MAX_VALUE,但实际上不可能存在这么多线程),KeepAlive设置为60s,意味着线程池中的空闲线程在停留超过60s后会被销毁,阻塞队列采用SynchronousQueue,虽然它是无界的,但它不会保存任务,也可以把它看成是容量为0的队列。public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。
4、线程池的拒绝策略
当任务过多且线程池的任务队列已满时,此时就会执行线程池的拒绝策略,线程池的拒绝策略默认有以下 4 种:
- AbortPolicy:拒绝新的任务并丢弃,并抛出异常;
- DiscardPolicy:拒绝新的任务并丢弃,不会发出通知也不会抛异常;
- DiscardOldestPolicy:丢弃队列中开头的任务,即存活时间最长的任务,腾出空间处理新来的任务;
- CallerRunsPolicy:会把这个任务交于提交线程池任务的线程去执行,即谁提交任务谁负责执行。
默认的拒绝策略为 AbortPolicy 中止策略。
如何自定义线程池拒绝策略?
- 将自定义的线程拒绝策略的类实现RejectedExecutionHandler接口,并将拒绝的逻辑在rejectedExecution方法里实现;
- 创建线程池时,通过线程池的set方法将自定义线程池拒绝策略的类给线程池的RejectedExecutionHandler成员属性。
5、线程池使用示例
创建两个线程池,一个是ThreadPoolExecutor类,另一个是Executors类的工厂方法,运行线程任务采用了execute方法和submit方法。
运行结果如下:import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) { // 用ThreadPoolExecutor类初始化线程池 ExecutorService threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); RunnableTask runnableTask1 = new RunnableTask("Jerry"); RunnableTask runnableTask2 = new RunnableTask("Cissie"); // execute方法执行Runnable类型的任务 threadPoolExecutor.execute(runnableTask1); threadPoolExecutor.execute(runnableTask2); // 关闭线程池 threadPoolExecutor.shutdown(); // 用Executors类的工场方法初始化线程池 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 初始化Callable类型的任务 MyCallableTask myCallableTask1 = new MyCallableTask(100); MyCallableTask myCallableTask2 = new MyCallableTask(200); Future<Integer> future1 = fixedThreadPool.submit(myCallableTask1); Future<Integer> future2 = fixedThreadPool.submit(myCallableTask2); // 关闭线程池 fixedThreadPool.shutdown(); // 主线程获取异步任务的执行结果 System.out.println("主线程获取异步任务的执行结果"); try{ System.out.println("Future1的结果: " + future1.get()); System.out.println("Future2的结果: " + future2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } static class RunnableTask implements Runnable { String val; public RunnableTask(String val) { this.val = val; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "执行Runnable任务,打印值:" + this.val); } } static class MyCallableTask implements Callable<Integer> { int sum; public MyCallableTask(int sum) { this.sum = sum; } @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName() + "执行Callable任务"); for (int i = 0; i < 100; ++i) { sum += i; } return sum; } } }
pool-1-thread-2执行Runnable任务,打印值:Cissie pool-1-thread-1执行Runnable任务,打印值:Jerry 主线程获取异步任务的执行结果 pool-2-thread-1执行Callable任务 pool-2-thread-2执行Callable任务 Future1的结果: 5050 Future2的结果: 5150
6、如何配置线程池的参数
见链接,没咋仔细看。参考
Java并发编程:线程池的使用
线程池的五种状态
Java并发编程:线程池的使用
线程池之ScheduledThreadPoolExecutor