1. 什么是线程池?

  1. 降低资源的消耗【减少创建销毁操作】

    通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
    高并发状态下过多创建线程可能将资源耗尽

  2. 提高响应速度【控制线程个数】

    因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数 过多导致CPU调度慢)

  3. 提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】

    线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源, 还降低系统的稳定性,使用线程池进行统一分配

2. ThreadPoolExecutor

核心线程池,原生创建线程的方式中有**7**大核心参数和**4**个拒绝策略。
创建线程池的有参构造方式源码如下:
image.png

2.1. 核心参数说明

  1. corePoolSize (int):核心线程数

核心线程会一直存在。不受其他影响,除非指定了ThreadPoolExecutor#allowCoreThreadTimeOut(true)
改方法意味着允许核心线程超时后被释放。如果未指定则不受_keepAliveTime_参数影响

  1. maximumPoolSize (int)线程池中允许的最大线程数量

用来控制资源,当线程数达到最大值后,不会继续创建线程,剩余请求会进入阻塞队列中进行等待,直到线程池中有线程能够处理。

  1. keepAliveTime (long) 存活时间

指定了那些当现有线程池中线程数量>核心线程数的线程,如果闲置达到存活时间后,会被释放掉。

  1. unit (TimeUnit) :存活时间的单位

规定了大于核心线程数量的闲置线程的存活时间单位。

  1. workQueue (BlockingQueue):阻塞队列

阻塞队列中保存了那些被execute方法提交的任务。当现有线程池中的线程已经全部被使用无法处理的任务会进入阻塞队列中等待。

  1. threadFactory (ThreadFactory) :线程工厂

定义了线程池中创建新线程的方式

  1. handler (RejectedExecutionHandler):拒绝策略

当线程池中和阻塞队列都已满时,则使用指定的拒绝策略,拒绝执行任务。

2.2. 拒绝策略

  1. DiscardOldestPolicy:丢弃最老的任务
  2. AbortPolicy:丢弃当前任务,抛出异常【默认策略】
  3. CallerRunsPolicy:同步执行run方法
  4. DiscardPolicy:丢弃当前任务,不抛出异常

image.png

2.3. 阻塞队列

image.png
。。。

2.3. 执行流程

线程池的运行流程:

  1. 线程池创建好,准备好core数量的核心线程,准备接收任务。
  2. 新任务到来,使用core准备好的空闲线程去执行任务
  3. 如果core满,将再到来的任务放入阻塞队列中。空闲的core会从阻塞队列中获取任务执行。
  4. 如果阻塞队列满了,会直接创建新线程执行,线程池中线程数量直到max指定的数量。
    1. 如果max执行好任务空闲,达到了keepAliveTime指定的时间后会自动销毁。最终保留初始core指定的数量。
    2. 如果线程数量已经达到了max,再进入的新任务,会使用reject指定的拒绝策略进行处理。

2.4. demo

假设有一种情况:一个线程池,分配了core:7,max:20,queue:50。此时有100个并发进来怎么分配?

  1. 7个任务立即进行执行
  2. 50个任务进入队列
  3. 创建13个线程执行
  4. 可执行70,剩余30任务使用默认拒绝策略

3. 常见的四种线程池

3.1. newCachedThreadPool

说明:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程

  1. public static void main(String[] args) {
  2. // 1. 使用Executors工具类
  3. Executors.newCachedThreadPool();
  4. // 2. 原生方式...
  5. }

进入Executors源码得知:
image.png

3.2. newFixedThreadPool

说明:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

  1. public static void main(String[] args) {
  2. Executors.newFixedThreadPool(50);
  3. }

进入源码得知,相当于直接创建了传入大小的固定的线程池
image.png

3.3. newScheduledThreadPool

说明:创建一个定长线程池,支持定时以及周期性任务执行
创建方式同上 Executors._newScheduledThreadPool_(20);
源码为:
image.png

3.4. singleThreadExecutor

说明:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务
创建方式同上:Executors._newSingleThreadExecutor_();
image.png

4. CompletableFuture 异步编排

4.1. 简述

cpu在切换时间片执行不同线程时,会比较浪费资源。使用线程池的好处就是可以更大的节省时间,提高cpu的执行效率。
但这种多线程不能满足所有的场景,让有既需要使用多线程又需要给某些线程编排顺序或者自定义的一些行为时,异步编排变得尤为重要。
使用概述:

  • runXXX开头的方法没有返回值,supplyXXX开头的方法可以接收返回值。
  • 可以传入自定义线程池,否则使用默认线程池

CompletableFuture提供了四个静态方法来创建一个异步操作:

public static _CompletableFuture runAsync(Runnable _runnable)
public static _CompletableFuture runAsync(Runnable _runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

4.2. 串行化任务

4.2.1. supplyAsync

  1. // 5.1.提交任务异步执行(supplyAsync)
  2. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "测试使用", executor);
  3. System.out.println(future1.get());

4.2.2. thenRunAsync

说明:不能获取上一步结果且返回值

4.2.3. thenAcceptAsync

说明:获取上一步结果,但返回值。

4.2.4. thenApplyAsync

说明:获取上一步结果,且返回值。

  1. // 5.3.获取上一步执行结果并获取异常信息(whenCompleteAsync)【无法处理异常返回默认值】
  2. CompletableFuture<String> future3 =
  3. future2.whenCompleteAsync((result, exception) -> System.out.println("结果是:" + result + "----异常是:" + exception));
  4. System.out.println(future2.get());

4.2.5. whenCompleteAsync

说明:可以获取上一步执行结果并感知异常,返回值。

  1. // 5.4.获取上一步异常,如果出现异常可返回默认值,不出现异常保持原值(exceptionally)
  2. CompletableFuture<Integer> future4 = future3.thenApplyAsync((s -> 1 / 0), executor);
  3. CompletableFuture<Integer> future5 = future4.exceptionally(exception -> {
  4. System.out.println("出现异常:" + exception);
  5. return 10;
  6. }); // 出现异常,使用默认返回值
  7. System.out.println("默认值:" + future5.get());

未完待续…