一、创建线程的几种方式
1.1 继承Thread类
/**
* 继承Thread类方式
*
* @author maomaochong
* @date 2022/04/19 10:19
**/
public class extendsThreadMethod {
/**
* 继承Thread类,重写run()方法,填充线程任务
*/
static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
}
}
public static void main(String[] args) {
System.out.println("main。。。start。。。");
Thread01 thread01 = new Thread01();
thread01.start(); // 启动线程
System.out.println("main。。。end。。。");
}
}
运行结果:
main。。。start。。。
main。。。end。。。
线程运行了。。。22
1.2 实现Runnable接口方式
/**
* 实现Runnable接口方式
*
* @author maomaochong
* @date 2022/04/19 10:38
**/
public class implementsRunnableMethod {
/**
* 实现Runnable接口,重写其run()方法,填充线程要执行的任务
*/
static class Runnable01 implements Runnable {
@Override
public void run() {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
}
}
public static void main(String[] args) {
System.out.println("main。。。start。。。");
/**
* 1、需要先创建Runnable接口实现类对象
* 2、将上述对象作为新建Thread对象的入参,传入任务
* 3、启动线程
*/
Runnable01 runnable01 = new Runnable01();
new Thread(runnable01).start(); // 启动线程
System.out.println("main。。。end。。。");
}
}
运行结果:
main。。。start。。。
main。。。end。。。
线程运行了。。。22
1.3 实现Callable接口方式
/**
* 实现Runnable接口方式
* 可获取返回值及抛出异常
*
* @author maomaochong
* @date 2022/04/19 10:38
**/
public class implementsCallableMethod {
/**
* 实现Callable接口,指定返回值类型,重写其call()方法,填充线程要执行的任务
*/
static class Callable01 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
return 666;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main。。。start。。。");
/**
* 1、需要先创建Callable接口实现类对象
* 2、将上述对象作为新建FutureTask对象的入参,传入任务,通过此包装后续可以获取到返回值
* 3、将上述FutureTask对象作为新建Thread类对象的入参
* 3、启动线程
* 4、可以根据需要通过FutureTask去获取返回值
*/
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start(); // 启动线程
// 使用get()方法,阻塞式等待结果返回
Integer integer = futureTask.get();
System.out.println("integer = " + integer);
System.out.println("main。。。end。。。");
}
}
运行结果:
main。。。start。。。
线程运行了。。。22
integer = 666
main。。。end。。。
1.4. 使用线程池创建线程
1.4.1 原生线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
1. 参数分析
a. int corePoolSize
核心线程容量。
- 线程池创建好之后就准备就绪的线程数量,核心线程会一直存活,即使没有任务需要执行
除非
allowCoreThreadTimeOut
参数设置为true
,这样的话即使是核心线程也会被超时销毁b.
int maximumPoolSize
最大线程容量。
-
c.
long keepAliveTime
存活时间。
如果当前的线程数量大于
corePoolSize
数量,则会在线程空闲大于指定的存活时间时进行释放此释放操作只会释放(
maximumPoolSize
-corePoolSize
)范围的线程d.
TimeUnit unit
e.
BlockingQueue<Runnable> workQueue
任务队列、阻塞队列。
用来保存等待被执行任务的阻塞队列,如果当前的任务很多,就会将目前多出来的任务放在队列中,当有线程空闲的时候,就会去队列中取出新任务继续执行。
常用的有:
-
g.
RejectedExecutionHandler handler
拒绝策略。
如果阻塞队列满了,则会按照我们指定的拒绝策略拒绝执行任务
- 常用的拒绝策略有:
- AbortPolicy:默认实现,会直接抛出RejectedExecutionException异常;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardPolicy:直接抛弃,任务不执行;
- DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务。
2. 工作流程
- 线程池创建,准备好 core 数量的核心线程,准备接受任务
- 新的任务进来,用 core 准备好的空闲线程执行。
- core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队 列获取任务执行
- 阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量
- max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终保持到 core 大小
- 如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策 略进行处理
- 所有的线程创建都是由指定的 factory 创建的。
1.4.2 几种常用的线程池
以下的线程池类型,在工作中并不常用,阿里巴巴开发手册中规定,不要直接使用Excutors来创建线程池,而是要使用ThreadPoolExcutor,指定相关的七个参数来实现线程池。
1. newSingleThreadExecutor
创建方式:
Executors.newSingleThreadExecutor();
内部实现:
// 单线程的线程池,后台从队列中获取任务依次执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。
2. newFixedThreadPool
创建方式:
Executors.newFixedThreadPool(5);
内部实现:
// 固定大小,core=max,都不可回收
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。
3. newCachedThreadPool
创建方式:
Executors.newCachedThreadPool();
内部实现
// core是0,所有都可回收
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 一个最大容量为Integer的最大值的线程池,比较适合处理执行时间比较小的任务。
- 创建一个可缓存线程池,
- 如果线程池长度超过处理需要,可灵活回收空闲线程,
- 若无可回收,则新建线程
4. newScheduledThreadPool
创建方式:
Executors.newScheduledThreadPool(5);
内部实现:
// 定时任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 使用延时队列
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
// 使用原生的线程池创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
5. newWorkStealingPool
创建方式:
Executors.newWorkStealingPool();
一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行。
1.4.3 线程池用法
// 可以通过execute()方法提交任务
void execute(Runnable command);
// 也可以通过submit()方法提交任务
Future<?> submit(Runnable task);
// 可以将Runnable接口实现类中的任务的返回值获取到
<T> Future<T> submit(Runnable task, T result);
// 可以获取Callable接口实现类的返回值
<T> Future<T> submit(Callable<T> task);
1.4.4 开发中使用线程池优势
- 降低资源的消耗
- 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 提高响应速度
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
- 提高线程的可管理性
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
二、异步编排
使用多线程可以提高性能,但在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。所以我们需要借助于异步编排来指定多个异步调用的执行顺序,保证业务的正确完成。
比如:查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
假如商品详情页的每个查询,需要如下标注的时间才能完成。那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。
Future
是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用isDone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。 虽然Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture
,提供了非常强大的 Future
的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture
的方法。 CompletableFuture
类实现了 Future
接口,所以你还是可以像以前一样通过get
方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture
和 FutureTask
同属于 Future
接口的实现类,都可以获取线程的执行结果。
2.1 创建异步对象
CompletableFuture
提供了四个静态方法来创建一个异步操作。
2.1.1 runXxx()方式
runXxxx 都是没有返回结果的。
// 使用默认的线程池
public static CompletableFuture<Void> runAsync(Runnable runnable);
// 使用指定的线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
测试:
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
System.out.println("main...start...");
// 使用runAsync()执行任务
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(()->{
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
main...end...
线程运行了。。。22
2.1.2 supplyXxx()方式
supplyXxx 都是可以获取返回结果的。
// 使用默认的线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 使用指定的线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor);
测试:
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用supplyAsync()执行任务
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor);
Integer integer = integerCompletableFuture.get();
System.out.println("main...end..." + integer);
}
}
运行结果:
main...start...
线程运行了。。。22
运行结果:5
main...end...5
2.2 计算完成时回调方法
有如下几种不同的回调方式:
// 可以用来接收上步骤中得到的结果或抛出的异常,在 同一线程 中进行后续操作
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
// 可以用来接收上步骤中得到的结果或抛出的异常,使用 默认线程池 创建新线程进行后续操作
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
// 可以用来接收上步骤中得到的结果或抛出的异常,使用 指定线程池 创建新线程进行后续操作
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
2.2.1 测试示例:
1. 使用whenComplete()
方式,做计算完成时的回调:
此种方式们可以获取到上一步骤中的返回值以及抛出的异常,然后使用上一步骤中的线程继续执行传入新的任务。当有异常抛出的时候无法进一步做处理。
无异常的情况
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用whenComplete()进行回调执行
CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).whenComplete((result, exception) -> {
System.out.println("异步任务成功完成...结果是:" + result + ";异常是:" + exception);
});
System.out.println("main...end...");
}
}
运行结果:
main...start...
线程运行了。。。22
运行结果:5
异步任务成功完成...结果是:5;异常是:null
main...end...
存在异常的情况
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用whenComplete()进行回调执行 -- 存在异常的情况
CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).whenComplete((result, exception) -> {
System.out.println("异步任务成功完成...结果是:" + result + ";异常是:" + exception);
});
System.out.println("main...end...");
}
}
运行结果:
main...start...
线程运行了。。。22
异步任务成功完成...结果是:null;异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main...end...
2. 使用exceptionally()
进行异常的默认处理
此种方式可以让我们在上一步执行失败,抛出异常的时候,仍可以指定默认的处理逻辑,并返回结果。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用exceptionally()进行异常情况下的默认处理
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).whenComplete((result, exception) -> {
System.out.println("异步任务成功完成...结果是:" + result + ";异常是:" + exception);
}).exceptionally(exception -> {
return 10;
});
Integer integer = integerCompletableFuture.get();
System.out.println("main...end..." + integer);
}
}
运行结果:
main...start...
线程运行了。。。22
异步任务成功完成...结果是:null;异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main...end...10
3. 对比
whenComplete
可以处理正常和异常的计算结果,exceptionally
处理异常情况。
whenComplete
和 whenCompleteAsync
的区别: whenComplete
:是执行当前任务的线程执行继续执行 whenComplete
的任务。whenCompleteAsync
:是执行把 whenCompleteAsync
这个任务继续提交给线程池 来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
2.3 handle方法
和 complete
一样,可对结果做最后的处理(可处理异常),可改变返回值。handle()
方法可以同时处理正常和异常的情况,根据不同的情况做不同的处理逻辑。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
2.3.1 测试示例
有异常的情况
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用handle(),方法执行完成后的处理
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).handle((result, throwable) -> {
if (result != null) {
return result*2;
}
if (throwable != null) {
System.out.println("throwable = " + throwable);
return 0;
}
return 0;
});
Integer integer = integerCompletableFuture.get();
System.out.println("main...end..." + integer);
}
}
运行结果:
main...start...
线程运行了。。。22
throwable = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main...end...0
无异常的情况
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用handle(),方法执行完成后的处理
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).handle((result, throwable) -> {
if (result != null) {
return result*2;
}
if (throwable != null) {
System.out.println("throwable = " + throwable);
return 0;
}
return 0;
});
Integer integer = integerCompletableFuture.get();
System.out.println("main...end..." + integer);
}
}
运行结果:
main...start...
线程运行了。。。22
运行结果:2
main...end...4
2.4 线程串行化方法
带有 Async 默认是异步执行的。同之前。 下面的方法都要前置任务成功完成。
2.4.1 thenRun()
方法
只要上面的任务执行完成,就开始执行 thenRun
,只是处理完任务后,执行 thenRun
的后续操作,不能接收上一步的执行结果,自己也没有返回值。
// 指定接下来执行的任务,不能接收上一步返回值,无返回结果
// 无需新开线程处理
public CompletableFuture<Void> thenRun(Runnable action)
// 新开线程处理,使用默认的线程池
public CompletableFuture<Void> thenRunAsync(Runnable action)
// 新开线程处理,使用指定的线程池
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor)
1. 测试示例
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用thenRunAsync(),方法执行完成后的处理
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).thenRunAsync(() -> {
System.out.println("任务2启动了...");
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
线程运行了。。。22
运行结果:2
main...end...
任务2启动了...
2.4.2 thenAccept()
方法
消费处理结果。可以接收上一步任务的执行结果,但自己无返回值。
// 指定接下来执行的任务,可以接收上一步的返回结果,无返回结果
// 无需新开线程处理
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 新开线程处理,使用默认的线程池
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// 新开线程处理,使用指定的线程池
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor)
1. 测试示例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用thenRunAsync(),方法执行完成后的处理
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).thenAcceptAsync((result) -> {
System.out.println("任务2启动了..." + result);
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
线程运行了。。。22
运行结果:2
main...end...
任务2启动了...2
2.4.3 thenApply()
方法
当一个线程依赖另一个线程时,可以获取上一个任务的执行结果,并返回当前任务的返回值。
Function<? super T,? extends U>
:T
:上一个任务返回结果的类型U
:当前任务的返回值类型
// 指定接下来执行的任务,可以接收上一步的返回结果,可以返回结果
// 无需新开线程处理
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn)
// 新开线程处理,使用默认的线程池
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn)
// 新开线程处理,使用指定的线程池
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor)
1. 测试示例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 异步编排
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用thenRunAsync(),方法执行完成后的处理
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i; // 返回结果
}, executor).thenApplyAsync((result) -> {
System.out.println("任务2启动了..." + result);
return "Hello " + result;
}, executor);
String s = stringCompletableFuture.get();
System.out.println("main...end..." + s);
}
}
运行结果:
main...start...
线程运行了。。。22
运行结果:2
任务2启动了...2
main...end...Hello 2
2.5 两任务组合-都要完成
2.5.1 runAfterBoth()
方法
组合两个 future
,不需要获取 future
的结果,只需两个 future
处理完任务后, 再处理该任务。
// 先执行上一个任务和other指定的任务,都完成后再执行action中的任务,此种是在同一线程进行
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
// 先执行上一个任务和other指定的任务,都完成后再执行action中的任务,此种是使用新线程进行,使用默认的线程池
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action)
// 先执行上一个任务和other指定的任务,都完成后再执行action中的任务,此种是使用新线程进行,使用指定的线程池
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor)
1. 测试示例
/**
* 异步编排
* 两任务组合 - 都要完成
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo05 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用runAsync()执行任务
// 任务一
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1 运行结果:" + i);
return i; // 返回结果
}, executor);
// 任务二
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程:" + Thread.currentThread().getId());
System.out.println("任务2 结束:");
return "Hello";
}, executor);
// 编排,当future01和future02都完成后进行任务3,不能获取到前两个任务的执行结果
future01.runAfterBothAsync(future02, ()->{
System.out.println("任务3 开始...");
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
任务1 线程运行了。。。22
任务1 运行结果:2
任务2 线程:23
任务2 结束:
main...end...
任务3 开始...
2.5.2 thenAccept()
方法
组合两个 future
,获取两个future
任务的返回结果,然后处理任务,没有返回值。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor)
1. 测试示例
/**
* 异步编排
* 两任务组合 - 都要完成
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo05 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用runAsync()执行任务
// 任务一
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1 运行结果:" + i);
return i; // 返回结果
}, executor);
// 任务二
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程:" + Thread.currentThread().getId());
System.out.println("任务2 结束:");
return "Hello";
}, executor);
// 编排,当future01和future02都完成后进行任务3,可以获取到前两个任务的执行结果
future01.thenAcceptBothAsync(future02, (result01, result02)->{
System.out.println("任务3 开始...之前的结果:" + result01 + ",--》" + result02);
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
任务1 线程运行了。。。22
任务1 运行结果:2
任务2 线程:23
任务2 结束:
main...end...
任务3 开始...之前的结果:2,--》Hello
2.5.3 thenCombine()
方法
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
1. 测试示例
/**
* 异步编排
* 两任务组合 - 都要完成
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo05 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用runAsync()执行任务
// 任务一
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1 运行结果:" + i);
return i; // 返回结果
}, executor);
// 任务二
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程:" + Thread.currentThread().getId());
System.out.println("任务2 结束:");
return "Hello";
}, executor);
// 编排,当future01和future02都完成后进行任务3,可以获取到前两个任务的执行结果,还可以处理并返回值
CompletableFuture<String> future = future01.thenCombineAsync(future02, (result01, result02) -> {
return result01 + " : " + result02 + "-->" + "HAHA";
}, executor);
String s = future.get();
System.out.println("main...end..." + s);
}
}
运行结果:
main...start...
任务1 线程运行了。。。22
任务1 运行结果:2
任务2 线程:23
任务2 结束:
main...end...2 : Hello-->HAHA
2.6 两任务组合 - 一个完成
当两个任务中,任意一个 future 任务完成的时候,执行任务。
2.6.1 runAfter()
方法
两个任务有一个执行完成,不需要获取future
的结果,处理任务,也没有返 回值。
// 任务1 和 任务2 有一个完成,即执行action中的任务,使用同一线程
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action)
// 任务1 和 任务2 有一个完成,即执行action中的任务,使用默认线程池,创建新线程执行
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action)
// 任务1 和 任务2 有一个完成,即执行action中的任务,使用指定线程池,创建新线程执行
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor)
1. 测试示例
/**
* 异步编排
* 两任务组合 - 一个完成
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo06 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用runAsync()执行任务
// 任务一
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1 运行结果:" + i);
return i; // 返回结果
}, executor);
// 任务二
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 结束:");
return "Hello";
}, executor);
// 编排,当future01和future02有任一完成后进行任务3,不能获取到前两个任务的执行结果
future01.runAfterEitherAsync(future02, ()->{
System.out.println("任务3 开始...");
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
任务1 线程运行了。。。22
任务1 运行结果:2
任务2 线程:23
main...end...
任务3 开始...
任务2 结束:
2.6.2 acceptEither()
方法
两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
// 任务1 和 任务2 有一个完成,即执行action中的任务,可获取前两个任务的执行结果,使用同一线程
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
// 任务1 和 任务2 有一个完成,即执行action中的任务,可获取前两个任务的执行结果,使用默认线程池创建新线程
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action)
// 任务1 和 任务2 有一个完成,即执行action中的任务,可获取前两个任务的执行结果,使用指定线程池创建新线程
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor)
1. 测试示例
/**
* 异步编排
* 两任务组合 - 一个完成
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo06 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用runAsync()执行任务
// 任务一
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1 运行结果:" + i);
return i; // 返回结果
}, executor);
// 任务二
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 结束:");
return "Hello";
}, executor);
// 编排,当future01和future02有任一完成后进行任务3,可以获取到先完成的任务的执行结果
future01.acceptEitherAsync(future02, (result)->{
System.out.println("任务3 开始...之前的结果:" + result);
}, executor);
System.out.println("main...end...");
}
}
运行结果:
main...start...
任务1 线程运行了。。。22
任务1 运行结果:2
任务2 线程:23
main...end...
任务3 开始...之前的结果:2
任务2 结束:
2.6.3 applyToEither()
方法
两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
// 任务1 和 任务2 有一个完成,即执行action中的任务,可获取前两个任务的执行结果,可返回值,使用同一线程
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
// 任务1 和 任务2 有一个完成,即执行action中的任务,可获取前两个任务的执行结果,可返回值,使用默认线程池创建新线程
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn)
// 任务1 和 任务2 有一个完成,即执行action中的任务,可获取前两个任务的执行结果,可返回值,使用指定线程池创建新线程
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor)
1. 测试示例
/**
* 异步编排
* 两任务组合 - 一个完成
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo06 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
// 使用runAsync()执行任务
// 任务一
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1 线程运行了。。。" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("任务1 运行结果:" + i);
return i; // 返回结果
}, executor);
// 任务二
CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2 线程:" + Thread.currentThread().getId());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2 结束:");
return "Hello";
}, executor);
// 编排,当future01和future02有任一完成后进行任务3,可以获取到先完成任务的执行结果,还可以处理并返回值
CompletableFuture<String> future = future01.applyToEitherAsync(future02, (result) -> {
return result + "-->" + "HAHA";
}, executor);
String s = future.get();
System.out.println("main...end..." + s);
}
}
运行结果:
main...start...
任务1 线程运行了。。。22
任务1 运行结果:2
任务2 线程:23
main...end...2-->HAHA
任务2 结束:
2.7 多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
2.7.1 allOf()
方法
1. 测试示例
不使用get()方法:
/**
* 异步编排
* 多任务组合
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo07 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
CompletableFuture<String> imgFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息");
return "hello.jpg";
}, executor);
CompletableFuture<String> attrFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的属性");
return "黑色 + 256G";
}, executor);
CompletableFuture<String> introFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品介绍");
return "华为";
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(imgFuture, attrFuture, introFuture);
// allOf.get(); // 等待所有线程执行结束
System.out.println("main...end...");
}
}
运行结果:
main...start...
查询商品的图片信息
查询商品的属性
main...end...
查询商品介绍
使用get()
方法:
/**
* 异步编排
* 多任务组合
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo07 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
CompletableFuture<String> imgFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息");
return "hello.jpg";
}, executor);
CompletableFuture<String> attrFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的属性");
return "黑色 + 256G";
}, executor);
CompletableFuture<String> introFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品介绍");
return "华为";
}, executor);
CompletableFuture<Void> allOf = CompletableFuture.allOf(imgFuture, attrFuture, introFuture);
allOf.get(); // 等待所有线程执行结束
System.out.println("main...end...");
}
}
运行结果:
main...start...
查询商品的图片信息
查询商品的属性
查询商品介绍
main...end...
2.7.2 anyOf()
方法
1. 测试示例
/**
* 异步编排
* 多任务组合
*
* @author maomaochong
* @date 2022/04/20 11:25
**/
public class CompletableFutureDemo07 {
// 创建线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main...start...");
CompletableFuture<String> imgFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的图片信息");
return "hello.jpg";
}, executor);
CompletableFuture<String> attrFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品的属性");
return "黑色 + 256G";
}, executor);
CompletableFuture<String> introFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品介绍");
return "华为";
}, executor);
// CompletableFuture<Void> allOf = CompletableFuture.allOf(imgFuture, attrFuture, introFuture);
// allOf.get(); // 等待所有线程执行结束
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(imgFuture, attrFuture, introFuture);
anyOf.get(); // 等待所有线程执行结束
System.out.println("main...end...");
}
}
运行结果:
main...start...
查询商品的图片信息
查询商品的属性
main...end...
查询商品介绍