线程池

“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控
什么时候使用线程池?

  • 单个任务处理时间比较短
  • 需要处理的任务数量很大

    线程池优势

  • 重用存在的线程,减少线程创建,消亡的开销,提高性能。

  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    线程的实现方式

    Runnable,Thread,Callable

    1. // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
    2. public interface Runnable {
    3. // run方法就是它所有的内容,就是实际执行的任务
    4. public abstract void run();
    5. }
    6. //Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
    7. public interface Callable<V> {
    8. // 相对于run方法的带有返回值的call方法
    9. V call() throws Exception;
    10. }

    Executor框架

    Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
    image.png
    从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为
    1,execute(Runnable command):履行Ruannable类型的任务。
    2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象。
    3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务,。
    4,shutdownNow():停止所有正在履行的任务并封闭办事。
    5,isTerminated():测试是否所有任务都履行完毕了。
    6,isShutdown():测试是否该ExecutorService已被关闭。
    线程池重点属性
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE-3;
    private static final int CAPACITY = (1 << COUNT_BITS) -1;
    ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
    ctl相关方法
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  • runStateOf:获取运行状态;

  • workerCountOf:获取活动线程数;
  • ctlOf:获取运行状态和活动线程数的值。

线程池存在5种状态

RUNNING = ‐1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行 处理。
(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处 于RUNNING状态,并且线程池中的任务数为0!
2、SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中 断正在处理的任务。 (2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也 为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的 任务为空时,就会由STOP -> TIDYING。 5、 TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING - > TERMINATED。
进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 如果线程池状态是SHUTDOWN并且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功。

image.png
线程池的具体实现
1.ThreadPoolExecutor 默认线程池
2.ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

线程池的创建

public ThreadPoolExecutor(int corePoolSize,
     int maximumPoolSize,
     long keepAliveTime,
     TimeUnit unit,
     BlockingQueue<Runnable> workQueue,
     ThreadFactory threadFactory,
     RejectedExecutionHandler handler)

参数解释
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供 了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到 另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory()来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

任务提交

1、public void execute() //提交任务无返回值
2、public Future<?> submit() //任务执行完成后有返回值

线程池监控

public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

线程池原理

image.png

源码分析

execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
    * clt记录着runState和workerCount
    */
    int c = ctl.get();
    /*
    * workerCountOf方法取出低29位的值,表示当前活动的线程数;
    * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
    * 并把任务添加到该线程中。
    */
    if (workerCountOf(c) < corePoolSize) {
        /*
        * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize
        来判断还是maximumPoolSize来判断;
        * 如果为true,根据corePoolSize来判断;
        * 如果为false,则根据maximumPoolSize来判断
        */
        if (addWorker(command, true))
            return;
        /*
        * 如果添加失败,则重新获取ctl值
        */
        c = ctl.get();
    }
    /*
    * 如果当前线程池是运行状态并且任务添加到队列成功
    */
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取ctl值
        int recheck = ctl.get();
        // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
        // 这时需要移除该command
        // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
        * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
        * 这里传入的参数表示:
        * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
        * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
        * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
        */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
    * 如果执行到这里,有两种情况:
    * 1. 线程池已经不是RUNNING状态;
    * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
    * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
    * 如果失败则拒绝该任务
    */
    else if (!addWorker(command, false))
        reject(command);
}

简单来说,在执行execute()方法时如果状态一直是RUNNING时的执行过程如下:
1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添 加到该阻塞队列中;
3. 如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新 提交的任务;
4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是 为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute方法执行流程如下:
image.png

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。

Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
2. 如果正在执行任务,则不应该中断线程;
3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;
4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程 池中的线程是否是空闲状态;
5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果 在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务
runWorker方法的执行过程:
1. while循环不断地通过getTask()方法获取任务;
2. getTask()方法从阻塞队列中取任务;
3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
4. 调用task.run()执行任务;
5. 如果task为null则跳出循环,执行processWorkerExit()方法;
6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

getTask方法

用来从阻塞队列中取任务

整个线程的执行流程

image.png

问题一:当使用线程池提交新任务时,异常如何处理?

1.在任务代码中try/catch捕获异常
2.通过Future对象的get方法接收抛出的异常,再处理
3.为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常
4.重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用

问题二:为什么使用线程池,说说线程池各个参数的作用

线程池有两个主要作用:
1、不同请求之间重复利用线程,无需频繁的创建和销毁线程,降低系统开销
2、控制线程数量上限,避免创建过多的线程耗尽进程内存空间,同时减少线程上下文切换次数。
线程池核心参数:
corePoolSize: 线程池核心线程数最大值
maximumPoolSize: 线程池最大线程数大小
keepAliveTime: 线程池中非核心线程空闲的存活时间大小
unit: 线程空闲存活时间单位
workQueue: 存放任务的阻塞队列
threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
handler: 线程池的拒绝策略事件,主要有四种类型。