10 ThreadPool 线程池

10.1 线程池简介

线程池(英语:thread pool)

  • 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。
  • 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务
  • 这避免了在处理短时间任务时创建与销毁线程的代价。
  • 线程池不仅能够保证内核的充分利用,还能防止过分调度。

例子

  • 10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换,效率高。

线程池的优势

  • 线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务
  • 如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:

  • 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
  • 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
  • Java 中的线程池是通过 Executor 框架实现的,该框架中用到了
    • Executor(接口)
    • ExecutorService(线程池接口)
    • ThreadPoolExecutorExecutorService接口的实现类,也即实际的线程池)
    • Executors(类似于工具类,用于操作线程池的各种操作)

image.png

10.2 线程池参数说明

本次介绍 5 种类型的线程池

10.2.1 常用的七个参数(重点)

  1. **corePoolSize** 线程池的核心线程数
  2. **maximumPoolSize** 能容纳的最大线程数
  3. **keepAliveTime** 空闲线程存活时间
  4. **unit** 存活的时间单位
  5. **workQueue** 存放提交但未执行任务的队列
  6. **threadFactory** 创建线程的工厂类
  7. **handler** 等待队列满后的拒绝策略

image.png
结合Executors工具类预设的五种线程池理解这七个参数
线程池中,有三个重要的参数,决定影响了拒绝策略:

  • **corePoolSize** - 核心线程数,也即最小的线程数。
  • **workQueue** - 阻塞队列 。
  • **maximumPoolSize** -最大线程数

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

10.2.2 拒绝策略(重点)

  • **CallerRunsPolicy**:
    • 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务
    • 一般用于并发比较小,性能要求不高,不允许失败
    • 但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
  • **AbortPolicy**:
    • 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。
    • 线程池默认的拒绝策略
    • 必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
  • **DiscardPolicy**:
    • 直接丢弃,其他啥都没有
  • DiscardOldestPolicy:

    • 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

      10.3 线程池的种类与创建

      10.3.1 newFixedThreadPool(一池N线程)

      作用
  • 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。

  • 在任意点,在大多数线程会处于处理任务的活动状态。
  • 如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。
  • 如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。
  • 在某个线程被显式地关闭之前,池中的线程将一直存在。

场景

  • 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景

特征

  • 线程池中的线程处于一定的量,可以很好的控制线程的并发量
  • 线程可以重复被使用,在显示关闭之前,都将一直存在
  • 超出一定量的线程被提交时候需在队列中等待

创建方式

  1. /**
  2. * 固定长度线程池
  3. * @return
  4. */
  5. public static ExecutorService newFixedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(
  16. 10,
  17. 10,
  18. 0L,
  19. TimeUnit.SECONDS,
  20. new LinkedBlockingQueue<>(), // 最大长度为 Integer.MAX_VALUE(),当任务太多时,容易导致OOM
  21. Executors.defaultThreadFactory(),
  22. new ThreadPoolExecutor.AbortPolicy()
  23. );
  24. }

