Java CompletableFuture
异步编程利器:CompletableFuture详解 - 图1

一个例子回顾 Future

因为CompletableFuture实现了Future接口,先来回顾Future吧。
Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。
来看个简单例子吧,假设有两个任务服务,一个查询用户基本信息,一个是查询用户勋章信息。如下,

  1. public class UserInfoService {
  2. public UserInfo getUserInfo(Long userId) throws InterruptedException {
  3. Thread.sleep(300);//模拟调用耗时
  4. return new UserInfo("666", "Hello", 27); //一般是查数据库,或者远程调用返回的
  5. }
  6. }
  7. public class MedalService {
  8. public MedalInfo getMedalInfo(long userId) throws InterruptedException {
  9. Thread.sleep(500); //模拟调用耗时
  10. return new MedalInfo("666", "守护勋章");
  11. }
  12. }

接下来,来演示下,在主线程中是如何使用Future来进行异步调用的。

  1. public class FutureTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. UserInfoService userInfoService = new UserInfoService();
  5. MedalService medalService = new MedalService();
  6. long userId =666L;
  7. long startTime = System.currentTimeMillis();
  8. //调用用户服务获取用户基本信息
  9. FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() {
  10. @Override
  11. public UserInfo call() throws Exception {
  12. return userInfoService.getUserInfo(userId);
  13. }
  14. });
  15. executorService.submit(userInfoFutureTask);
  16. Thread.sleep(300); //模拟主线程其它操作耗时
  17. FutureTask<MedalInfo> medalInfoFutureTask = new FutureTask<>(new Callable<MedalInfo>() {
  18. @Override
  19. public MedalInfo call() throws Exception {
  20. return medalService.getMedalInfo(userId);
  21. }
  22. });
  23. executorService.submit(medalInfoFutureTask);
  24. UserInfo userInfo = userInfoFutureTask.get();//获取个人信息结果
  25. MedalInfo medalInfo = medalInfoFutureTask.get();//获取勋章信息结果
  26. System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
  27. }
  28. }

运行结果:

  1. 总共用时806ms

如果不使用Future进行并行异步调用,而是在主线程串行进行的话,耗时大约为300+500+300 = 1100 ms。可以发现,future+线程池异步配合,提高了程序的执行效率。
但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

  • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞。
  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFutureCompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

一个例子走进CompletableFuture

还是基于以上Future的例子,改用CompletableFuture 来实现

  1. public class FutureTest {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
  3. UserInfoService userInfoService = new UserInfoService();
  4. MedalService medalService = new MedalService();
  5. long userId =666L;
  6. long startTime = System.currentTimeMillis();
  7. //调用用户服务获取用户基本信息
  8. CompletableFuture<UserInfo> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));
  9. Thread.sleep(300); //模拟主线程其它操作耗时
  10. CompletableFuture<MedalInfo> completableMedalInfoFuture = CompletableFuture.supplyAsync(() -> medalService.getMedalInfo(userId));
  11. UserInfo userInfo = completableUserInfoFuture.get(2,TimeUnit.SECONDS);//获取个人信息结果
  12. MedalInfo medalInfo = completableMedalInfoFuture.get();//获取勋章信息结果
  13. System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
  14. }
  15. }

可以发现,使用CompletableFuture,代码简洁了很多。CompletableFuturesupplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool
CompletableFuture提供了几十种方法辅助异步任务场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理等方面。

CompletableFuture使用场景

