一,前置知识

1,用户线程和守护线程

1.所有用户线程执行完毕,程序就会退出,不再等待守护线程。

  1. public static void main(String[] args) {
  2. new Thread(()-> System.out.println("当前线程"+Thread.currentThread().getName()+"是"+(Thread.currentThread().isDaemon()?"守护线程":"用户线程"))).start();
  3. }

2.用户线程不结束,程序就不会退出

  1. public class DemoA {
  2. public static void main(String[] args) {
  3. Thread thread = new Thread(DemoA::run,"线程1");
  4. thread.start();
  5. }
  6. private static void run() {
  7. System.out.println("当前线程" + Thread.currentThread().getName() + "是" + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));
  8. while (true) ;
  9. }
  10. }

3.不管守护线程结束没有,只要用户线程结束,程序就会退出

  1. public class DemoA {
  2. public static void main(String[] args) {
  3. Thread thread = new Thread(DemoA::run,"线程1");
  4. //将线程1设置为守护线程
  5. thread.setDaemon(true);
  6. thread.start();
  7. }
  8. private static void run() {
  9. System.out.println("当前线程" + Thread.currentThread().getName() + "是" + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));
  10. while (true) ;
  11. }
  12. }

2.回首FutureTask

1.FutureTask存在的问题

  1. public class Demob {
  2. public static void main(String[] args)throws Exception {
  3. FutureTask<String> futureTask = new FutureTask<>(Demob::call);
  4. new Thread(futureTask,"t1").start();
  5. //异步结果集的展现 ---- 会阻塞
  6. //System.out.println(futureTask.get());
  7. //轮询的方式去问 --- 消耗cpu资源
  8. while (true) {
  9. if (futureTask.isDone()){
  10. System.out.println(futureTask.get());
  11. break;
  12. }
  13. }
  14. System.out.println("main结束");
  15. }
  16. private static String call() {
  17. try {
  18. TimeUnit.SECONDS.sleep(1);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. return "yhd";
  23. }
  24. }

从上面的代码可以看出FutureTask存在的问题:

  1. get()异步结果集的展现 —— 会阻塞
  2. futureTask.isDone()轮询的方式去问 —- 消耗cpu资源

2.想完成一些复杂的任务

  1. 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知。
  2. 将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
  3. 当Future集合中某个任务最快结束时,返回结果。
  4. 等待Future集合中的所有任务都完成。

二,CompletableFuture

1.介绍

异步函数式编程,Future接口的扩展与增强版。

可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

  1. 完成通知回调
  2. 类似Linux管道符的形式,可以异步的计算,上一个计算结果可以给下一个,结合lambda表达式和函数式编程串起来转换和组合获得结果。

image.png

2.实例化

如果不指定线程池,默认所有的线程都是守护线程。

  1. public class DemoC {
  2. private static ThreadPoolExecutor pool=new ThreadPoolExecutor(1,5,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1));
  3. public static void main(String[] args) throws Exception{
  4. //创建一个有返回结果的实例
  5. CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "yhd", pool);
  6. //创建一个没有返回结果的实例
  7. CompletableFuture<Void> runAsync = CompletableFuture.runAsync(System.out::println);
  8. pool.shutdown();
  9. }
  10. }

3.异步+回调

whenComplete 和 join 的区别:

一个是执行完返回结果,一个是主动获取结果。

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

  1. public class DemoC {
  2. private static ThreadPoolExecutor pool=new ThreadPoolExecutor(1,5,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1));
  3. public static void main(String[] args) throws Exception{
  4. //创建一个有返回结果的实例 计算完成时回调 发生异常时执行
  5. CompletableFuture.supplyAsync(() -> "yhd", pool).whenComplete(DemoC::accept).exceptionally(Throwable::toString);
  6. pool.shutdown();
  7. }
  8. private static void accept(String result, Throwable e) {
  9. System.out.println(result);
  10. }
  11. }

4.结果处理

handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。

  1. /**
  2. * @author yhd
  3. * @createtime 2020/11/15 23:10
  4. */
  5. public class DemoB {
  6. public static void main(String[] args) {
  7. CompletableFuture.supplyAsync(()->"尹会东").handle(DemoB::apply).whenCompleteAsync(((s, throwable) -> System.out.println(s)));
  8. }
  9. private static String apply(String s, Throwable throwable) {
  10. if (null != throwable)
  11. return throwable.getMessage();
  12. return s + " 牛逼!";
  13. }
  14. }

5.线程串行化

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作
带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

  1. public class DemoE {
  2. public static void main(String[] args) {
  3. CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(() -> 1).thenApply(integer -> 1);
  4. CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> 1).thenAccept(System.out::println);
  5. CompletableFuture<Void> thenRun = CompletableFuture.supplyAsync(() -> 1).thenRun(System.out::println);
  6. System.out.println(thenApply.join());
  7. }
  8. }