操作实例

  • 10个任务,交给线程池中的5个线程执行
  • 10个任务会依次进入LinkedBlockingQueue阻塞队列中
  • 5个线程各自去阻塞队列中获取任务执行
  • 当线程手上的任务执行完后,会再去阻塞队列中获取任务;

    1. public static void main(String[] args) {
    2. ExecutorService threadPool = Excutors.newFixedThreadPool(5);
    3. try {
    4. for(int i = 1; i <= 10; i++) {
    5. threadPool.excutor(() -> {
    6. System.out.println(Thread.currentThread().getName() + " 办理业务");
    7. });
    8. }
    9. } finally {
    10. // 将所有的线程归还给线程池
    11. threadPool.shutdown();
    12. }
    13. }

    10.3.2 newSingleThreadExecutor(一池一线程)

    作用:

  • 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

  • (注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。
  • 可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
  • 与其他等效的 newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

场景

  • 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景

特征

  • 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此执行

创建方式

  1. /**
  2. * 单一线程池
  3. * @return
  4. */
  5. public static ExecutorService newSingleThreadExecutor(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(
  16. 1,
  17. 1,
  18. 0L,
  19. TimeUnit.SECONDS,
  20. new LinkedBlockingQueue<>(),// 最大长度为 Integer.MAX_VALUE(),当任务太多时,容易导致OOM
  21. Executors.defaultThreadFactory(),
  22. new ThreadPoolExecutor.AbortPolicy()
  23. );
  24. }

10.3.3 newCachedThreadPool(一池可扩容线程)

作用:

  • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.

特点:

  • 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
  • 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
  • 当线程池中,没有可用线程,会重新创建一个线程

场景:

  • 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景

创建方式

  1. /**
  2. * 可缓存线程池
  3. * @return
  4. */
  5. public static ExecutorService newCachedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(
  16. 0,
  17. Integer.MAX_VALUE,
  18. 60L,
  19. TimeUnit.SECONDS,
  20. new SynchronousQueue<>(),
  21. Executors.defaultThreadFactory(),
  22. new ThreadPoolExecutor.AbortPolicy()
  23. );
  24. }

10.3.4 newScheduleThreadPool(了解)

作用

  • 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池

场景

  • 适用于需要多个后台线程执行周期任务的场景

特征

  • 线程池中具有指定数量的线程,即便是空线程也将保留
  • 可定时或者延迟执行线程活动

创建方式

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  3. }

10.3.5 newWorkStealingPool

jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执行任务
场景

  • 适用于大耗时,可并行执行的场景

创建方式

  1. public static ExecutorService newWorkStealingPool(int parallelism) {
  2. /**
  3. * parallelism:并行级别,通常默认为 JVM 可用的处理器个数
  4. * factory:用于创建 ForkJoinPool 中使用的线程。
  5. * handler:用于处理工作线程未处理的异常,默认为 null
  6. * asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列*/
  7. return new ForkJoinPool(parallelism,
  8. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  9. null,
  10. true);
  11. }

10.4 线程池入门案例

场景: 火车站 3 个售票口, 10 个用户买票

  • 线程池中 ```java /**
  • 入门案例 / public class ThreadPoolDemo1 { /*
    • 火车站 3 个售票口, 10 个用户买票
    • @param args */ public static void main(String[] args) { //定时线程次:线程数量为 3—-窗口数为 3 // 自己手动定义线程池 ExecutorService threadService = new ThreadPoolExecutor(3,
      1. 3,
      2. 60L,
      3. TimeUnit.SECONDS,
      4. new LinkedBlockingQueue<>(),
      5. Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
      try {
      1. //10 个人买票
      2. for (int i = 1; i <= 10; i++) {
      3. threadService.execute(()->{
      4. try {
      5. System.out.println(Thread.currentThread().getName() + "窗口,开始卖票");
      6. Thread.sleep(5000);
      7. System.out.println(Thread.currentThread().getName() + "窗口买票结束");
      8. }catch (Exception e){
      9. e.printStackTrace();
      10. }
      11. });
      12. }
      }catch (Exception e){
      1. e.printStackTrace();
      }finally {
      1. //完成后结束
      2. threadService.shutdown();
      } } } ```

      10.5 线程池底层工作原理(重要)

      image.png这图要会画
      如图所示:常驻线程(corePool):2,非核心线程:3, 最大线程(maximumPool):5,阻塞队列长度(blockingQueue.size()):4
      线程执行流程:
  1. 在创建了线程池后,线程池中的线程数为零
  2. 当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
    1. 如果正在运行的线程数量小于 corePoolSize(2),那么马上创建线程运行这个任务
    2. 如果正在运行的线程数量大于或等于 corePoolSize(2),那么将这个任务放入队列
    3. 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize(5),那么还是要创建非核心线程立刻运行这个任务
      • 此时,这个非核心线程运行的任务,是队列满了之后才来的任务
      • 当再队列满的状态持续时,又来任务时,依旧会再起一个非核心线程运行这个任务(运行的线程数还是小于 maximumPoolSize)
      • 队列中的数据,还是依旧通过核心线程来消费
      • 当队列不再满时,再来任务,则直接进队列
    4. 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    1. 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
    2. 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

      10.6 注意事项(重要)

  5. 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用LinkedBlockingQueue 实现的这个阻塞队列最大长度为 Integer.MAX_VALUE,请求太多时容易导致 OOM。所以实际生产一般自己通过ThreadPoolExecutor 的 7 个参数,自定义线程池

  6. 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建
    • corePoolSize 线程池的核心线程数
    • maximumPoolSize 能容纳的最大线程数
    • keepAliveTime 空闲线程存活时间
    • unit 存活的时间单位
    • workQueue 存放提交但未执行任务的队列
    • threadFactory 创建线程的工厂类
    • handler 等待队列满后的拒绝策略
  7. 为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图

