JUC(java.util.councurrent)是Java并发包的简称

JUC - 图1

锁 & 工具类

r7ghxmo8.bmp

Lock

接口: Condition

Condition为接口类型,它将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。可以通过await()/signal()来休眠/唤醒线程。

接口: Lock

Lock为接口类型,Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的Condition对象。

锁常用类: ReentrantLock

ReentrantLock为常用类,它是一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

锁常用类: LockSupport

LockSupport为常用类,用来创建锁和其他同步类的基本线程阻塞原语。LockSupport的功能和”Thread中的 Thread.suspend()和Thread.resume()有点类似”,LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。但是park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。
详细分析请看: JUC锁: LockSupport详解

锁常用类: StampedLock

它是java8在java.util.concurrent.locks新增的一个API。StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。

接口: ReadWriteLock

ReadWriteLock为接口类型, 维护了一对相关的锁,一个用于只读操作, read lock可以由多个读线程同时进行属于共享锁,而write lock 同时只能单独线程执行属于独占锁。
注意:读-读 可以共存; 写-写 不能共存; 读-写 不能共存(否则会出现幻读,无法保证数据的实时一致性)。

  • 说明

    • 独占锁(写锁):一次只能被一个线程占有;
    • 共享锁(读锁):多个线程可以同时占有;

      锁常用类: ReentrantReadWriteLock

      ReentrantReadWriteLock是读写锁接口ReadWriteLock的实现类,它包括Lock子类ReadLock和WriteLock。ReadLock是共享锁,WriteLock是独占锁。
  • Demo

    1. /**
    2. * 加锁的自定义集合
    3. */
    4. public class MyCacheLock {
    5. private volatile Map<String, Object> map = new HashMap<>();
    6. /**
    7. * 读写锁
    8. * 比ReentrantLock更细粒度的控制
    9. */
    10. private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    11. /**
    12. * 写入
    13. * 写锁,同一时间只有一个线程可写
    14. */
    15. public void put(String key, Object value) {
    16. readWriteLock.readLock().lock(); //加写锁
    17. try {
    18. System.out.println(Thread.currentThread().getName() + "线程写入:" + key);
    19. map.put(key, value);
    20. System.out.println(Thread.currentThread().getName() + "线程写入完毕!");
    21. } catch (Exception e) {
    22. e.printStackTrace();
    23. } finally {
    24. readWriteLock.readLock().unlock(); //释放写锁
    25. }
    26. }
    27. /**
    28. * 读取
    29. * 同一时间所有线程都可以读
    30. */
    31. public Object get(String key) {
    32. readWriteLock.readLock().lock(); //加读锁
    33. Object value = null;
    34. try {
    35. System.out.println(Thread.currentThread().getName() + "线程读取:" + key);
    36. value = map.get(key);
    37. System.out.println(Thread.currentThread().getName() + "线程读取成功!值为:" + value);
    38. } catch (Exception e) {
    39. e.printStackTrace();
    40. } finally {
    41. readWriteLock.readLock().unlock(); //释放读锁
    42. }
    43. return value;
    44. }
    45. }

抽象类: AbstractOwnableSynchonizer

可以由线程以独占方式拥有的同步器。此类为创建锁和相关同步器(伴随着所有权的概念)提供了基础。AbstractOwnableSynchronizer 类本身不管理或使用此信息。但是,子类和工具可以使用适当维护的值帮助控制和监视访问以及提供诊断。

抽象类(long): AbstractQueuedLongSynchronizer

AbstractQueuedLongSynchronizer为抽象类,以 long 形式维护同步状态的一个 AbstractQueuedSynchronizer 版本。此类具有的结构、属性和方法与 AbstractQueuedSynchronizer 完全相同,但所有与状态相关的参数和结果都定义为 long 而不是 int。当创建需要 64 位状态的多级别锁和屏障等同步器时,此类很有用。

核心抽象类(int): AbstractQueuedSynchonizer

