异步与线程池
创建线程的4钟方式
- 继承Thread
- 实现 Runnable接口
- 实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
- 线程池
区别:
1、2不能得到返回值。3可以获取返回值
1、2、3都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】)
4可以控制资源,性能稳定,不会一下子所有线程一起运行
结论:
实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
创建线程池的方式
创建固定线程数的线程池ExecutorService
固定线程数的线程池
Executors.newFixedThreadPool(10);
execute和submit区别
作用:都是提交异步任务的
- execute:只能提交Runnable任务,没有返回值
- submit:可以提交Runnable、Callable,返回值是FutureTask
创建原生线程池ThreadPoolExecutor
new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
七大参数
- corePoolSize: 核心线程数,不会被回收,接收异步任务时才会创建
- maximumPoolSize:最大线程数量,控制资源
- keepAliveime: maximumPoolSize-corePoolSize 无任务存活超过空闲时间则线程被释放
- TimeUnitunit: 时间单位
- workQueue: 阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行
- threadFactory: 线程的创建工厂【可以自定义】
- RejectedExecutionHandler handler:队列满后执行的拒绝策略,默认为AbortPolicy策略
拒绝策略
- DiscardOldestPolicy:丢弃最老的任务
- AbortPolicy:丢弃当前任务,抛出异常【默认策略】
- CallerRunsPolicy:同步执行run方法
- DiscardPolicy:丢弃当前任务,不抛出异常
阻塞队列
new LinkedBlockingDeque<>();// 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小
线程池的分类
- 可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
- 定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize
- 周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行)
- 单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程)
对应的创建方式:
- Executors.newCachedThreadPool();
- Executors.newFixedThreadPool(10);
- Executors.newScheduledThreadPool(10);
- Executors.newSingleThreadExecutor();
注意:回收线程 = maximumPoolSize - corePoolSize
为什么使用线程池?
- 降低资源的消耗【减少创建销毁操作】
- 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 高并发状态下过多创建线程可能将资源耗尽
- 提高响应速度【控制线程个数】
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
- 提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
异步编排CompletableFuture
引入
举例:
例如:完成以下业务
如果单线程,那么会消耗5.5秒,所以使用多线程完成操作,但是4、5、6依赖1,得先知道sku是哪个spu下的。所以不能同时开启线程来执行任务,需要有顺序,所以使用CompletableFuture。
创建异步对象
CompletableFuture提供了四个静态方法来创建一个异步操作。
- runXXX都是没有返回结果的,supplyXXX可以获取返回结果
- 可以传入自定义线程池,否则使用默认线程池
下面演示了2钟方法创建:
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程开始...");
// 没有返回值
CompletableFuture.runAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("结果:" + i);
}, executorService); // 使用自定义创建的线程
// 有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
int i = 10 / 2;
return i;
}, executorService);
System.out.println("结果:" + future.get());
System.out.println("主线程结束...");
}
}
计算完成时的回调方法
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:
- whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
- whenCempleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池方法来执行
方法不以Async结尾,意味着Action 使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
// 有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
int i = 10 / 2;
return i;
}, executorService)
// 虽然能得到异常信息,但是没法修改数据
.whenComplete((res, exception) -> {
System.out.println("异步方法完成了,结果是:" + res + "异常是:" + exception);
})
// 得到异常信息,同时返回默认值
.exceptionally(exception -> {
return 10;
});
handler方法
和complete一样,可对结果做最后的处理(可处理异常),可改变返回值。
// 有返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
// int i = 10 / 2; // 返回10
int i = 10 / 0; // 返回0
return i;
}, executorService).handle((res,thr) -> {
if (res != null) {
return res * 2;
}
if (thr != null){
return 0;
}
return 0;
});
System.out.println("结果:" + future.get());
System.out.println("主线程结束...");
线程串行化
- thenApply方法:当一个线程依赖另一个线性时,获取上一个任务返回的结果,并返回当前任务的返回值。
- thenAccent.方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
- thenRun方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun的后续操作。
- 带有Async默认是异步执行的。同之前。以上都要前置任务成功完成。
- Function<? super T,? extends U>
T:上一个任务返回结果的类型 U:返回泛型
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
int i = 10 / 2; // 返回10
return i;
}, executorService).thenApplyAsync(res -> {
// 能接收结果,有返回值
return "改变了结果" + res;
}, executorService);
System.out.println("结果:" + future.get());
System.out.println("主线程结束...");
两任务组合(一个完成)
- 当两个任务中,任意一个 future任务完成的时候,执行任务。
- applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
- acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
- runAfterEither:两个任务有一个执行完成,不需要获取future 的结果,处理任务,也没有返回值。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
int i = 10 / 2; // 返回10
return i;
}, executorService);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程开启:" + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = 10 / 2; // 返回10
return i;
}, executorService);
// 有返回值,能获取结果
CompletableFuture<String> future3 = future1.applyToEither(future2, res -> {
System.out.println("有返回值" + res);
return "有返回值:" + res ;
});
// 没有返回值,能获取结果
CompletableFuture<Void> future4 = future1.acceptEither(future2, res -> {
System.out.println("没有返回值,但接受了参数:" + res);
});
CompletableFuture<Void> future5 = future1.runAfterEither(future2, () -> {
System.out.println("有一个线程已经执行完了");
});
// System.out.println("结果:" + future3.get());
System.out.println("主线程结束...");
两任务组合(两个完成)
和上面一致
多任务组合
- allOf:等待所有任务完成
- anyof:只要有一个任务完成
allof测试
// 注意三个线程都要放入线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性...");
return "CPU";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品价格...");
return "20000";
}, executorService);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品品牌...");
return "华为";
}, executorService);
// 全部都执行完成,才会执行
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.get(); // 使主线程阻塞等待
System.out.println("结果:" + future1.get() + " " + future2.get() + " " + future3.get());
System.out.println("主线程结束...");
结果
查询商品属性...
查询商品价格...
查询商品品牌...
结果:CPU 20000 华为
主线程结束...
anyof测试
// 注意三个线程都要放入线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性...");
return "CPU";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品价格...");
return "20000";
}, executorService);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品品牌...");
return "华为";
}, executorService);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.get(); // 使主线程阻塞等待
System.out.println("其中一个线程结果:" + anyOf.get());
System.out.println("主线程结束...");
结果
查询商品属性...
查询商品价格...
其中一个线程结果:CPU
主线程结束...
查询商品品牌...