Java中的线程池类有两个,分别是:ThreadPoolExecutorScheduledThreadPoolExecutor,这两个类都继承自ExecutorService。利用这两个类,可以创建各种不同的Java线程池,为了方便我们创建线程池,Java API提供了Executors工厂类来帮助我们创建各种各样的线程池。下面我们分别介绍一下这三个类。

Java线程池ExecutorService继承树:

线程池的创建(二) - 图1

ThreadPoolExecutor概述

Java.util.concurrent.ThreadPoolExecutor类是ExecutorSerivce接口的具体实现,也是java中最常用的线程池类。

ThreadPoolExecutor使用线程池中的一个线程来执行给定的任务(Runnable或者Runnable)。ThreadPoolExecutor内部维持了一个线程池,可以执行给定的任务,下面是关于它的具体使用方法。

ThreadPoolExecutor内部的线程池包含不定数量的线程。池中线程的数量由下面的这些变量决定:

  • corePoolSize
  • maximumPoolSize

当一个任务委托给线程池执行,此时如果池线程中线程数少于corePoolSize,即使池中有空闲的线程,线程池中也会创建一个新的线程。

如果任务队列是满的,corePoolSize个线程或者更多的且少于maximumPoolSize的线程正在运行,也会创建一个新的线程来执行任务。

下面图释ThreadPoolExecutor这种原理:
线程池的创建(二) - 图2

ThreadPoolExecutor构造方法

ThreadPoolExecutor有多种构造函数。例如:

线程池的创建(二) - 图3

  1. int corePoolSize = 5;
  2. int maxPoolSize = 10;
  3. long keepAliveTime = 5000;
  4. ExecutorService threadPoolExecutor =
  5. new ThreadPoolExecutor(
  6. corePoolSize,
  7. maxPoolSize,
  8. keepAliveTime,
  9. TimeUnit.MILLISECONDS,
  10. new LinkedBlockingQueue<Runnable>()
  11. );

除非你需要显示的给ThreadPoolExecutor指定这些参数,通常使用java.util.concurrent.Executor类中的工厂方法来创建实例。

ThreadPoolExecutor提供了四个构造方法:

image.png

我们以最后一个构造方法(参数最多的那个),对其参数进行解释:

 public ThreadPoolExecutor(int corePoolSize, // 1
                              int maximumPoolSize,  // 2
                              long keepAliveTime,  // 3
                              TimeUnit unit,  // 4
                              BlockingQueue<Runnable> workQueue, // 5
                              ThreadFactory threadFactory,  // 6
                              RejectedExecutionHandler handler ) { //7
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
序号 名称 类型 含义
1 corePoolSize int 核心线程池大小
2 maximumPoolSize int 最大线程池大小
3 keepAliveTime long 线程最大空闲时间
4 unit TimeUnit 时间单位
5 workQueue BlockingQueue 线程等待队列
6 threadFactory ThreadFactory 线程创建工厂
7 handler RejectedExecutionHandler 拒绝策略
- corePoolSize:线程池维护线程的最少数量
- maximumPoolSize:线程池维护线程的最大数量
- keepAliveTime: 线程池维护线程所允许的空闲时间
- unit: 线程池维护线程所允许的空闲时间的单位
- workQueue: 线程池所使用的缓冲队列
- handler: 线程池对拒绝任务的处理策略

线程数量控制

ThreadPoolExecutor线程池中的线程数量是可变的,其变化范围取决于下面两个变量:

  1. corePoolSize:线程池维护线程的最少数量
  2. maximumPoolSize:线程池维护线程的最大数量

具体线程的分配方式是,当一个任务被添加到线程池:

  1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
  2. 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
  3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
  4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
  5. 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。

这样,线程池可以动态的调整池中的线程数。除了corePoolSizemaximumPoolSize两个变量外,
ThreadPoolExecutor构造方法还有几个参数:

- keepAliveTime: 线程池维护线程所允许的空闲时间
- unit: 线程池维护线程所允许的空闲时间的单位
- workQueue: 线程池所使用的缓冲队列
- handler: 线程池对拒绝任务的处理策略

unit

unit 可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:

- NANOSECONDS
- MICROSECONDS
- MILLISECONDS
- SECONDS

workQueue

workQueue是一个BlockingQueue,默认是LinkedBlockingQueue<Runnable>

handler

handler 是线程池拒绝处理任务的方式,主要有四种类型:

  1. ThreadPoolExecutor.AbortPolicy()(系统默认):抛出java.util.concurrent.RejectedExecutionException异常
  2. ThreadPoolExecutor.CallerRunsPolicy():当抛出RejectedExecutionException异常时,会调用rejectedExecution方法
  3. ThreadPoolExecutor.DiscardOldestPolicy():抛弃旧的任务
  4. ThreadPoolExecutor.DiscardPolicy():抛弃当前的任务

如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述

拒绝策略RejectedExecutionHandler

RejectedExecutionHandler是一个接口:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

里面只有一个方法。当要创建的线程数量大于线程池的最大线程数的时候,新的任务就会被拒绝,就会调用这个接口里的这个方法。

可以自己实现这个接口,实现对这些超出数量的任务的处理。

ThreadPoolExecutor自己已经提供了四个拒绝策略,分别是CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy

这四个拒绝策略其实一看实现方法就知道很简单。

AbortPolicy

ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();12

下面是他的实现:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。

测试

先自定义一个Runnable,给每个线程起个名字,下面都用这个Runnable

static class MyThread implements Runnable {
        String name;
        public MyThread(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程:"+Thread.currentThread().getName() +" 执行:"+name +"  run");
        }
    }

然后构造一个核心线程是1,最大线程数是2的线程池。拒绝策略是AbortPolicy

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, 
        TimeUnit.MICROSECONDS, 
        new LinkedBlockingDeque<Runnable>(2), 
        new ThreadPoolExecutor.AbortPolicy());1234