AbstractQueuedSynchonizer为抽象类,其为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。

Tool


描述 优势 劣势 场景
CountDownLatch 所有线程在调用.await()方法的地方阻塞等待计数器归零后再向下执行;归零后不会复位,只能用一次 计数器减小与阻塞 分开 只能用一次 并发计算,汇总结果
CyclicBarrier 所有线程在调用.await()方法的地方阻塞等待计数器满数后再向下执行;满数后会复位,可重复执行 可循环复用 计数器增加与阻塞 耦合 并发计算,汇总结果
Semaphore

CountDownLatch

减法计数器:
同步辅助类,允许一个或多个线程等待直到在其他线程中执行的一组操作完成。(即,当计数器归零后才能继续向下执行后面的的代码)

  1. /**
  2. * 减法计数器
  3. */
  4. class CountDownLatchDemo {
  5. public static void main(String[] args) throws InterruptedException {
  6. //总数为3的计数器
  7. CountDownLatch countDownLatch = new CountDownLatch(6);
  8. for (int i = 0; i < 3; i++) {
  9. new Thread(() -> {
  10. System.out.println(Thread.currentThread().getName() + ":Go out!");
  11. countDownLatch.countDown(); //计数器减1
  12. System.out.println(Thread.currentThread().getName() + ":计数器-1");
  13. }, i + "").start();
  14. }
  15. System.out.println("Close..."); //注意:在 .await()方法之前当前线程是不会阻塞的
  16. countDownLatch.await(); //当前线程阻塞等待计数器归零,然后再继续执行
  17. System.out.println("Close Door!");
  18. }
  19. }
  20. ======结果========
  21. 0Go out!
  22. Close...
  23. 1Go out!
  24. 1:计数器-1
  25. 2Go out!
  26. 0:计数器-1
  27. 2:计数器-1
  28. Close Door!

每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续 执行!

  • 使用场景

例如:一个服务需要从3个远程接口获取数据,就可以开三个线程并调用远程接口,等待所有远程接口数据返回之后,服务线程再继续执行。
在比如:并发计算,汇总结果。

CyclicBarrier

加法计数器:
同步辅助类,允许一组线程互相等待彼此直到达到共同屏障点 (common barrier point)。即 指定个数线程执行完毕再执行操作。
在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

  1. /**
  2. * 加法计数器
  3. */
  4. class CyclicBarrierDemo {
  5. CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
  6. System.out.println("每次突破屏障的回调!");
  7. });
  8. for (int i = 0; i < 2; i++) {
  9. new Thread(() -> {
  10. try {
  11. System.out.println(Thread.currentThread().getName() + ":第一回合");
  12. cyclicBarrier.await(); //CyclicBarrier计数器加1,并阻塞当前线程
  13. System.out.println(Thread.currentThread().getName() + ":第二回合");
  14. cyclicBarrier.await();
  15. System.out.println(Thread.currentThread().getName() + ":第三回合");
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. }).start();
  20. }
  21. System.out.println("我没阻塞哦.。。");
  22. Thread.sleep(2000); //避免junit中子线程没执行完进程就结束了
  23. }
  24. ========结果===========
  25. 我没阻塞哦.。。
  26. Thread-1:第一回合
  27. Thread-2:第一回合
  28. 每次突破屏障的回调!
  29. Thread-2:第二回合
  30. Thread-1:第二回合
  31. 每次突破屏障的回调!
  32. Thread-1:第三回合
  33. Thread-2:第三回合

Phaser

Phaser是JDK 7新增的一个同步辅助类,它可以实现CyclicBarrier和CountDownLatch类似的功能,而且它支持对任务的动态调整,并支持分层结构来达到更高的吞吐量。

Semaphore

一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。
简单来说,就是获得了信号量的许可证线程才能执行,否则一直阻塞,直到获取到许可证为止。

作用

  1. 多个共享资源互斥的使用;
  2. 并发限流,控制最大的线程数。

