什么是 Future 接口

很多场景下,我们想去获得线程的运行结果,而通常使用 execute 去提交任务是无法获得结果的,这个时候我们常常会改用 submiet 去提交,以便获得线程运行的结果。而 submit 返回的就是 Future,使用 future.get() 去获取线程执行结果,包括如果出现异常,也会随 get 抛出。

Future 接口的缺陷

当我们使用future.get()方法去取得线程执行结果时,要知道get方法是阻塞的,也就是说为了拿到结果,当主线程执行到get()方法,当前线程会去等待异步任务执行完成,换言之,异步的效果在我们使用get()拿结果时,会变得无效。示例如下

  1. public static void main(String[] args) throws Exception{
  2. ExecutorService executorService = Executors.newSingleThreadExecutor();
  3. Future future = executorService.submit(()->{
  4. try {
  5. Thread.sleep(3000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. System.out.println("异步任务执行了");
  10. });
  11. future.get();
  12. System.out.println("主线任务执行了");
  13. }

打印结果是:异步任务执行了过后主线任务才执行。 就是因为get()在一直等待。那么如何解决我想要拿到结果,可以对结果进行处理,又不想被阻塞呢?

CompletableFuture 使一切变得可能

JDK1.8才新加入的一个实现类CompletableFuture,实现了Future, CompletionStage两个接口。

实际开发中,我们常常面对如下的几种场景:

  • 针对Future的完成事件,不想简单的阻塞等待,在这段时间内,我们希望可以正常继续往下执行,所以在它完成时,我们可以收到回调即可。
  • 面对Future集合来讲,这其中的每个Future结果其实很难去描述它们之间的依赖关系,而往往我们希望等待所有的Future集合都完成,然后做一些事情。
  • 在异步计算中,两个计算任务相互独立,但是任务二又依赖于任务一的结果。

如上的几种场景,单靠Future是解决不了的,而CompletableFuture则可以帮我们实现。

CompletableFuture 常见api 介绍

1. runAsync 和 supplyAsync

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync类似于execute方法,不支持返回值,而supplyAsync方法类似submit方法,支持返回值。也是我们的重点方法。 没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

  1. //无返回值
  2. CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
  3. System.out.println("runAsync无返回值");
  4. });
  5. future1.get();
  6. //有返回值
  7. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  8. System.out.println("supplyAsync有返回值");
  9. return "111";
  10. });
  11. String s = future2.get();

2、 异步任务执行完时的回调方法 whenComplete 和 exceptionally

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的任务

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  4. public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

这些方法都是上述创建的异步任务完成后 (也可能是抛出异常后结束) 所执行的方法。whenComplete和whenCompleteAsync方法的区别在于:前者是由上面的线程继续执行,而后者是将whenCompleteAsync的任务继续交给线程池去做决定。exceptionally则是上面的任务执行抛出异常后,所要执行的方法。

  1. CompletableFuture.supplyAsync(()->{
  2. int a = 10/0;
  3. return 1;
  4. }).whenComplete((r, e)->{
  5. System.out.println(r);
  6. }).exceptionally(e->{
  7. System.out.println(e);
  8. return 2;
  9. });

值得注意的是:哪怕supplyAsync抛出了异常,whenComplete也会执行,意思就是,只要supplyAsync执行结束,它就会执行,不管是不是正常执行完。exceptionally只有在异常的时候才会执行。其实,在whenComplete的参数内 e就代表异常了,判断它是否为null,就可以判断是否有异常,只不过这样的做法,我们不提倡。whenComplete和exceptionally这两个,谁在前,谁先执行。 此类的回调方法,哪怕主线程已经执行结束,已经跳出外围的方法体,然后回调方法依然可以继续等待异步任务执行完成再触发,丝毫不受外部影响。


3. thenApply 和 handle 方法

如果两个任务之间有依赖关系,比如B任务依赖于A任务的执行结果,那么就可以使用这两个方法

  1. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  3. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  4. public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
  5. public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
  6. public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

