• Java表面提供了多种创建线程的方式:继承Thread类,实现Runnable接口(无返回值,也不抛异常),实现Callable接口(带返回值和异常的异步任务),也可通过FutureTask和线程池来创建线程。
  • 但这几个方式实际上最终都是new一个Thread类,所以Java实际上只有一种线程创建方式。
  • run()定义了该线程实际需要执行的代码,start()则会启动线程执行run()里边的代码。

    1. 为什么要引入线程池

    实际上,线程是个重量级的东西:
  1. cpu处理线程的速度非常快,但创建、销毁、切换线程的速度比较慢,当线程的执行速度大于任务切换速度时,线程太多反而是累赘,特别在Linux系统中,由于存在内核态和用户态,这个问题尤为严重(原因请自行查看“协程”相关知识)。
  2. Java的线程内存占用很高,通常一个线程被创建出来,即使声明也不做都会占用约128k内存,可以想象,高并发情况下直接使用线程很容易造成OOM错误。
  3. CPU同一时刻能执行的线程=物理核心数,这是由物理因素决定的,一个4核cpu同一时刻只能并行4个任务,所以线程数量太多是没有意义的。
  4. 通常Java Web服务都是典型地IO密集型计算,每一个请求的执行速度很快,但请求量巨大,性能瓶颈主要在IO等待上,所以给每一个请求都创建一个新的线程显得不太明智,这会导致系统资源被快速消耗完。

综上所述,我们应该尽可能少的创建线程,并尽可能多地复用这些线程,避免频繁切换线程导致的开销,这样才能达到性能最大化。线程池就是基于这个问题引入的,当然,还有更优秀的协程解决方案,这个会在其他地方讲解。

线程池的风险

  • 死锁:想象一下,假如一个线程池只有200个线程数量,而现在有200个任务陷入了死锁(互相竞争资源),导致线程池不再有空闲线程,但那200个任务又陷于死锁无法恢复,那么此时系统就会处于瘫痪状态,因为线程池再也无法接收任何其他请求了。
    • 例如线程 A 持有对象 X 的独占锁,并且在等待对象 Y 的锁,而线程 B 持有对象 Y 的独占锁,却在等待对象 X 的锁,这就陷入了双向等待,无限卡死在这。
  • 线程泄漏:当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回池时,会发生这种情况。发生线程泄漏的一种情形出现在任务抛出一个 RuntimeException 或一个 Error 时。如果池类没有捕捉到它们,那么线程只会退出而线程池的大小将会永久减少一个。当这种情况发生的次数足够多时,线程池最终就为空,而且系统将停止,因为没有可用的线程来处理任务。
  • 请求过载:过多的请求同时涌入线程池时,会有大量任务需要在队列中排队等待资源,如果这个队列的大小设置不当,或者没有合理的拒绝机制,仍会导致资源耗尽。

    2. FutureTask

  • FutureTask 为 Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。

  • 一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。
  • FutureTask 常用来承载 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证。

线程池 - 图1

  • FutureTask实现了RunnableFuture接口,则RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的返回值。

    2.1 核心属性

    ```java // 内部持有的callable任务,运行完毕后置空 private Callable callable;

// 从get()中返回的结果或抛出的异常 private Object outcome;

// 运行callable的线程 private volatile Thread runner;

// 使用Treiber栈保存等待线程 private volatile WaitNode waiters;

//任务状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;

  1. ![](https://cdn.nlark.com/yuque/0/2021/png/772705/1614242758845-3002b4ce-a580-494e-9e7a-1da4f8344963.png#align=left&display=inline&height=350&margin=%5Bobject%20Object%5D&originHeight=436&originWidth=676&size=0&status=done&style=none&width=542)<br />其中需要注意的是state是volatile类型的,也就是说只要有任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。7种状态具体表示:
  2. - `NEW`:表示是个新的任务或者还没被执行完的任务。这是初始状态。
  3. - `COMPLETING`:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
  4. - `NORMAL`:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
  5. - `EXCEPTIONAL`:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
  6. - `CANCELLED`:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
  7. - `INTERRUPTING`:任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
  8. - `INTERRUPTED`:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。 有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。
  9. <a name="4q4Ho"></a>
  10. ## 2.2 核心方法
  11. <a name="rWLeA"></a>
  12. ### run()
  13. 1. 首先判断当前state是否等于NEW状态,若不是则利用CAS操作将runner修改为当前线程并返回(任务更新)。
  14. 1. 重复1,直到再次进入方法拿到一个新任务为止,拿到新任务后就会尝试执行它。若执行无错误,则会set返回结果,并将state改为已完成,若出错,则set异常,并将state改为异常。这两个状态都代表当前线程已结束。
  15. 1. 执行期间可以中断,借助下述方法完成,通过不断的轮询state是否已变为中断,若没有,会不断的使用yield()让出cpu使用权:
  16. ```java
  17. private void handlePossibleCancellationInterrupt(int s) {
  18. //在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
  19. if (s == INTERRUPTING)
  20. while (state == INTERRUPTING)
  21. Thread.yield(); // 让出执行权
  22. }

