1. 线程池

线程池是一种多线程处理形式,是线程的集合,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后再次将线程返回线程池中成为空闲状态,等待执行下一个任务。

线程池状态

  1. private static final int RUNNING = -1 << COUNT_BITS;
  2. private static final int SHUTDOWN = 0 << COUNT_BITS;
  3. private static final int STOP = 1 << COUNT_BITS;
  4. private static final int TIDYING = 2 << COUNT_BITS;
  5. private static final int TERMINATED = 3 << COUNT_BITS;

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务
  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态
  • 调用线程池的shutdownNow()方法,可以切换到STOP状态

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务
  • 线程池中执行的任务为空,进入TIDYING状态

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0
  • terminated()执行完毕,进入TERMINATED状态

TERMINATED

  • 该状态表示线程池彻底终止

image.png
image.png

线程池优点

  • 统一分配、管理和监控线程
  • 可以复用池中的线程,避免频繁创建和销毁,降低资源的消耗,提高响应速度
  • 控制最大并发数

    ThreadPoolExecutor常见方法

    image.png
    execute是Executor中声明的方法,适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了。

submit是ExecutorService中声明的方法,适用于需要关注返回值的场景。底层还是调用execute()方法,它利用了Future来获取任务执行结果。

shutdownNow:对正在执行的任务全部发出Thread.interrupt()终止所有线程,对还未开始执行的任务全部取消,并且返回还没开始的任务列表。

shutdown:当我们调用shutdown后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务。

awaitTermination:判断线程池中是否有继续运行的线程。waitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

2. 常见线程池及使用场景

newFixedThreadPool 固定数量线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • 核心线程数和最大线程数大小一样
  • keepAliveTime为0, 意味着一旦有多余的空闲线程,就会被立即停
  • 阻塞队列是LinkedBlockingQueue(队列容量Integer.MAX_VALUE
  • 实际线程数量将永远维持着在nthreads maximumPoolSize和keepAliveTime将无效

场景:用于已知并发压力的情况下,对线程数做限制,适用于处理CPU密集型的任务。

newSingleThreadExecutor 单个线程线程池

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 核心线程数和最大线程数大小一样且都是1
  • keepAliveTime为0
  • 阻塞队列是LinkedBlockingQueue

场景:适用串行执行任务,每个任务必须按顺序执行。

newCachedThreadPool 可缓存线程池

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 核心线程数为0,且最大线程数为Integer.MAX_VALUE
  • 阻塞队列是SynchronousQueue
  • 非核心线程空闲存活时间为60秒
  • 当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能添加新线程来执行

场景:用于并发执行大量短期的小任务。

newScheduledThreadPool 定时线程池

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

场景:适用于需要多个后台线程执行周期任务的场景。

newWorkStealingPool 抢占式线程池

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
  • Java8中新增的线程池,实际上是ForkJoinPool
  • 采用fork join算法。将一个任务可以分解为多个小任务 ( 任务不能十分简单

    不建议使用 Executors静态工厂构建线程池

    不允许使用Executors静态工厂构建线程池,要通过ThreadPoolExecutor这样的方式,会更加明确线程池的运行规则,规避资源耗尽的风险。
    1)FixedThreadPool 和 SingleThreadPool
    允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
    2)CachedThreadPool 和 ScheduledThreadPool
    允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    3. 线程池参数

    image.png

    corePoolSize

    核心线程数量,会一直存在除非allowCoreThreadTimeOut设置为true。默认情况下线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法。
    CPU密集是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
    IO密集型是该任务需要大量的IO,即大量的阻塞。
    对于 CPU 密集型的计算场景,理论上线程的数量 =CPU 核数+1;
    对于 I/O 密集型计算场景,最佳线程数 =CPU 核数 [ 1 +(I/O 耗时 / CPU 耗时)],经验值=**2 CPU 的核数 + 1**

