CompletableFuture,默认依靠fork/join框架启动新的线程实现异步与并发的。
它提供了函数式编程的能力,可以通过回调函数的方式处理返回结果,并且提供了转换和组合CompletableFuture的方法。

0.测试用例

创建一个DeptService,模拟根据Id获取部门的方法getById(Integer id)

  1. public class DeptService {
  2. public Dept getById(Integer id) {
  3. System.out.println("线程:" + Thread.currentThread().getName() + " getById(" + id + ")");
  4. if (id == 1){
  5. return new Dept(1, "研发一部");
  6. } else if (id == 2){
  7. return new Dept(2, "研发二部");
  8. } else {
  9. throw null;
  10. }
  11. }
  12. }

创建一个UserService ,模拟getById()和save()这2个方法

  1. public class UserService {
  2. //根据Id获取User
  3. public User getById(Integer id) throws Exception {
  4. System.out.println("线程:" + Thread.currentThread().getName() + " getById(" + id + ")");
  5. if (id == 1){
  6. return new User(1, "meizuna", 20);
  7. } else if (id == 2){
  8. return new User(2, "misa", 18);
  9. } else {
  10. throw new Exception("未能找到人员");
  11. }
  12. }
  13. //保存User
  14. public User save(User user){
  15. System.out.println("线程:" + Thread.currentThread().getName() + " save()," + user.toString());
  16. return user;
  17. }
  18. }

1. supplyAsync、runAsync

CompletableFuture创建线程有2种方式:supplyAsync(有返回值)和:runAsync(无返回值)

1.1 supplyAsync(有返回值)

supplyAsync有2种,第二个需要多传1个线程池的实现:

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  1. public class Thread01_SupplyAsync {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. CompletableFuture<Dept> deptCompletableFuture = CompletableFuture.supplyAsync(() -> {
  5. Dept dept = deptService.getById(1);
  6. return dept;
  7. });
  8. System.out.println("线程:" + Thread.currentThread().getName() +
  9. " 结果:" + deptCompletableFuture.get());
  10. }
  11. }
  12. //线程:ForkJoinPool.commonPool-worker-1 getById(1)
  13. //线程:main 结果:Dept{id=1, name='研发一部'}

1.2 runAsync(无返回值)

unAsync适用无返回值的情况

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

测试代码

  1. public class Thread02_RunAsync {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
  5. deptService.getById(1);
  6. });
  7. System.out.println("线程:" + Thread.currentThread().getName() +
  8. " 结果:" + voidCompletableFuture.get());
  9. }
  10. }
  11. //线程:ForkJoinPool.commonPool-worker-1 getById(1)
  12. //线程:main 结果:null

2. thenApply、thenAccept、thenRun

2.1 thenApply 转换结果

thenApply是同步的
thenApplyAsync是异步的

  1. public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
  3. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
  1. Function<? super T, ? extends U>
  2. T:上一个任务返回结果的类型
  3. U:当前任务的返回值类型

测试用例

  1. public class Thread03_SupplyAsync_ThenApply {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. UserService userService = new UserService();
  5. User user = new User(1, "meizuna", 20);
  6. CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
  7. Dept dept = deptService.getById(1);
  8. return dept;
  9. }).thenApplyAsync(dept -> {
  10. //注意这里用到了上个线程的返回值dept
  11. user.setDeptId(dept.getId());
  12. user.setDeptName(dept.getName());
  13. return userService.save(user);
  14. });
  15. System.out.println("线程:" + Thread.currentThread().getName() +
  16. " 结果:" + userCompletableFuture.get().toString());
  17. }
  18. }

2.2 thenAccept 消费结果

thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是回调方法无返回值

  1. public CompletionStage<Void> thenAccept(Consumer<? super T> action);
  2. public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
  3. public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