image.png

11 Fork/Join

11.1 Fork/Join 框架简介

Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事情:

  • Fork(分支):把一个复杂任务进行分拆,大事化小
  • Join(合并):把分拆任务的结果进行合并

image.png

  1. 任务分割:首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
  2. 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

在 Java 的 Fork/Join 框架中,使用两个类完成上述操作

  • ForkJoinTask ():我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:
    • RecursiveAction:用于没有返回结果的任务
    • RecursiveTask:用于有返回结果的任务
  • ForkJoinPool分支合并池):ForkJoinTask 需要通过 ForkJoinPool 来执行
  • RecursiveTask:继承后可以实现递归(自己调自己)调用的任务

image.pngimage.png
Fork/Join 框架的实现原理

  • ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成
  • ForkJoinTask 数组负责存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。

    11.2 Fork 方法

    Fork 方法的实现原理:

  • 当我们调用 ForkJoinTask 的 fork 方法时,程序会把任务放在 ForkJoinWorkerThread 的 pushTask 的 workQueue 中,异步地执行这个任务,然后立即返回结果

    1. public final ForkJoinTask<V> fork() {
    2. Thread t;
    3. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    4. ((ForkJoinWorkerThread)t).workQueue.push(this);
    5. else
    6. ForkJoinPool.common.externalPush(this);
    7. return this;
    8. }

    pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。
    代码如下:

    1. final void push(ForkJoinTask<?> task) {
    2. ForkJoinTask<?>[] a;
    3. ForkJoinPool p;
    4. int b = base, s = top, n;
    5. if ((a = array) != null) { // ignore if queue removed
    6. int m = a.length - 1; // fenced write for task visibility
    7. U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    8. U.putOrderedInt(this, QTOP, s + 1);
    9. if ((n = s - b) <= 1) {
    10. if ((p = pool) != null)
    11. p.signalWork(p.workQueues, this);//执行
    12. }
    13. else if (n >= m)
    14. growArray();
    15. }
    16. }

    11.3 join 方法

    Join 方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下:

    1. public final V join() {
    2. int s;
    3. if ((s = doJoin() & DONE_MASK) != NORMAL)
    4. reportException(s);
    5. return getRawResult();
    6. }

    它首先调用 doJoin 方法,通过 doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有 4 种:

  • 已完成(NORMAL)

  • 被取消(CANCELLED)
  • 信号(SIGNAL)
  • 出现异常(EXCEPTIONAL)

各个状态下,join() 方法会:

  • 如果任务状态是已完成,则直接返回任务结果。
  • 如果任务状态是被取消,则直接抛出 CancellationException
  • 如果任务状态是抛出异常,则直接抛出对应的异常