1)提交到线程池中的任务,IO耗时占比是90%,计算耗时占比10%,忽略提交到线程池中的任务数量,在4C8G的机器上,理想情况下线程池中创建多少个线程是最优的(40)。
2)有一类cpu密集性的任务,没有IO操作,日常的时候只有1个任务,流量高峰会有50个任务,4C8G的机器上,使用的线程池,如何设置corePoolSize, maxPoolSize以及BlockingQueue的大小。

  • 提交第一个任务,创建出core,1个线程
  • 提交第二个到第47个任务时,这些任务进入到队列中,此时队列已满
  • 提交第48个任务到第50个任务时,创建出max,此时一共有4个线程
  • 4个线程同时将队列里的46个任务消费完
  • 一段时间后,max - core数量的线程销毁,即销毁3个线程,还剩下一个线程

    maximumPoolSize

    线程池允许的最大线程池数量,表示在线程池中最多能创建多少个线程。

    keepAliveTime、unit

    keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下当线程数量超过corePoolSize,keepAliveTime才会起作用。即空闲线程的最大超时时间,直到线程池中的线程数不超过corePoolSize。 值为0表示,线程不会被回收。
    unit:超时时间的单位。

    //TimeUnit类中有7种静态属性
    TimeUnit.DAYS; //天
    TimeUnit.HOURS; //小时
    TimeUnit.MINUTES; //分钟
    TimeUnit.SECONDS; //秒
    TimeUnit.MILLISECONDS; //毫秒
    TimeUnit.MICROSECONDS; //微妙
    TimeUnit.NANOSECONDS; //纳秒
    

    workQueue

    工作队列,保存未执行的Runnable 任务。
    **BlockingQueue

  • 一个基于数组的有界阻塞队列

  • 按 FIFO(先进先出)原则对元素进行排序。

LinkedBlockingQueue

  • 一个基于链表结构的阻塞队列
  • 此队列按FIFO (先进先出) 排序元素,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE
  • 吞吐量通常要高ArrayBlockingQueue
  • 静态工厂方法Executors.newFixedThreadPool()使用了这个队列

DelayQueue

  • 任务定时周期的延迟执行的队列
  • 根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序
  • newScheduledThreadPool线程池使用了这个队列

SynchronousQueue

  • 一个不存储元素的阻塞队列
  • 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
  • 吞吐量通常要高于LinkedBlockingQueue
  • 静态工厂方法Executors.newCachedThreadPool使用了这个队列
  • 使用SynchronoousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略

PriorityBlockingQueue

  • 具有优先级的无限阻塞队列
  • 按优先级对元素进行排序,元素的优先级是通过自然顺序或Comparator来定义的

    threadFactory

    创建线程的工厂类。 JDK提供了线程工厂的默认实现DefaultThreadFactory,但还是建议自定义实现,这样可以自定义线程创建的过程,例如线程分组、自定义线程名称等。

    handler

    拒绝策略, 当任务队列存满并且线程池个数达到maximunPoolSize后采取的策略。ThreadPoolExecutor中已经包含四种处理策略。

  • AbortPolicy策略:默认拒绝策略,该策略会直接抛出 RejectedExecutionException 异常

  • CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中运行当前的被丢弃的任务
  • DiscardPolicy策略:丢弃无法处理的任务, 不抛异常
  • DiscardOleddestPolicy策略: 丢弃最老的一个请求,即把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

    除了JDK默认提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义时直接实现RejectedExecutionHandler接口即可。 ```java public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } /**

       *在调用任务的线程中执行
       */
    

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

      if (!e.isShutdown()) {
          //在调用者线程者线程中,运行当前被丢弃的任务。
          //显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
          r.run();
      }
    

    } }

public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

}

/**
 * 队列满了,丢掉任务,不会抛出异常,其实就是什么也不做
 * 该策略陌陌的丢弃无法处理的任务,不进行任何处理
 */

public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }

/**
 * 该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
 */

public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { }

/**
     * 将队列的首部任务直接丢弃,然后执行当前的任务
     */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

}

<a name="ybpvc"></a>
### 工作原理

```java
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • 1)execute方法提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务, 即使此时线程池中存在空闲线程。

image.png

  • 2)如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。

image.png

  • 3)当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。