测试用例

  1. public class Thread04_SupplyAsync_ThenAccept {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
  5. Dept dept = deptService.getById(1);
  6. return dept;
  7. })
  8. .thenAcceptAsync(dept -> {
  9. //注意这里用到了上个线程的返回值dept
  10. System.out.println("线程:" + Thread.currentThread().getName() +
  11. "假设把dept作为日志记录发给Kafka: " + dept.toString());
  12. //thenAccept是没有返回值的
  13. });
  14. System.out.println("线程:" + Thread.currentThread().getName() +
  15. " 结果:" + voidCompletableFuture.get());
  16. }
  17. }

2.3 thenRun 任务完成后触发的回调

thenRun 是上一个任务完成后触发的回调,没有入参和返回值

  1. public CompletionStage<Void> thenRun(Runnable action);
  2. public CompletionStage<Void> thenRunAsync(Runnable action);
  3. public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

测试用例

  1. public class Thread05_SupplyAsync_ThenRun {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
  5. Dept dept = deptService.getById(1);
  6. return dept;
  7. })
  8. .thenRun(() -> {//注意没有入参
  9. System.out.println("线程:" + Thread.currentThread().getName() + " do something");
  10. //thenRun注意没有入参,也没有返回值
  11. });
  12. System.out.println("线程:" + Thread.currentThread().getName() +
  13. " 结果:" + voidCompletableFuture.get());
  14. }
  15. }

3. exceptionally

在用CompletableFuture编写多线程时,如果需要处理异常,可以用exceptionally,它的作用相当于catch
exceptionally的特点:
当出现异常时,会触发回调方法exceptionally
exceptionally中可指定默认返回结果,如果出现异常,则返回默认的返回结果

测试用例

  1. public class Thread01_Exceptionally {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
  4. if (Math.random() < 0.5) {
  5. throw new RuntimeException("抛出异常");
  6. }
  7. System.out.println("正常结束");
  8. return 1.1;
  9. })
  10. .thenApply(result -> {
  11. System.out.println("thenApply接收到的参数 = " + result);
  12. return result;
  13. })
  14. .exceptionally(new Function<Throwable, Double>() {
  15. @Override
  16. public Double apply(Throwable throwable) {
  17. System.out.println("异常:" + throwable.getMessage());
  18. return 0.0;
  19. }
  20. });
  21. System.out.println("最终返回的结果 = " + future.get());
  22. }
  23. }

4. whenComplete

当CompletableFuture的任务不论是正常完成还是出现异常都会调用whenComplete
正常完成:whenComplete返回结果和上级任务一致,异常为null;
出现异常:whenComplete返回结果为null,异常为上级任务的异常;
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要处理该异常

  1. public class Thread02_WhenComplete {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
  4. if (Math.random() < 0.5) {
  5. throw new RuntimeException("出错了");
  6. }
  7. System.out.println("正常结束");
  8. return 0.11;
  9. }).whenComplete(new BiConsumer<Double, Throwable>() {
  10. @Override
  11. public void accept(Double aDouble, Throwable throwable) {
  12. if (aDouble == null) {
  13. System.out.println("whenComplete aDouble is null");
  14. } else {
  15. System.out.println("whenComplete aDouble is " + aDouble);
  16. }
  17. if (throwable == null) {
  18. System.out.println("whenComplete throwable is null");
  19. } else {
  20. System.out.println("whenComplete throwable is " + throwable.getMessage());
  21. }
  22. }
  23. });
  24. System.out.println("最终返回的结果 = " + future.get());
  25. }
  26. }

