Future接口
- 两种趋势不断的推动我们反思设计软件的方式。第一种趋势和应用运行的硬件平台相关,第二种趋势与应用程序的架构相关,尤其是它们之间如何交互。
- 下一代网络应用都采用“混聚”(mash-up)的方式:它会使用来自多个来源的内容,将这些内容聚合在一起,方便用户的生活。
- Future接口Java5被引入,它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。
- 打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你可以去做其他的事情。
并发执行的实例 ```java public class Test1 { public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();Future<Double> future = service.submit(new Callable<Double>() {@Overridepublic Double call() throws Exception {return doSomeThingComputation();}});doSomeThingElse();try {Double num = future.get();System.out.println("num:" + num);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}service.shutdown();
}
private static Double doSomeThingComputation() {
try {Thread.sleep(2 * 1000);System.out.println("Test1.doSomeThingComputation");return 2D;} catch (InterruptedException e) {e.printStackTrace();}return 1D;
}
private static void doSomeThingElse() {
try {Thread.sleep(1 * 1000);System.out.println("Test1.doSomeThingElse");} catch (InterruptedException e) {e.printStackTrace();}
} }
// Test1.doSomeThingElse // Test1.doSomeThingComputation // num:2.0
- 这种编程方式让你的线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。- 这种场景是存在问题的,如果长时间没有查询到执行结果,那么就会阻塞。Future提供了无需任务参数的get方法,但是推荐使用重载方法,它接受一个超时参数。- Future接口的局限性:当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。- **想要完成的功能**- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。- 等待Future集合中的所有任务都完成- 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果- 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)- 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)- **使用CompletableFuture构建异步应用**- 为了展示CompletableFuture的强大特性,我们会创建一个"最佳价格查询器",他会查询多个在线商店,拿出最低价格。- 使用Spring Boot 构建2个应用,二者通过HTTP请求的方式通信,模拟真实业务场景。- **将学到什么**- 学到如何为你的客户提供异步API- 掌握如何让你使用了同步API的代码变为非阻塞代码- 使用流水线将两个接续的异步操作合并为一个异步计算操作- 以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格- **同步API和异步API**- 同步API:其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。- 异步API:异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。<a name="ALKl3"></a># 实现异步API- 定义商店,商店会返回指定产品的价格- getPrice方法的内部实现会查询商店的数据库,但也有可能执行一些其他耗时的任务,比如联系其他外部服务```javapublic class Shop {@Getterprivate String shopName;public Shop(String shopName) {this.shopName = shopName;}public double getPrice(String product) {return 0D;}}
- 模拟1s延迟的方法 ```java
private static void delay() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }
- 在getPrice方法中引入一个模拟的延迟```javapublic class Shop {@Getterprivate String shopName;public Shop(String shopName) {this.shopName = shopName;}private static final Random r = new Random();public double getPrice(String product) {delay();return calculatePrice(product);}private double calculatePrice(String product) {delay();return r.nextDouble() * product.charAt(0) + product.charAt(1);}private static void delay() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
- 将同步方法转换为异步方法
getPrice转换为getPriceAsync方法,并修改它的返回值
public Future<Double> getPriceAsync(String product) {// TODO}
因为Future的值的获取发生在未来,所以需要返回一个Future,而非具体的目标类型,因为需要将获取未来这个值的策略在外面进行单独处理。
只有返回Future这样设计,才给了外部调用线程一个机会去做某些操作。
public Future<Double> getPriceAsync(String product) {CompletableFuture<Double> future = new CompletableFuture();new Thread(() -> {try {double calculatePrice = calculatePrice(product);future.complete(calculatePrice);} catch (Exception ex) {// 如果出现异常了,进行异常的处理future.completeExceptionally(ex);}}).start();return future;}
使用异步API
public class Test2 {public static void main(String[] args) throws ExecutionException, InterruptedException {test1();}public static void test1() throws ExecutionException, InterruptedException {Shop shop = new Shop("BestShop");long start = System.currentTimeMillis();Future<Double> futurePrice = shop.getPriceAsync("my love icanci");long endFirst = System.currentTimeMillis();long invokeTime = endFirst - start;System.out.println(invokeTime + ":ms");// 其他操作doSomethingElse();Double price = futurePrice.get();System.out.println("price:" + price);System.out.println("future time:" + (System.currentTimeMillis() - endFirst) + ":ms");}private static final void doSomethingElse() {for (int i = 0; i < 10; i++) {System.out.print(i);}System.out.println();}}// 44:ms// 0123456789// price:126.27621982529686// future time:1006:ms
getPriceAsync方法的调用返回远远早于最终价格计算完成的时间
- 错误处理
- 如果价格计算过程中产生了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞
- 可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。
为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
public Future<Double> getPriceAsync(String product) {CompletableFuture<Double> future = new CompletableFuture();new Thread(() -> {try {// 计算逻辑double calculatePrice = calculatePrice(product);future.complete(calculatePrice);} catch (Exception ex) {// 如果出现异常了,进行异常的处理future.completeExceptionally(ex);}}).start();return future;}
内部抛出异常测试
public Future<Double> getPriceAsync(String product) {CompletableFuture<Double> future = new CompletableFuture();new Thread(() -> {try {// 计算逻辑int i = 1 / 0;double calculatePrice = calculatePrice(product);future.complete(calculatePrice);} catch (Exception ex) {// 如果出现异常了,进行异常的处理future.completeExceptionally(ex);}}).start();return future;}
执行结果如下
32:ms0123456789Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)at com.ldl.function.future.Test2.test1(Test2.java:23)at com.ldl.function.future.Test2.main(Test2.java:12)Caused by: java.lang.ArithmeticException: / by zeroat com.ldl.function.future.Shop.lambda$getPriceAsync$1(Shop.java:38)at java.lang.Thread.run(Thread.java:748)[ERROR] Command execution failed.
使用工厂方法supplyAsync创建CompletableFuture
public Future<Double> getPriceAsync2(String product) {return CompletableFuture.supplyAsync(() -> calculatePrice(product));}
```java public class Test3 { public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
}
public static void test1() throws ExecutionException, InterruptedException {
Shop shop = new Shop("BestShop");long start = System.currentTimeMillis();// getPriceAsync2Future<Double> futurePrice = shop.getPriceAsync2("my love icanci");long endFirst = System.currentTimeMillis();long invokeTime = endFirst - start;System.out.println(invokeTime + ":ms");doSomethingElse();Double price = futurePrice.get();System.out.println("price:" + price);System.out.println("future time:" + (System.currentTimeMillis() - endFirst) + ":ms");
}
private static final void doSomethingElse() {
for (int i = 0; i < 10; i++) {System.out.print(i);}System.out.println();
} }
- supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法```javapublic Future<Double> getPriceAsync2(String product) {return CompletableFuture.supplyAsync(() -> calculatePrice(product), Executors.newSingleThreadExecutor());}
去除阻塞测试
拥有一个商家列表,然后顺序查询结果 ```java public class Test4 { static final List
shops = Arrays.asList(new Shop(“icanci”), new Shop(“ldl”), new Shop(“one”), new Shop(“two”)); public static void main(String[] args) {
long start = System.currentTimeMillis();List<String> prices = findPrices(shops, "Mac Pro 2022");System.out.println(System.currentTimeMillis() - start + ":ms");for (String price : prices) {System.out.println(price);}
}
public static List
findPrices(List shops, String product) { return shops.stream().map(shop -> {return ("shopName:" + shop.getShopName() + ",price:" + shop.getPrice(product));}).collect(Collectors.toList());
} }
- 上述代码执行结果```java8070:msshopName:icanci,price:139.35156975647476shopName:ldl,price:133.7474336693779shopName:one,price:130.14441623539358shopName:two,price:108.93915925708032
- 上述代码执行,居然使用了8s多
接下来对流进行并行处理 ```java public class Test5 { static final List
shops = Arrays.asList(new Shop(“icanci”), new Shop(“ldl”), new Shop(“one”), new Shop(“two”)); public static void main(String[] args) {
long start = System.currentTimeMillis();List<String> prices = findPrices(shops, "Mac Pro 2022");System.out.println(System.currentTimeMillis() - start + ":ms");for (String price : prices) {System.out.println(price);}
}
public static List
findPrices(List shops, String product) { return shops.parallelStream().map(shop -> {return ("shopName:" + shop.getShopName() + ",price:" + shop.getPrice(product));}).collect(Collectors.toList());
} }
- 上述代码执行结果```java2053:msshopName:icanci,price:123.99168537743746shopName:ldl,price:112.76811461616849shopName:one,price:124.0312726480345shopName:two,price:129.19205996300218
- 快了4倍
使用CompletableFuture发起异步请求 ```java public class Test6 { static final List
shops = Arrays.asList(new Shop(“icanci”), new Shop(“ldl”), new Shop(“one”), new Shop(“two”)); public static void main(String[] args) {
long start = System.currentTimeMillis();List<String> prices = findPrices(shops, "Mac Pro 2022");System.out.println(System.currentTimeMillis() - start + ":ms");for (String price : prices) {System.out.println(price);}
}
public static List
findPrices(List shops, String product) { List<CompletableFuture<String>> completableFutures = shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> {try {return "shopName:" + shop.getShopName() + ",price:" + shop.getPriceAsync2(product).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return null;})).collect(Collectors.toList());return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
} }
- 上述代码执行结果```java2042:msshopName:icanci,price:100.10588494345834shopName:ldl,price:166.8870866246924shopName:one,price:157.38070850686452shopName:two,price:154.8300868085765
- 执行了2秒,也没快多少
- 但是要注意的是:上述的代码使用了2个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个的放置两个map操作,考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。
- 因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果

- 上半部分展示了使用单一流水线处理流的过程,我们看到,执行的流程(以虚线标识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。与此相反,图的下半部分展示了如何先将CompletableFutures对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。
使用并行处理
public class Test6 {static final List<Shop> shops = Arrays.asList(new Shop("icanci"), new Shop("ldl"), new Shop("one"), new Shop("two"));public static void main(String[] args) {long start = System.currentTimeMillis();List<String> prices = findPrices(shops, "Mac Pro 2022");System.out.println(System.currentTimeMillis() - start + ":ms");for (String price : prices) {System.out.println(price);}}public static List<String> findPrices(List<Shop> shops, String product) {List<CompletableFuture<String>> completableFutures = shops.parallelStream().map(shop -> CompletableFuture.supplyAsync(() -> {try {return "shopName:" + shop.getShopName() + ",price:" + shop.getPriceAsync2(product).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return null;})).collect(Collectors.toList());return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());}}
2066:msshopName:icanci,price:169.0741823736231shopName:ldl,price:141.53420950495shopName:one,price:173.6930822511241shopName:two,price:123.97103865172662
效果不是很ok
增加商店 ```java public class Test6 { static final List
shops = Arrays.asList(new Shop(“icanci”), new Shop(“ldl”), new Shop(“one”), new Shop(“two”), new Shop(“icanci”), new Shop(“ldl”), new Shop(“one”), new Shop("two"));
public static void main(String[] args) {
long start = System.currentTimeMillis();List<String> prices = findPrices(shops, "Mac Pro 2022");System.out.println(System.currentTimeMillis() - start + ":ms");for (String price : prices) {System.out.println(price);}
}
public static List
findPrices(List shops, String product) { List<CompletableFuture<String>> completableFutures = shops.parallelStream().map(shop -> CompletableFuture.supplyAsync(() -> {try {return "shopName:" + shop.getShopName() + ",price:" + shop.getPriceAsync2(product).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return null;})).collect(Collectors.toList());return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
} }
```java4059:msshopName:icanci,price:136.410924434729shopName:ldl,price:155.4459340047615shopName:one,price:139.82056903888338shopName:two,price:115.26508629258022shopName:icanci,price:118.2941480722545shopName:ldl,price:140.80036561462958shopName:one,price:155.33834086071445shopName:two,price:155.52112871359822
- 还是需要4秒,能不能再快一些呢?
- 并行和CompletableFuture,它们内部采用的是同样的通用线程池,默认都使用固定数目的线程。
- 好在CompletableFuture可以自定义执行器,可以配置线程池的大小。
- 使用定制的执行器
- 明智的选择似乎是创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷
- 线程池个数估算公式 ```java N(threads) = N(cpu) U(cpu) (1 + W / C)
N(cpu)是处理器的核的数目,可以通过Runtime.getRuntime().availableProce-ssors()得到 U(cpu)是期望的CPU利用率(该值应该介于0和1之间) W / C是等待时间与计算时间的比率
- 定制执行器```javapublic class Test7 {static final List<Shop> shops = Arrays.asList(new Shop("icanci"), new Shop("ldl"), new Shop("one"), new Shop("two"), new Shop("icanci"), new Shop("ldl"), new Shop("one"),new Shop("two"));public static void main(String[] args) {long start = System.currentTimeMillis();System.out.println(findPrices(shops, "Mac Pro 2022"));System.out.println(System.currentTimeMillis() - start + ":ms");}public static List<String> findPrices(List<Shop> shops, String product) {Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), r -> {Thread t = new Thread(r);t.setDaemon(true);return t;});List<CompletableFuture<String>> completableFutures = shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> {try {return "shopName:" + shop.getShopName() + ",price:" + shop.getPriceAsync2(product).get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return null;}, executor)).collect(Collectors.toList());return completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());}}
执行结果
[shopName:icanci,price:135.1279016391757, shopName:ldl,price:160.10692930306047, shopName:one,price:162.3093707366204, shopName:two,price:127.58959679064557, shopName:icanci,price:167.40878008141954, shopName:ldl,price:127.9710772727492, shopName:one,price:109.68156333152535, shopName:two,price:140.47878604319249]2043:ms
并行:使用流还是CompletableFutures
- 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)
并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待
对多个异步任务进行流水线操作
假设所有的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折扣代码,每个折扣代码对应不同的折扣率。使用一个枚举型变量Discount.Code来实现。 ```java public class Shop {
@Getter private String shopName;
public Shop(String shopName) {
this.shopName = shopName;
}
private static final Random r = new Random();
public String getPrice(String product) {
double price = calculatePrice(product);Discount.CODE code = Discount.CODE.values()[r.nextInt(Discount.CODE.values().length)];return String.format(shopName + ":" + price + ":" + code);
}
private double calculatePrice(String product) {
delay();return r.nextDouble() * product.charAt(0) + product.charAt(1);
}
private static void delay() {
try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
} }
```javapublic class Discount {public enum CODE {NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20),;private final int percentage;CODE(int percentage) {this.percentage = percentage;}}public static String applyDiscount(Quote quote) {return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getCode());}public static double apply(double price, CODE code) {delay();return price * (100 - code.percentage) / 100;}private static void delay() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}@Dataclass Quote {private final String shopName;private final double price;private final Discount.CODE code;public Quote(String shopName, double price, Discount.CODE code) {this.shopName = shopName;this.price = price;this.code = code;}public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.CODE code = Discount.CODE.valueOf(split[2]);return new Quote(shopName, price, code);}}
- 使用Discount服务
- 因为Discount是一种远程服务,所以我们在上述的代码中加了1秒的延迟
以最简单的方式实现使用Discount服务的findPrices方法
public class Test1 {static final List<Shop> shops = Arrays.asList(new Shop("icanci"), new Shop("ldl"), new Shop("one"), new Shop("two"));public static void main(String[] args) {log(() -> findPrices("icanci"));}public static List<String> findPrices(String product) {return shops.stream().map(shop -> shop.getPrice(product)).map(Quote::parse).map(Discount::applyDiscount).collect(Collectors.toList());}private static <T> T log(Supplier<T> supplier) {long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;}}// 8044:ms
通过在shop构成的流上采用流水线方式执行三次map操作,我们得到了期望的结果
- 第一个操作将每个shop对象转换成了一个字符串,该字符串包含了该shop中指定商品的价格和折扣代码
- 第二个操作对这些字符串进行了解析,在Quote对象中对它们进行转换
- 第三个map会操作联系远程的Discount服务,计算出最终的折扣价格,并返回该价格及提供该价格商品的shop
- 但是耗时8s:4个商店耗时4秒,4个discount耗时4秒。
- 这一方案在商店的数目增加时,扩展性不好,因为Stream底层依赖的是线程数量固定的通用线程池。你也知道,如果自定义CompletableFutures调度任务执行的执行器能够更充分地利用CPU资源。
构造同步和异步操作
public class Test2 {static final List<Shop> shops = Arrays.asList(new Shop("icanci"), new Shop("ldl"), new Shop("one"), new Shop("two"));public static void main(String[] args) {log(() -> findPrices("icanci"));}public static List<String> findPrices(String product) {ExecutorService ex = Executors.newCachedThreadPool();List<CompletableFuture<String>> priceFutures = shops.stream() //// 异步方式取得每个shop中指定的产品的原始价格.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), ex)) //// 使用另外一个异步任务工作期望的 Future,申请折扣.map(future -> future.thenApply(Quote::parse)) //.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), ex))) //.collect(Collectors.toList());//return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());}private static <T> T log(Supplier<T> supplier) {long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;}}// 2031:ms

获取价格
- 三个操作中的第一个,只需要将Lambda表达式作为参数传递给supplyAsync工厂方法就可以以异步方式对shop进行查询。第一个转换的结果是一个Stream
>,一旦运行结束,每个CompletableFuture对象中都会包含对应shop返回的字符串。此处新建了线程池,来进行处理。 - 解析报价
- 需要进行第二次转换将字符串转变为订单。由于一般情况下解析操作不涉及任何远程服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。由于这个原因,你可以对第一步中生成的CompletableFuture对象调用它的thenApply,将一个由字符串转换Quote的方法作为参数传递给它。
- 直到调用的CompletableFuture执行结束,使用的thenApply方法都不会阻塞你代码的执行。这意味着CompletableFuture最终结束运行时,你希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture
对象转换为对应的CompletableFuture 对象。你可以把这看成是为处理CompletableFuture的结果建立了一个菜单,就像你曾经为Stream的流水线所做的事儿一样。
- 为计算折扣价格构造Future
- 第三个map操作涉及联系远程的Discount服务,为从商店中得到的原始价格申请折扣率。这一转换与前一个转换又不大一样,因为这一转换需要远程执行(或者,就这个例子而言,它需要模拟远程调用带来的延迟),出于这一原因,你也希望它能够异步执行。
- 第一个调用传递getPrice给supplyAsync那样,将这一操作以Lambda表达式的方式传递给了supplyAsync工厂方法,该方法最终会返回另一个Completable-Future对象。到目前为止,你已经进行了两次异步操作,用了两个不同的CompletableFutures对象进行建模,你希望能把它们以级联的方式串接起来进行工作。
- 从shop对象中获取价格,接着把价格转换为Quote
- 拿到返回的Quote对象,将其作为参数传递给Discount服务,取得最终的折扣价格。
- Java8的CompletableFuture API 提供了名为thenCompose的方法,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作,换句话说,你可以创建两个CompletableFutures对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应UI事件。
- 这三次map操作的返回的Stream元素收集到一个列表,你就得到了一个List
>,等这些CompletableFuture对象最终执行完毕,你就可以利用join取得它们的返回值。 - 上述代码执行只需要2秒。
- thenCompose方法像CompletableFuture类中的其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。就这个例子而言,第二个CompletableFuture对象的结果取决于第一个CompletableFuture,所以无论你使用哪个版本的方法来处理CompletableFuture对象,对于最终的结果,或者大致的时间而言都没有多少差别。我们选择thenCompose方法的原因是因为它更高效一些,因为少了很多线程切换的开销。
- 将两个CompletableFuture对象整合起来,无论它们是否存在依赖
- 上述案例对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。但是,另一种比较常见的情况是,需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且也不希望等到第一个任务完全结束才开始第二项任务。
- 这种情况,应该使用thenCombine方法,它接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供有一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。
一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。你可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。用这种方式,你需要使用第三个CompletableFuture对象,当前两个CompletableFuture计算出结果,并由BiFunction方法完成合并后,由它来最终结束这一任务。 ```java public class ExchangeService { public double getRate(Money from, Money to) {
delay();return 0.8;
}
private static void delay() {
try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
} }
enum Money { EUR, USD }
```javapublic class Shop {// 增加方法public double getPriceDouble(String product) {return calculatePrice(product);}}
public class ExchangeService {public double getRate(Money from, Money to) {delay();return 0.8;}private static void delay() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}enum Money {EUR, USD}
public class Test3 {static final List<Shop> shops = Arrays.asList(new Shop("icanci"), new Shop("ldl"), new Shop("one"), new Shop("two"));private static final ExchangeService exchangeService = new ExchangeService();public static void main(String[] args) {log(() -> findPrices("icanci"));}public static List<Double> findPrices(String product) {ExecutorService ex = Executors.newCachedThreadPool();Stream<CompletableFuture<Double>> priceFutures = shops.stream() //.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceDouble(product), ex).thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD), ex), (price, rate) -> price * rate));List<Double> list = priceFutures.map(CompletableFuture::join).collect(Collectors.toList());ex.shutdown();return list;}private static <T> T log(Supplier<T> supplier) {long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;}}// 4028:ms
CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起
响应CompletableFuture的completion事件
一个模拟生成0.5秒至2.5秒随机延迟的方法
private static final Random r = new Random();private static void randomDelay() {int delay = 500 + r.nextInt(2000);try {Thread.sleep(delay);} catch (InterruptedException e) {e.printStackTrace();}}
重构findPrices方法返回一个由Future构成的流 ```java public class Test4 { static final List
shops = Arrays.asList(new Shop(“icanci”), new Shop(“icanci”), new Shop(“icanci”), new Shop(“icanci”), new Shop(“icanci”), new Shop("icanci"));
private static final ExchangeService exchangeService = new ExchangeService();
public static void main(String[] args) {
CompletableFuture[] futures = findPrices("my love").map(f -> f.thenAccept(System.out::println)).toArray(size -> new CompletableFuture[size]);CompletableFuture.allOf(futures).join();
}
public static Stream
> findPrices(String product) { ExecutorService ex = Executors.newCachedThreadPool();return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), ex)).map(future -> future.thenApply(Quote::parse)).map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), ex)));
} }
- 打印每个执行的时间```javapublic class Test4 {static final List<Shop> shops = Arrays.asList(new Shop("icanci"), new Shop("icanci"), new Shop("icanci"), new Shop("icanci"), new Shop("icanci"),new Shop("icanci"));private static final ExchangeService exchangeService = new ExchangeService();public static void main(String[] args) {long start = System.currentTimeMillis();CompletableFuture[] futures = findPrices("my love").map(f -> f.thenAccept(s -> System.out.println(s + "(done in " + (System.currentTimeMillis() - start) + ":ms)"))).toArray(size -> new CompletableFuture[size]);CompletableFuture.allOf(futures).join();System.out.println("all done in " + (System.currentTimeMillis() - start) + ":ms");}public static Stream<CompletableFuture<String>> findPrices(String product) {ExecutorService ex = Executors.newCachedThreadPool();return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), ex)).map(future -> future.thenApply(Quote::parse)).map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), ex)));}}
icanci price is 127.33160045136277(done in 1675:ms)icanci price is 186.41685816597658(done in 1947:ms)icanci price is 190.38841540648255(done in 2290:ms)icanci price is 132.1967499956825(done in 2332:ms)icanci price is 178.940055677258(done in 2339:ms)icanci price is 144.71398857049564(done in 2431:ms)all done in 2431:ms
实践案例
- 应用场景:针对淘宝的订单,其可能有退订信息、修改信息、邮寄信息、保险信息等等,现在需要整合这些信息,吐出去一个合并信息。每个信息都来源于外部系统,此时可以使用并行查询
模拟定义外部服务 ```java public class BaseService { private static final Random r = new Random();
protected void randomDelay() {
int delay = 500 + r.nextInt(2000);try {Thread.sleep(delay);} catch (InterruptedException e) {e.printStackTrace();}
} }
```java@Datapublic class MailVO {private String orderId;private String province;private int processState;private String receiver;private String receiveCellphone;private String city;private String sendAddress;private String expressNumber;private String expressName;private String region;}
public class MailService extends BaseService {public MailVO queryByOrderId(String orderId) {randomDelay();return new MailVO();}}
@Datapublic class OrderVO {private String orderId;private double amount;private String memberId;private String createTime;}
public class OrderService extends BaseService {public OrderVO queryByOrderId(String orderId) {randomDelay();return new OrderVO();}}
@Datapublic class RefundVO {private String refundId;private String price;private String username;private String ext;}
public class RefundService extends BaseService {public RefundVO queryByOrderId(String orderId) {randomDelay();return new RefundVO();}}
@Datapublic class MergeInfoVO {private MailVO mailVO;private OrderVO orderVO;private RefundVO refundVO;}
合并数据服务 ```java public class MergeInfoService { private OrderService orderService = new OrderService(); private MailService mailService = new MailService(); private RefundService refundService = new RefundService();
public MergeInfoVO getInfo(String orderId) {
// 基于并行编排的查询流程:// 优先查询订单,(有)并行查询其它服务,(没有)抛异常返回结果// |---- stage2 ----|// stage1 -----| |----- stage4// |---- stage3 ----|try {return CompletableFuture.supplyAsync(() -> {OrderVO orderVO = log(() -> orderService.queryByOrderId(orderId));if (orderVO == null) {throw new IllegalArgumentException("orderVO is null");}return buildByOrder(orderVO);}).thenComposeAsync(vo -> {try {CompletableFuture.allOf( //CompletableFuture.runAsync(() -> log(() -> buildByMail(vo, mailService.queryByOrderId(orderId)))), //CompletableFuture.runAsync(() -> log(() -> buildByRefund(vo, refundService.queryByOrderId(orderId)))) //).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}return CompletableFuture.completedFuture(vo);}).handleAsync((c, e) -> {if (e != null) {return null;}return c;}).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}return null;
}
private MergeInfoVO buildByOrder(OrderVO orderVO) {
MergeInfoVO mergeInfoVO = new MergeInfoVO();mergeInfoVO.setOrderVO(orderVO);return mergeInfoVO;
}
private MergeInfoVO buildByMail(MergeInfoVO mergeInfoVO, MailVO mailVO) {
mergeInfoVO.setMailVO(mailVO);return mergeInfoVO;
}
private MergeInfoVO buildByRefund(MergeInfoVO mergeInfoVO, RefundVO refundVO) {
mergeInfoVO.setRefundVO(refundVO);return mergeInfoVO;
}
private static
T log(Supplier supplier) { long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;
} }
- 测试```javapublic class MergeInfoServiceTest {public static void main(String[] args) {MergeInfoService mergeInfoService = new MergeInfoService();log(() -> mergeInfoService.getInfo("MX00110"));}private static <T> T log(Supplier<T> supplier) {long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;}}// 1399:ms// 1293:ms// 1482:ms// 2886:ms
- 因为有先后依赖关系,所以需要先查询order,在查询其他的。所以总耗时为第一个时间+第二第三中慢的那个时间。
没有先后依赖关系,可以同时并行查,如下 ```java public class MergeInfoService2 { private OrderService orderService = new OrderService(); private MailService mailService = new MailService(); private RefundService refundService = new RefundService();
public MergeInfoVO getInfo(String orderId) {
// 基于并行编排的查询流程:// 没有优先级,可并行查// |---- stage1 ----|// |---- stage2 ----| ----- stage4// |---- stage3 ----|MergeInfoVO mergeInfoVO = new MergeInfoVO();try {CompletableFuture.allOf( //CompletableFuture.runAsync(() -> log(() -> buildByOrder(mergeInfoVO, orderService.queryByOrderId(orderId)))), //CompletableFuture.runAsync(() -> log(() -> buildByMail(mergeInfoVO, mailService.queryByOrderId(orderId)))), //CompletableFuture.runAsync(() -> log(() -> buildByRefund(mergeInfoVO, refundService.queryByOrderId(orderId)))) //).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}return null;
}
private MergeInfoVO buildByOrder(MergeInfoVO mergeInfoVO, OrderVO orderVO) {
mergeInfoVO.setOrderVO(orderVO);return mergeInfoVO;
}
private MergeInfoVO buildByMail(MergeInfoVO mergeInfoVO, MailVO mailVO) {
mergeInfoVO.setMailVO(mailVO);return mergeInfoVO;
}
private MergeInfoVO buildByRefund(MergeInfoVO mergeInfoVO, RefundVO refundVO) {
mergeInfoVO.setRefundVO(refundVO);return mergeInfoVO;
}
private static
T log(Supplier supplier) { long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;
} }
```javapublic class MergeInfoService2Test {public static void main(String[] args) {MergeInfoService2 mergeInfoService2 = new MergeInfoService2();log(() -> mergeInfoService2.getInfo("MX00110"));}private static <T> T log(Supplier<T> supplier) {long start = System.currentTimeMillis();T t = supplier.get();System.out.println(System.currentTimeMillis() - start + ":ms");return t;}}// 1196:ms// 1444:ms// 1593:ms// 1596:ms
小结
- 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度
- 你应该尽可能地为客户提供异步API。使用CompletableFuture类提供的特性,你能够轻松地实现这一目标
- CompletableFuture类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常
- 将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果
- 如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个
- 你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者它们计算的结果可用时,针对性地执行一些程序
你可以决定在什么时候结束程序的运行,是等待由CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行
参考文章
《Java 8 in Action》
-
推荐文章
CompletableFuture详解:https://zhuanlan.zhihu.com/p/344431341
