在 JDK 5 中,使用 Future 获取异步任务的结果时非常不方便,只能通过 get() 阻塞获取或轮询 isDone() 方法来获取结果,因此 JDK 8 中提供了 CompletableFuture 工具类,可以通过异步回调的方式来获取任务结果。

创建 CompletableFuture

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法:

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

其中 runAsync 和 supplyAsync 这两个方法的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。其对应的重载方法可以指定线程池参数,如果没有显示指定 Executor 参数,则默认使用 ForkJoinPool.commonPool() 线程池,这个线程池默认创建的线程数是 CPU 的核数,并且所有线程都是 Daemon 线程,意味着如果主线程退出,这些线程无论是否执行完毕都会退出系统。

如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。因此要根据不同的业务类型创建不同的线程池,避免互相干扰。

创建完 CompletableFuture 对象后,会自动地异步执行 Runnable.run() 方法或 Supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,我们该如何理解这个接口呢?

CompletionStage

CompletionStage 接口代表了一个特定的计算阶段,可以同步或异步的被完成。我们可以把它看成一个计算流水线上的一个工作单元。工作单元之间是有时序关系的,比如串行、并行、AND、OR 等,CompletionStage 接口可以清晰地描述这种时序关系,并且支持链式调用,这意味着你可以把几个 CompletionStage 串起来,一个完成的阶段可以触发下一阶段的执行,直到获取一个最终结果。

官方定义中,一个 Function,Comsumer 或 Runnable 都会被描述为一个 CompletionStage,相关方法比如有 apply,accept,run 等,这些方法的区别在于它们有些是需要传入参,有些则会产生“结果”。

1. 描述串行关系

image.png
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

  1. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  3. public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
  4. public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  5. public CompletableFuture<Void> thenRun(Runnable action)
  6. public CompletableFuture<Void> thenRunAsync(Runnable action)
  7. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
  8. public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)

thenApply 系列函数里参数 fn 的类型是接口 Function,Function 接口与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值。

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture
  3. .supplyAsync(() -> "Hello")
  4. .thenApply(s -> s + " World");
  5. System.out.println(f1.join());
  6. }

thenApply 这个方法名中的 “then” 意味着这个阶段的动作发生在当前阶段正常完成之后,而 “Apply” 意味着返回的阶段将会对前一阶段的结果应用一个函数。

thenAccept 系列方法里参数 action 的类型是接口 Consumer,Consumer 接口里 CompletionStage 相关的方法是 void accept(T t),这个方法只支持参数,不支持返回值。

  1. public static void main(String[] args) {
  2. StringBuilder sb = new StringBuilder();
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. Thread.sleep(5000);
  6. } catch (Exception e) {
  7. //
  8. }
  9. return "Hello World";
  10. }).thenAccept(sb::append).join(); // 对上一阶段返回的结果进行操作,join用于等待异步任务返回结果
  11. System.out.println(sb.toString());
  12. }

thenRun 系列方法里参数 action 的类型是 Runnable,所以 action 既不能接收参数也不支持返回值。需要注意的是 thenCompose 系列方法,该系列方法会将结果进一步传递给后续新创建出的一个子流程。

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture
  3. .supplyAsync(() -> "Hello")
  4. .thenCompose(cf -> CompletableFuture.supplyAsync(() -> " World")
  5. .thenApply(s -> cf + s));
  6. System.out.println(f1.join());
  7. }

2. 描述 AND 汇聚关系

image.png
CompletionStage 接口里面描述 AND 汇聚关系的方法,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些系列接口的区别也是源自 fn、consumer、action 这三个核心参数不同。注意这里的 A 和 B 是并行执行的。

  1. public <U,V> CompletableFuture<V> thenCombine(
  2. CompletionStage<? extends U> other,
  3. BiFunction<? super T,? super U,? extends V> fn)
  4. public <U> CompletableFuture<Void> thenAcceptBoth(
  5. CompletionStage<? extends U> other,
  6. BiConsumer<? super T, ? super U> action)
  7. public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

thenCombine 方法首先完成当前 CompletableFuture 和 other 的执行,注意这两者是并行执行的。接着,将这两者的执行结果传递给 BiFunction 接口,该接口接收两个参数,并有一个返回值。代码示例如下:

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
  3. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> " World");
  4. // 接收两个阶段的结果并进行操作,有返回值
  5. CompletableFuture<String> f3 = f1.thenCombine(f2, (a, b) -> a + b);
  6. }