4.1 whenComplete + exceptionally

  1. public class Thread03_WhenComplete_Exceptionally {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
  4. if (Math.random() < 0.5) {
  5. throw new RuntimeException("出错了");
  6. }
  7. System.out.println("正常结束");
  8. return 0.11;
  9. }).whenComplete(new BiConsumer<Double, Throwable>() {
  10. @Override
  11. public void accept(Double aDouble, Throwable throwable) {
  12. if (aDouble == null){
  13. System.out.println("whenComplete aDouble is null");
  14. } else {
  15. System.out.println("whenComplete aDouble is " + aDouble);
  16. }
  17. if (throwable == null){
  18. System.out.println("whenComplete throwable is null");
  19. } else {
  20. System.out.println("whenComplete throwable is " + throwable.getMessage());
  21. }
  22. }
  23. }).exceptionally(new Function<Throwable, Double>() {
  24. @Override
  25. public Double apply(Throwable throwable) {
  26. System.out.println("exceptionally中异常:" + throwable.getMessage());
  27. return 0.0;
  28. }
  29. });
  30. System.out.println("最终返回的结果 = " + future.get());
  31. }
  32. }

5. handle()

不论正常返回还是出异常都会进入handle,类似whenComplete

handle()一般接收new BiFunction()
T:就是任务传入的对象类型
Throwable:就是任务传入的异常
R:就是handle自己返回的对象类型

5.1 handle和thenApply的区别

thenApply:任务出现异常就不会进入thenApply
handle:任务出现异常也会进入handle,可对异常处理

5.2 handle和whenComplete的区别

handle对传入值进行转换,并产生自己的返回结果,T -> R
whenComplete的返回值和上级任务传入的结果一致,不能对其转换

测试用例

  1. public class Thread04_Handle {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. UserService userService = new UserService();
  5. CompletableFuture<User> future = CompletableFuture
  6. .supplyAsync(() -> {
  7. //int a = 1 / 0;//如果出现异常,那么thenApply则不执行
  8. return deptService.getById(1);
  9. }
  10. )
  11. .handle(new BiFunction<Dept, Throwable, User>() {
  12. @Override
  13. public User apply(Dept dept, Throwable throwable) {
  14. if (throwable != null){
  15. System.out.println(throwable.getMessage());
  16. return null;
  17. } else {
  18. User user = new User(1, "winter", 32, dept.getId(), dept.getName());
  19. return userService.save(user);
  20. }
  21. }
  22. }
  23. );
  24. System.out.println("线程:" + Thread.currentThread().getName() +
  25. " 结果:" + future.get());
  26. }
  27. }
  28. //正常
  29. //线程:ForkJoinPool.commonPool-worker-1 Dept.getById(1)
  30. //线程:main User.save(),User(id=1, name=winter, age=32, deptId=1, deptName=研发一部)
  31. //线程:main 结果:User(id=1, name=winter, age=32, deptId=1, deptName=研发一部)
  32. //异常
  33. //java.lang.ArithmeticException: / by zero
  34. //线程:main 结果:null

6. thenCompose、thenCombine

6.1 thenCompose

thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。

  1. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
  2. public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
  3. public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

6.2 thenApply和thenCompose的区别

thenApply转换的是泛型中的类型,是同一个CompletableFuture,相当于将CompletableFuture 转换成CompletableFuture
thenCompose用来连接两个CompletableFuture,是生成一个新的CompletableFuture
测试用例

  1. public class Thread06_SupplyAsync_ThenCompose {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. UserService userService = new UserService();
  5. User user = new User(1, "meizuna", 20);
  6. CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
  7. Dept dept = deptService.getById(1);
  8. return dept;
  9. })
  10. .thenCompose(dept -> CompletableFuture.supplyAsync(() -> {
  11. //注意这里用到了上个线程的返回值dept
  12. user.setDeptId(dept.getId());
  13. user.setDeptName(dept.getName());
  14. return userService.save(user);
  15. }));
  16. System.out.println("线程:" + Thread.currentThread().getName() +
  17. " 结果:" + userCompletableFuture.get().toString());
  18. }
  19. }

6.3 thenCombine

