CompletableFuture是 java8 中新增的一个类,算是对 Future 的一种增强,用起来很方便,也是会经常用到的一个工具类,熟悉一下。

目录

  • CompletionStage 接口
  • CompletableFuture 类
  • CompletableFuture 常见的方法
    • runAsync 和 supplyAsync 方法
    • 计算结果完成时的回调方法
    • thenApply 方法
    • handle 方法
    • thenAccept 消费处理结果
    • thenRun 方法
    • thenCombine 合并任务
    • thenAcceptBoth
    • applyToEither 方法
    • acceptEither 方法
    • runAfterEither 方法
    • runAfterBoth
    • thenCompose 方法
  • 最新资料

    CompletionStage 接口

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

    CompletableFuture 类

  • 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

  • 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了 Future 和 CompletionStage 接口

    CompletableFuture 常见的方法

    runAsync 和 supplyAsync 方法

    CompletableFuture 提供了四个静态方法来创建一个异步操作。
    public static CompletableFuture runAsync(Runnable runnable)
    public static CompletableFuture runAsync(Runnable runnable, Executor executor)
    public static CompletableFuture supplyAsync(Supplier supplier)
    public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync 方法不支持返回值。
  • supplyAsync 可以支持返回值。

示例代码
//无返回值
public static void runAsync() throws Exception {
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println(“run end …”);
});

  1. future.get();<br />}

//有返回值
public static void supplyAsync() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println(“run end …”);
return System.currentTimeMillis();
});

  1. **long** time = future.get();<br /> System.out.println("time = "+time);<br />}

计算结果完成时的回调方法

当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function fn)

可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

示例代码
public static void whenComplete() throws Exception {
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(new Random().nextInt()%2>=0) {
int i = 12/0;
}
System.out.println(“run end …”);
});

  1. future.whenComplete(**new** BiConsumer<Void, Throwable>() {<br /> @Override<br /> **public** **void** **accept**(Void t, Throwable action) {<br /> System.out.println("执行完成!");<br /> }
  2. });<br /> future.exceptionally(**new** Function<Throwable, Void>() {<br /> @Override<br /> **public** Void **apply**(Throwable t) {<br /> System.out.println("执行失败!"+t.getMessage());<br /> **return** **null**;<br /> }<br /> });
  3. TimeUnit.SECONDS.sleep(2);<br />}

thenApply 方法

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

Function<? super T,? extends U> T:上一个任务返回结果的类型 U:当前任务的返回值类型
示例代码
private static void thenApply() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Long get() {
long result = new Random().nextInt(100);
System.out.println(“result1=”+result);
return result;
}
}).thenApply(new Function() {
@Override
public Long apply(Long t) {
long result = t5;
System.out.println(“result2=”+result);
*return
result;
}
});

  1. **long** result = future.get();<br /> System.out.println(result);<br />}

第二个任务依赖第一个任务的结果。

handle 方法

handle 是执行任务完成时对结果的处理。handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

示例代码
public static void handle() throws Exception{
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {

  1. @Override<br /> **public** Integer **get**() {<br /> **int** i= 10/0;<br /> **return** **new** Random().nextInt(10);<br /> }<br /> }).handle(**new** BiFunction<Integer, Throwable, Integer>() {<br /> @Override<br /> **public** Integer **apply**(Integer param, Throwable throwable) {<br /> **int** result = -1;<br /> **if**(throwable==**null**){<br /> result = param * 2;<br /> }**else**{<br /> System.out.println(throwable.getMessage());<br /> }<br /> **return** result;<br /> }<br /> });<br /> System.out.println(future.get());<br />}

从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。

thenAccept 消费处理结果

接收任务的处理结果,并消费处理,无返回结果。
public CompletionStage thenAccept(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);

示例代码
public static void thenAccept() throws Exception{
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
return new Random().nextInt(10);
}
}).thenAccept(integer -> {
System.out.println(integer);
});
future.get();
}