这两个方法,效果是一样的,区别在于,当A任务执行出现异常时,thenApply方法不会执行,而handle 方法一样会去执行,因为在handle方法里,我们可以处理异常,而前者不行。

  1. CompletableFuture.supplyAsync(()->{
  2. return 5;
  3. }).thenApply((r)->{
  4. r = r + 1;
  5. return r;
  6. });
  7. //出现了异常,handle方法可以拿到异常 e
  8. CompletableFuture.supplyAsync(()->{
  9. int i = 10/0;
  10. return 5;
  11. }).handle((r, e)->{
  12. System.out.println(e);
  13. r = r + 1;
  14. return r;
  15. });

这里延伸两个方法 thenAccept 和 thenRun。其实 和上面两个方法差不多,都是等待前面一个任务执行完 再执行。区别就在于thenAccept接收前面任务的结果,且无需return。而thenRun只要前面的任务执行完成,它就执行,不关心前面的执行结果如何 如果前面的任务抛了异常,非正常结束,这两个方法是不会执行的,所以处理不了异常情况

4. 合并操作方法 thenCombine 和 thenAcceptBoth

我们常常需要合并两个任务的结果,在对其进行统一处理,简言之,这里的回调任务需要等待两个任务都完成后再会触发。

  1. public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
  2. public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
  3. public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
  4. public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
  5. public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
  6. public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

这两者的区别 在于 前者是有返回值的,后者没有(就是个消耗工作)

  1. private static void thenCombine() throws Exception {
  2. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
  3. try {
  4. Thread.sleep(4000);
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. return "future1";
  9. });
  10. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
  11. return "future2";
  12. });
  13. CompletableFuture<String> result = future1.thenCombine(future2, (r1, r2)->{
  14. return r1 + r2;
  15. });
  16. //这里的get是阻塞的,需要等上面两个任务都完成
  17. System.out.println(result.get());
  18. }
  1. private static void thenAcceptBoth() throws Exception {
  2. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
  3. try {
  4. Thread.sleep(4000);
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. return "future1";
  9. });
  10. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
  11. return "future2";
  12. });
  13. //值得注意的是,这里是不阻塞的
  14. future1.thenAcceptBoth(future2, (r1, r2)->{
  15. System.out.println(r1 + r2);
  16. });
  17. System.out.println("继续往下执行");
  18. }

这两个方法 都不会形成阻塞。就是个回调方法。只有get()才会阻塞。

5. allOf (重点,个人觉得用的场景很多)

很多时候,不止存在两个异步任务,可能有几十上百个。我们需要等这些任务都完成后,再来执行相应的操作。那怎么集中监听所有任务执行结束与否呢? allOf方法可以帮我们完成。

  1. public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

它接收一个可变入参,既可以接收CompletableFuture单个对象,可以接收其数组对象。 结合例子说明其作用。

  1. public static void main(String[] args) throws Exception{
  2. long start = System.currentTimeMillis();
  3. CompletableFutureTest test = new CompletableFutureTest();
  4. // 结果集
  5. List<String> list = new ArrayList<>();
  6. List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  7. // 全流式处理转换成CompletableFuture[]
  8. CompletableFuture[] cfs = taskList.stream()
  9. .map(integer -> CompletableFuture.supplyAsync(() -> test.calc(integer))
  10. .thenApply(h->Integer.toString(h))
  11. .whenComplete((s, e) -> {
  12. System.out.println("任务"+s+"完成!result="+s+",异常 e="+e+","+new Date());
  13. list.add(s);
  14. })
  15. ).toArray(CompletableFuture[]::new);
  16. CompletableFuture.allOf(cfs).join();
  17. System.out.println("list="+list+",耗时="+(System.currentTimeMillis()-start));
  18. }
  19. public int calc(Integer i) {
  20. try {
  21. if (i == 1) {
  22. Thread.sleep(3000);//任务1耗时3秒
  23. } else if (i == 5) {
  24. Thread.sleep(5000);//任务5耗时5秒
  25. } else {
  26. Thread.sleep(1000);//其它任务耗时1秒
  27. }
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. return i;
  32. }

全流式写法,综合了以上的一些方法,使用allOf集中阻塞,等待所有任务执行完成,取得结果集list。 这里有些CountDownLatch的感觉。

CompletableFuture 总结

CompletableFuture构建异步应用 - 图1