thenAcceptBoth 代码示例:

  1. public static void main(String[] args) {
  2. StringBuilder sb = new StringBuilder();
  3. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
  4. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> " World");
  5. // 接收两个阶段的结果并进行操作,无返回值
  6. f1.thenAcceptBoth(f2, (a, b) -> sb.append(a).append(b)).join();
  7. System.out.println(sb.toString());
  8. }

runAfterBoth 代码示例:

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
  3. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> " World");
  4. // 当两个阶段都执行完成后,触发该操作
  5. f1.runAfterBoth(f2, () -> System.out.println("GG")).join();
  6. }

3. 描述 OR 汇聚关系

image.png
CompletionStage 接口里面描述 OR 汇聚关系的方法,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。注意这里的 A 和 B 谁先执行完就执行 C,后执行完的则不会再执行 C。

  1. public <U> CompletableFuture<U> applyToEither(
  2. CompletionStage<? extends T> other, Function<? super T, U> fn)
  3. public CompletableFuture<Void> acceptEither(
  4. CompletionStage<? extends T> other, Consumer<? super T> action)
  5. public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)

applyToEither 代码示例:

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
  3. Thread.sleep(500);
  4. return "Hello";
  5. }).thenApplyAsync(s -> {
  6. Thread.sleep(2000);
  7. return s + " World";
  8. });
  9. CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
  10. Thread.sleep(1000);
  11. return "Hello";
  12. }).thenApplyAsync(s -> {
  13. Thread.sleep(2000);
  14. return s + " Java";
  15. });
  16. // f1执行任务花费2.5s,f2执行任务花费3s,所以这里会先接收到f1的结果,因此输出 Hello World!
  17. CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s + "!");
  18. System.out.println(f3.join());
  19. }

acceptEither 代码示例:

  1. public static void main(String[] args) {
  2. StringBuilder sb = new StringBuilder();
  3. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
  4. Thread.sleep(500);
  5. return "Hello";
  6. }).thenApplyAsync(s -> {
  7. Thread.sleep(2000);
  8. return s + " World";
  9. });
  10. CompletableFuture.supplyAsync(() -> {
  11. Thread.sleep(1000);
  12. return "Hello";
  13. }).thenApplyAsync(s -> {
  14. Thread.sleep(2000);
  15. return s + " Java";
  16. }).acceptEither(f1, s -> sb.append(s)).join();
  17. // f1执行任务花费2.5s,f2执行任务花费3s,所以这里会先接收到f1的结果, 进行处理
  18. System.out.println(sb.toString());
  19. }

runAfterEither 代码示例:

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
  3. Thread.sleep(500);
  4. return "Hello";
  5. }).thenApplyAsync(s -> {
  6. Thread.sleep(2000);
  7. return s + " World";
  8. });
  9. CompletableFuture.supplyAsync(() -> {
  10. Thread.sleep(1000);
  11. return "Hello";
  12. }).thenApplyAsync(s -> {
  13. Thread.sleep(2000);
  14. return s + " Java";
  15. });
  16. // f1执行任务花费2.5s,f2执行任务花费3s,所以这里会先接收到f1的结果,然后触发操作
  17. f1.runAfterEither(f2, () -> System.out.println("GG")).join();
  18. }

4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制在执行任务时抛出运行时异常。非异步编程里面,我们可以使用 try{}-catch{} 来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

CompletionStage 接口给我们提供了一些方法用于进行异常处理:

  1. public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
  2. public CompletableFuture<T> whenComplete(
  3. BiConsumer<? super T, ? super Throwable> action)
  4. public CompletableFuture<T> whenCompleteAsync(
  5. BiConsumer<? super T, ? super Throwable> action)
  6. public <U> CompletableFuture<U> handle(
  7. BiFunction<? super T, Throwable, ? extends U> fn)
  8. public <U> CompletableFuture<U> handleAsync(
  9. BiFunction<? super T, Throwable, ? extends U> fn)

其中,exceptionally() 的作用类似于 try{}-catch{} 中的 catch{} 块,但是由于支持链式编程方式,所以相对更简单。而 whenComplete() 和 handle() 系列方法就类似于 try{}-finally{} 中的 finally{} 块,无论是否发生异常都会执行 whenComplete() 和 handle() 方法中的回调函数。两者的区别在于 whenComplete() 方法不支持返回结果而 handle() 方法支持返回结果。

exceptionally 代码示例:

  1. public static void main(String[] args) {
  2. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
  3. if (1 == 1) {
  4. throw new RuntimeException("计算异常");
  5. }
  6. return "Hello";
  7. }).exceptionally(t -> t.getMessage()); // 接收异常并输出异常信息
  8. System.out.println(f1.join());
  9. }