所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让主线程继续运行的方法,简单的讲就是在主线程之外另启一个线程来完成调用中的部分计算,使调用继续运行或返回,主线程不需要等待异步计算结果,且主线程可以获取异步调用的返回结果。
JDK5新增了Future接口,用于描述一个异步计算的结果,虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于异步任务结果的获取却是很不方便,在JDK8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性。本文主要介绍CompletableFuture API的使用。
1、创建异步任务
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码;如果指定线程池,则使用指定的线程池运行;
- runAsync方法不支持返回值,supplyAsync可以支持返回值,U就是返回值的类型。
举例:
public static void main(String[] args) {
CompletableFuture<Void> completedFuture = CompletableFuture.runAsync(() ->
{
System.out.println(Thread.currentThread().getName() + "开始调用异步任务");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "调用异步任务结束");
});
System.out.println(Thread.currentThread().getName() + "正在执行主线程任务");
try {
completedFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
结果:
main正在执行主线程任务
ForkJoinPool.commonPool-worker-9开始调用异步任务
ForkJoinPool.commonPool-worker-9调用异步任务
说明:
用线程休眠来模拟异步线程执行任务所花费的时间,执行程序时,第一句话和第二句话几乎同时打印出来,主线程任务是第一句打印,原因是异步任务起线程池需要时间消耗,过了2秒后第三句话也打印了出来,证明同一时刻有两个任务在运行:主线程任务和异步任务,互不干扰,在主线程任务中可以用completedFuture.get()获取异步任务的执行结果。
举例:
public static void main(String[] args) {
CompletableFuture<Integer> completedFuture = CompletableFuture.supplyAsync(() ->
{
System.out.println(Thread.currentThread().getName() + "开始调用异步任务");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "调用异步任务结束");
return 1111;
});
System.out.println(Thread.currentThread().getName() + "正在执行主线程任务");
try {
System.out.println("异步任务的执行结果:" + completedFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
2、获取结果
获取CompletableFuture的结果主要有两种方式:
- join:抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出;
- get:抛出的是受查异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)。
其中get方法有三种重载的形式,CompletableFuture提供的get和join接口如下:
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
- get()和get(long timeout, TimeUnit unit) 方法在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常;
- getNow立即获取结果不阻塞,如果计算已完成将返回结果或计算过程中的异常,如果计算未完成将返回设定的valueIfAbsent值;
- join方法不会抛出受查异常。
举例:
public class GetJoinDemo {
public static void main(String[] args) {
CompletableFuture<Integer> completedFuture = CompletableFuture.supplyAsync(() ->
{
int res = 10 / 2;
return res;
});
// join抛出的unchecked异常不需要代码中处理
int resJoin = completedFuture.join();
System.out.println(resJoin);
// get抛出的是受查异常,需要向上throws或者try-catch包围
int resGet = 0;
try {
resGet = completedFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(resGet);
}
}
3、计算结果完成后的处理方法
3.1 whenComplete
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action,主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
- whenComplete是执行当前任务的线程执行继续执行whenComplete的任务action;
- whenCompleteAsync是把whenCompleteAsync的任务action继续提交给线程池来执行,即执行CompletableFuture的任务和执行whenCompleteAsync的任务的线程可能是两个不同的线程;
- 前三个方法的BiConsumer接口类型入参action,要传入两个入参构造,第一个参数是supplyAsync/runAsync里返回的结果值,第二个参数是supplyAsync/runAsync时的异常。
举例:
public class whenCompleteDemo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() ->
{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int res = 10 / 2;
// int res = 10 / 0;
System.out.println(Thread.currentThread().getName() + ": 异步任务执行中,结果为:" + res);
return res;
}).whenComplete((result, ex) -> {
if (result != null) {
System.out.println("whenComplete: " + result);
}
if (ex != null) {
System.out.println("whenComplete: " + ex);
}
});
Integer res = null;
try {
System.out.println(Thread.currentThread().getName() + ": 主线程在做其他事情");
res = completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.print(Thread.currentThread().getName() + ": " + res);
}
}
当res = 10 / 2时,结果为:
main: 主线程在做其他事情
ForkJoinPool.commonPool-worker-1: 异步任务执行中,结果为:5
whenComplete: 5
main: 5
当res = 10 / 0时,结果为:
main: 主线程在做其他事情
whenComplete: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main: nulljava.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
...
注意:
- whenComplete方法返回的还是CompletableFuture类型的对象,因此可以使用链式写法。
3.2 thenApply
当线程B依赖于线程A的执行结果时,可以使用thenApply方法来把这两个线程串行化,thenApply方法如下:
thenApply方法接受一个Function类型的接口fn,参数T代表上一个CompletableFuture中的计算结果,参数U代表经过thenApply里函数式接口的方法处理后的计算结果。public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
举例: ```java package com.Jerry.CompletableFuture;
import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
public class ThenApplyDemo {
public static void main(String[] args) {
CompletableFuture
try {
String res = completableFuture.get();
System.out.println(Thread.currentThread().getName() + ": " + res + " " + res.getClass());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
结果:
ForkJoinPool.commonPool-worker-9 supplyAsync结果: 29 class java.lang.Integer ForkJoinPool.commonPool-worker-9 thenApply结果: 29 class java.lang.String main: 29 class java.lang.String
注意:
- thenApply方法返回的还是CompletableFuture类型的对象,因此可以使用链式写法;
- 上面例子里,将supplyAsync方法里的结果(Integer)作为thenApply方法里Function接口的apply方法的入参,apply方法返回String类型的结果,即thenApply方法返回String类型的结果,即链式写法的第一个completableFuture是CompletableFuture<String>类型而不是CompletableFuture<Integer>类型。
<a name="ol4sM"></a>
## 3.3 handle
handle方法和thenApply方法处理方式基本一样,不同的是handle里的方法是在supplyAsync/runAsync执行后一定会执行的,即使supplyAsync/runAsync里抛了异常也会执行handle里的方法,而thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。handle方法如下:
```java
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
举例:
package com.Jerry.CompletableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class HandleDemo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() ->
{
// int res = 10 / 0;
int res = 10 / 2;
return res;
}).handleAsync((res, ex) -> {
if (res != null) {
System.out.println(Thread.currentThread().getName() + " handle: " + res);
}
if (ex != null) {
System.out.println(Thread.currentThread().getName() + " handle: " + ex);
}
return res;
});
try {
int res = completableFuture.get();
System.out.println(Thread.currentThread().getName() + ": " + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
结果:
当不抛异常,即int res = 10 / 2时:
ForkJoinPool.commonPool-worker-9 handle: 5
main: 5
当抛异常,即int res = 10 / 0时:
ForkJoinPool.commonPool-worker-9 handle: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.lang.NullPointerException
at com.Jerry.CompletableFuture.HandleDemo.main(HandleDemo.java:25)
注意:
- handle方法返回的还是CompletableFuture类型的对象,因此可以使用链式写法;
- handle方法和handleAsync方法的区别在于前者是与supplyAsync/runAsync共用一个线程,后者是另起一个fork-join的线程;
上面例子里可以看到,即使supplyAsync/runAsync里抛了异常,handle里方法依然会执行,而thenApply则不会执行。
3.4 thenAccept
前面介绍的whenComplete、thenAccept和handle方法,处理完后都会返回一个值,而thenAccept方法同样也是对前面的supplyAsync/runAsync生成的结果进行消费,但是不同点在于thenAccept方法只是纯消费,不返回值,thenAccept方法如下:
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
从thenAccpet方法接受的函数式接口时Consumer可以知道,这个方法就是纯消费的,返回的值类型都是void的。
举例:public class ThenAcceptDemo { public static void main(String[] args) { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { Random random = new Random(); int res = random.nextInt(100); return res; }).thenAccept(System.out::println); try { completableFuture.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
小结:
当只是纯消费,没有返回类型时用thenAccept;
- 不仅消费,还返回completableFuture时用whenComplete和handle,区别在于whenComplete接收的BiConsumer接口参数,handle接收BiFunction接口参数,但好像也没啥区别…..
当下一个任务的执行依赖上一个任务的结果时用thenApply。
4、组合任务
4.1 thenCompose
thenCombine方法功能上与thenApply方法类似,也是线程B的计算依赖上一个线程A的计算结果,但是不同之处在于:
thenCompose()用来连接两个CompletableFuture,返回值是新的CompletableFuture;
- thenApply()转换的是泛型中的类型,是同一个CompletableFuture。
thenCompose方法如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
例子不想举了,用到的地方不多,远没有thenCombine方法广泛。
4.2 thenCombine
thenCombine会把两个CompletableFuture的任务都执行完成后,把两个任务的结果一块交给thenCombine来处理,并生成新的CompletableFuture任务。thenCombine如下:
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
说明:
- 第一个参数other是上一个CompletableFuture任务;
- 第二个参数fn有两个入参T、U组成,T代表上一个上一个CompletableFuture任务的计算结果,U代表当前CompletableFuture任务的计算结果。
举例:
public class ThenCombineDemo {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> result = future2.thenCombine(future1, (t, u) -> t + " " + u);
System.out.println(result.join());
}
}
说明:
t代表上一个任务future1的计算结果,u代表当前任务future2的计算结果,因为是future2调用的thenCombine方法,因此future2定义为当前任务,future1定义为上一个任务。
5、辅助方法
5.1 allOf
allOf方法的入参是若干个CompletableFuture任务,返回类型是CompletableFuture
,allOf方法是等所有的CompletableFuture都执行完后再执行计算,一般后面会跟链式的thenApply方法或者thenAccept方法对所有的异步任务进行汇总处理。
举例:public class AllAnyOfDemo { public static void main(String[] args) { CompletableFuture<String> completedFuture1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务1完成"); return "sleep1"; }); CompletableFuture<String> completedFuture3 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务3完成"); return "sleep3"; }); CompletableFuture<String> completedFuture5 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务5完成"); return "sleep5"; }); CompletableFuture<String> completableFutureResult = CompletableFuture.allOf( completedFuture1, completedFuture3, completedFuture5).thenApply(x -> { StringBuilder sb = new StringBuilder(); sb.append(completedFuture1.join()).append(completedFuture3.join()).append(completedFuture5.join()); return sb.toString(); } ); System.out.println("主线程等待异步运算结果..."); String res = completableFutureResult.join(); System.out.println("主线程打印异步计算结果:" + res); } }
结果:
上面代码起了3个CompletableFuture异步任务,每个任务返回一个字符串,不同的任务依次休眠1s、3s、5s,在allOf里将三个异步任务组合起来,在thenApply方法里将三个异步任务返回的字符串结果拼接起来并返回给completableFutureResult,并在主线程获取打印结果;
- 运行结果首先会打印“主线程等待异步运算结果…”,然后在1s、3s、5s的时间段后一次打印”异步任务1完成”、”异步任务3完成”、”异步任务5完成”,最后打印总的结果“主线程打印异步计算结果:sleep1sleep3sleep5”
分析: