一、前言
本文参考:https://blog.csdn.net/qq_31865983/article/details/106137777
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
多线程的基本编程参考:
7-多线程
二、创建异步任务
1、Future
通常的线程池接口类ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,3个重载的submit方法的返回值是Future,可以据此获取任务执行的状态和结果,示例如下:
public class callableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
int a = 1 / 0;
return Thread.currentThread().getName();
}
});
// 每次执行一个线程
System.out.println("main thread start,time->" + System.currentTimeMillis());
//等待子任务执行完成,如果已完成则直接返回结果
//如果执行任务异常,则get方法会把之前捕获的异常重新抛出
System.out.println(future.get());
System.out.println("main thread end,time->" + System.currentTimeMillis());
executorService.shutdown();
}
}
执行结果如下(get方法抛出异常导致主线程异常终止),若无异常则正常输出
子线程是异步执行的,主线程休眠等待子线程执行完成,子线程执行完成后唤醒主线程,主线程获取任务执行结果后退出。
2、supplyAsync/runAsync
supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable
(1)runAsync
public class Test01 {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executorService);
executorService.shutdown();
}
}
main....start.....
当前线程 :13
运行结果:5
(2)supplyAsync
CompletableFuture<Object> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService);
System.out.println(supplyAsyncFuture.get());
main....start.....
当前线程 :13
运行结果:5
5
(3)默认线程池
2-ForkJoinPool
上述两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程。测试用例如下:
CompletableFuture<Object> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
});
System.out.println(supplyAsyncFuture.get());
三、异步回调函数
1、thenApply / thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,当使用 ExecutorService 线程池的时候,这两个方法没有什么区别,当使用 ForkJoinPool 线程池的使用,thenApply的方法使用的是和上一个方法相同的线程,而 thenApplyAsync 使用的是新的线程。
(1)thenApply
ExecutorService
public class Test03 {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
System.out.println("a、当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).thenApply(res -> {
System.out.println("b、当前线程 :" + Thread.currentThread().getId());
System.out.println("任务四启动了。。。" + res);
return "hello" + res;
});
System.out.println("main....end...." + feature.get());
}
}
main....start.....
a、当前线程 :13
运行结果:5
b、当前线程 :14
任务四启动了。。。5
main....end....hello5
ForkJoinPool
public class Test03 {
private static final ForkJoinPool pool = new ForkJoinPool();
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
System.out.println("a、当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).thenApply(res -> {
System.out.println("b、当前线程 :" + Thread.currentThread().getId());
System.out.println("任务四启动了。。。" + res);
return "hello" + res;
});
System.out.println("main....end...." + feature.get());
}
}
main....start.....
a、当前线程 :13
运行结果:5
b、当前线程 :13
任务四启动了。。。5
main....end....hello5
(2)thenApplyAsync
ExecutorService
public class Test03 {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
System.out.println("a、当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).thenApplyAsync(res -> {
System.out.println("b、当前线程 :" + Thread.currentThread().getId());
System.out.println("任务四启动了。。。" + res);
return "hello" + res;
}, executorService);
System.out.println("main....end...." + feature.get());
}
}
main....start.....
a、当前线程 :13
运行结果:5
b、当前线程 :14
任务四启动了。。。5
main....end....hello5
ForkJoinPool
public class Test03 {
private static final ForkJoinPool pool = new ForkJoinPool();
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {
System.out.println("a、当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).thenApplyAsync(res -> {
System.out.println("b、当前线程 :" + Thread.currentThread().getId());
System.out.println("任务四启动了。。。" + res);
return "hello" + res;
}, executorService);
System.out.println("main....end...." + feature.get());
}
}
main....start.....
a、当前线程 :13
运行结果:5
b、当前线程 :14
任务四启动了。。。5
main....end....hello5
2、thenAccept/thenApplyAsync
和 �thenApply / thenApplyAsync 的用法一致,只是,没有返回值,下面只举一个例子。
public class Test03 {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<Void> feature = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).thenAcceptAsync(res -> {
System.out.println("任务三执行。。。。" + res);
}, executorService);
System.out.println("main....end...." + feature.get());
}
}
main....start.....
当前线程 :13
运行结果:5
任务三执行。。。。5
main....end....null
3、exceptionally
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果,测试用例如下:
public class Test03 {
private static final ForkJoinPool pool = new ForkJoinPool();
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
CompletableFuture<Integer> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}).exceptionally(throwable -> {
throwable.printStackTrace();
// 抛出异常
throw new RuntimeException(throwable);
// 异常时候的处理
// return Integer.parseInt("1");
});
System.out.println(supplyAsyncFuture.get());
}
}
main....start.....
当前线程 :13
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.ArithmeticException: / by zero
at demo08_completableFuture.Test01.lambda$main$0(Test01.java:26)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
。。。。。。。。
4、whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。测试用例如下:
public class Test03 {
private static final ForkJoinPool pool = new ForkJoinPool();
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
/**
* 方法完成后的感知
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executorService).whenComplete((res, exception) -> {
// 虽然能得到异常信息,但是没法修改结果
System.out.println("异步任务成功完成了....结果是:" + res + "异常是:" + exception);
}).exceptionally(throwable -> {
// 感知异常,同时返回默认值
return 10;
});
Integer integer = future.get();
System.out.println("main....end...." + integer);
}
}
结果:
main....start.....
当前线程 :13
异步任务成功完成了....结果是:null异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main....end....10
5、handle
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。测试用例如下:
public class Test03 {
private static final ForkJoinPool pool = new ForkJoinPool();
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main....start.....");
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> feature = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程 :" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executorService).handle((a, b) -> {
if (b != null) {
// 删除异常
b.printStackTrace();
}
return a;
});
System.out.println(feature.get());
}
}
四、组合处理
1、thenCombine/thenAcceptBoth/runAfterBoth
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
public class Test07 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + "start cf time->" + System.currentTimeMillis());
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread() + "end cf time->" + System.currentTimeMillis());
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
return 3.2;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
CompletableFuture<Double> cf3 = cf.thenCombine(cf2, (a, b) -> {
System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
System.out.println("job3 param a->" + a + ",b->" + b);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
return a + b;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
CompletableFuture cf4 = cf.thenAcceptBoth(cf2, (a, b) -> {
System.out.println("job4 param a->" + a + ",b->" + b);
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
}
});
//cf4和cf3都执行完成后,执行cf5,无入参,无返回值
CompletableFuture cf5 = cf4.runAfterBoth(cf3, () -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
System.out.println("cf5 do something");
});
//等待子任务执行完成
System.out.println("cf run result->" + cf.get());
System.out.println("cf3 run result->" + cf3.get());
System.out.println("cf5 run result->" + cf5.get());
System.out.println("main thread exit,time->" + System.currentTimeMillis());
}
}
执行结果:
Thread[ForkJoinPool.commonPool-worker-5,5,main] start job2,time->1639575226010
Thread[ForkJoinPool.commonPool-worker-19,5,main]start cf time->1639575226010
Thread[ForkJoinPool.commonPool-worker-5,5,main] exit job2,time->1639575241012
Thread[ForkJoinPool.commonPool-worker-19,5,main]end cf time->1639575246012
Thread[main,5,main] start job3,time->1639575246013
job4 param a->1.2,b->3.2
job3 param a->1.2,b->3.2
Thread[main,5,main] exit job3,time->1639575266020
cf5 do something
cf run result->1.2
cf3 run result->4.4
cf5 run result->null
main thread exit,time->1639575276022
2、applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
public class Test08 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
return 3.2;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
CompletableFuture<Double> cf3 = cf.applyToEither(cf2, (result) -> {
System.out.println("job3 param result->" + result);
return result;
});
//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
CompletableFuture cf4 = cf.acceptEither(cf2, (result) -> {
System.out.println("job4 param result->"+result);
});
//cf4和cf3都执行完成后,执行cf5,无入参,无返回值
CompletableFuture cf5 = cf4.runAfterEither(cf3, () -> {
System.out.println("cf5 do something");
});
//等待子任务执行完成
System.out.println("cf run result->" + cf.get());
System.out.println("cf3 run result->" + cf3.get());
System.out.println("cf5 run result->" + cf5.get());
}
}
// 结果
job4 param result->3.2
cf5 do something
job3 param result->3.2
cf run result->1.2
cf3 run result->3.2
cf5 run result->null
3、thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务,测试用例如下:
public class Test09 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<String> cf2 = cf.thenCompose((param) -> {
System.out.println("job3 .....param:" + param);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return "job3 test";
});
});
//等待子任务执行完成
System.out.println("cf run result->" + cf.get());
System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());
System.out.println("cf2 run result->" + cf2.get());
}
}
// 结果
cf run result->1.2
job3 .....param:1.2
main thread start cf2.get(),time->1640015940253
cf2 run result->job3 test
4、allOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
public class Test10 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
return 2.2;
});
CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
int a = 1/0;
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
return 3.2;
});
// 当三个线程都执行成功之后才执行 ,a 一般都是 null,b是输出异常
CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf, cf2, cf3).whenComplete((a, b) -> {
System.out.println("cf4 a-->" + a);
System.out.println("cf4 b-->" + b);
});
//等待子任务执行完成
System.out.println("cf run result->" + cf.get());
System.out.println("cf2 run result->" + cf2.get());
System.out.println("cf3 run result->" + cf3.get());
System.out.println("cf4 run result->" + cf4.get());
}
}
// 结果
cf4 a-->null
cf4 b-->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
cf run result->1.2
cf2 run result->2.2
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at demo08_completableFuture.Test10.main(Test10.java:45)
Caused by: java.lang.ArithmeticException: / by zero
at demo08_completableFuture.Test10.lambda$main$2(Test10.java:28)
...........
5、anyof
anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。
public class Test11 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
return 2.2;
});
CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
return 3.2;
});
// 当三个线程都执行成功之后才执行 ,a 一般都是 null,b是输出异常
CompletableFuture<Object> cf4 = CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((a, b) -> {
System.out.println("cf4 a-->" + a);
System.out.println("cf4 b-->" + b);
});
//等待子任务执行完成
System.out.println("cf run result->" + cf.get());
System.out.println("cf2 run result->" + cf2.get());
System.out.println("cf3 run result->" + cf3.get());
System.out.println("cf4 run result->" + cf4.get());
}
}
结果
cf4 a-->3.2
cf4 b-->null
cf run result->1.2
cf2 run result->2.2
cf3 run result->3.2
cf4 run result->3.2