image.png

  • 4)如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。

image.png

  • 5)当线程池中的线程执行完任务空闲时,会尝试从workQueue中取头结点任务执行。

image.png

  • 6) 当线程池中线程数超过corePoolSize,并且未配置allowCoreThreadTimeOut=true,空闲时间超过keepAliveTime的线程会被销毁,保持线程池中线程数为corePoolSize。 ;当设置allowCoreThreadTimeOut=true时,任何空闲时间超过keepAliveTime的线程都会被销毁。

image.png

4.底层原理

变量说明

image.png
ctl用于表示线程池的状态和线程数,在ThreadPoolExecutor中使用32位二进制数来表示线程池的状态和线程池中线程数量,其中前3位表示线程池状态,后29位表示线程池中线程数。

image.png
RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED分别表示线程池的不同状态,转算后的结果如下。
image.png
image.png

细节说明

submit()、execute()提交线程任务

  1. 如果当前核心线程数量没达到最大值corePoolSize,创建新线程来执行此任务
  2. 如果当前核心线程到达最大,向阻塞队列添加任务
  3. 如果核心线程已满,阻塞队列已满,尝试开启非核心线程来执行任务
  4. 如果线程池不处于RUNNABLE状态,或者处于饱和状态,执行任务拒绝策略

image.png
image.png

addWorker方法

第一部分是检查线程池的状态,如果不满足条件会直接返回false,然后进入死循环等待成功增加线程,如果增加线程成功,那么就可以进入第二部分,真正新增线程,执行任务。
将线程和任务封装到了Worker中,然后将Worker添加到HashSet集合中,添加成功后通过线程对象的start方法启动线程执行任务。

  1. 线程池处于 RUNNBALE 或者处于 SHUTDOWN 并在阻塞队列里还有任务时,需要添加新线程。自旋确保 CAS 成功,然后添加新线程
  2. 线程存于Worker,线程池存有Worker信息,就能访问线程
  3. 线程启动失败,则移除Worker,销毁线程

image.png

image.png

内部类Worker

Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
image.png

runWorker方法执行任务

线程首个任务为firstTask,之后通过getTask()就从阻塞队列里任务。 使用循环通过getTask方法不断从阻塞队列中获取任务执行,如果任务不为空则执行任务,这里实现了线程的复用,不断的获取任务执行,不用重新创建线程;队列中获取的任务为null,则将Worker从HashSet集合中清除,注意这个清除就是空闲线程的回收。
线程池提供了beforeExecute()和afterExecute()通知子类任务执行前后的回调,让子类有时机能执行自己的事情。
image.png

getTask方法

线程池里的线程从阻塞队列里拿任务,如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程也要在等到keepAliveTime时间后才会释放。如果当前仅有核心线程存在,如果允许释放核心线程的话,也就和非核线程的处理方式一样,反之,则通过take()一直阻塞直到拿到任务,这也就是线程池里的核心线程为什么不死的原因。
image.png

processWorkerExit方法释放线程

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 到这里说明线程中断,先通过decrementWorkerCount()减少线程数值
        // 否则,说明是线程没有从阻塞队列获取到线程
        if (completedAbruptly) 
                decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // completedTaskCount记录线程池总共完成的任务
            // w.completedTasks则是线程完成的任务数
            completedTaskCount += w.completedTasks;
            // 移除Worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 线程池状态改变,尝试中止线程池
        tryTerminate();

        int c = ctl.get();
        // 检查线程池状态,线程池处于RUNNABLE或者SHUTDOWN则进入
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                // 线程池最小数量,取决于是否能释放核心线程
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果任务队列还有线程,最起码都要有一个线程来处理任务
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; 
            }
            // 因为线程中断,可能导致没有线程来执行阻塞队列里的任务
            // 因此尝试创建线程去执行任务
            addWorker(null, false);
        }
    }

5.线程池一定比使用单线程高效?

Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:多线程带来线程上下文切换开销,单线程就没有这种开销。

总结

线程池原理总结思维导图.png

image.png

参考

线程池工作原理和实现原理