Java中的线程池类有两个,分别是:ThreadPoolExecutor和ScheduledThreadPoolExecutor,这两个类都继承自ExecutorService。利用这两个类,可以创建各种不同的Java线程池,为了方便我们创建线程池,Java API提供了Executors工厂类来帮助我们创建各种各样的线程池。下面我们分别介绍一下这三个类。
Java线程池ExecutorService继承树:

ThreadPoolExecutor概述
Java.util.concurrent.ThreadPoolExecutor类是ExecutorSerivce接口的具体实现,也是java中最常用的线程池类。
ThreadPoolExecutor使用线程池中的一个线程来执行给定的任务(Runnable或者Runnable)。ThreadPoolExecutor内部维持了一个线程池,可以执行给定的任务,下面是关于它的具体使用方法。
ThreadPoolExecutor内部的线程池包含不定数量的线程。池中线程的数量由下面的这些变量决定:
corePoolSizemaximumPoolSize
当一个任务委托给线程池执行,此时如果池线程中线程数少于corePoolSize,即使池中有空闲的线程,线程池中也会创建一个新的线程。
如果任务队列是满的,corePoolSize个线程或者更多的且少于maximumPoolSize的线程正在运行,也会创建一个新的线程来执行任务。
下面图释ThreadPoolExecutor这种原理:
ThreadPoolExecutor构造方法
ThreadPoolExecutor有多种构造函数。例如:

int corePoolSize = 5;int maxPoolSize = 10;long keepAliveTime = 5000;ExecutorService threadPoolExecutor =new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
除非你需要显示的给ThreadPoolExecutor指定这些参数,通常使用java.util.concurrent.Executor类中的工厂方法来创建实例。
ThreadPoolExecutor提供了四个构造方法:

我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
public ThreadPoolExecutor(int corePoolSize, // 1
int maximumPoolSize, // 2
long keepAliveTime, // 3
TimeUnit unit, // 4
BlockingQueue<Runnable> workQueue, // 5
ThreadFactory threadFactory, // 6
RejectedExecutionHandler handler ) { //7
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
| 序号 | 名称 | 类型 | 含义 |
|---|---|---|---|
| 1 | corePoolSize | int | 核心线程池大小 |
| 2 | maximumPoolSize | int | 最大线程池大小 |
| 3 | keepAliveTime | long | 线程最大空闲时间 |
| 4 | unit | TimeUnit | 时间单位 |
| 5 | workQueue | BlockingQueue | 线程等待队列 |
| 6 | threadFactory | ThreadFactory | 线程创建工厂 |
| 7 | handler | RejectedExecutionHandler | 拒绝策略 |
- corePoolSize:线程池维护线程的最少数量
- maximumPoolSize:线程池维护线程的最大数量
- keepAliveTime: 线程池维护线程所允许的空闲时间
- unit: 线程池维护线程所允许的空闲时间的单位
- workQueue: 线程池所使用的缓冲队列
- handler: 线程池对拒绝任务的处理策略
线程数量控制
ThreadPoolExecutor线程池中的线程数量是可变的,其变化范围取决于下面两个变量:
corePoolSize:线程池维护线程的最少数量maximumPoolSize:线程池维护线程的最大数量
具体线程的分配方式是,当一个任务被添加到线程池:
- 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
- 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
- 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
- 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。
这样,线程池可以动态的调整池中的线程数。除了corePoolSize、maximumPoolSize两个变量外,
ThreadPoolExecutor构造方法还有几个参数:
- keepAliveTime: 线程池维护线程所允许的空闲时间
- unit: 线程池维护线程所允许的空闲时间的单位
- workQueue: 线程池所使用的缓冲队列
- handler: 线程池对拒绝任务的处理策略
unit
unit 可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
- NANOSECONDS
- MICROSECONDS
- MILLISECONDS
- SECONDS
workQueue
workQueue是一个BlockingQueue,默认是LinkedBlockingQueue<Runnable>
handler
handler 是线程池拒绝处理任务的方式,主要有四种类型:
ThreadPoolExecutor.AbortPolicy()(系统默认):抛出java.util.concurrent.RejectedExecutionException异常ThreadPoolExecutor.CallerRunsPolicy():当抛出RejectedExecutionException异常时,会调用rejectedExecution方法ThreadPoolExecutor.DiscardOldestPolicy():抛弃旧的任务ThreadPoolExecutor.DiscardPolicy():抛弃当前的任务
如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述。
拒绝策略RejectedExecutionHandler
RejectedExecutionHandler是一个接口:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
里面只有一个方法。当要创建的线程数量大于线程池的最大线程数的时候,新的任务就会被拒绝,就会调用这个接口里的这个方法。
可以自己实现这个接口,实现对这些超出数量的任务的处理。
ThreadPoolExecutor自己已经提供了四个拒绝策略,分别是CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy
这四个拒绝策略其实一看实现方法就知道很简单。
AbortPolicy
ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();12
下面是他的实现:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
测试
先自定义一个Runnable,给每个线程起个名字,下面都用这个Runnable
static class MyThread implements Runnable {
String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:"+Thread.currentThread().getName() +" 执行:"+name +" run");
}
}
然后构造一个核心线程是1,最大线程数是2的线程池。拒绝策略是AbortPolicy
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<Runnable>(2),
new ThreadPoolExecutor.AbortPolicy());1234
for (int i = 0; i < 6; i++) {
System.out.println("添加第"+i+"个任务");
executor.execute(new MyThread("线程"+i));
Iterator iterator = executor.getQueue().iterator();
while (iterator.hasNext()){
MyThread thread = (MyThread) iterator.next();
System.out.println("列表:"+thread.name);
}
}
输出是:

分析一下过程。
- 添加第一个任务时,直接执行,任务列表为空。
- 添加第二个任务时,因为采用的LinkedBlockingDeque,,并且核心线程正在执行任务,所以会将第二个任务放在队列中,队列中有 线程2.
- 添加第三个任务时,也一样会放在队列中,队列中有 线程2,线程3.
- 添加第四个任务时,因为核心任务还在运行,而且任务队列已经满了,所以胡直接创建新线程执行第四个任务,。这时线程池中一共就有两个线程在运行了,达到了最大线程数。任务队列中还是有线程2, 线程3.
- 添加第五个任务时,再也没有地方能存放和执行这个任务了,就会被线程池拒绝添加,执行拒绝策略的rejectedExecution方法,这里就是执行AbortPolicy的rejectedExecution方法直接抛出异常。
- 最终,只有四个线程能完成运行。后面的都被拒绝了。
CallerRunsPolicy
CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
下面说他的实现:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
也很简单,直接run。
测试
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(2),
new ThreadPoolExecutor.AbortPolicy());1234
按上面的运行,输出

注意在添加第五个任务,任务5 的时候,同样被线程池拒绝了,因此执行了CallerRunsPolicy的rejectedExecution方法,这个方法直接执行任务的run方法。因此可以看到任务5是在main线程中执行的。
从中也可以看出,因为第五个任务在主线程中运行,所以主线程就被阻塞了,以至于当第五个任务执行完,添加第六个任务时,前面两个任务已经执行完了,有了空闲线程,因此线程6又可以添加到线程池中执行了。
这个策略的缺点就是可能会阻塞主线程。
DiscardPolicy
这个策略的处理就更简单了,看一下实现就明白了:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
这个东西什么都没干。
因此采用这个拒绝策略,会让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。
测试
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(2),
new ThreadPoolExecutor.DiscardPolicy());
输出:

可以看到 后面添加的任务5和6根本不会执行,什么反应都没有,直接丢弃。
DiscardOldestPolicy
DiscardOldestPolicy策略的作用是,当任务呗拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
在rejectedExecution先从任务队列总弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列。
测试
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(2),
new ThreadPoolExecutor.DiscardOldestPolicy());1234
输出是:

可以看到,
- 在添加第五个任务时,会被线程池拒绝。这时任务队列中有 任务2,任务3
- 这时,拒绝策略会让任务队列中最先加入的任务弹出,也就是任务2.
- 然后把被拒绝的任务5添加人任务队列,这时任务队列中就成了 任务3,任务5.
- 添加第六个任务时会因为同样的过程,将队列中的任务3抛弃,把任务6加进去,任务队列中就成了 任务5,任务6
- 因此,最终能被执行的任务只有1,4,5,6. 任务2和任务3倍抛弃了,不会执行。
自定义拒绝策略
通过看前面的系统提供的四种拒绝策略可以看出,拒绝策略的实现都非常简单。自己写亦一样
比如现在想让被拒绝的任务在一个新的线程中执行,可以这样写:
static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
new Thread(r,"新线程"+new Random().nextInt(10)).start();
}
}
然后正常使用:
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(2),
new MyRejectedExecutionHandler());1234
输出:

发现被拒绝的任务5和任务6都在新线程中执行了。

自定义线程池
知道了各个参数的作用后,我们开始构造符合我们期待的线程池。
int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
public class ThreadTest {
public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler);
executor.prestartAllCoreThreads(); // 预启动所有核心线程
for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
System.in.read(); //阻塞主线程
}
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
Thread.sleep(3000); //让任务执行慢点
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}
输出结果如下:

其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。
