创建异步对象方式
CompletableFuture提供了四个静态方法来创建一个异步操作
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都雷同。
- runAsync()不支持返回值。
- supplyAsync()可以支持返回值。
示例:
//无返回值
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
});
System.out.println(future.get());
}
//有返回值
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return System.currentTimeMillis();
});
long time = future.get();
System.out.println("time = "+time);
}
计算完成时回调方法
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。
CompletableFuture 有以下几个方法可以在计算完成时回调
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是把当前任务的线程执行完后,继续执行 whenComplete 的任务,是同一个线程
- whenCompleteAsync:是把当前任务的线程执行完后,把 whenCompleteAsync 这个任务继续提交给线程池来进行执行
- exceptionally:可以得到异常,同时返回默认值
示例:
// 方法完成后的感知
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
log.info("当前线程" + Thread.currentThread().getId());
int i = 10 / 0;
log.info("运行结果" + i);
return i;
}, exector).whenComplete((result, exception) -> {
// 可以得到异常信息,没有返回值
log.info("异步任务完成了....结果是:" + result + ", 异常是:" + exception);
}).exceptionally((throwable)->{
// 可以得到异常,同时返回默认值
return 10;
});
log.info("supplyAsync=" + supplyAsync.get());
Handler()方法
handle()方法是执行任务完成时对结果的处理,handle() 方法和 thenApply()方法处理方式基本一样。不同的是 handle() 是在任务完成后再执行,还可以处理异常的任务。thenApply()只可以执行正常的任务,任务出现异常则不执行 thenApply方法
示例:
// 方法执行完成后的处理
CompletableFuture<Integer> supplyAsyncHandler = CompletableFuture.supplyAsync(() -> {
log.info("当前线程" + Thread.currentThread().getId());
int i = 10 / 5;
log.info("运行结果" + i);
return i;
}, exector).handle((result, thr) -> {
if (result != null) {
return result * 2;
}
if (thr != null) {
return 0;
}
return 0;
});
log.info("supplyAsyncHandler=" + supplyAsyncHandler.get());
从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply方法,如果上个任务出现错误,则不会执行 thenApply方法
串行化执行
所谓串行化的意思是多个线程执行时,他们之间存在先后执行顺序,按照编排顺序执行依次这些线程,并且下一个线程可以可以获取上一个线程的执行结果。
实现线程的串行化方式可以使用 thenApply()、thenAccept()、thenRun()方法来实现
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U> 泛型T:表示上一个任务返回结果的类型,泛型U:表示当前任务的返回值类型
示例:
/**
* 线程串行化
* 1.thenRun 不能获取到上一步的执行结果,无返回值
*/
CompletableFuture.supplyAsync(() -> {
log.info("当前线程" + Thread.currentThread().getId());
int i = 10 / 5;
log.info("运行结果" + i);
return i;
}, exector).thenRunAsync(()->{
log.info("任务二启动了");
});
/**
* 2.thenAcceptAsync 能接受上一步结果,但无返回值
*/
CompletableFuture.supplyAsync(() -> {
log.info("当前线程" + Thread.currentThread().getId());
int i = 10 / 5;
log.info("运行结果" + i);
return i;
},exector).thenAcceptAsync((result)->{
log.info("任务三启动了...我没有返回值" + result );
});
/**
* 3.thenApplyAsync 能接受上一步结果,有返回值
*/
CompletableFuture<String> thenApplyAsync = CompletableFuture.supplyAsync(() -> {
log.info("当前线程" + Thread.currentThread().getId());
int i = 10 / 5;
log.info("运行结果" + i);
return i;
}, exector).thenApplyAsync(result -> {
log.info("任务四启动了...我可以有返回值");
return "hello" + result;
}, exector);
log.info("任务四的返回值" + thenApplyAsync.get());
两个任务组合执行
所有任务都要完成
两组任务执行完成,交给第三个任务来处理这两个任务的结果
thenCombine
thenCombine 会把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理,有返回值
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
thenAcceptBoth
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行处理,无法回执
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
示例:
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
log.info("任务一线程开始" + Thread.currentThread().getId());
int i = 10 / 5;
log.info("任务一线程结束" + i);
return i;
}, exector);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, exector);
future01.runAfterBothAsync(future02, ()->{
log.info("任务三开始...");
},exector);
// 当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗,无返回值
future01.thenAcceptBothAsync(future02, (f1, f2) -> {
log.info("任务三开始,之前结果:" + f1 + "---->" + f2);
}, exector);
// 当两个CompletionStage的任务都执行完成后,把两个任务的结果一块交给thenCombine来处理,有返回值
CompletableFuture<String> completableFuture = future01.thenCombineAsync(future02, (f1, f2) -> {
log.info("任务三开始,之前结果:" + f1 + "---->" + f2);
return f1 + ":" + f2 + "->Hello";
}, exector);
log.info("任务三开始,返回值" + completableFuture.get());
其中一个任务完成
两组任务执行时,谁先返回结果(只要一个任务执行完毕),第三组任务就开始执行
applyToEither
两组任务,谁执行返回的结果快,我就用那个任务的结果进行下一步的转化操作,有返回值
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
acceptEither
两组任务,谁执行返回的结果快,我就用那个任务的结果进行下一步的消耗操作 ,无返回值
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例:
/**
* 两个任务只要一个完成就执行任务三
* runAfterEitherAsync 不感知结果,自己业务返回值
*/
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
log.info("两个任务,完成一个。future1");
return "两个任务,完成一个。future1";
}, exector);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("两个任务,完成一个,future2");
return "两个任务,完成一个,future2";
}, exector);
//不感知结果,自己没有返回值
future1.runAfterEitherAsync(future2, () -> {
log.info("两个任务任务完成一个即可,任务三开始....");
}, exector);
// 感知结果,自己没有返回值
future1.acceptEitherAsync(future2, (result) -> {
log.info("两个任务任务完成一个即可,任务三开始....之前的值:" + result);
}, exector);
// 自己感知结果,自己有返回值
CompletableFuture<String> applyToEitherAsync = future1.applyToEitherAsync(future2, (result) -> {
log.info("两个任务任务完成一个即可,任务三开始....之前的值:" + result);
return "两个任务任务完成一个即可,任务三开始....之前的值:" + result + ",返回值->哈哈";
}, exector);
log.info("两个任务任务完成一个即可,任务三开始....之前的值" + applyToEitherAsync.get());
多组任务执行
多组任务一起执行,当所有任务或者其中一个任务完成后获取任务结果
CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
log.info("查询商品图片信息");
return "hello.jpg";
});
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
log.info("查询商品属性");
return "黑色256g";
});
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
log.info("查询商品介绍");
return "华为";
});
//futureImage.get();futureAttr.get();futureDesc.get();
// 所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImage, futureAttr, futureDesc);
allOf.get(); // 等待所有结果完成
log.info("查询商品图片信息"+futureImage.get()+",查询商品属性"+futureAttr.get()+",查询商品介绍"+futureDesc.get());
// 任何一个任务完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImage, futureAttr, futureDesc);
anyOf.get(); // 等待任何一个任务完成
log.info("查询商品图片信息" + futureImage.get() + ",查询商品属性" + futureAttr.get() + ",查询商品介绍" + futureDesc.get());