get()

FutureTask 通过get()方法获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING),就调用awaitDone方法(下方讲解)等待任务完成。任务完成后,通过report方法获取执行结果或抛出执行期间的异常。report源码如下:

  1. // 返回执行结果或抛出异常
  2. private V report(int s) throws ExecutionException {
  3. Object x = outcome;
  4. if (s == NORMAL)
  5. // 正常执行时对x进行类型强转后返回
  6. return (V)x;
  7. if (s >= CANCELLED)
  8. throw new CancellationException();
  9. throw new ExecutionException((Throwable)x);
  10. }

awaitDone()

awaitDone用于等待任务完成,或任务因为中断、超时而终止。返回任务的完成状态。函数执行逻辑如下:

  • 如果线程被中断,首先清除中断状态,调用removeWaiter移除等待节点,然后抛出InterruptedException。
  • 如果当前状态为结束状态(state>COMPLETING),则根据需要置空等待节点的线程,并返回 Future 状态;
  • 如果当前状态为正在完成(COMPLETING),说明此时 Future 还不能做出超时动作,为任务让出CPU执行时间片;
  • 如果state为NEW,先新建一个WaitNode,然后CAS修改当前waiters;
  • 如果等待超时,则调用removeWaiter移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park阻塞当前线程;
  • 其他情况直接阻塞当前线程。

    cancel()

    用于尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。

  • 如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED。

  • 如果当前状态不为NEW,则根据参数mayInterruptIfRunning决定是否在任务运行中也可以中断。中断操作完成后,调用finishCompletion移除并唤醒所有等待线程。

    3. ThreadPoolExecutor

    Java是如何实现和管理线程池的?从JDK 5开始,工作单元与执行机制被分离开来,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供。

ThreadPoolExecutor就是其提供的线程池类,用来创建一个线程池。其实现原理很简单:

  • 定义了一个线程集合workerSet和一个阻塞队列workQueue。
  • 当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。
  • 然后workerSet中的线程在执行完后会直接从workQueue中获取新线程执行。
  • 当workQueue中没有任务的时候,workerSet里的线程就会阻塞,直到队列中有任务了就取出来继续执行。

线程池 - 图2

3.1 Execute原理

核心构造参数

  • corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize(即使有其他空闲线程能够执行新来的任务,也会继续创建线程);如果当前线程数等于corePoolSize,继续提交的任务就会被保存到阻塞队列中,等待被执行;
    • 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
  • workQueue:用来保存等待被执行的任务的阻塞队列,这里使用的就是JUC线程安全集合中的XXXQueue阻塞队列,种类比较多,可根据具体场景来选择适合的。
  • maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,直至线程数量达到该最大数;
    • 特别地,当阻塞队列是无界队列时,maximumPoolSize不会起作用。
  • keepAliveTime:线程处于空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用,超过这个时间的空闲线程将被终止,核心线程不会被杀死
  • unit:keepAliveTime的时间单位
  • threadFactory:创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory
  • handler:线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

      执行过程

      当一个任务提交至线程池之后:
  1. 线程池首先检查当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入2。
  2. 判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue,否则进入3。
  3. 如果创建一个新的工作线程会使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理拒绝任务,否则,创建新线程。
  4. 当ThreadPoolExecutor创建新线程时,会通过CAS来更新线程池的状态ctl。

    3.2 不要使用预置线程池

    JUC默认提供的几个预置线程池都有可能导致服务器资源耗尽,因此,创建线程池应该手动的进行,并明确参数,而不应该使用自带的线程池。

