CompletableFuture 的核心优势

为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。
image.png

下面是代码实现,你先略过 runAsync()、supplyAsync()、thenCombine() 这些不太熟悉的方法,从大局上看,你会发现:

  1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
  2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
  1. // 任务 1:洗水壶 -> 烧开水
  2. CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
  3. System.out.println("T1: 洗水壶...");
  4. sleep(1, TimeUnit.SECONDS);
  5. System.out.println("T1: 烧开水...");
  6. sleep(15, TimeUnit.SECONDS);
  7. });
  8. // 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
  9. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
  10. System.out.println("T2: 洗茶壶...");
  11. sleep(1, TimeUnit.SECONDS);
  12. System.out.println("T2: 洗茶杯...");
  13. sleep(2, TimeUnit.SECONDS);
  14. System.out.println("T2: 拿茶叶...");
  15. sleep(1, TimeUnit.SECONDS);
  16. return " 龙井 ";
  17. });
  18. // 任务 3:任务 1 和任务 2 完成后执行:泡茶
  19. CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{
  20. System.out.println("T1: 拿到茶叶:" + tf);
  21. System.out.println("T1: 泡茶...");
  22. return " 上茶:" + tf;
  23. });
  24. // 等待任务 3 执行结果
  25. System.out.println(f3.join());
  26. void sleep(int t, TimeUnit u) {
  27. try {
  28. u.sleep(t);
  29. }catch(InterruptedException e){}
  30. }
  31. // 一次执行结果:
  32. T1: 洗水壶...
  33. T2: 洗茶壶...
  34. T1: 烧开水...
  35. T2: 洗茶杯...
  36. T2: 拿茶叶...
  37. T1: 拿到茶叶: 龙井
  38. T1: 泡茶...
  39. 上茶: 龙井

创建 CompletableFuture 对象

  1. // 使用默认线程池
  2. static CompletableFuture<Void> runAsync(Runnable runnable)
  3. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. // 可以指定线程池
  5. static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  6. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  1. 它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。
  2. 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢?

如何理解 CompletionStage 接口

我觉得,你可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。

image.png

1. 描述串行关系

  1. CompletionStage<R> thenApply(fn);
  2. CompletionStage<R> thenApplyAsync(fn);
  3. CompletionStage<Void> thenAccept(consumer);
  4. CompletionStage<Void> thenAcceptAsync(consumer);
  5. CompletionStage<Void> thenRun(action);
  6. CompletionStage<Void> thenRunAsync(action);
  7. CompletionStage<R> thenCompose(fn);
  8. CompletionStage<R> thenComposeAsync(fn);

2. 描述并行关系

  1. CompletionStage<R> thenCombine(other, fn);
  2. CompletionStage<R> thenCombineAsync(other, fn);
  3. CompletionStage<Void> thenAcceptBoth(other, consumer);
  4. CompletionStage<Void> thenAcceptBothAsync(other, consumer);
  5. CompletionStage<Void> runAfterBoth(other, action);
  6. CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述or关系

  1. CompletionStage applyToEither(other, fn);
  2. CompletionStage applyToEitherAsync(other, fn);
  3. CompletionStage acceptEither(other, consumer);
  4. CompletionStage acceptEitherAsync(other, consumer);
  5. CompletionStage runAfterEither(other, action);
  6. CompletionStage runAfterEitherAsync(other, action);

image.png

4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

  1. CompletableFuture<Integer>
  2. f0 = CompletableFuture.
  3. .supplyAsync(()->(7/0))
  4. .thenApply(r->r*10);
  5. System.out.println(f0.join());

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

  1. CompletionStage exceptionally(fn);
  2. CompletionStage<R> whenComplete(consumer);
  3. CompletionStage<R> whenCompleteAsync(consumer);
  4. CompletionStage<R> handle(fn);
  5. CompletionStage<R> handleAsync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

  1. CompletableFuture<Integer>
  2. f0 = CompletableFuture
  3. .supplyAsync(()->7/0))
  4. .thenApply(r->r*10)
  5. .exceptionally(e->0);
  6. System.out.println(f0.join());