从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。

thenRun 方法

跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);

示例代码
public static void thenRun() throws Exception{
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
return new Random().nextInt(10);
}
}).thenRun(() -> {
System.out.println(“thenRun …”);
});
future.get();
}

该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理完任务后,执行 thenAccept 的后续操作。

thenCombine 合并任务

thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public CompletionStage thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

示例代码
private static void thenCombine() throws Exception {
CompletableFuture future1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
return “hello”;
}
});
CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
return “hello”;
}
});
CompletableFuture result = future1.thenCombine(future2, new BiFunction() {
@Override
public String apply(String t, String u) {
return t+” “+u;
}
});
System.out.println(result.get());
}

thenAcceptBoth

当两个 CompletionStage 都执行完成后,把结果一块交给 thenAcceptBoth 来进行消耗
public CompletionStage thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

示例代码
private static void thenAcceptBoth() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“f1=”+t);
return t;
}
});

  1. CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.thenAcceptBoth(f2, **new** BiConsumer<Integer, Integer>() {<br /> @Override<br /> **public** **void** **accept**(Integer t, Integer u) {<br /> System.out.println("f1="+t+";f2="+u+";");<br /> }<br /> });<br />}

applyToEither 方法

两个 CompletionStage,谁执行返回的结果快,我就用那个 CompletionStage 的结果进行下一步的转化操作。
public CompletionStage applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

示例代码
private static void applyToEither() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“f1=”+t);
return t;
}
});
CompletableFuture f2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“f2=”+t);
return t;
}
});

  1. CompletableFuture<Integer> result = f1.applyToEither(f2, **new** Function<Integer, Integer>() {<br /> @Override<br /> **public** Integer **apply**(Integer t) {<br /> System.out.println(t);<br /> **return** t * 2;<br /> }<br /> });
  2. System.out.println(result.get());<br />}

acceptEither 方法

两个 CompletionStage,谁执行返回的结果快,我就用那个 CompletionStage 的结果进行下一步的消耗操作。
public CompletionStage acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

示例代码
private static void acceptEither() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“f1=”+t);
return t;
}
});

  1. CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.acceptEither(f2, **new** Consumer<Integer>() {<br /> @Override<br /> **public** **void** **accept**(Integer t) {<br /> System.out.println(t);<br /> }<br /> });<br />}

runAfterEither 方法

两个 CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
public CompletionStage runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例代码
private static void runAfterEither() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“f1=”+t);
return t;
}
});

  1. CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.runAfterEither(f2, **new** Runnable() {
  2. @Override<br /> **public** **void** **run**() {<br /> System.out.println("上面有一个已经完成了。");<br /> }<br /> });<br />}

runAfterBoth

两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
public CompletionStage runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例代码
private static void runAfterBoth() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“f1=”+t);
return t;
}
});

  1. CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(**new** Supplier<Integer>() {<br /> @Override<br /> **public** Integer **get**() {<br /> **int** t = **new** Random().nextInt(3);<br /> **try** {<br /> TimeUnit.SECONDS.sleep(t);<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> System.out.println("f2="+t);<br /> **return** t;<br /> }<br /> });<br /> f1.runAfterBoth(f2, **new** Runnable() {
  2. @Override<br /> **public** **void** **run**() {<br /> System.out.println("上面两个任务都执行完成了。");<br /> }<br /> });<br />}

thenCompose 方法

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
public CompletableFuture thenCompose(Function<? super T, ? extends CompletionStage> fn);
public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn) ;
public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor) ;

示例代码
private static void thenCompose() throws Exception {
CompletableFuture f = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
System.out.println(“t1=”+t);
return t;
}
}).thenCompose(new Function>() {
@Override
public CompletionStage apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = param 2;
System.out.println(“t2=”+t);
*return
t;
}
});
}

  1. });<br /> System.out.println("thenCompose result : "+f.get());<br /> }