6.任务组合

1.两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

  1. public class DemoF {
  2. public static void main(String[] args) {
  3. CompletableFuture
  4. .supplyAsync(() -> "hello")
  5. .thenApplyAsync(t -> t + " world!")
  6. .thenCombineAsync(
  7. CompletableFuture
  8. .completedFuture(" CompletableFuture"), (t, u) -> t + u)
  9. .whenComplete(DemoF::accept);
  10. }
  11. private static void accept(String t, Throwable u) {
  12. System.out.println(t);
  13. }
  14. }

2.两任务组合 - 一个完成

当两个任务中,任意一个future任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值

3.多任务组合

allOf:等待所有任务完成
anyOf:只要有一个任务完成

7.计算接口性能

1.一淘

  1. /**
  2. * @author yhd
  3. * @createtime 2020/11/15 15:11
  4. * 查询多个电商网站同一个商品的价格
  5. * 同步和异步两种方式
  6. */
  7. public class DemoD {
  8. static List<Gmall> gmalls= Arrays.asList(
  9. new Gmall("京东"),
  10. new Gmall("拼多多"),
  11. new Gmall("淘宝"),
  12. new Gmall("唯品会"),
  13. new Gmall("分期乐"));
  14. /**
  15. * 同步
  16. * @param gmalls
  17. * @param productName
  18. * @return
  19. */
  20. public static List<String> getPriceSync(List<Gmall> gmalls,String productName){
  21. return gmalls.stream().map(gmall->String.format(productName+"in %s price is %d ",gmall.getGmallName(),gmall.getPriceByProductName(productName))).collect(Collectors.toList());
  22. }
  23. /**
  24. * 异步
  25. * @param gmalls
  26. * @param productName
  27. * @return
  28. */
  29. public static List<String> getPriceAsync(List<Gmall> gmalls,String productName){
  30. return gmalls.stream().map(gmall->CompletableFuture.supplyAsync(()->String.format(productName+"in %s price is %d ",gmall.getGmallName(),gmall.getPriceByProductName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
  31. }
  32. public static void main(String[] args) {
  33. long startTime = System.currentTimeMillis();
  34. getPriceSync(gmalls,"金鳞岂是池中物").forEach(System.out::println);
  35. long endTime = System.currentTimeMillis();
  36. System.out.println("同步"+ (endTime-startTime));
  37. System.out.println("------------------------");
  38. long s = System.currentTimeMillis();
  39. getPriceAsync(gmalls,"金鳞岂是池中物").forEach(System.out::println);
  40. long e = System.currentTimeMillis();
  41. System.out.println("异步"+ (e-s));
  42. }
  43. }
  44. @Data
  45. @NoArgsConstructor
  46. @AllArgsConstructor
  47. class Gmall{
  48. private String gmallName;
  49. public Integer getPriceByProductName(String productName){
  50. try {
  51. TimeUnit.SECONDS.sleep(1);
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. return (int)(Math.random()*100)+1;
  56. }
  57. }

结果:

  1. =================================
  2. 金鳞岂是池中物in 京东 price is 100
  3. 金鳞岂是池中物in 拼多多 price is 12
  4. 金鳞岂是池中物in 淘宝 price is 25
  5. 金鳞岂是池中物in 唯品会 price is 35
  6. 金鳞岂是池中物in 分期乐 price is 60
  7. 同步5081
  8. ------------------------
  9. 金鳞岂是池中物in 京东 price is 57
  10. 金鳞岂是池中物in 拼多多 price is 68
  11. 金鳞岂是池中物in 淘宝 price is 84
  12. 金鳞岂是池中物in 唯品会 price is 16
  13. 金鳞岂是池中物in 分期乐 price is 20
  14. 异步1387
  15. =================================

2.检索

  1. /**
  2. * @author yhd
  3. * @createtime 2020/11/15 23:40
  4. */
  5. public class DemoC {
  6. //模拟数据库
  7. private static Map<String,Company> db=new HashMap<>();
  8. //模拟查询条件
  9. private static List<String> persons;
  10. //初始化数据
  11. static {
  12. persons=Arrays.asList("马云","马化腾","李彦宏","张朝阳","刘强东","王兴");
  13. db.put("马云",new Company("马云","阿里巴巴"));
  14. db.put("马化腾",new Company("马化腾","腾讯"));
  15. db.put("李彦宏",new Company("李彦宏","百度"));
  16. db.put("张朝阳",new Company("张朝阳","搜狐"));
  17. db.put("刘强东",new Company("刘强东","京东"));
  18. db.put("王兴",new Company("王兴","美团"));
  19. }
  20. //模拟去数据库查询
  21. public static Company selectDB(String key){
  22. try {
  23. TimeUnit.SECONDS.sleep(1);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. return db.get(key);
  28. }
  29. public static void main(String[] args) {
  30. Sync();
  31. System.out.println();
  32. Async();
  33. System.out.println();
  34. System.out.println(result());
  35. }
  36. //同步查询
  37. public static void Sync(){
  38. Long startTime=System.currentTimeMillis();
  39. persons.stream().map(key->selectDB(key).getWorks()).collect(Collectors.toList()).forEach(System.out::println);
  40. Long endTime=System.currentTimeMillis();
  41. System.out.println(endTime-startTime);
  42. }
  43. //异步查询
  44. public static void Async(){
  45. Long startTime=System.currentTimeMillis();
  46. persons.stream().map(key->CompletableFuture.supplyAsync(()->selectDB(key).getWorks())).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()).forEach(System.out::println);
  47. Long endTime=System.currentTimeMillis();
  48. System.out.println(endTime-startTime);
  49. }
  50. //转换成map返回
  51. public static Map<String,String> result(){
  52. return persons.stream().map(key -> CompletableFuture.supplyAsync(() -> selectDB(key))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toMap(Company::getPerson,Company::getWorks));
  53. }
  54. }
  55. @Data
  56. @NoArgsConstructor
  57. @AllArgsConstructor
  58. class Company{
  59. private String person;
  60. private String works;
  61. }

结果

  1. 阿里巴巴
  2. 腾讯
  3. 百度
  4. 搜狐
  5. 京东
  6. 美团
  7. 6053
  8. 阿里巴巴
  9. 腾讯
  10. 百度
  11. 搜狐
  12. 京东
  13. 美团
  14. 1406
  15. {张朝阳=搜狐, 马云=阿里巴巴, 王兴=美团, 李彦宏=百度, 刘强东=京东, 马化腾=腾讯}

3.商品详情

  1. /**
  2. * @author 二十
  3. * @since 2021/8/28 7:54 下午
  4. */
  5. public class CompletableFutureTest {
  6. private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
  7. public static void main(String[] args) {
  8. Map<String, Result> result = new HashMap<>(4);
  9. Long s = System.currentTimeMillis();
  10. CompletableFuture<Result> skuInfoFuture = CompletableFuture.supplyAsync(() -> {
  11. Result skuInfo = getSkuInfo(1);
  12. result.put(skuInfo.getName(), skuInfo);
  13. return skuInfo;
  14. }, pool);
  15. CompletableFuture<Void> skuPriceFuture = CompletableFuture.runAsync(() -> {
  16. Result skuPrice = getSkuPrice(1);
  17. result.put(skuPrice.getName(), skuPrice);
  18. }, pool);
  19. CompletableFuture<Void> skuImgsFuture = skuInfoFuture.thenAcceptAsync(r -> {
  20. Result skuImgs = getSkuImgs(r.getId());
  21. result.put(skuImgs.getName(), skuImgs);
  22. }, pool);
  23. CompletableFuture<Void> spuInfoFuture = skuInfoFuture.thenAcceptAsync(r -> {
  24. Result spuInfo = getSpuInfo(r.getId());
  25. result.put(spuInfo.getName(), spuInfo);
  26. }, pool);
  27. CompletableFuture.allOf(skuInfoFuture, skuPriceFuture, skuImgsFuture, spuInfoFuture).join();
  28. System.out.println(result);
  29. Long e = System.currentTimeMillis();
  30. System.out.println("查詢總計耗時:" + (e - s));
  31. pool.shutdown();
  32. }
  33. /**
  34. * 查询sku基本信息
  35. */
  36. public static Result getSkuInfo(Integer key) {
  37. sleep();
  38. return new Result(1, "skuInfo");
  39. }
  40. /**
  41. * 查询sku图片信息
  42. */
  43. public static Result getSkuImgs(Integer key) {
  44. sleep();
  45. return new Result(1, "skuImgs");
  46. }
  47. /**
  48. * 查询spu销售属性
  49. */
  50. public static Result getSpuInfo(Integer key) {
  51. sleep();
  52. return new Result(1, "spuInfo");
  53. }
  54. /**
  55. * 查询sku价格
  56. */
  57. public static Result getSkuPrice(Integer key) {
  58. sleep();
  59. return new Result(1, "skuPrice");
  60. }
  61. public static void sleep() {
  62. try {
  63. TimeUnit.SECONDS.sleep(2);
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. @Data
  69. @NoArgsConstructor
  70. @AllArgsConstructor
  71. private static class Result {
  72. private Integer id;
  73. private String name;
  74. }
  75. }

夜拍.jpeg
哪有什么岁月静好,不过是在你看不见的地方负重前行。