CompletableFuture
可以指定异步处理流程:
runAsync&supplyAsync
创建CompletableFuture对象可以通过CompletableFuture类的静态方法创建一个CompletableFuture对象。常用的两个方法:
- CompletableFuture.runAsync():创建一个CompletableFuture对象,该对象会在一个新线程中执行指定的任务,并且不返回结果。
- CompletableFuture.supplyAsync():创建一个CompletableFuture对象,该对象会在一个新线程中执行指定的任务,并且返回一个结果。
获取任务结果方法
// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException
// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)
// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)
// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex)
thenAccept&thenAcceptAsync
thenAccept&thenAcceptAsync表示某个任务执行完成后执行的动作,即回调方法。会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。
区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
});
//等待任务1执行完成
System.out.println("cf1结果->" + cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
//------------------------------------------------
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
return 1;
});
CompletableFuture<Void> cf4 = cf3.thenAcceptAsync((result) -> {
System.out.println(Thread.currentThread() + " cf4 do something....");
});
//等待任务1执行完成
System.out.println("cf3结果->" + cf3.get());
//等待任务2执行完成
System.out.println("cf4结果->" + cf4.get());
}
thenApply&thenApplyAsync
表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。
区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return 1;
});
CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
System.out.println(Thread.currentThread() + " cf2 do something....");
result += 2;
return result;
});
//等待任务1执行完成
System.out.println("cf1结果->" + cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
//------------------------------------------------
CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf3 do something....");
return 1;
});
CompletableFuture<Integer> cf4 = cf3.thenApply((result) -> {
System.out.println(Thread.currentThread() + " cf4 do something....");
result += 2;
return result;
});
//等待任务1执行完成
System.out.println("cf3结果->" + cf3.get());
//等待任务2执行完成
System.out.println("cf4结果->" + cf4.get());
}
thenRun&thenRunAsync
该方法同 thenAccept 方法类似。区别thenRun 不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。
不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。
区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}).thenAccept(r -> { //区别在这
System.out.println("thenRun r:" + r);
});
future1.get();
//thenAccept VS thenRun
CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}).thenRun(() -> {
System.out.println("thenRun ...");
});
future2.get();
}
whenComplete&whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
whenCompleteAsync和whenComplete区别也是whenCompleteAsync可能会另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
int a = 1/0;
return 1;
});
CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
System.out.println("上个任务结果:" + result);
System.out.println("上个任务抛出异常:" + e);
System.out.println(Thread.currentThread() + " cf2 do something....");
});
//等待任务1执行完成
//System.out.println("cf1结果->" + cf1.get());
//等待任务2执行完成
System.out.println("cf2结果->" + cf2.get());
}
handle&handleAsync
跟whenComplete基本一致,区别在于handle的回调方法有返回值
exceptional
exceptionally()方法来处理任务中发生的异常。该方法会在任务发生异常时,执行一个指定的函数并附带默认值。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println("测试【thenAccept&exceptionally】创建方式!!!");
//int i = 1/0;
return "有返回值";
}).thenAccept(x -> {
System.out.println("x:" + x); //正常执行完毕进入此函数
}).exceptionally(e -> {
System.out.println("e:" + e); //正常异常进入此函数
return null;
});
}
CompletableFuture的组合
在实际开发中,我们经常需要将多个CompletableFuture对象组合在一起,以便于并行执行多个任务并等待所有任务完成后处理结果。在这种情况下,我们可以使用CompletableFuture的组合方法
thenCombine&thenCompose
- thenCombine():将两个CompletableFuture对象的结果合并为一个结果。
public static void main(String[] args) {
CompletableFuture<List<String>> painting = CompletableFuture.supplyAsync(() -> {
// 第一个任务获取美术课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("画笔");
stuff.add("颜料");
return stuff;
});
CompletableFuture<List<String>> handWork = CompletableFuture.supplyAsync(() -> {
// 第二个任务获取劳技课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("剪刀");
stuff.add("折纸");
return stuff;
});
CompletableFuture<List<String>> total = painting
// 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
.thenCombine(handWork, (stuff1, stuff2) -> {
// 合并成新的list
List<String> totalStuff = Stream.of(stuff1, stuff1).flatMap(Collection::stream).collect(Collectors.toList());
return totalStuff;
});
System.out.println(JSON.toJSONString(total.join()));
}
- thenCompose():将一个CompletableFuture对象的结果作为另一个CompletableFuture对象的输入。
public static void main(String[] args) {
CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
// 第一个任务获取美术课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("画笔");
stuff.add("颜料");
return stuff;
}).thenCompose(list -> {
// 向第二个任务传递参数list(上一个任务美术课所需的东西list)
CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
List<String> stuff = new ArrayList<>();
// 第二个任务获取劳技课所需的工具
stuff.add("剪刀");
stuff.add("折纸");
// 合并两个list,获取课程所需所有工具
List<String> allStuff = Stream.of(list, stuff).flatMap(Collection::stream).collect(Collectors.toList());
return allStuff;
});
return insideFuture;
});
System.out.println(JSON.toJSONString(total.join()));
}
anyOf&allOf
anyOf() 任意一个执行完成,就可以进行下一步动作
allOf() 全部完成所有任务,才可以进行下一步任务
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步任务,无返回值,采用内部的forkjoin线程池
CompletableFuture c1 = CompletableFuture.runAsync(() -> {
System.out.println("打开开关,开始制作,就不用管了");
});
// 异步任务,无返回值,使用自定义的线程池
CompletableFuture c11 = CompletableFuture.runAsync(() -> {
System.out.println("打开开关,开始制作,就不用管了【自定义线程池版】");
}, newSingleThreadExecutor());
// 异步任务,有返回值,使用内部默认的线程池
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
System.out.println("清洗米饭");
int i = 1 / 0;
return "干净的米饭";
});
//anyOf:任意一个执行完成,就可以进行下一步动作。即使一个报异常也不影响继续执行
System.out.println("start");
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(c1, c2);
anyOf.thenAccept(r -> {
System.out.println("r:" + r);
}).exceptionally(e -> {
//e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
System.out.println("e:" + e);
return null;
});
//allOf:全部完成所有任务,并且不能有异常,才可以进行下一步任务
CompletableFuture<Void> allOf = CompletableFuture.allOf(c1, c2);
allOf.thenAccept(r -> {
System.out.println("r:" + r);
}).exceptionally(e -> {
System.out.println("e:" + e); //e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
return null;
});
System.out.println("end");
}
小结
常用语法
thenAccept()
处理正常结果;exceptional()
处理异常结果;thenApplyAsync()
用于串行化另一个CompletableFuture
;anyOf()
和allOf()
用于并行化多个CompletableFuture
。
练习代码