异步与线程池

创建线程的4钟方式

  1. 继承Thread
  2. 实现 Runnable接口
  3. 实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
  4. 线程池

区别:
1、2不能得到返回值。3可以获取返回值
1、2、3都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】)
4可以控制资源,性能稳定,不会一下子所有线程一起运行

结论:
实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】

创建线程池的方式

创建固定线程数的线程池ExecutorService

  1. 固定线程数的线程池
  2. Executors.newFixedThreadPool(10);

execute和submit区别

作用:都是提交异步任务的

  • execute:只能提交Runnable任务,没有返回值
  • submit:可以提交Runnable、Callable,返回值是FutureTask

创建原生线程池ThreadPoolExecutor

异步与线程池 - 图1

  1. new ThreadPoolExecutor(5,
  2. 200,
  3. 10,
  4. TimeUnit.SECONDS,
  5. new LinkedBlockingDeque<>(100000),
  6. Executors.defaultThreadFactory(),
  7. new ThreadPoolExecutor.AbortPolicy());

七大参数

  • corePoolSize: 核心线程数,不会被回收,接收异步任务时才会创建
  • maximumPoolSize:最大线程数量,控制资源
  • keepAliveime: maximumPoolSize-corePoolSize 无任务存活超过空闲时间则线程被释放
  • TimeUnitunit: 时间单位
  • workQueue: 阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行
  • threadFactory: 线程的创建工厂【可以自定义】
  • RejectedExecutionHandler handler:队列满后执行的拒绝策略,默认为AbortPolicy策略

拒绝策略

  • DiscardOldestPolicy:丢弃最老的任务
  • AbortPolicy:丢弃当前任务,抛出异常【默认策略】
  • CallerRunsPolicy:同步执行run方法
  • DiscardPolicy:丢弃当前任务,不抛出异常

阻塞队列

异步与线程池 - 图2

new LinkedBlockingDeque<>();// 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小

线程池的分类

  • 可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
  • 定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize
  • 周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行)
  • 单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程)

对应的创建方式:

  • Executors.newCachedThreadPool();
  • Executors.newFixedThreadPool(10);
  • Executors.newScheduledThreadPool(10);
  • Executors.newSingleThreadExecutor();

注意:回收线程 = maximumPoolSize - corePoolSize

为什么使用线程池?

  • 降低资源的消耗【减少创建销毁操作】
    • 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
    • 高并发状态下过多创建线程可能将资源耗尽
  • 提高响应速度【控制线程个数】
    • 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
  • 提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】
    • 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

异步编排CompletableFuture

引入

举例:

例如:完成以下业务

异步与线程池 - 图3

如果单线程,那么会消耗5.5秒,所以使用多线程完成操作,但是4、5、6依赖1,得先知道sku是哪个spu下的。所以不能同时开启线程来执行任务,需要有顺序,所以使用CompletableFuture。

创建异步对象

CompletableFuture提供了四个静态方法来创建一个异步操作。

异步与线程池 - 图4

  • runXXX都是没有返回结果的,supplyXXX可以获取返回结果
  • 可以传入自定义线程池,否则使用默认线程池

下面演示了2钟方法创建:

  1. private static ExecutorService executorService = Executors.newFixedThreadPool(10);
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. System.out.println("主线程开始...");
  4. // 没有返回值
  5. CompletableFuture.runAsync(() -> {
  6. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  7. int i = 10 / 2;
  8. System.out.println("结果:" + i);
  9. }, executorService); // 使用自定义创建的线程
  10. // 有返回值
  11. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  12. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  13. int i = 10 / 2;
  14. return i;
  15. }, executorService);
  16. System.out.println("结果:" + future.get());
  17. System.out.println("主线程结束...");
  18. }
  19. }

计算完成时的回调方法

异步与线程池 - 图5

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。

whenComplete和whenCompleteAsync的区别:

  • whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
  • whenCempleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池方法来执行

方法不以Async结尾,意味着Action 使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

  1. // 有返回值
  2. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  3. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  4. int i = 10 / 2;
  5. return i;
  6. }, executorService)
  7. // 虽然能得到异常信息,但是没法修改数据
  8. .whenComplete((res, exception) -> {
  9. System.out.println("异步方法完成了,结果是:" + res + "异常是:" + exception);
  10. })
  11. // 得到异常信息,同时返回默认值
  12. .exceptionally(exception -> {
  13. return 10;
  14. });

handler方法

