在 JDK 5 中,使用 Future 获取异步任务的结果时非常不方便,只能通过 get() 阻塞获取或轮询 isDone() 方法来获取结果,因此 JDK 8 中提供了 CompletableFuture 工具类,可以通过异步回调的方式来获取任务结果。
创建 CompletableFuture
创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
其中 runAsync 和 supplyAsync 这两个方法的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。其对应的重载方法可以指定线程池参数,如果没有显示指定 Executor 参数,则默认使用 ForkJoinPool.commonPool() 线程池,这个线程池默认创建的线程数是 CPU 的核数,并且所有线程都是 Daemon 线程,意味着如果主线程退出,这些线程无论是否执行完毕都会退出系统。
如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。因此要根据不同的业务类型创建不同的线程池,避免互相干扰。
创建完 CompletableFuture 对象后,会自动地异步执行 Runnable.run() 方法或 Supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,我们该如何理解这个接口呢?
CompletionStage
CompletionStage 接口代表了一个特定的计算阶段,可以同步或异步的被完成。我们可以把它看成一个计算流水线上的一个工作单元。工作单元之间是有时序关系的,比如串行、并行、AND、OR 等,CompletionStage 接口可以清晰地描述这种时序关系,并且支持链式调用,这意味着你可以把几个 CompletionStage 串起来,一个完成的阶段可以触发下一阶段的执行,直到获取一个最终结果。
官方定义中,一个 Function,Comsumer 或 Runnable 都会被描述为一个 CompletionStage,相关方法比如有 apply,accept,run 等,这些方法的区别在于它们有些是需要传入参,有些则会产生“结果”。
1. 描述串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
thenApply 系列函数里参数 fn 的类型是接口 Function,Function 接口与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值。
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World");
System.out.println(f1.join());
}
thenApply 这个方法名中的 “then” 意味着这个阶段的动作发生在当前阶段正常完成之后,而 “Apply” 意味着返回的阶段将会对前一阶段的结果应用一个函数。
thenAccept 系列方法里参数 action 的类型是接口 Consumer,Consumer 接口里 CompletionStage 相关的方法是 void accept(T t),这个方法只支持参数,不支持返回值。
public static void main(String[] args) {
StringBuilder sb = new StringBuilder();
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (Exception e) {
//
}
return "Hello World";
}).thenAccept(sb::append).join(); // 对上一阶段返回的结果进行操作,join用于等待异步任务返回结果
System.out.println(sb.toString());
}
thenRun 系列方法里参数 action 的类型是 Runnable,所以 action 既不能接收参数也不支持返回值。需要注意的是 thenCompose 系列方法,该系列方法会将结果进一步传递给后续新创建出的一个子流程。
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(cf -> CompletableFuture.supplyAsync(() -> " World")
.thenApply(s -> cf + s));
System.out.println(f1.join());
}
2. 描述 AND 汇聚关系
CompletionStage 接口里面描述 AND 汇聚关系的方法,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些系列接口的区别也是源自 fn、consumer、action 这三个核心参数不同。注意这里的 A 和 B 是并行执行的。
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
thenCombine 方法首先完成当前 CompletableFuture 和 other 的执行,注意这两者是并行执行的。接着,将这两者的执行结果传递给 BiFunction 接口,该接口接收两个参数,并有一个返回值。代码示例如下:
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> " World");
// 接收两个阶段的结果并进行操作,有返回值
CompletableFuture<String> f3 = f1.thenCombine(f2, (a, b) -> a + b);
}
thenAcceptBoth 代码示例:
public static void main(String[] args) {
StringBuilder sb = new StringBuilder();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> " World");
// 接收两个阶段的结果并进行操作,无返回值
f1.thenAcceptBoth(f2, (a, b) -> sb.append(a).append(b)).join();
System.out.println(sb.toString());
}
runAfterBoth 代码示例:
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> " World");
// 当两个阶段都执行完成后,触发该操作
f1.runAfterBoth(f2, () -> System.out.println("GG")).join();
}
3. 描述 OR 汇聚关系
CompletionStage 接口里面描述 OR 汇聚关系的方法,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。注意这里的 A 和 B 谁先执行完就执行 C,后执行完的则不会再执行 C。
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
applyToEither 代码示例:
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(500);
return "Hello";
}).thenApplyAsync(s -> {
Thread.sleep(2000);
return s + " World";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Hello";
}).thenApplyAsync(s -> {
Thread.sleep(2000);
return s + " Java";
});
// f1执行任务花费2.5s,f2执行任务花费3s,所以这里会先接收到f1的结果,因此输出 Hello World!
CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s + "!");
System.out.println(f3.join());
}
acceptEither 代码示例:
public static void main(String[] args) {
StringBuilder sb = new StringBuilder();
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(500);
return "Hello";
}).thenApplyAsync(s -> {
Thread.sleep(2000);
return s + " World";
});
CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Hello";
}).thenApplyAsync(s -> {
Thread.sleep(2000);
return s + " Java";
}).acceptEither(f1, s -> sb.append(s)).join();
// f1执行任务花费2.5s,f2执行任务花费3s,所以这里会先接收到f1的结果, 进行处理
System.out.println(sb.toString());
}
runAfterEither 代码示例:
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(500);
return "Hello";
}).thenApplyAsync(s -> {
Thread.sleep(2000);
return s + " World";
});
CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Hello";
}).thenApplyAsync(s -> {
Thread.sleep(2000);
return s + " Java";
});
// f1执行任务花费2.5s,f2执行任务花费3s,所以这里会先接收到f1的结果,然后触发操作
f1.runAfterEither(f2, () -> System.out.println("GG")).join();
}
4. 异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制在执行任务时抛出运行时异常。非异步编程里面,我们可以使用 try{}-catch{} 来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
CompletionStage 接口给我们提供了一些方法用于进行异常处理:
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn)
其中,exceptionally() 的作用类似于 try{}-catch{} 中的 catch{} 块,但是由于支持链式编程方式,所以相对更简单。而 whenComplete() 和 handle() 系列方法就类似于 try{}-finally{} 中的 finally{} 块,无论是否发生异常都会执行 whenComplete() 和 handle() 方法中的回调函数。两者的区别在于 whenComplete() 方法不支持返回结果而 handle() 方法支持返回结果。
exceptionally 代码示例:
public static void main(String[] args) {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
if (1 == 1) {
throw new RuntimeException("计算异常");
}
return "Hello";
}).exceptionally(t -> t.getMessage()); // 接收异常并输出异常信息
System.out.println(f1.join());
}