让我们分析一下 doJoin 方法的实现

  1. private int doJoin() {
  2. int s; Thread t; ForkJoinWorkerThread wt;
  3. ForkJoinPool.WorkQueuew;
  4. return (s = status) < 0 ? s :
  5. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  6. (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :
  7. wt.pool.awaitJoin(w, this, 0L) :
  8. externalAwaitDone();
  9. }
  10. final int doExec() {
  11. int s; boolean completed;
  12. if ((s = status) >= 0) {
  13. try {
  14. completed = exec();
  15. } catch (Throwable rex) {
  16. return setExceptionalCompletion(rex);
  17. }
  18. if (completed)
  19. s = setCompletion(NORMAL);
  20. }
  21. return s;
  22. }

在 doJoin()方法流程如下:

  1. 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
  2. 如果没有执行完,则从任务数组里取出任务并执行。
  3. 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

    11.4 Fork/Join 异常

    框架的异常处理ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的getException 方法获取异常。
    getException 方法返回 Throwable 对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回 null。

    11.5 入门案例

    场景: 生成一个计算任务,计算 1+2+3………+1000,每 100 个数切分一个子任务 ```java /**
  • 继承RecursiveTask类,重写 computer 方法 */ public class TaskExample extends RecursiveTask { private int start; private int end; private long sum;

    /**

    • 构造函数
    • @param start
    • @param end */ public TaskExample(int start, int end){ this.start = start; this.end = end; }

      /**

    • The main computation performed by this task. *
    • @return the result of the computation */@Override protected Long compute() { System.out.println(“任务” + start + “=========” + end + “累加开始”); //大于 100 个数相加切分,小于直接加 if(end - start <= 100){
      1. for (int i = start; i <= end; i++) {
      2. //累加
      3. sum += i;
      4. }
      }else {
      1. //切分为 2 块
      2. int middle = start + 100;
      3. //递归调用,切分为 2 个小任务
      4. TaskExample taskExample1 = new TaskExample(start, middle);
      5. TaskExample taskExample2 = new TaskExample(middle + 1, end);
      6. //执行:异步
      7. taskExample1.fork();
      8. taskExample2.fork();
      9. //同步阻塞获取执行结果
      10. sum = taskExample1.join() + taskExample2.join();
      } //加完返回 return sum; } }

/**

  • 分支合并案例 /public class ForkJoinPoolDemo { /*
    • 生成一个计算任务,计算 1+2+3………+1000
    • @param args */ public static void main(String[] args) { //定义任务 TaskExample taskExample = new TaskExample(1, 1000); //定义执行对象 ForkJoinPool forkJoinPool = new ForkJoinPool(); //加入任务执行 ForkJoinTask result = forkJoinPool.submit(taskExample); //输出结果 try {
      1. System.out.println(result.get());
      }catch (Exception e){
      1. e.printStackTrace();
      }finally {
      1. forkJoinPool.shutdown();
      } } } ```

      12 CompletableFuture (重点-java 真正的异步编程)

      研读文章
      https://cloud.tencent.com/developer/article/1366581
      https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650

      12.1 CompletableFuture 简介

  • CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻,可以使得我们的任务单独运行在与主线程分离的其他线程中
  • 为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎Future粉墨登场。
  • 然而Future提供的方式是主线程主动问询新线程,要是有个回调函数就爽了(等做完了,子线程主动告诉主线程)。所以,为了满足Future的某些遗憾,强大的CompletableFuture随着Java8一起来了。
  • 同时,CompletableFuture 也是结合了 java8 的流特性

CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者结合,从而打造出了强大的 CompletableFuture 类。

12.2 Future 与 CompletableFuture

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:

  1. 不支持手动完成
    • 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
  2. 不支持进一步的非阻塞调用
    • 通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能
  3. 不支持链式调用
    • 对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。
  4. 不支持多个 Future 合并
    • 比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的。
  5. 不支持异常处理
    • Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。

      12.3 CompletableFuture 入门

      12.3.1 使用 CompletableFuture

      场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止。 ```java /**
  • 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止
  • @param args */ public static void main(String[] args) throws Exception{ CompletableFuture future = new CompletableFuture<>(); new Thread(() -> {
    1. try{
    2. System.out.println(Thread.currentThread().getName() + "子线程开始干活");
    3. //子线程睡 5 秒
    4. Thread.sleep(5000);
    5. //在子线程中完成主线程
    6. future.complete("success");
    7. }catch (Exception e){
    8. e.printStackTrace();
    9. }
    }, “A”).start(); //主线程调用 get 方法阻塞 System.out.println(“主线程调用 get 方法获取结果为: “ + future.get()); System.out.println(“主线程完成,阻塞结束!!!!!!”); } ```

    12.3.2 没有返回值的异步任务

    ```java /**
  • 没有返回值的异步任务
  • @param args */ public static void main(String[] args) throws Exception{ System.out.println(“主线程开始”); //运行一个没有返回值的异步任务 CompletableFuture future = CompletableFuture.runAsync(() -> {
    1. try {
    2. System.out.println("子线程启动干活");
    3. Thread.sleep(5000);
    4. System.out.println("子线程完成");
    5. } catch (Exception e) {
    6. e.printStackTrace();
    7. }
    }); //主线程阻塞 future.get(); System.out.println(“主线程结束”); } ```

    12.3.3 有返回值的异步任务

    ```java /**
  • 没有返回值的异步任务
  • @param args */ public static void main(String[] args) throws Exception{ System.out.println(“主线程开始”); //运行一个有返回值的异步任务 CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    1. try {
    2. System.out.println("子线程开始任务");
    3. Thread.sleep(5000);
    4. } catch (Exception e) {
    5. e.printStackTrace();
    6. }
    7. return "子线程完成了!";
    }); //主线程阻塞 String s = future.get(); System.out.println(“主线程结束, 子线程的结果为:” + s); } ```

    12.3.4 线程依赖

    当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。 ```java private static Integer num = 10; /**
  • 先对一个数加 10,然后取平方
  • @param args */ public static void main(String[] args) throws Exception{ System.out.println(“主线程开始”); CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    1. try {
    2. System.out.println("加 10 任务开始");
    3. num += 10;
    4. } catch (Exception e) {
    5. e.printStackTrace();
    6. }
    7. return num;
    }).thenApply(integer -> {
    1. return num * num;
    }); Integer integer = future.get(); System.out.println(“主线程结束, 子线程的结果为:” + integer); }
    1. <a name="a185157a"></a>
    2. ### 12.3.5 消费处理结果
    3. thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
    4. ```java
    5. public static void main(String[] args) throws Exception{
    6. System.out.println("主线程开始");
    7. CompletableFuture.supplyAsync(() -> {
    8. try {
    9. System.out.println("加 10 任务开始");
    10. num += 10;
    11. } catch (Exception e) {
    12. e.printStackTrace();
    13. }
    14. return num;
    15. }).thenApply(integer -> {
    16. return num * num;
    17. }).thenAccept(new Consumer<Integer>() {
    18. @Override
    19. public void accept(Integer integer) {
    20. System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" + integer);
    21. }
    22. });
    23. }

    12.3.6 异常处理

    exceptionally 异常处理,出现异常时触发
    1. public static void main(String[] args) throws Exception{
    2. System.out.println("主线程开始");
    3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    4. int i= 1/0;
    5. System.out.println("加 10 任务开始");
    6. num += 10;
    7. return num;
    8. }).exceptionally(ex -> {
    9. System.out.println(ex.getMessage());
    10. return -1;
    11. });
    12. System.out.println(future.get());
    13. }
    handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常
    1. public static void main(String[] args) throws Exception{
    2. System.out.println("主线程开始");
    3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    4. System.out.println("加 10 任务开始");
    5. num += 10;
    6. return num;
    7. }).handle((i,ex) ->{
    8. System.out.println("进入 handle 方法");
    9. if(ex != null){
    10. System.out.println("发生了异常,内容为:" + ex.getMessage());
    11. return -1;
    12. }else{
    13. System.out.println("正常完成,内容为: " + i);
    14. return i;
    15. }
    16. });
    17. System.out.println(future.get());
    18. }

    12.3.7 结果合并

    thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果
    1. public static void main(String[] args) throws Exception{
    2. System.out.println("主线程开始");
    3. //第一步加 10
    4. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    5. System.out.println("加 10 任务开始");
    6. num += 10;
    7. return num;
    8. });
    9. //合并
    10. CompletableFuture<Integer> future1 = future.thenCompose(i ->
    11. //再来一个 CompletableFuture
    12. CompletableFuture.supplyAsync(() -> {
    13. return i + 1;
    14. })
    15. );
    16. System.out.println(future.get());
    17. System.out.println(future1.get());
    18. }
    thenCombine 合并两个没有依赖关系的 CompletableFutures 任务
    1. public static void main(String[] args) throws Exception{
    2. System.out.println("主线程开始");
    3. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
    4. System.out.println("加 10 任务开始");
    5. num += 10;
    6. return num;
    7. });
    8. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
    9. System.out.println("乘以 10 任务开始");
    10. num = num * 10;
    11. return num;
    12. });
    13. //合并两个结果
    14. CompletableFuture<Object> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
    15. @Override
    16. public List<Integer> apply(Integer a, Integer b) {
    17. List<Integer> list = new ArrayList<>();
    18. list.add(a);
    19. list.add(b);
    20. return list;
    21. }
    22. });
    23. System.out.println("合并结果为:" + future.get());
    24. }
    合并多个任务的结果 allOf 与 anyOf
  • allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情 ```java /**
  • 先对一个数加 10,然后取平方
  • @param args */ public static void main(String[] args) throws Exception{ System.out.println(“主线程开始”); List list = new ArrayList<>(); CompletableFuture job1 = CompletableFuture.supplyAsync(() -> {
    1. System.out.println("加 10 任务开始");
    2. num += 10;
    3. return num;
    }); list.add(job1); CompletableFuture job2 = CompletableFuture.supplyAsync(() -> {
    1. System.out.println("乘以 10 任务开始");num = num * 10;
    2. return num;
    }); list.add(job2); CompletableFuture job3 = CompletableFuture.supplyAsync(() -> {
    1. System.out.println("减以 10 任务开始");
    2. num = num * 10;
    3. return num;
    }); list.add(job3); CompletableFuture job4 = CompletableFuture.supplyAsync(() -> {
    1. System.out.println("除以 10 任务开始");
    2. num = num * 10;
    3. return num;
    }); list.add(job4); //多任务合并 List collect = list.stream().map(CompletableFuture::join).collect(Collectors.toList()); System.out.println(collect); } ```
  • anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束 ```java /**
  • 先对一个数加 10,然后取平方
  • @param args */ public static void main(String[] args) throws Exception{ System.out.println(“主线程开始”); CompletableFuture[] futures = new CompletableFuture[4]; CompletableFuture job1 = CompletableFuture.supplyAsync(() -> {
    1. try{
    2. Thread.sleep(5000);
    3. System.out.println("加 10 任务开始");num += 10;
    4. return num;
    5. }catch (Exception e){
    6. return 0;
    7. }
    }); futures[0] = job1; CompletableFuture job2 = CompletableFuture.supplyAsync(() -> {
    1. try{
    2. Thread.sleep(2000);
    3. System.out.println("乘以 10 任务开始");
    4. num = num * 10;
    5. return num;
    6. }catch (Exception e){
    7. return 1;
    8. }
    }); futures[1] = job2; CompletableFuture job3 = CompletableFuture.supplyAsync(() -> {
    1. try{
    2. Thread.sleep(3000);
    3. System.out.println("减以 10 任务开始");
    4. num = num * 10;
    5. return num;
    6. }catch (Exception e){
    7. return 2;
    8. }
    }); futures[2] = job3; CompletableFuture job4 = CompletableFuture.supplyAsync(() -> {
    1. try{
    2. Thread.sleep(4000);
    3. System.out.println("除以 10 任务开始");num = num * 10;
    4. return num;
    5. }catch (Exception e){
    6. return 3;
    7. }
    }); futures[3] = job4; CompletableFuture future = CompletableFuture.anyOf(futures); System.out.println(future.get()); } ```