How

  1. /**
  2. * 信号量
  3. */
  4. class SemaphoreDemo {
  5. public static void main(String[] args) {
  6. Semaphore semaphore = new Semaphore(3); //创建一个拥有三张许可证的信号灯
  7. for (int i = 0; i < 6; i++) {
  8. new Thread(() -> {
  9. try {
  10. semaphore.acquire(); //获取一张许可证
  11. System.out.println(Thread.currentThread().getName() + "抢到车位!");
  12. TimeUnit.SECONDS.sleep(2); //逻辑执行种...
  13. System.out.println(Thread.currentThread().getName() + "离开车位!");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } finally {
  17. semaphore.release(); //释放
  18. }
  19. }).start();
  20. }
  21. }
  22. }
  • semaphore.acquire();//获得,假设如果已经满了,等待,等待有线程被被释放许可,然后获取到为止!
  • semaphore.release(); //释放许可,会将当前的信号量释放 + 1,然后唤醒等待的线程!

Exchanger

Exchanger是用于线程协作的工具类, 主要用于两个线程之间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange()方法交换数据,当一个线程先执行exchange()方法后,它会一直等待第二个线程也执行exchange()方法,当这两个线程到达同步点时,这两个线程就可以交换数据了。

Executors

Executors是一个工具类,用其可以创建ExecutorService、ScheduledExecutorService、ThreadFactory、Callable等对象。它的使用融入到了ThreadPoolExecutor, ScheduledThreadExecutor和ForkJoinPool中。

并发集合

JUC - 图3

CopyOnWriteArrayList

写-写 加lock锁,读-写& 读-读 无锁。也就是写入是线程安全的,读取是不能保证线程安全。因此CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。

  • vs 读写锁

传统读写锁 是 写-写 & 读-写 都加锁,而 读-读 不加锁。所以读写锁是能够保证数据的时时一致性,而CopyOnWriteArrayList只能保证最终一致性。

  • 原理
    CopyOnWriteArrayList使用了一种叫 写时复制 的方法,当有新元素添加到CopyOnWriteArrayList时,先从原有的数组中拷贝一份出来,然后在新的数组做写操作,写完之后,再将原来的数组引用指向到新数组。
  • 场景
    • 合适读多写少的场景。且需慎用 ,因为没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障;
    • 由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc;

ArrayBolckingQueue


会抛出异常 不抛异常 阻塞等待 超时等待
添加元素 boolean add(E e) boolean offer(E e) put(E e) offer(E e,long timeout,TimeUnit unit)
移除元素 E remove() E poll() E take() E poll(long timeout,TimeUnit unit)
检测首元素 E element() E peek() —- —-

SynchronousQueue

队列中只能有一个元素, 进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
api:
E put()、E take()

BlockingQueue

  • 当阻塞队列是空时,从队列中获取元素的操作会被阻塞;
  • 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。

ekn.pngdcn.png

原子类

其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。

Executor

Executor接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。

JUC - 图6

ExecutorService🌟

ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其停止接受新任务。关闭后,执行程序将最后终止,这时没有任务在执行,也没有任务在等待执行,并且无法提交新任务。

常见方法
Future submit(Callable task); 提交任务 有返回值
Future submit(Runnable task, T result);
Future<?> submit(Runnable task); 无返回值
void execute(Runnable command); 执行任务,继承自父类

AbstractExecutorService

AbstractExecutorService继承自ExecutorService接口,其提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。

ThreadPoolExecutor🌟

ThreadPoolExecutor类实现了AbstractExecutorService接口,也是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题: 由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。

ScheduledExecutorService

ScheduledExecutorService接口继承自ExecutorService接口,可安排在给定的延迟后运行或定期执行的命令。

ScheduledThreadExecutor🌟

ScheduledThreadPoolExecutor实现ScheduledExecutorService接口,可安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。

Future

Future接口定义了获取任务的返回值等方法,具体如下:

V get() 获取方法返回值
V get(long timeout, TimeUnit unit); 延迟获取方法返回值
boolean isDone();� 方法是否执行完毕
boolean isCancelled(); 方法是否已经取消
boolean cancel(boolean mayInterruptIfRunning); 取消方法

⚠️:Future的get方法都是阻塞获取结果的

FutureTask

FutureTask 为 Future接口提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。FutureTask 常用来封装 Callable 和 Runnable,将Callable接口适配为Runnable接口,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证。

Fork/Join🌟

分支合并框架,Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。 核心思想:分治,将一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。 小任务并行执行,以提高整体任务的效率。 since jdk1.7

  • 特点工作窃取(通过双端队列来实现)
    一个大任务会被划分成无数个小任务,这些任务被分配到不同的队列,这些队列有些干活干的快,有些干得慢。于是干得快的,一看自己没任务需要执行了,就去隔壁的队列里面拿去任务执行。 实现能者多劳,最大化利用cpu资源。
  • 两个核心类
    • ForkJoinPool
      既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool,它与其它的ExecutorService区别主要在于它使用“工作窃取“。
      ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
    • ForkJoinTask
      ForkJoinTask就是ForkJoinPool里面的每一个任务。他主要有两个子类:RecursiveAction和RecursiveTask 使用这两个子类都可以实现我们的任务分配和计算。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果
      • 两个核心实现类:
        • RecursiveAction:递归事件,没有返回值
        • RecursiveTask:递归任务,有返回值
  • 代码示例:

    1. class MyForkJoin extends RecursiveTask<Long> {
    2. private final long start; //开始值
    3. private final long end; //结束值
    4. public MyForkJoin(long start, long end) {
    5. this.start = start;
    6. this.end = end;
    7. }
    8. /**
    9. * 拆分/合并的逻辑
    10. * 将技术拆分为左右两部分,各自计算再合并
    11. *
    12. * @return 结果
    13. */
    14. @Override
    15. protected Long compute() {
    16. long middle = (start + end) / 2; //中间值
    17. MyForkJoin task1 = new MyForkJoin(start, middle);
    18. task1.fork();//拆分任务,把任务压入线程队列
    19. MyForkJoin task2 = new MyForkJoin(middle, end);
    20. task2.fork();//拆分任务,把任务压入线程队列
    21. //执行任务并合并
    22. return task1.join() + task2.join();
    23. }
    24. public static void main(String[] args) throws ExecutionException, InterruptedException {
    25. long start = System.currentTimeMillis();
    26. ForkJoinPool forkJoinPool = new ForkJoinPool(); //创建分支合并池
    27. ForkJoinTask<Long> task = new MyForkJoin(0L, 1000000000L);
    28. ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务
    29. Long sum = submit.get();//获取结果
    30. System.out.println(sum);
    31. forkJoinPool.shutdown(); //关闭池对象
    32. System.out.println("forkJoin消耗时间:" + (System.currentTimeMillis() - start));
    33. //使用stream流更快
    34. // long start2 = System.currentTimeMillis();
    35. // LongStream longStream = LongStream.rangeClosed(0, 1000000000);
    36. // long sum2 = longStream.parallel().reduce(0, Long::sum);
    37. // System.out.println(sum2);
    38. // System.out.println("stream消耗时间:" + (System.currentTimeMillis() - start2));
    39. }
    40. }

ExecutorCompletionService

实现了CompletionService接口,作用与Executor Service类似,不同点是在获取Future返回值时支持异步非阻塞式。

  • 底层实现