异步编程利器:CompletableFuture详解 - 图2

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsyncrunAsync两个方法
异步编程利器:CompletableFuture详解 - 图3
创建异步任务

  • supplyAsync执行CompletableFuture任务,支持返回值
  • runAsync执行CompletableFuture任务,没有返回值。

    supplyAsync方法

    1. //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
    2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    3. //自定义线程,根据supplier构建执行任务
    4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    runAsync方法

    1. //使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
    2. public static CompletableFuture<Void> runAsync(Runnable runnable)
    3. //自定义线程,根据runnable构建执行任务
    4. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

    实例代码如下:

    1. public class FutureTest {
    2. public static void main(String[] args) {
    3. //可以自定义线程池
    4. ExecutorService executor = Executors.newCachedThreadPool();
    5. //runAsync的使用
    6. CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,Hello"), executor);
    7. //supplyAsync的使用
    8. CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
    9. System.out.print("supply,Hello");
    10. return "Hello"; }, executor);
    11. //runAsync的future没有返回值,输出null
    12. System.out.println(runFuture.join());
    13. //supplyAsync的future,有返回值
    14. System.out.println(supplyFuture.join());
    15. executor.shutdown(); // 线程池需要关闭
    16. }
    17. }
    18. //输出
    19. run,Hello
    20. null
    21. supply,Hello

    任务异步回调

    异步编程利器:CompletableFuture详解 - 图4

    1. thenRun/thenRunAsync

    1. public CompletableFuture<Void> thenRun(Runnable action);
    2. public CompletableFuture<Void> thenRunAsync(Runnable action);

    CompletableFuturethenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

    1. public class FutureThenRunTest {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
    4. ()->{
    5. System.out.println("先执行第一个CompletableFuture方法任务");
    6. return "Hello";
    7. }
    8. );
    9. CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
    10. System.out.println("接着执行第二个任务");
    11. });
    12. System.out.println(thenRunFuture.get());
    13. }
    14. }
    15. //输出
    16. 先执行第一个CompletableFuture方法任务
    17. 接着执行第二个任务
    18. null

    thenRunthenRunAsync有什么区别呢?可以看下源码: ```java private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public CompletableFuture thenRun(Runnable action) { return uniRunStage(null, action); }

public CompletableFuture thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); }

  1. 如果执行第一个任务的时候,传入了一个自定义线程池:
  2. - 调用`thenRun`方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
  3. - 调用`thenRunAsync`执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
  4. TIPS: 后面介绍的`thenAccept``thenAcceptAsync``thenApply``thenApplyAsync`等,它们之间的区别也是这个。
  5. <a name="PxaFH"></a>
  6. #### 2.`thenAccept`/`thenAcceptAsync`
  7. `CompletableFuture``thenAccept`方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
  8. ```java
  9. public class FutureThenAcceptTest {
  10. public static void main(String[] args) throws ExecutionException, InterruptedException {
  11. CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
  12. ()->{
  13. System.out.println("原始CompletableFuture方法任务");
  14. return "Hello";
  15. }
  16. );
  17. CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
  18. if ("Hello".equals(a)) {
  19. System.out.println("World");
  20. }
  21. System.out.println("Hi");
  22. });
  23. System.out.println(thenAcceptFuture.get());
  24. }
  25. }

3. thenApply/thenApplyAsync

CompletableFuturethenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

  1. public class FutureThenApplyTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
  4. ()->{
  5. System.out.println("原始CompletableFuture方法任务");
  6. return "Hello";
  7. }
  8. );
  9. CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {
  10. if ("Hello".equals(a)) {
  11. return "World";
  12. }
  13. return "Hi";
  14. });
  15. System.out.println(thenApplyFuture.get());
  16. }
  17. }
  18. //输出
  19. 原始CompletableFuture方法任务
  20. 关注了

4. exceptionally

CompletableFutureexceptionally方法表示,某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法。

  1. public class FutureExceptionTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
  4. ()->{
  5. System.out.println("当前线程名称:" + Thread.currentThread().getName());
  6. throw new RuntimeException();
  7. }
  8. );
  9. CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
  10. e.printStackTrace();
  11. return "你的程序异常啦";
  12. });
  13. System.out.println(exceptionFuture.get());
  14. }
  15. }
  16. //输出
  17. 当前线程名称:ForkJoinPool.commonPool-worker-1
  18. java.util.concurrent.CompletionException: java.lang.RuntimeException
  19. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
  20. at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
  21. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
  22. at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
  23. at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  24. at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  25. at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  26. at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
  27. Caused by: java.lang.RuntimeException
  28. at cn.eovie.future.FutureWhenTest.lambda$main$0(FutureWhenTest.java:13)
  29. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
  30. ... 5 more
  31. 你的程序异常啦

5. whenComplete方法

CompletableFuturewhenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFutureresult是上个任务的结果。

  1. public class FutureWhenTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
  4. ()->{
  5. System.out.println("当前线程名称:" + Thread.currentThread().getName());
  6. try {
  7. Thread.sleep(2000L);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. return "Hello";
  12. }
  13. );
  14. CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
  15. System.out.println("当前线程名称:" + Thread.currentThread().getName());
  16. System.out.println("上个任务执行完啦,还把" + a + "传过来");
  17. if ("Hello".equals(a)) {
  18. System.out.println("666");
  19. }
  20. System.out.println("233333");
  21. });
  22. System.out.println(rstFuture.get());
  23. }
  24. }
  25. //输出
  26. 当前线程名称:ForkJoinPool.commonPool-worker-1
  27. 当前线程名称:ForkJoinPool.commonPool-worker-1
  28. 上个任务执行完啦,还把Hello传过来
  29. 666
  30. 233333
  31. Hello

6. handle方法

CompletableFuturehandle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFutureresult是回调方法执行的结果。

  1. public class FutureHandlerTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
  4. ()->{
  5. System.out.println("当前线程名称:" + Thread.currentThread().getName());
  6. try {
  7. Thread.sleep(2000L);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. return "Hello";
  12. }
  13. );
  14. CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {
  15. System.out.println("上个任务执行完啦,还把" + a + "传过来");
  16. if ("Hello".equals(a)) {
  17. System.out.println("666");
  18. return "关注了";
  19. }
  20. System.out.println("233333");
  21. return null;
  22. });
  23. System.out.println(rstFuture.get());
  24. }
  25. }
  26. //输出
  27. 当前线程名称:ForkJoinPool.commonPool-worker-1
  28. 上个任务执行完啦,还把Hello传过来
  29. 666
  30. 关注了