异步与线程池 - 图6

和complete一样,可对结果做最后的处理(可处理异常),可改变返回值。

  1. // 有返回值
  2. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  3. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  4. // int i = 10 / 2; // 返回10
  5. int i = 10 / 0; // 返回0
  6. return i;
  7. }, executorService).handle((res,thr) -> {
  8. if (res != null) {
  9. return res * 2;
  10. }
  11. if (thr != null){
  12. return 0;
  13. }
  14. return 0;
  15. });
  16. System.out.println("结果:" + future.get());
  17. System.out.println("主线程结束...");

线程串行化

异步与线程池 - 图7

  • thenApply方法:当一个线程依赖另一个线性时,获取上一个任务返回的结果,并返回当前任务的返回值。
  • thenAccent.方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
  • thenRun方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun的后续操作。
  • 带有Async默认是异步执行的。同之前。以上都要前置任务成功完成。
  • Function<? super T,? extends U>
    T:上一个任务返回结果的类型 U:返回泛型
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  3. int i = 10 / 2; // 返回10
  4. return i;
  5. }, executorService).thenApplyAsync(res -> {
  6. // 能接收结果,有返回值
  7. return "改变了结果" + res;
  8. }, executorService);
  9. System.out.println("结果:" + future.get());
  10. System.out.println("主线程结束...");

两任务组合(一个完成)

异步与线程池 - 图8

异步与线程池 - 图9

  • 当两个任务中,任意一个 future任务完成的时候,执行任务。
  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  • runAfterEither:两个任务有一个执行完成,不需要获取future 的结果,处理任务,也没有返回值。
  1. CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
  2. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  3. int i = 10 / 2; // 返回10
  4. return i;
  5. }, executorService);
  6. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
  7. System.out.println("当前线程开启:" + Thread.currentThread().getId());
  8. try {
  9. Thread.sleep(2000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. int i = 10 / 2; // 返回10
  14. return i;
  15. }, executorService);
  16. // 有返回值,能获取结果
  17. CompletableFuture<String> future3 = future1.applyToEither(future2, res -> {
  18. System.out.println("有返回值" + res);
  19. return "有返回值:" + res ;
  20. });
  21. // 没有返回值,能获取结果
  22. CompletableFuture<Void> future4 = future1.acceptEither(future2, res -> {
  23. System.out.println("没有返回值,但接受了参数:" + res);
  24. });
  25. CompletableFuture<Void> future5 = future1.runAfterEither(future2, () -> {
  26. System.out.println("有一个线程已经执行完了");
  27. });
  28. // System.out.println("结果:" + future3.get());
  29. System.out.println("主线程结束...");

两任务组合(两个完成)

和上面一致

异步与线程池 - 图10

多任务组合

异步与线程池 - 图11

  • allOf:等待所有任务完成
  • anyof:只要有一个任务完成

allof测试

  1. // 注意三个线程都要放入线程池
  2. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println("查询商品属性...");
  4. return "CPU";
  5. }, executorService);
  6. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  7. System.out.println("查询商品价格...");
  8. return "20000";
  9. }, executorService);
  10. CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
  11. try {
  12. Thread.sleep(2000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println("查询商品品牌...");
  17. return "华为";
  18. }, executorService);
  19. // 全部都执行完成,才会执行
  20. CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
  21. allOf.get(); // 使主线程阻塞等待
  22. System.out.println("结果:" + future1.get() + " " + future2.get() + " " + future3.get());
  23. System.out.println("主线程结束...");

结果

  1. 查询商品属性...
  2. 查询商品价格...
  3. 查询商品品牌...
  4. 结果:CPU 20000 华为
  5. 主线程结束...

anyof测试

  1. // 注意三个线程都要放入线程池
  2. CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
  3. System.out.println("查询商品属性...");
  4. return "CPU";
  5. }, executorService);
  6. CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
  7. System.out.println("查询商品价格...");
  8. return "20000";
  9. }, executorService);
  10. CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
  11. try {
  12. Thread.sleep(2000);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println("查询商品品牌...");
  17. return "华为";
  18. }, executorService);
  19. CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
  20. anyOf.get(); // 使主线程阻塞等待
  21. System.out.println("其中一个线程结果:" + anyOf.get());
  22. System.out.println("主线程结束...");

结果

  1. 查询商品属性...
  2. 查询商品价格...
  3. 其中一个线程结果:CPU
  4. 主线程结束...
  5. 查询商品品牌...