thenCombine会在两个任务都执行完成后,把两个任务的结果合并
注意:
两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果
两个任务是并行执行的,它们之间并没有先后依赖顺序
测试用例

  1. public class Thread10_ThenCombine {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. DeptService deptService = new DeptService();
  4. UserService userService = new UserService();
  5. //第1个任务:获取id=1的部门
  6. CompletableFuture<Dept> deptFuture = CompletableFuture
  7. .supplyAsync(() -> {
  8. return deptService.getById(1);
  9. }
  10. );
  11. //第2个任务:获取id=1的人员
  12. CompletableFuture<User> userFuture = CompletableFuture
  13. .supplyAsync(() -> {
  14. try {
  15. //int a = 1 / 0;//出了异常就报错
  16. return userService.getById(1);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. return null;
  21. });
  22. //将上面2个任务的返回结果dept和user合并,返回新的user
  23. CompletableFuture<User> resultFuture = deptFuture
  24. .thenCombine(userFuture,
  25. new BiFunction<Dept, User, User>() {
  26. @Override
  27. public User apply(Dept dept, User user) {
  28. user.setDeptId(dept.getId());
  29. user.setDeptName(dept.getName());
  30. return userService.save(user);
  31. }
  32. }
  33. );
  34. System.out.println("线程:" + Thread.currentThread().getName() + " 结果:" + resultFuture.get());
  35. }
  36. }

7. allOf

场景:当有一批任务交给线程池执行,我们需要获取所有线程的返回结果

  • Future的get()时阻塞的,如果循环get()每一个线程的结果,一个线程会卡住后面所有线程
  • CompletionService的take().get()虽然不会因为某个线程阻塞后面的线程,但是功能不丰富
  • CompletableFuture提供的功能丰富,使用简单,代码优雅
  • Java9中CompletableFuture还添加了completeOnTimeout、orTimeout,方便对超时任务的处理

测试用例

  • 一串数字1, 2, 3, 4, 5, 6, 7, 8, 9, 10
  • 开启线程,执行乘以2的计算
  • 其中任务2会抛出异常
  • 要获取所有线程的返回结果和异常结果

这里用handle方法来处理线程的结果

  1. public class Thread17_AllOf {
  2. public static void main(String[] args) {
  3. System.out.println("==========begin==========");
  4. //记录开始时间
  5. Long start = System.currentTimeMillis();
  6. //定长10线程池
  7. ExecutorService executor = Executors.newFixedThreadPool(3);
  8. //任务
  9. final List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  10. List<String> resultList = new ArrayList<>();
  11. Map<String, String> errorList = new HashMap<>();
  12. Stream<CompletableFuture<String>> completableFutureStream = taskList.stream()
  13. .map(num -> CompletableFuture
  14. .supplyAsync(() -> getDouble(num), executor)
  15. .handle((s, throwable) -> {
  16. if (throwable == null) {
  17. System.out.println("任务" + num + "完成! result=" + s + ", " + new Date());
  18. resultList.add(s.toString());
  19. } else {
  20. System.out.println("任务" + num + "异常! e=" + throwable + ", " + new Date());
  21. errorList.put(num.toString(), throwable.getMessage());
  22. }
  23. return "";
  24. })
  25. );
  26. CompletableFuture[] completableFutures = completableFutureStream.toArray(CompletableFuture[]::new);
  27. CompletableFuture.allOf(completableFutures)
  28. .whenComplete((v, th) -> {
  29. System.out.println("所有任务执行完成触发\n resultList=" + resultList + "\n errorList=" + errorList + "\n耗时=" + (System.currentTimeMillis() - start));
  30. }).join();
  31. System.out.println("==========end==========");
  32. //根据数字判断线程休眠的时间
  33. public static Integer getDouble(Integer i) {
  34. try {
  35. if (i == 1) {
  36. //任务1耗时3秒
  37. Thread.sleep(3000);
  38. } else if (i == 2) {
  39. //任务2耗时1秒,还出错
  40. Thread.sleep(1000);
  41. throw new RuntimeException("出异常了");
  42. } else {
  43. //其它任务耗时1秒
  44. Thread.sleep(1000);
  45. }
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. return 2 * i;
  50. }
  51. }