Executors各个预置线程池的弊端:

  • newFixedThreadPool和newSingleThreadExecutor:主要问题是使用了无界队列,堆积的请求会耗费非常大的内存,甚至OOM。
  • newCachedThreadPool和newScheduledThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

    4. ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor,为任务提供延迟或周期执行(定时任务),属于线程池的一种。和 ThreadPoolExecutor 相比,它还具有以下几种特性:

  • 使用专门的任务类型—ScheduledFutureTask 来执行周期任务,也可以接收不需要时间调度的任务(这些任务通过 ExecutorService 来执行)。

  • 使用专门的存储队列—DelayedWorkQueue 来存储任务,DelayedWorkQueue 是无界延迟队列DelayQueue 的一种。相比ThreadPoolExecutor也简化了执行机制(delayedExecute方法)。
  • 支持可选的run-after-shutdown参数,在池被关闭(shutdown)之后支持可选的逻辑来决定是否继续运行周期或延迟任务。并且当任务提交操作与 shutdown 操作重叠时,复查逻辑也不相同。

    4.1 类结构

    线程池 - 图3
    ScheduledThreadPoolExecutor 内部构造了两个内部类 ScheduledFutureTaskDelayedWorkQueue

  • ScheduledFutureTask:继承了FutureTask,说明是一个异步运算任务;最上层分别实现了Runnable、Future、Delayed接口,说明它是一个可以延迟执行的异步运算任务。

  • DelayedWorkQueue:这是 ScheduledThreadPoolExecutor 为存储周期或延迟任务专门定义的一个延迟队列,继承了 AbstractQueue,为了契合 ThreadPoolExecutor 也实现了 BlockingQueue 接口。它内部只允许存储 RunnableScheduledFuture 类型的任务。

    • 与 DelayQueue 的不同之处就是它只允许存放 RunnableScheduledFuture 对象,并且自己实现了二叉堆(DelayQueue 是利用了 PriorityQueue 的二叉堆结构)。

      4.2 也不要用这个线程池

      ScheduledThreadPoolExecutor的构造参数没有ThreadPoolExecutor丰富,核心参数只有corePoolSize可定义。自然而然的,这个线程池也极易引发OOM,不要使用。

      5. Fork/Join框架

  • ForkJoinPool 是JDK 1.7新加入的一个线程池类,它采取分治算法(Divide-and-Conquer)的并行实现(一项可以获得良好的并行性能的简单且高效的设计技术)。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。

  • 本质是一个可以将大任务拆分为很多小任务来异步执行的工具。

    5.1 模块关系

    Fork/Join框架主要包含三个模块:

  • 任务对象:ForkJoinTask (包括RecursiveTaskRecursiveActionCountedCompleter)

  • 执行Fork/Join任务的线程:ForkJoinWorkerThread
  • 线程池:ForkJoinPool

这三者的关系是: ForkJoinPool可以通过池中的ForkJoinWorkerThread来处理ForkJoinTask任务。

ForkJoinPool 只接收 ForkJoinTask 任务(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务),RecursiveTask 是 ForkJoinTask 的子类,是一个可以递归执行的 ForkJoinTask,RecursiveAction 是一个无返回值的 RecursiveTask,CountedCompleter 在任务完成执行后会触发执行一个自定义的钩子函数。

在实际运用中,我们一般都会继承 RecursiveTaskRecursiveActionCountedCompleter 来实现我们的业务需求,而不会直接继承 ForkJoinTask 类。

5.2 核心思想:

分治算法(Divide-and-Conquer)

分治算法(Divide-and-Conquer)把任务递归的拆分为各个子任务(著名的快速排序算法其实也是这种思想,将大的问题拆解成多个小问题,全部解决后再汇总到一起),这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。首先看一下 Fork/Join 框架的任务运行机制:
线程池 - 图4

work-stealing(工作窃取)算法

work-stealing(工作窃取)算法:

  • 线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。
  • 这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。

在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务

具体思路如下:

  • 每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。
  • 队列支持三个功能push、pop、poll
  • push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。
  • 划分的子任务调用fork时,都会被push到自己的队列中。
  • 默认情况下,工作线程从自己的双端队列获出任务并执行。
  • 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。

    执行流程

    线程池 - 图5
    上图可以看出ForkJoinPool 中的任务执行分两种:

  • 直接通过 ForkJoinPool 提交的外部任务(external/submissions task),存放在 workQueue 的偶数槽位;

  • 通过内部 fork 分割的子任务(Worker task),存放在 workQueue 的奇数槽位。