for (int i = 0; i < 6; i++) {
    System.out.println("添加第"+i+"个任务");
    executor.execute(new MyThread("线程"+i));
    Iterator iterator = executor.getQueue().iterator();
    while (iterator.hasNext()){
        MyThread thread = (MyThread) iterator.next();
        System.out.println("列表:"+thread.name);
    }
}

输出是:

线程池的创建(二) - 图5

分析一下过程。

  1. 添加第一个任务时,直接执行,任务列表为空。
  2. 添加第二个任务时,因为采用的LinkedBlockingDeque,,并且核心线程正在执行任务,所以会将第二个任务放在队列中,队列中有 线程2.
  3. 添加第三个任务时,也一样会放在队列中,队列中有 线程2,线程3.
  4. 添加第四个任务时,因为核心任务还在运行,而且任务队列已经满了,所以胡直接创建新线程执行第四个任务,。这时线程池中一共就有两个线程在运行了,达到了最大线程数。任务队列中还是有线程2, 线程3.
  5. 添加第五个任务时,再也没有地方能存放和执行这个任务了,就会被线程池拒绝添加,执行拒绝策略的rejectedExecution方法,这里就是执行AbortPolicy的rejectedExecution方法直接抛出异常。
  6. 最终,只有四个线程能完成运行。后面的都被拒绝了。

CallerRunsPolicy

CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。

下面说他的实现:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

也很简单,直接run。

测试

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new ThreadPoolExecutor.AbortPolicy());1234

按上面的运行,输出

线程池的创建(二) - 图6

注意在添加第五个任务,任务5 的时候,同样被线程池拒绝了,因此执行了CallerRunsPolicy的rejectedExecution方法,这个方法直接执行任务的run方法。因此可以看到任务5是在main线程中执行的。

从中也可以看出,因为第五个任务在主线程中运行,所以主线程就被阻塞了,以至于当第五个任务执行完,添加第六个任务时,前面两个任务已经执行完了,有了空闲线程,因此线程6又可以添加到线程池中执行了。

这个策略的缺点就是可能会阻塞主线程。

DiscardPolicy

这个策略的处理就更简单了,看一下实现就明白了:

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

这个东西什么都没干。

因此采用这个拒绝策略,会让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。

测试

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new ThreadPoolExecutor.DiscardPolicy());

输出:

线程池的创建(二) - 图7

可以看到 后面添加的任务5和6根本不会执行,什么反应都没有,直接丢弃。

DiscardOldestPolicy

DiscardOldestPolicy策略的作用是,当任务呗拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

在rejectedExecution先从任务队列总弹出最先加入的任务,空出一个位置,然后再次执行execute方法把任务加入队列。

测试

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new ThreadPoolExecutor.DiscardOldestPolicy());1234

输出是:

线程池的创建(二) - 图8

可以看到,

  1. 在添加第五个任务时,会被线程池拒绝。这时任务队列中有 任务2,任务3
  2. 这时,拒绝策略会让任务队列中最先加入的任务弹出,也就是任务2.
  3. 然后把被拒绝的任务5添加人任务队列,这时任务队列中就成了 任务3,任务5.
  4. 添加第六个任务时会因为同样的过程,将队列中的任务3抛弃,把任务6加进去,任务队列中就成了 任务5,任务6
  5. 因此,最终能被执行的任务只有1,4,5,6. 任务2和任务3倍抛弃了,不会执行。

自定义拒绝策略

通过看前面的系统提供的四种拒绝策略可以看出,拒绝策略的实现都非常简单。自己写亦一样

比如现在想让被拒绝的任务在一个新的线程中执行,可以这样写:

static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        new Thread(r,"新线程"+new Random().nextInt(10)).start();
    }
}

然后正常使用:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 30,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<Runnable>(2),
        new MyRejectedExecutionHandler());1234

输出:

线程池的创建(二) - 图9

发现被拒绝的任务5和任务6都在新线程中执行了。

死循环懵逼.gif

自定义线程池

知道了各个参数的作用后,我们开始构造符合我们期待的线程池。

int  corePoolSize  =    5;
int  maxPoolSize   =   10;
long keepAliveTime = 5000;

ExecutorService threadPoolExecutor =
    new ThreadPoolExecutor(
            corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()
            );

以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:

public class ThreadTest {

    public static void main(String[] args) throws InterruptedException, IOException {
        int corePoolSize = 2;
        int maximumPoolSize = 4;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ThreadFactory threadFactory = new NameTreadFactory();
        RejectedExecutionHandler handler = new MyIgnorePolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
          corePoolSize, 
          maximumPoolSize, 
          keepAliveTime, 
          unit,
          workQueue, 
          threadFactory, 
          handler);
        executor.prestartAllCoreThreads(); // 预启动所有核心线程

        for (int i = 1; i <= 10; i++) {
            MyTask task = new MyTask(String.valueOf(i));
            executor.execute(task);
        }

        System.in.read(); //阻塞主线程
    }

    static class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        }
    }

    public static class MyIgnorePolicy implements RejectedExecutionHandler {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            doLog(r, e);
        }

        private void doLog(Runnable r, ThreadPoolExecutor e) {
            // 可做日志记录等
            System.err.println( r.toString() + " rejected");
//          System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
        }
    }

    static class MyTask implements Runnable {
        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(this.toString() + " is running!");
                Thread.sleep(3000); //让任务执行慢点
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "MyTask [name=" + name + "]";
        }
    }
}

输出结果如下:

image.png

其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。