ExecutorCompletionService内部持有一个BlockingQueue> 用于存放那些执行完毕的Future,因此每次从该队列头中获取就会获取到最新执行完毕的任务。

  • 示例 ```java public class TemporaryTest {
  1. @Test
  2. public void test1() throws InterruptedException {
  3. ExecutorService executorService = Executors.newCachedThreadPool();
  4. ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
  5. ArrayList<Future<String>> futureList = new ArrayList<>();
  6. System.out.println("公司让你通知大家聚餐 你开车去接人");
  7. Future<String> future10 = completionService.submit(() -> {
  8. System.out.println("总裁:好的,1个小时后来接我吧,等我通知");
  9. TimeUnit.SECONDS.sleep(10);
  10. System.out.println("总裁:忙完了来接吧");
  11. return "总裁忙完了";
  12. });
  13. futureList.add(future10);
  14. Future<String> future3 = completionService.submit(() -> {
  15. System.out.println("研发:好的,3分钟后来接我吧,等我通知");
  16. TimeUnit.SECONDS.sleep(3);
  17. System.out.println("研发:忙完了来接吧");
  18. return "研发忙完了";
  19. });
  20. futureList.add(future3);
  21. Future<String> future6 = completionService.submit(() -> {
  22. System.out.println("主任:好的,10分钟后来接我吧,等我通知");
  23. TimeUnit.SECONDS.sleep(6);
  24. System.out.println("主任:忙完了来接吧");
  25. return "主任忙完了";
  26. });
  27. futureList.add(future6);
  28. TimeUnit.SECONDS.sleep(1);
  29. System.out.println("通知完毕,等着接吧!");
  30. try {
  31. //方式一:按照futureList中Future的顺序阻塞获取返回值

// for (Future future : futureList) { // String returnStr = future.get(); //阻塞等待获取 // System.out.println(returnStr + “,你去接他”); // }

  1. //方式二:按照Future执行完毕后的先后顺序异步获取返回值
  2. for (int i = 0; i < futureList.size(); i++) {
  3. String returnStr = completionService.take().get();
  4. System.out.println(returnStr + ",你去接他");
  5. }
  6. Thread.currentThread().join(); //保证主线程不结束
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. }

}

  1. - **踩坑**
  2. 没有返回值时最好不要用ExecutorCompletionService,因为每次任务执行完毕后都会将其Future存入阻塞队列中,如果不将其取出他将越来越多,有oom的风险。即使要用,也一定要使用take()或poll()方法将其执行完毕的Future取出来,切记。
  3. <a name="RzcPn"></a>
  4. # CompletableFuture
  5. **定义**:ajax是实现的服务间的异步回调,而Future是**线程间的异步回调**<br />since jdk8
  6. JDK5中增加了Future异步获取结果的功能,但是这种方式在获取的时候是阻塞的,在正常场景下这种实现方式肯定是不太友好的,当然可以通过轮询的方式去获取异步结果,但是这种方式比较消耗CPU并且获取结果也不会太及时,所以也不提倡使用;<br />在jdk7中提供了CompletionServicetakepool方法,来获取执行的结果;<br />jdk8中的**CompletableFuture**也是非常强大的实现了异步回调的功能;另外很多开源框架自己也实现了异步回调功能,像NettychannelFuture,通过addListener来实现异步回调的编程方式;Guava也是同样的通过ListenableFuture实现了异步回调的编程方式。
  7. <a name="dtm1G"></a>
  8. ### How
  9. **说明**
  10. 1. 带有 Async后缀的方法会新起线程去执行方法中的逻辑;
  11. 1. 存在 Executor 参数的重载方法用于自定线程池来执行该方法,否则采用默认的;
  12. - **创建& 执行异步任务**
  13. | static <U> CompletableFuture<U> **supplyAsync**(Supplier<U> supplier) | 有返回值的异步任务 |
  14. | --- | --- |
  15. | static CompletableFuture<Void> **runAsync**(Runnable runnable) | 无返回值的异步任务 |
  16. | 注:各自还存在重载方法,用于自定线程池 | |
  17. - **执行完毕的回调**
  18. | CompletableFuture<T> **whenComplete**(BiConsumer<? super T, ? super Throwable> action) | 正常回调 | 仅能感知结果 |
  19. | --- | --- | --- |
  20. | <U> CompletableFuture<U> **handle**(BiFunction<? super T, Throwable, ? extends U> fn) | | 可处理结果 |
  21. | CompletableFuture<T> **exceptionally**(Function<Throwable, ? extends T> fn) | 异常回调 | |
  22. | 参数BiConsumer中两个范型分别指:正常返回结果 & 异常信息 | | |
  23. - **任务串行化**
  24. | CompletableFuture<Void> **thenRun**(Runnable action) | 不能获取上一步结果 | 无返回值 |
  25. | --- | --- | --- |
  26. | CompletableFuture<Void> **thenAccept**(Consumer<? super T> action) | 能获取上一步结果 | |
  27. | <U> CompletableFuture<U> **thenApply**(Function<? super T, ? extends U> fn) | | 有返回值 |
  28. | 将每个方法串行执行 | | |
  29. - **任务组合**
  30. | **两任务组合**_都完成才能执行 | | |
  31. | --- | --- | --- |
  32. | CompletableFuture<Void> **runAfterBoth**(CompletionStage<?> other, Runnable action) | 不能获取结果 | 无返回值 |
  33. | <U> CompletableFuture<Void> **thenAcceptBoth**(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) | 能获取结果 | |
  34. | <U,V> CompletableFuture<V> **thenCombine**(CompletionStage<? extends U> other, BiFunction<? super T,? super U, ? extends V> fn) | | 有返回值 |
  35. | **两任务组合**_一个完成就可执行 | | |
  36. | CompletableFuture<Void> **runAfterEither**(CompletionStage<?> other, Runnable action) | 不能获取结果 | 无返回值 |
  37. | CompletableFuture<Void> **acceptEither**(CompletionStage<? extends T> other, Consumer<? super T> action) | 能获取结果 | |
  38. | <U> CompletableFuture<U> **applyToEither**(CompletionStage<? extends T> other, Function<? super T, U> fn) | | 有返回值 |
  39. | **多任务组合** | | |
  40. | static CompletableFuture<Void> **allOf**(CompletableFuture<?>... cfs) | 等待所有任务完成 | |
  41. | static CompletableFuture<Object> **anyOf**(CompletableFuture<?>... cfs) | 只要一个任务完成即可 | |
  42. 参数 CompletionStage接口 就是指的 future任务
  43. <a name="SQcCf"></a>
  44. ### 示例
  45. ```java
  46. class FutureDemo {
  47. @SneakyThrows
  48. public static void main(String[] args) {
  49. voidRunAsync();
  50. // returnException();
  51. }
  52. /**
  53. * 没有返回值的异步调用
  54. */
  55. @SneakyThrows
  56. public static void voidRunAsync() {
  57. CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
  58. try {
  59. TimeUnit.SECONDS.sleep(2); //模拟线程执行任务耗时
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. System.out.println(Thread.currentThread().getName() + "runAsync=>void");
  64. });
  65. }
  66. /**
  67. * 有返回值的异步调用
  68. */
  69. @SneakyThrows
  70. public static void returnException() {
  71. CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
  72. System.out.println(Thread.currentThread().getName() + "runAsync=>void");
  73. int i = 1 / 0;
  74. return 200;
  75. }).whenComplete((t, u) -> {
  76. //正常执行完毕的回调
  77. //t:正常返回结果;u:错误信息(未使用exceptionally的情况下,出现异常直接抛出此信息)
  78. System.out.println("t:" + t + "-----u:" + u);
  79. System.out.println("执行完毕!");
  80. }).exceptionally(e -> {
  81. //异常时候的回调,并可指定返回值
  82. System.out.println("出现异常!");
  83. return 500;
  84. });
  85. Integer integer = completableFuture.get(); //阻塞获取返回结果
  86. System.out.println(integer);
  87. }
  88. }

说明

  1. 显然whenComplete()的异步回调比get()的阻塞等待更优雅;
  2. 如果在调用get()方法的时候线程没有执行,那么,此时也会触发线程执行。