多个任务组合处理

异步编程利器:CompletableFuture详解 - 图5

AND组合关系

异步编程利器:CompletableFuture详解 - 图6
thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。
区别在于:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth不会把执行结果当做方法入参,且没有返回值。

    1. public class ThenCombineTest {
    2. public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
    3. CompletableFuture<String> first = CompletableFuture.completedFuture("第一个异步任务");
    4. ExecutorService executor = Executors.newFixedThreadPool(10);
    5. CompletableFuture<String> future = CompletableFuture
    6. //第二个异步任务
    7. .supplyAsync(() -> "第二个异步任务", executor)
    8. // (w, s) -> System.out.println(s) 是第三个任务
    9. .thenCombineAsync(first, (s, w) -> {
    10. System.out.println(w);
    11. System.out.println(s);
    12. return "两个异步任务的组合";
    13. }, executor);
    14. System.out.println(future.join());
    15. executor.shutdown();
    16. }
    17. }
    18. //输出
    19. 第一个异步任务
    20. 第二个异步任务
    21. 两个异步任务的组合

    OR 组合的关系

    异步编程利器:CompletableFuture详解 - 图7
    applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。
    区别在于:

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither:不会把执行结果当做方法入参,且没有返回值。

    1. public class AcceptEitherTest {
    2. public static void main(String[] args) {
    3. //第一个异步任务,休眠2秒,保证它执行晚点
    4. CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
    5. try{
    6. Thread.sleep(2000L);
    7. System.out.println("执行完第一个异步任务");}
    8. catch (Exception e){
    9. return "第一个任务异常";
    10. }
    11. return "第一个异步任务";
    12. });
    13. ExecutorService executor = Executors.newSingleThreadExecutor();
    14. CompletableFuture<Void> future = CompletableFuture
    15. //第二个异步任务
    16. .supplyAsync(() -> {
    17. System.out.println("执行完第二个任务");
    18. return "第二个任务";}
    19. , executor)
    20. //第三个任务
    21. .acceptEitherAsync(first, System.out::println, executor);
    22. executor.shutdown();
    23. }
    24. }
    25. //输出
    26. 执行完第二个任务
    27. 第二个任务

    AllOf

    所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOfCompletableFuture,执行get方法,会抛出异常

    1. public class allOfFutureTest {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
    4. System.out.println("我执行完了");
    5. });
    6. CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
    7. System.out.println("我也执行完了");
    8. });
    9. CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
    10. System.out.println("finish");
    11. });
    12. }
    13. }
    14. //输出
    15. 我执行完了
    16. 我也执行完了
    17. finish

    AnyOf

    任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOfCompletableFuture,执行get方法,会抛出异常

    1. public class AnyOfFutureTest {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
    4. try {
    5. Thread.sleep(3000L);
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. System.out.println("我执行完了");
    10. });
    11. CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
    12. System.out.println("我也执行完了");
    13. });
    14. CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
    15. System.out.println("finish");
    16. // return "Hello";
    17. });
    18. anyOfFuture.join();
    19. }
    20. }
    21. //输出
    22. 我也执行完了
    23. finish

    thenCompose

    thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;

  • 如果该CompletableFuture实例为null,然后就执行这个新任务

    1. public class ThenComposeTest {
    2. public static void main(String[] args) throws ExecutionException, InterruptedException {
    3. CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
    4. //第二个异步任务
    5. ExecutorService executor = Executors.newSingleThreadExecutor();
    6. CompletableFuture<String> future = CompletableFuture
    7. .supplyAsync(() -> "第二个任务", executor)
    8. .thenComposeAsync(data -> {
    9. System.out.println(data); return f; //使用第一个任务作为返回
    10. }, executor);
    11. System.out.println(future.join());
    12. executor.shutdown();
    13. }
    14. }
    15. //输出
    16. 第二个任务
    17. 第一个任务

    CompletableFuture使用有哪些注意点

    CompletableFuture 使异步编程更加便利的、代码更加优雅的同时,也要关注下它的一些注意点。
    异步编程利器:CompletableFuture详解 - 图8

    1. Future需要获取返回值,才能获取异常信息

    1. ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
    2. TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    3. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    4. int a = 0;
    5. int b = 666;
    6. int c = b / a;
    7. return true;
    8. },executorService).thenAccept(System.out::println);
    9. //如果不加 get()方法这一行,看不到异常信息
    10. //future.get();

    Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。使用的时候,注意一下,考虑是否加try...catch...或者使用exceptionally方法。

    2. CompletableFutureget()方法是阻塞的。

    CompletableFutureget()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间~

    1. //反例
    2. CompletableFuture.get();
    3. //正例
    4. CompletableFuture.get(5, TimeUnit.SECONDS);

    3. 默认线程池的注意点

    CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

    4. 自定义线程池时,注意饱和策略

    CompletableFutureget()方法是阻塞的,一般建议使用future.get(3, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。
    但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离。