CompletableFuture,默认依靠fork/join框架启动新的线程实现异步与并发的。
它提供了函数式编程的能力,可以通过回调函数的方式处理返回结果,并且提供了转换和组合CompletableFuture的方法。
0.测试用例
创建一个DeptService,模拟根据Id获取部门的方法getById(Integer id)
public class DeptService {public Dept getById(Integer id) {System.out.println("线程:" + Thread.currentThread().getName() + " getById(" + id + ")");if (id == 1){return new Dept(1, "研发一部");} else if (id == 2){return new Dept(2, "研发二部");} else {throw null;}}}
创建一个UserService ,模拟getById()和save()这2个方法
public class UserService {//根据Id获取Userpublic User getById(Integer id) throws Exception {System.out.println("线程:" + Thread.currentThread().getName() + " getById(" + id + ")");if (id == 1){return new User(1, "meizuna", 20);} else if (id == 2){return new User(2, "misa", 18);} else {throw new Exception("未能找到人员");}}//保存Userpublic User save(User user){System.out.println("线程:" + Thread.currentThread().getName() + " save()," + user.toString());return user;}}
1. supplyAsync、runAsync
CompletableFuture创建线程有2种方式:supplyAsync(有返回值)和:runAsync(无返回值)
1.1 supplyAsync(有返回值)
supplyAsync有2种,第二个需要多传1个线程池的实现:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public class Thread01_SupplyAsync {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();CompletableFuture<Dept> deptCompletableFuture = CompletableFuture.supplyAsync(() -> {Dept dept = deptService.getById(1);return dept;});System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + deptCompletableFuture.get());}}//线程:ForkJoinPool.commonPool-worker-1 getById(1)//线程:main 结果:Dept{id=1, name='研发一部'}
1.2 runAsync(无返回值)
unAsync适用无返回值的情况
public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
测试代码
public class Thread02_RunAsync {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {deptService.getById(1);});System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + voidCompletableFuture.get());}}//线程:ForkJoinPool.commonPool-worker-1 getById(1)//线程:main 结果:null
2. thenApply、thenAccept、thenRun
2.1 thenApply 转换结果
thenApply是同步的
thenApplyAsync是异步的
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
Function<? super T, ? extends U>T:上一个任务返回结果的类型U:当前任务的返回值类型
测试用例
public class Thread03_SupplyAsync_ThenApply {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();UserService userService = new UserService();User user = new User(1, "meizuna", 20);CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {Dept dept = deptService.getById(1);return dept;}).thenApplyAsync(dept -> {//注意这里用到了上个线程的返回值deptuser.setDeptId(dept.getId());user.setDeptName(dept.getName());return userService.save(user);});System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + userCompletableFuture.get().toString());}}
2.2 thenAccept 消费结果
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是回调方法无返回值
public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
测试用例
public class Thread04_SupplyAsync_ThenAccept {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {Dept dept = deptService.getById(1);return dept;}).thenAcceptAsync(dept -> {//注意这里用到了上个线程的返回值deptSystem.out.println("线程:" + Thread.currentThread().getName() +"假设把dept作为日志记录发给Kafka: " + dept.toString());//thenAccept是没有返回值的});System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + voidCompletableFuture.get());}}
2.3 thenRun 任务完成后触发的回调
thenRun 是上一个任务完成后触发的回调,没有入参和返回值
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
测试用例
public class Thread05_SupplyAsync_ThenRun {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {Dept dept = deptService.getById(1);return dept;}).thenRun(() -> {//注意没有入参System.out.println("线程:" + Thread.currentThread().getName() + " do something");//thenRun注意没有入参,也没有返回值});System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + voidCompletableFuture.get());}}
3. exceptionally
在用CompletableFuture编写多线程时,如果需要处理异常,可以用exceptionally,它的作用相当于catch
exceptionally的特点:
当出现异常时,会触发回调方法exceptionally
exceptionally中可指定默认返回结果,如果出现异常,则返回默认的返回结果
测试用例
public class Thread01_Exceptionally {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("抛出异常");}System.out.println("正常结束");return 1.1;}).thenApply(result -> {System.out.println("thenApply接收到的参数 = " + result);return result;}).exceptionally(new Function<Throwable, Double>() {@Overridepublic Double apply(Throwable throwable) {System.out.println("异常:" + throwable.getMessage());return 0.0;}});System.out.println("最终返回的结果 = " + future.get());}}
4. whenComplete
当CompletableFuture的任务不论是正常完成还是出现异常都会调用whenComplete
正常完成:whenComplete返回结果和上级任务一致,异常为null;
出现异常:whenComplete返回结果为null,异常为上级任务的异常;
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要处理该异常
public class Thread02_WhenComplete {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");}System.out.println("正常结束");return 0.11;}).whenComplete(new BiConsumer<Double, Throwable>() {@Overridepublic void accept(Double aDouble, Throwable throwable) {if (aDouble == null) {System.out.println("whenComplete aDouble is null");} else {System.out.println("whenComplete aDouble is " + aDouble);}if (throwable == null) {System.out.println("whenComplete throwable is null");} else {System.out.println("whenComplete throwable is " + throwable.getMessage());}}});System.out.println("最终返回的结果 = " + future.get());}}
4.1 whenComplete + exceptionally
public class Thread03_WhenComplete_Exceptionally {public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("出错了");}System.out.println("正常结束");return 0.11;}).whenComplete(new BiConsumer<Double, Throwable>() {@Overridepublic void accept(Double aDouble, Throwable throwable) {if (aDouble == null){System.out.println("whenComplete aDouble is null");} else {System.out.println("whenComplete aDouble is " + aDouble);}if (throwable == null){System.out.println("whenComplete throwable is null");} else {System.out.println("whenComplete throwable is " + throwable.getMessage());}}}).exceptionally(new Function<Throwable, Double>() {@Overridepublic Double apply(Throwable throwable) {System.out.println("exceptionally中异常:" + throwable.getMessage());return 0.0;}});System.out.println("最终返回的结果 = " + future.get());}}
5. handle()
不论正常返回还是出异常都会进入handle,类似whenComplete
handle()一般接收new BiFunction
T:就是任务传入的对象类型
Throwable:就是任务传入的异常
R:就是handle自己返回的对象类型
5.1 handle和thenApply的区别
thenApply:任务出现异常就不会进入thenApply
handle:任务出现异常也会进入handle,可对异常处理
5.2 handle和whenComplete的区别
handle对传入值进行转换,并产生自己的返回结果,T -> R
whenComplete的返回值和上级任务传入的结果一致,不能对其转换
测试用例
public class Thread04_Handle {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();UserService userService = new UserService();CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> {//int a = 1 / 0;//如果出现异常,那么thenApply则不执行return deptService.getById(1);}).handle(new BiFunction<Dept, Throwable, User>() {@Overridepublic User apply(Dept dept, Throwable throwable) {if (throwable != null){System.out.println(throwable.getMessage());return null;} else {User user = new User(1, "winter", 32, dept.getId(), dept.getName());return userService.save(user);}}});System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + future.get());}}//正常//线程:ForkJoinPool.commonPool-worker-1 Dept.getById(1)//线程:main User.save(),User(id=1, name=winter, age=32, deptId=1, deptName=研发一部)//线程:main 结果:User(id=1, name=winter, age=32, deptId=1, deptName=研发一部)//异常//java.lang.ArithmeticException: / by zero//线程:main 结果:null
6. thenCompose、thenCombine
6.1 thenCompose
thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
6.2 thenApply和thenCompose的区别
thenApply转换的是泛型中的类型,是同一个CompletableFuture,相当于将CompletableFuture
thenCompose用来连接两个CompletableFuture,是生成一个新的CompletableFuture
测试用例
public class Thread06_SupplyAsync_ThenCompose {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();UserService userService = new UserService();User user = new User(1, "meizuna", 20);CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {Dept dept = deptService.getById(1);return dept;}).thenCompose(dept -> CompletableFuture.supplyAsync(() -> {//注意这里用到了上个线程的返回值deptuser.setDeptId(dept.getId());user.setDeptName(dept.getName());return userService.save(user);}));System.out.println("线程:" + Thread.currentThread().getName() +" 结果:" + userCompletableFuture.get().toString());}}
6.3 thenCombine
thenCombine会在两个任务都执行完成后,把两个任务的结果合并
注意:
两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果
两个任务是并行执行的,它们之间并没有先后依赖顺序
测试用例
public class Thread10_ThenCombine {public static void main(String[] args) throws ExecutionException, InterruptedException {DeptService deptService = new DeptService();UserService userService = new UserService();//第1个任务:获取id=1的部门CompletableFuture<Dept> deptFuture = CompletableFuture.supplyAsync(() -> {return deptService.getById(1);});//第2个任务:获取id=1的人员CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> {try {//int a = 1 / 0;//出了异常就报错return userService.getById(1);} catch (Exception e) {e.printStackTrace();}return null;});//将上面2个任务的返回结果dept和user合并,返回新的userCompletableFuture<User> resultFuture = deptFuture.thenCombine(userFuture,new BiFunction<Dept, User, User>() {@Overridepublic User apply(Dept dept, User user) {user.setDeptId(dept.getId());user.setDeptName(dept.getName());return userService.save(user);}});System.out.println("线程:" + Thread.currentThread().getName() + " 结果:" + resultFuture.get());}}
7. allOf
场景:当有一批任务交给线程池执行,我们需要获取所有线程的返回结果
- Future的get()时阻塞的,如果循环get()每一个线程的结果,一个线程会卡住后面所有线程
- CompletionService的take().get()虽然不会因为某个线程阻塞后面的线程,但是功能不丰富
- CompletableFuture提供的功能丰富,使用简单,代码优雅
- Java9中CompletableFuture还添加了completeOnTimeout、orTimeout,方便对超时任务的处理
测试用例
- 一串数字1, 2, 3, 4, 5, 6, 7, 8, 9, 10
- 开启线程,执行乘以2的计算
- 其中任务2会抛出异常
- 要获取所有线程的返回结果和异常结果
这里用handle方法来处理线程的结果
public class Thread17_AllOf {public static void main(String[] args) {System.out.println("==========begin==========");//记录开始时间Long start = System.currentTimeMillis();//定长10线程池ExecutorService executor = Executors.newFixedThreadPool(3);//任务final List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);List<String> resultList = new ArrayList<>();Map<String, String> errorList = new HashMap<>();Stream<CompletableFuture<String>> completableFutureStream = taskList.stream().map(num -> CompletableFuture.supplyAsync(() -> getDouble(num), executor).handle((s, throwable) -> {if (throwable == null) {System.out.println("任务" + num + "完成! result=" + s + ", " + new Date());resultList.add(s.toString());} else {System.out.println("任务" + num + "异常! e=" + throwable + ", " + new Date());errorList.put(num.toString(), throwable.getMessage());}return "";}));CompletableFuture[] completableFutures = completableFutureStream.toArray(CompletableFuture[]::new);CompletableFuture.allOf(completableFutures).whenComplete((v, th) -> {System.out.println("所有任务执行完成触发\n resultList=" + resultList + "\n errorList=" + errorList + "\n耗时=" + (System.currentTimeMillis() - start));}).join();System.out.println("==========end==========");//根据数字判断线程休眠的时间public static Integer getDouble(Integer i) {try {if (i == 1) {//任务1耗时3秒Thread.sleep(3000);} else if (i == 2) {//任务2耗时1秒,还出错Thread.sleep(1000);throw new RuntimeException("出异常了");} else {//其它任务耗时1秒Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}return 2 * i;}}
