术语问题

术语“并发”,“并行”,“多任务”,“多处理”,“多线程”,分布式系统(可能还有其他)在整个编程文献中都以多种相互冲突的方式使用,并且经常被混为一谈。 Brian Goetz 在他 2016 年《从并发到并行》的演讲中指出了这一点,之后提出了合理的二分法:

  • 并发是关于正确有效地控制对共享资源的访问。
  • 并行是使用额外的资源来更快地产生结果。


并发的新定义

并发性是一系列性能技术,专注于减少等待

  • 不同的并发方法。

值得强调的是,这个定义的有效性取决于“等待”这个词。如果没有什么可以等待,那就没有机会去加速。如果有什么东西在等待,那么就会有很多方法可以加快速度,这取决于多种因素,包括系统运行的配置,你要解决的问题类型以及其他许多问题。

抽象泄露(Leaky Abstraction) 一词的完美候选。抽象的目标是“抽象出”那些对于手头想法不重要的东西,以屏蔽不必要的细节。如果抽象是有漏洞的,那些碎片和细节就会不断重新声明自己是重要的,无论你废了多少功夫来隐藏它们。

并发的超能力

到底什么是并发呢? 我的理解是:假如你是一个保洁员,有10层楼,每层有100个房间, 如果你自己干的话要干很久很久, 但是如果你会分身术,分成10个 ,那么就会缩短保洁所需的时间,当然前提是你能分身出10个,这就存在了抽象泄露现象, 也就是说你不确定你能不能分身,或者能分身几个,现实很骨感
这里假设我们只有4个处理器, 但是却有10个分身, 这10个分身就得共用这四个处理器,假设我们在打扫房间的时候需要先敲门,然后等待开门,就出现了一段需要等待的时间,这时候在这个分身上的处理器就会转移到其它可以工作的分身上去, 这就叫任务切换,任务切换就会消耗性能,多核还好,如果是单核处理器的话,任务切换所消耗的性能就会全部加在单核处理器身上,那样会让系统运行的更慢
在克隆体敲门等待的情况下,即使单核处理器系统也能从并发中受益,因为它可以从等待(阻塞)的任务切换到准备运行的任务。但是如果所有任务都可以一直运行那么切换的成本反而会使任务变慢,在这种情况下,如果你有多个进程,并发通常只会有意义。

如果你正在尝试破解某种密码,在同一时间内参与破解的线程越多,你越快得到答案的可能性就越大。每个线程都能持续使用你所分配的处理器时间,在这种情况下(CPU 密集型问题),你代码中的线程数应该和你拥有的处理器的核心数保持一致。
同时 还会有其它的问题,比如你有一个蛋糕工厂, 现在工人们做的都是把蛋糕放到盒子里的工作, 会出现那种两个工人都看好同一个盒子,然后,就会撞在一起, 所以这个适合就需要加锁,一个工人抱住盒子说 这是我的盒子 你换一个
而且你必须要考虑好 因为这种偶然性并不会在编译时被暴露出来

并发为速度而生

如果说有其他的能给程序提速的方法, 那么尽量不要用并发变成, 它是一柄双刃剑,并发会带来各种成本,包括复杂性成本,但是它也能带来更低的耦合性,同事带来的负面影响也会被程序设计, 资源平衡,用户便利性所抵消

速度问题已开始听起来很简单:如果想让一个程序运行的更快,就要把它分成多部分, 提高单个处理器的处理速度到达极限时, 解决速度问题不是让处理器更快,而是处理器更多,为了使程序运行的更快,你必须使用额外的处理器

但是通常情况下并发可以提高单处理器的性能, 为什么这么说呢? 早成这个结局的原因就是阻塞(程序中的某个任务因为程序之外的控制而导致失败,就叫阻塞), 针对单进程的程序, 这个时候整个程序都会停下来来等这个阻塞通, 才能继续任务, 但是如果是并发, 就会在发生阻塞的时候进行任务切换,一个倒下了换另一个人顶上, 使得程序可以继续进行, 实际上 如果程序中不存在阻塞,那么并发是没有意义的

并发的一种简单的实现是使用系统操作级的进程,与线程不同, 进程是在自己的独立的地址空间运行独立程序,进程的优势在于不同的进程之间是彼此隔离的,它们之间不会互相干扰
而线程则不同.线程之间会共享内存和I/O资源,因此编写多线程程序的困难就在于怎么协调不同线程对共享资源的调用,以免这些资源被多个线程同时访问
一些人推崇用进程进行并发的开发,但是问题在于系统通常存在进程的数量跟开销方面的限制

java语言实现并发的方式相对传统, 即在顺序的语言之上添加对线程的支持,而不是在多个操作系统中进行分叉进程,线程是表示在单个进程中内创建任务

Java 并发的四句格言

1.不要用它(避免使用并发)
2.没有什么是真的,一切可能都有问题
3.仅仅是它能运行,并不意味着它没有问题
4.你必须理解它(逃不掉并发)

1.不要用它

(而且不要自己去实现它)
尽量不要用它, 如果只是为了给程序提速, 你先看看有没有其他的解决途径,且为了使程序提速并不是使用并发的唯一理由,如果你被迫使用并发, 尽量使用最简单,最安全的方法来解决问题,使用知名的库, 尽量减少自己编写代码

2.没有什么是真的,一切可能都有问题

如果你没有使用并发, 那么程序都是按照顺序进行,你能拿得准, 但是出现并发, 它可能超出你的预期, 尽管结果可能会是你预想的结果,但是让需要小心

3.仅仅是它能运行,并不意味着它没有问题

  • 你不能验证出并发程序是正确的,你只能(有时)验证出它是不正确的。
  • 大多数情况下你甚至没办法验证:如果它出问题了,你可能无法检测到它。
  • 你通常无法编写有用的测试,因此你必须依靠代码检查和对并发的深入了解来发现错误。
  • 即使是有效的程序也只能在其设计参数下工作。当超出这些设计参数时,大多数并发程序会以某种方式失败。

    4.你必须理解它

    Java 是一种多线程语言,不管你有没有意识到并发问题,它就在那里。因此,有很多使用并发的 Java 程序,要么只是偶然运行,要么大部分时间都在运行,并且会因为未被发现的并发缺陷而时不时地神秘崩溃。

并行流

java8的优点之一是他可以很容易的使用并行, 这来自于库的仔细设计, 它们控制着自己的迭代器,称为Spliterator,它被限制为易于自动分割, 只需要用.parallel() ,就会使流中的所有内容作为一组并行任务运行
例如考虑来自 Streams 的 Prime.java。查找质数可能是一个耗时的过程,我们可以看到该程序的计时:

  1. // concurrent/ParallelPrime.java
  2. import java.util.*;
  3. import java.util.stream.*;
  4. import static java.util.stream.LongStream.*;
  5. import java.io.*;
  6. import java.nio.file.*;
  7. import onjava.Timer;
  8. public class ParallelPrime {
  9. static final int COUNT = 100_000;
  10. public static boolean isPrime(long n){
  11. return rangeClosed(2, (long)Math.sqrt(n)).noneMatch(i -> n % i == 0);
  12. }
  13. public static void main(String[] args)
  14. throws IOException {
  15. Timer timer = new Timer();
  16. List<String> primes =
  17. iterate(2, i -> i + 1)
  18. .parallel() // [1]
  19. .filter(ParallelPrime::isPrime)
  20. .limit(COUNT)
  21. .mapToObj(Long::toString)
  22. .collect(Collectors.toList());
  23. System.out.println(timer.duration());
  24. Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE);
  25. }
  26. }
  27. 输出结果
  28. Output:
  29. 1224

我们将数据保存在磁盘上以防止编译器过激的优化;如果我们没有对结果做任何事情,当我注释掉[1] parallel() 行时,我的结果用时大约是 parallel() 的三倍。

  • parallel() 不是灵丹妙药 , 有很多的缺陷,并且盲目地应用内置的“并行”操作有时甚至会使运行速度明显变慢。

Java 8 将两者合并起来。例如,Collections 没有内置的 map() 操作。在 CollectionMap 中唯一类似流的批处理操作是 forEach() 。如果要执行 map()reduce() 等操作,必须首先将 Collection 转换为存在这些操作的 Stream :

  1. // concurrent/CollectionIntoStream.java
  2. import onjava.*;
  3. import java.util.*;
  4. import java.util.stream.*;
  5. public class CollectionIntoStream {
  6. public static void main(String[] args) {
  7. List<String> strings = Stream.generate(new Rand.String(5))
  8. .limit(10)
  9. .collect(Collectors.toList());
  10. strings.forEach(System.out::println);
  11. // Convert to a Stream for many more options:
  12. String result = strings.stream()
  13. .map(String::toUpperCase)
  14. .map(s -> s.substring(2))
  15. .reduce(":", (s1, s2) -> s1 + s2);
  16. System.out.println(result);
  17. }
  18. }
  19. 输出:
  20. btpen
  21. pccux
  22. szgvg
  23. meinn
  24. eeloz
  25. tdvew
  26. cippc
  27. ygpoa
  28. lkljl
  29. bynxt
  30. :PENCUXGVGINNLOZVEWPPCPOALJLNXT

Collection 确实有一些批处理操作,如 removeAll()removeIf()retainAll() ,但这些都是破坏性的操作。ConcurrentHashMapforEachreduce 操作有特别广泛的支持。
在许多情况下,只在集合上调用 stream() 或者 parallelStream() 没有问题。但是,有时将 StreamCollection 混合会产生意想不到的结果。这是一个有趣的难题:

  1. // concurrent/ParallelStreamPuzzle.java
  2. import java.util.*;
  3. import java.util.function.*;
  4. import java.util.stream.*;
  5. public class ParallelStreamPuzzle {
  6. static class IntGenerator
  7. implements Supplier<Integer> {
  8. private int current = 0;
  9. @Override
  10. public Integer get() {
  11. return current++;
  12. }
  13. }
  14. public static void main(String[] args) {
  15. List<Integer> x = Stream.generate(new IntGenerator())
  16. .limit(10)
  17. .parallel() // [1]
  18. .collect(Collectors.toList());
  19. System.out.println(x);
  20. }
  21. }
  22. /* Output:
  23. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  24. */

它并不会产生想注解那样的顺序 ,那意味着什么?一堆线程都在从一个生成器取值,然后以某种方式选择有限的结果集?代码看起来很简单,但它变成了一个特别棘手的问题。parallel拿到流的数据, 但是你无法保证让线程按顺序拿,所以会出现乱序的现象

创建和运行任务 - 开始进入重点

准备知识

异步计算(Async)

所谓的异步计算就是不用等被调用函数产生的结果,而直接可以让操作往下继续进行, 换成java中的话来说就是额外开启一个线程来执行被调用函数中的部分计算,使被调用的函数继续进行或者返回,而不需要等待计算结果,单调用者而需要从线栈中取回调用结果

回调函数

回调函数通常的解释是通过指针(地址)调用的函数
把指针作为参数传给另一个函数, 我们通过这个指针来调用这个指针所指向的函数, 这就叫做回调函数
回调函数不是由提供指针的函数的方法体来调用, 而是在特定的条件跟场景下由其他的函数调用的
所以在异步计算的过程中因为异步线程的原因, 会造成函数的调用顺序丢失, 也就是不知道谁先谁后, 所以通过存储函数的指针,也就是回调函数来解决这一问题, 如下图所示

回调机制:
1.定义一个回调函数
2.提供函数实现的一方,在函数初始化的时候将回调函数的函数指针注册给调用方
3.在特定的情况下,调用方使用指针对回调函数进行调用

我们将从最简单的方法—SingleThreadExecutor 开始:

  1. // onjava/Nap.java
  2. package onjava;
  3. import java.util.concurrent.*;
  4. public class Nap {
  5. public Nap(double t) { // Seconds
  6. try {
  7. TimeUnit.MILLISECONDS.sleep((int)(1000 * t));
  8. } catch(InterruptedException e){
  9. throw new RuntimeException(e);
  10. }
  11. }
  12. public Nap(double t, String msg) {
  13. this(t);
  14. System.out.println(msg);
  15. }
  16. }

TimeUnit.MILLISECONDS.sleep() 的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在你的计算机上运行另一个窗口。OS 任务管理器定期检查 sleep() 是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。

  1. // concurrent/NapTask.java
  2. import onjava.Nap;
  3. public class NapTask implements Runnable {
  4. final int id;
  5. public NapTask(int id) {
  6. this.id = id;
  7. }
  8. @Override
  9. public void run() {
  10. new Nap(0.1);// Seconds
  11. System.out.println(this + " "+
  12. Thread.currentThread().getName());
  13. }
  14. @Override
  15. public String toString() {
  16. return"NapTask[" + id + "]";
  17. }
  18. }
  1. //concurrent/SingleThreadExecutor.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. import onjava.*;
  5. public class SingleThreadExecutor {
  6. public static void main(String[] args) {
  7. ExecutorService exec =
  8. Executors.newSingleThreadExecutor();
  9. IntStream.range(0, 10)
  10. .mapToObj(NapTask::new)
  11. .forEach(exec::execute);
  12. System.out.println("All tasks submitted");
  13. exec.shutdown();
  14. while(!exec.isTerminated()) {
  15. System.out.println(
  16. Thread.currentThread().getName()+
  17. " awaiting termination");
  18. new Nap(0.1);
  19. }
  20. }
  21. }
  22. 输出为:
  23. All tasks submitted
  24. main awaiting termination
  25. main awaiting termination
  26. NapTask[0] pool-1-thread-1
  27. main awaiting termination
  28. NapTask[1] pool-1-thread-1
  29. main awaiting termination
  30. NapTask[2] pool-1-thread-1
  31. main awaiting termination
  32. NapTask[3] pool-1-thread-1
  33. main awaiting termination
  34. NapTask[4] pool-1-thread-1
  35. main awaiting termination
  36. NapTask[5] pool-1-thread-1
  37. main awaiting termination
  38. NapTask[6] pool-1-thread-1
  39. main awaiting termination
  40. NapTask[7] pool-1-thread-1
  41. main awaiting termination
  42. NapTask[8] pool-1-thread-1
  43. main awaiting termination
  44. NapTask[9] pool-1-thread-1

首先请注意,没有 SingleThreadExecutor 类。newSingleThreadExecutor()Executors 中的一个工厂方法,它创建特定类型的 ExecutorService [^4]
请注意,main() 中线程的名称是 main,并且只有一个其他线程 pool-1-thread-1。此外,交错输出显示两个线程确实同时运行。
我创建了十个 NapTasks 并将它们提交给 ExecutorService,这意味着它们开始自己运行。然而,在此期间,main() 继续做事。当我运行 callexec.shutdown() 时,它告诉 ExecutorService 完成已经提交的任务,但不接受任何新任务。此时,这些任务仍然在运行,因此我们必须等到它们在退出 main() 之前完成。这是通过检查 exec.isTerminated() 来实现的,这在所有任务完成后变为 true。
至于开头为什么会出现两次main awaiting termination, 这是因为Nap是先sleep再system out, 而main是先system out再sleep,所以main会先输出一次
注意exec.shutdown(); 它代表关闭提交新任务,如果仍然提交的话会抛异常

  1. // concurrent/MoreTasksAfterShutdown.java
  2. import java.util.concurrent.*;
  3. public class MoreTasksAfterShutdown {
  4. public static void main(String[] args) {
  5. ExecutorService exec
  6. =Executors.newSingleThreadExecutor();
  7. exec.execute(newNapTask(1));
  8. exec.shutdown();
  9. try {
  10. exec.execute(newNapTask(99));
  11. } catch(RejectedExecutionException e) {
  12. System.out.println(e);
  13. }
  14. }
  15. }
  16. 输出为
  17. java.util.concurrent.RejectedExecutionException: TaskNapTask[99] rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Shutting down, pool size = 1,active threads = 1, queued tasks = 0, completed tasks =0]NapTask[1] pool-1-thread-1

exec.shutdown() 的替代方法是 exec.shutdownNow() ,它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。

  • 使用更多线程

使用线程的重点是(几乎总是)更快地完成任务,那么我们为什么要限制自己使用 SingleThreadExecutor 呢?查看执行 Executors 的 Javadoc,你将看到更多选项。例如 CachedThreadPool:

  1. // concurrent/CachedThreadPool.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class CachedThreadPool {
  5. public static void main(String[] args) {
  6. ExecutorService exec
  7. =Executors.newCachedThreadPool();
  8. IntStream.range(0, 10)
  9. .mapToObj(NapTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. 输出为
  15. NapTask[7] pool-1-thread-8
  16. NapTask[4] pool-1-thread-5
  17. NapTask[1] pool-1-thread-2
  18. NapTask[3] pool-1-thread-4
  19. NapTask[0] pool-1-thread-1
  20. NapTask[8] pool-1-thread-9
  21. NapTask[2] pool-1-thread-3
  22. NapTask[9] pool-1-thread-10
  23. NapTask[6] pool-1-thread-7
  24. NapTask[5] pool-1-thread-6

当你运行这个程序时,你会发现它完成得更快。这是有道理的,每个任务都有自己的线程,所以它们都并行运行,而不是使用相同的线程来顺序运行每个任务。这似乎没毛病,很难理解为什么有人会使用 SingleThreadExecutor。

要理解这个问题,我们需要一个更复杂的任务:

  1. // concurrent/InterferingTask.java
  2. public class InterferingTask implements Runnable {
  3. final int id;
  4. private static Integer val = 0;
  5. public InterferingTask(int id) {
  6. this.id = id;
  7. }
  8. @Override
  9. public void run() {
  10. for(int i = 0; i < 100; i++)
  11. val++;
  12. System.out.println(id + " "+
  13. Thread.currentThread().getName() + " " + val);
  14. }
  15. }

每个任务增加 val 一百次。这似乎很简单。让我们用 CachedThreadPool 尝试一下:

  1. // concurrent/CachedThreadPool2.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class CachedThreadPool2 {
  5. public static void main(String[] args) {
  6. ExecutorService exec
  7. =Executors.newCachedThreadPool();
  8. IntStream.range(0, 10)
  9. .mapToObj(InterferingTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. 输出结果:
  15. 0 pool-1-thread-1 200
  16. 1 pool-1-thread-2 200
  17. 4 pool-1-thread-5 300
  18. 5 pool-1-thread-6 400
  19. 8 pool-1-thread-9 500
  20. 9 pool-1-thread-10 600
  21. 2 pool-1-thread-3 700
  22. 7 pool-1-thread-8 800
  23. 3 pool-1-thread-4 900
  24. 6 pool-1-thread-7 1000

输出不是我们所期望的,并且从一次运行到下一次运行会有所不同。问题是所有的任务都试图写入 val 的单个实例,并且他们正在踩着彼此的脚趾。我们称这样的类是线程不安全的。让我们看看 SingleThreadExecutor 会发生什么:

  1. // concurrent/SingleThreadExecutor3.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class SingleThreadExecutor3 {
  5. public static void main(String[] args)throws InterruptedException {
  6. ExecutorService exec
  7. =Executors.newSingleThreadExecutor();
  8. IntStream.range(0, 10)
  9. .mapToObj(InterferingTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. 输出结果为:
  15. 0 pool-1-thread-1 100
  16. 1 pool-1-thread-1 200
  17. 2 pool-1-thread-1 300
  18. 3 pool-1-thread-1 400
  19. 4 pool-1-thread-1 500
  20. 5 pool-1-thread-1 600
  21. 6 pool-1-thread-1 700
  22. 7 pool-1-thread-1 800
  23. 8 pool-1-thread-1 900
  24. 9 pool-1-thread-1 1000

现在我们看到结果跟预期一致, 尽管InterferingTask是非线程安全的, 但是SingleThreadExecutor()的主要好处就是,它一次只完成一个任务,且任务与任务之间是彼此互不干扰的,因此加强了线程安全, 我们称这种现象叫做线程封闭
因为在单线程上运行限制了其影响, 线程封闭限制了加速, 但可以减少很多困难的调试和重写

  • 产生结果

因为InterferingTask是一个Runnable, 它无返回值,因此只能使用其副作用产生结果,也就是在overside里用System out - 操作返回值而不是直接返回结果, 副作用是并发编程中主要的问题之一, 因为我们看到了 CachedThreadPool2.java, 对val是共享的状态, 这就是问题所在, 多个线程对修改同一资源,会产生竞争,竞争就会产生不确定的结果
避免竞争,最好的方法就是避免资源的可变共享状态, 我们可以称之为自私孩子原则: 什么都不分享

使用InterferingTask,最好的就是删除其副作用,并返回任务结果,为此,我们创建 Callable 而不是 Runnable

  1. // concurrent/CountingTask.java
  2. import java.util.concurrent.*;
  3. public class CountingTask implements Callable<Integer> { //Callable<Interger>就代表override的
  4. 的返回值是Interger
  5. final int id;
  6. public CountingTask(int id) { this.id = id; }
  7. @Override
  8. public Integer call() {
  9. Integer val = 0;
  10. for(int i = 0; i < 100; i++)
  11. val++;
  12. System.out.println(id + " " +
  13. Thread.currentThread().getName() + " " + val);
  14. return val;
  15. }
  16. }

call() 完全独立于所有其他 CountingTasks 生成其结果,这意味着没有可变的共享状态
ExecutorService 允许你使用 invokeAll() 启动集合中的每个 Callable:

  1. // concurrent/CachedThreadPool3.java
  2. import java.util.*;
  3. import java.util.concurrent.*;
  4. import java.util.stream.*;
  5. public class CachedThreadPool3 {
  6. public static Integer extractResult(Future<Integer> f) {
  7. try {
  8. return f.get();
  9. } catch(Exception e) {
  10. throw new RuntimeException(e);
  11. }
  12. }
  13. public static void main(String[] args)throws InterruptedException {
  14. ExecutorService exec =
  15. Executors.newCachedThreadPool();
  16. List<CountingTask> tasks =
  17. IntStream.range(0, 10)
  18. .mapToObj(CountingTask::new)
  19. .collect(Collectors.toList()); //先把对象存到list里面
  20. List<Future<Integer>> futures =
  21. exec.invokeAll(tasks); //这里是将集合中的所有Callable执行call方法
  22. Integer sum = futures.stream()
  23. .map(CachedThreadPool3::extractResult)
  24. .reduce(0, Integer::sum); //这里的reduce是后一位结果跟前一个结果相加,0是用来跟第一位相加
  25. System.out.println("sum = " + sum);
  26. exec.shutdown();
  27. }
  28. }
  29. 输出结果为:
  30. 1 pool-1-thread-2 100
  31. 0 pool-1-thread-1 100
  32. 4 pool-1-thread-5 100
  33. 5 pool-1-thread-6 100
  34. 8 pool-1-thread-9 100
  35. 9 pool-1-thread-10 100
  36. 2 pool-1-thread-3 100
  37. 3 pool-1-thread-4 100
  38. 6 pool-1-thread-7 100
  39. 7 pool-1-thread-8 100
  40. sum = 1000

只有在所有任务完成后,invokeAll() 才会返回一个 Future 列表,每个任务一个 FutureFuture 是 Java 5 中引入的机制,允许你提交任务而无需等待它完成。在这里,我们使用 ExecutorService.submit() :

  1. // concurrent/Futures.java
  2. import java.util.*;
  3. import java.util.concurrent.*;
  4. import java.util.stream.*;
  5. public class Futures {
  6. public static void main(String[] args)throws InterruptedException, ExecutionException {
  7. ExecutorService exec
  8. =Executors.newSingleThreadExecutor();
  9. Future<Integer> f =
  10. exec.submit(newCountingTask(99));
  11. System.out.println(f.get()); // [1]
  12. exec.shutdown();
  13. }
  14. }
  15. 输出结果:
  16. 99 pool-1-thread-1 100
  17. 100
  • [1] 当你的任务在尚未完成的 Future 上调用 get() 时,调用会阻塞(等待)直到结果可用。

但这意味着,在 CachedThreadPool3.java 中,Future 似乎是多余的,因为 invokeAll() 甚至在所有任务完成之前都不会返回。但是,这里的 Future 并不用于延迟结果,而是用于捕获任何可能发生的异常。

Future

Future接口是java5中提出的, 用于描述一个异步处理的结果,但是它获取结果的方式是通过阻塞跟轮询的方式来获取结果, 阻塞的方式明显违背了我们对异步调用的初衷, 而轮询的方式又会耗费无谓的CPU资源

还要注意在
CachedThreadPool3.java.get() 中抛出异常,因此 extractResult() 在 Stream 中执行此提取。
因为当你调用
get() 时,Future 会阻塞,所以它只能解决等待任务完成才暴露问题。最终,Futures 被认为是一种无效的解决方案,现在不鼓励,我们推荐 Java 8 的 CompletableFuture** ,这将在本章后面探讨。当然,你仍会在遗留库中遇到 Futures。

我们可以使用并行 Stream 以更简单,更优雅的方式解决这个问题:

  1. // concurrent/CountingStream.java
  2. // {VisuallyInspectOutput}
  3. import java.util.*;
  4. import java.util.concurrent.*;
  5. import java.util.stream.*;
  6. public class CountingStream {
  7. public static void main(String[] args) {
  8. System.out.println(
  9. IntStream.range(0, 10)
  10. .parallel() //这里就是开辟多个线程来处理后面的任务
  11. .mapToObj(CountingTask::new)
  12. .map(ct -> ct.call())
  13. .reduce(0, Integer::sum));
  14. }
  15. }
  16. 输出结果为
  17. 1 ForkJoinPool.commonPool-worker-3 100
  18. 8 ForkJoinPool.commonPool-worker-2 100
  19. 0 ForkJoinPool.commonPool-worker-6 100
  20. 2 ForkJoinPool.commonPool-worker-1 100
  21. 4 ForkJoinPool.commonPool-worker-5 100
  22. 9 ForkJoinPool.commonPool-worker-7 100
  23. 6 main 100
  24. 7 ForkJoinPool.commonPool-worker-4 100
  25. 5 ForkJoinPool.commonPool-worker-2 100
  26. 3 ForkJoinPool.commonPool-worker-3 100
  27. 1000
  • Lambda 和方法引用作为任务

java8 有了 lambdas 和方法引用,你不需要受限于只能使用 RunnableCallable 。因为 java8 的 lambdas 和方法引用可以通过匹配方法签名来使用(即,它支持结构一致性),所以我们可以将非 RunnableCallable 的参数传递给 ExecutorService :

  1. // concurrent/LambdasAndMethodReferences.java
  2. import java.util.concurrent.*;
  3. class NotRunnable {
  4. public void go() {
  5. System.out.println("NotRunnable");
  6. }
  7. }
  8. class NotCallable {
  9. public Integer get() {
  10. System.out.println("NotCallable");
  11. return 1;
  12. }
  13. }
  14. public class LambdasAndMethodReferences {
  15. public static void main(String[] args)throws InterruptedException {
  16. ExecutorService exec =
  17. Executors.newCachedThreadPool();
  18. exec.submit(() -> System.out.println("Lambda1"));
  19. exec.submit(new NotRunnable()::go);
  20. exec.submit(() -> {
  21. System.out.println("Lambda2");
  22. return 1;
  23. });
  24. exec.submit(new NotCallable()::get);
  25. exec.shutdown();
  26. }
  27. }
  28. 输出结果为:
  29. Lambda1
  30. NotCallable
  31. NotRunnable
  32. Lambda2

终止耗时任务

并发程序通常使用长时间运行的任务,可调用的任务虽然在完成时会有返回值,但是仍会耗费大量的时间, 可运行的任务有时候会被设置为永久运行的后台程序,这时候你就需要一种关闭任务的方法
最初的 Java 设计提供了中断运行任务的机制(为了向后兼容,仍然存在);中断机制包括阻塞问题。中断任务既乱又复杂,因为你必须了解可能发生中断的所有可能状态,以及可能导致的数据丢失。使用中断被视为反对模式,但我们仍然被迫接受。
任务终止的最佳方法就是设置任务周期性检查标志,然后任务可以通过自己的shutdown进程并正常终止, 并不是随机的关闭任务, 而是任务达到某一状态再进行关闭, 以这种方式听起来简单, 但是也会遇到问题, 还是共享资源的问题, 如果该标志被另一个任务控制, 那么会导致碰撞可能性

java5引入了Atomic(原子)类,它提供了一组可以使用的类型, 而不必担心并发问题, 我们将添加AtomicBoolean标志来告诉任务清理自己并退出

  1. // concurrent/QuittableTask.java
  2. import java.util.concurrent.atomic.AtomicBoolean;import onjava.Nap;
  3. public class QuittableTask implements Runnable {
  4. final int id;
  5. public QuittableTask(int id) {
  6. this.id = id;
  7. }
  8. private AtomicBoolean running =
  9. new AtomicBoolean(true);
  10. public void quit() {
  11. running.set(false);
  12. }
  13. @Override
  14. public void run() {
  15. while(running.get()) // [1]
  16. new Nap(0.1);
  17. System.out.print(id + " "); // [2]
  18. }
  19. }

虽然quit()可以被多个任务调用,但是AtomicBoolean却可以防止多个任务对running的修改, 从而使quit()任务变成线程安全的

  • [1]:只要运行标志为 true,此任务的 run() 方法将继续。

需要 running AtomicBoolean 证明编写 Java program 并发时最基本的困难之一是,如果 running 是一个普通的布尔值,你可能无法在执行程序中看到问题。实际上,在这个例子中,你可能永远不会有任何问题 - 但是代码仍然是不安全的。编写表明该问题的测试可能很困难或不可能。

作为测试,我们将启动很多 QuittableTasks 然后关闭它们。尝试使用较大的 COUNT 值

  1. // concurrent/QuittingTasks.java
  2. import java.util.*;
  3. import java.util.stream.*;
  4. import java.util.concurrent.*;
  5. import onjava.Nap;
  6. public class QuittingTasks {
  7. public static final int COUNT = 150;
  8. public static void main(String[] args) {
  9. ExecutorService es =
  10. Executors.newCachedThreadPool();
  11. List<QuittableTask> tasks =
  12. IntStream.range(1, COUNT)
  13. .mapToObj(QuittableTask::new)
  14. .peek(qt -> es.execute(qt))
  15. .collect(Collectors.toList()); //这里是确保按id顺序塞对象
  16. new Nap(1);
  17. tasks.forEach(QuittableTask::quit); es.shutdown();
  18. }
  19. }
  20. 输出为:
  21. 24 27 31 8 11 7 19 12 16 4 23 3 28 32 15 20 63 60 68 6764 39 47 52 51 55 40 43 48 59 44 56 36 35 71 72 83 10396 92 88 99 100 87 91 79 75 84 76 115 108 112 104 107111 95 80 147 120 127 119 123 144 143 116 132 124 128
  22. 136 131 135 139 148 140 2 126 6 5 1 18 129 17 14 13 2122 9 10 30 33 58 37 125 26 34 133 145 78 137 141 138 6274 142 86 65 73 146 70 42 149 121 110 134 105 82 117106 113 122 45 114 118 38 50 29 90 101 89 57 53 94 4161 66 130 69 77 81 85 93 25 102 54 109 98 49 46 97

从这个demo中可以看出, 虽然用peek()来往tasks里塞QuittableTask对象, 但是他们的执行quit()的速度是不一样的,且每一个quit()都是独立的任务, 所以造成输出的结果是这种随机的

CompletableFuture 类 基本用法

java8 提供了CompletableFuture接口,它提供了非常强大的Future拓展功能,可以帮助我们简化异步编程的复杂性,并提供了函数式编程的能力, 可以通过回调的方式处理计算结果,也提供了转换和组合的CompletableFuture方法

这是一个带有静态方法 work() 的类,它对该类的对象执行某些工作:

  1. // concurrent/Machina.java
  2. import onjava.Nap;
  3. public class Machina {
  4. public enum State {
  5. START, ONE, TWO, THREE, END;
  6. State step() {
  7. if(equals(END))
  8. return END;
  9. return values()[ordinal() + 1];
  10. }
  11. }
  12. private State state = State.START;
  13. private final int id;
  14. public Machina(int id) {
  15. this.id = id;
  16. }
  17. public static Machina work(Machina m) {
  18. if(!m.state.equals(State.END)){
  19. new Nap(0.1);
  20. m.state = m.state.step();
  21. }
  22. System.out.println(m);
  23. return m;
  24. }
  25. @Override
  26. public String toString() {
  27. return"Machina" + id + ": " + (state.equals(State.END)? "complete" : state);
  28. }
  29. }

这是一个有限状态机,一个微不足道的机器,因为它没有分支……它只是从头到尾遍历一条路径。work() 方法将机器从一个状态移动到下一个状态,并且需要 100 毫秒才能完成
CompletableFuture 可以被用来做的一件事是, 使用 completedFuture() 将它感兴趣的对象进行包装。

  1. // concurrent/CompletedMachina.java
  2. import java.util.concurrent.*;
  3. public class CompletedMachina {
  4. public static void main(String[] args) {
  5. CompletableFuture<Machina> cf =
  6. CompletableFuture.completedFuture(
  7. new Machina(0));
  8. try {
  9. Machina m = cf.get(); // Doesn't block
  10. } catch(InterruptedException |
  11. ExecutionException e) {
  12. throw new RuntimeException(e);
  13. }
  14. }
  15. }

completedFuture() 创建一个“已经完成”的 CompletableFuture 。对这样一个未来做的唯一有用的事情是 get() 里面的对象,所以这看起来似乎没有用
当我们将 handle() 包装在 CompletableFuture 中时,发现我们可以在 CompletableFuture 上添加操作来处理所包含的对象,使得事情变得更加有趣:

  1. // concurrent/CompletableApply.java
  2. import java.util.concurrent.*;
  3. public class CompletableApply {
  4. public static void main(String[] args) {
  5. CompletableFuture<Machina> cf =
  6. CompletableFuture.completedFuture(
  7. new Machina(0)); //id为0的machina
  8. CompletableFuture<Machina> cf2 =
  9. cf.thenApply(Machina::work);
  10. CompletableFuture<Machina> cf3 =
  11. cf2.thenApply(Machina::work);
  12. CompletableFuture<Machina> cf4 =
  13. cf3.thenApply(Machina::work);
  14. CompletableFuture<Machina> cf5 =
  15. cf4.thenApply(Machina::work);
  16. }
  17. }
  18. 输出为:
  19. Machina0: ONE
  20. Machina0: TWO
  21. Machina0: THREE
  22. Machina0: complete

thenApply() 应用一个接收输入并产生输出的函数。在本例中,work() 函数产生的类型与它所接收的类型相同 (Machina),因此每个 CompletableFuture添加的操作的返回类型都为 Machina, 但是 (类似于流中的 map() ) 函数也可以返回不同的类型,这将体现在返回类型上。

你可以在此处看到有关 CompletableFutures 的重要信息:它们会在你执行操作时自动解包并重新包装它们所携带的对象。这使得编写和理解代码变得更加简单, 而不会在陷入在麻烦的细节中。

我们可以消除中间变量并将操作链接在一起,就像我们使用 Streams 一样:

  1. 我们可以消除中间变量并将操作链接在一起,就像我们使用 Streams 一样:
  2. // concurrent/CompletableApplyChained.javaimport java.util.concurrent.*;
  3. import onjava.Timer;
  4. public class CompletableApplyChained {
  5. public static void main(String[] args) {
  6. Timer timer = new Timer();
  7. CompletableFuture<Machina> cf =
  8. CompletableFuture.completedFuture(
  9. new Machina(0))
  10. .thenApply(Machina::work)
  11. .thenApply(Machina::work)
  12. .thenApply(Machina::work)
  13. .thenApply(Machina::work);
  14. System.out.println(timer.duration());
  15. }
  16. }
  17. 输出为:
  18. Machina0: ONE
  19. Machina0: TWO
  20. Machina0: THREE
  21. Machina0: complete
  22. 514

这里我们还添加了一个 Timer,它的功能在每一步都显性地增加 100ms 等待时间之外,还将 CompletableFuture 内部 thenApply 带来的额外开销给体现出来了。CompletableFuture的一个重要的好处就是它鼓励使用私有子类原则(不共享任何东西) , 默认情况下使用thenApply()来应用一个不对外通信的函数, 它只需一个参数并返回一个结果, 这是函数式编程的基础 并且它在并发情况下特别有效, 并行流和 ComplempleFutures 旨在支持这些原则 ->只要你不进行数据共享, 你的并发就是相对安全的

回调thenApply() ,一旦开始操作, 在完成所有任务之前, 不会完成completableFuture的构建, 虽然有时候这很有用, 但通常开始所有任务更为重要,这样就可以继续前进并执行其他操作, 我们可以通过thenApplyAsync()来实现此目的

  1. // concurrent/CompletableApplyAsync.java
  2. import java.util.concurrent.*;
  3. import onjava.*;
  4. public class CompletableApplyAsync {
  5. public static void main(String[] args) {
  6. Timer timer = new Timer();
  7. CompletableFuture<Machina> cf =
  8. CompletableFuture.completedFuture(
  9. new Machina(0))
  10. .thenApplyAsync(Machina::work)
  11. .thenApplyAsync(Machina::work)
  12. .thenApplyAsync(Machina::work)
  13. .thenApplyAsync(Machina::work);
  14. System.out.println(timer.duration());
  15. System.out.println(cf.join());
  16. System.out.println(timer.duration());
  17. }
  18. }
  19. 输出结果为:
  20. 116
  21. Machina0: ONE
  22. Machina0: TWO
  23. Machina0: THREE
  24. Machina0: complete
  25. Machina0: complete
  26. 552

同步调用意味着任务全部完成才返回, 而async异步代表完成一个就立即返回一个, 也就是说没触发一次thenApplyAsync就立刻返回一个结果,因此可以进行下一次的调用,这样整个链路完成的就比之前快
事实上,如果没有回调 cf.join() 方法,程序会在完成其工作之前退出。而 cf.join() 直到 cf 操作完成之前,阻止 main() 进程结束。我们还可以看出本示例大部分时间消耗在 cf.join() 这。
这种”立即返回”的异步能力需要completableFuture库进行一些秘密(客户无感)的操作,特别是,它将你需要的操作存储为一组回调, 当操作第一个链路(后台操作)完成并返回结果时,第二个链路(后台操作)必须获得新生的mechina,并开始工作,以此类推但这种异步机制没有我们可以通过程序调用栈控制的普通函数调用序列,它的调用链路顺序会丢失,因此它使用一个函数地址来存储的回调来解决这个问题。-参考回调函数

幸运的是,这就是你需要了解的有关回调的全部信息。程序员将这种人为制造的混乱称为 callback hell(回调地狱)。通过异步调用,CompletableFuture 帮你管理所有回调。 除非你知道你系统中的一些特定逻辑会导致某些改变,或许你更想使用异步调用来实现程序。

  • 其他操作

下面的示例展示了所有”基本”操作,这些操作既不涉及组合两个 CompletableFuture,也不涉及异常 (我们将在后面介绍)。首先,为了提供简洁性和方便性,我们应该重用以下两个实用程序:

package onjava;
import java.util.concurrent.*;

public class CompletableUtilities {
  // Get and show value stored in a CF:
  public static void showr(CompletableFuture<?> c) {
    try {
      System.out.println(c.get());
    } catch(InterruptedException
            | ExecutionException e) {
      throw new RuntimeException(e);
    }
  }
  // For CF operations that have no value:
  public static void voidr(CompletableFuture<Void> c) {
    try {
      c.get(); // Returns void
    } catch(InterruptedException
            | ExecutionException e) {
      throw new RuntimeException(e);
    }
  }
}

showr()CompletableFuture<Integer> 上调用 get(),并显示结果,try/catch 两个可能会出现的异常。
voidr()CompletableFuture<Void>showr() 版本,也就是说,CompletableFutures 只为任务完成或失败时显示信息。
为简单起见,下面的 CompletableFutures 只包装整数。cfi() 是一个便利的方法,它把一个整数包装在一个完整的 CompletableFuture<Integer> :

// concurrent/CompletableOperations.java
import java.util.concurrent.*;
import static onjava.CompletableUtilities.*;

public class CompletableOperations {
    static CompletableFuture<Integer> cfi(int i) {
        return
                CompletableFuture.completedFuture(
                        Integer.valueOf(i));  // 这里就是包装int变成completableFuture<Interger>的
    }

    public static void main(String[] args) {
        showr(cfi(1)); // Basic test  输出为1 返回被包装的Interger
        voidr(cfi(2).runAsync(() ->
                System.out.println("runAsync"))); //调用runAsync,由于它的入参是runnable,它是静态方法,返回一个completableFuture<Void>
        voidr(cfi(3).thenRunAsync(() ->
                System.out.println("thenRunAsync"))); // 在这里看起来是跟runAsync的效果一样,但是then的含义会先返回一个新的状态,当此阶段完成时再执行给定操作
        voidr(CompletableFuture.runAsync(() ->
                System.out.println("runAsync is static")));
        showr(CompletableFuture.supplyAsync(() -> 99)); // supplyAsync的特点就是返回一个带类型的completableFuture<Interger>, 且它的入参是suppler
        voidr(cfi(4).thenAcceptAsync(i ->
                System.out.println("thenAcceptAsync: " + i)));thenAcceptAsync返回一个completableFuture<Void> 
        showr(cfi(5).thenApplyAsync(i -> i + 42));
        showr(cfi(6).thenComposeAsync(i -> cfi(i + 99)));
        CompletableFuture<Integer> c = cfi(7);
        c.obtrudeValue(111);
        showr(c);
        showr(cfi(8).toCompletableFuture());
        c = new CompletableFuture<>();
        c.complete(9);
        showr(c);
        c = new CompletableFuture<>();
        c.cancel(true);
        System.out.println("cancelled: " +
                c.isCancelled());
        System.out.println("completed exceptionally: " +
                c.isCompletedExceptionally());
        System.out.println("done: " + c.isDone());
        System.out.println(c);
        c = new CompletableFuture<>();
        System.out.println(c.getNow(777));
        c = new CompletableFuture<>();
        c.thenApplyAsync(i -> i + 42)
                .thenApplyAsync(i -> i * 12);
        System.out.println("dependents: " +
                c.getNumberOfDependents());
        c.thenApplyAsync(i -> i / 2);
        System.out.println("dependents: " +
                c.getNumberOfDependents());
    }
}

输出结果
1
runAsync
thenRunAsync
runAsync is static
99
thenAcceptAsync: 4
47
105
111
8
9
cancelled: true
completed exceptionally: true
done: true
java.util.concurrent.CompletableFuture@6d311334[Complet ed exceptionally]
777
dependents: 1
dependents: 2

main() 包含一系列可由其 int 值引用的测试。

  • cfi(1) 演示了 showr() 正常工作。
  • cfi(2) 是调用 runAsync() 的示例。由于 Runnable 不产生返回值,因此使用了返回 CompletableFuture <Void>voidr() 方法。
  • 注意使用cfi(3),thenRunAsync()效果似乎与 上例cfi(2)使用的runAsync()相同,差异在后续的测试中体现:
    • runAsync() 是一个 static 方法,所以你通常不会像cfi(2)一样调用它。相反你可以在 QuittingCompletable.java 中使用它。
    • 后续测试中表明 supplyAsync() 也是静态方法,区别在于它需要一个 Supplier 而不是Runnable, 并产生一个CompletableFuture<Integer> 而不是 CompletableFuture<Void>
  • then系列方法将对现有的CompletableFuture<Integer>进一步操作。
    • thenRunAsync() 不同,cfi(4)cfi(5)cfi(6) “then” 方法的参数是未包装的 Integer
    • 通过使用voidr()方法可以看到:
      • AcceptAsync()接收了一个 Consumer,因此不会产生结果。
      • thenApplyAsync() 接收一个Function, 并生成一个结果(该结果的类型可以不同于其输入类型)。
      • thenComposeAsync()thenApplyAsync()非常相似,唯一区别在于其 Function 必须产生已经包装在CompletableFuture中的结果。
  • cfi(7) 示例演示了 obtrudeValue(),它强制将值作为结果。
  • cfi(8) 使用 toCompletableFuture()CompletionStage 生成一个CompletableFuture
  • c.complete(9) 显示了如何通过给它一个结果来完成一个taskfuture)(与 obtrudeValue() 相对,后者可能会迫使其结果替换该结果)。
  • 如果你调用 CompletableFuture中的 cancel()方法,如果已经完成此任务,则正常结束。 如果尚未完成,则使用 CancellationException 完成此 CompletableFuture
  • 如果任务(future)完成,则 getNow() 方法返回CompletableFuture的完成值,否则返回getNow()的替换参数。
  • 最后,我们看一下依赖 (dependents) 的概念。如果我们将两个thenApplyAsync()调用链路到CompletableFuture上,则依赖项的数量不会增加,保持为 1。但是,如果我们另外将另一个thenApplyAsync()直接附加到c,则现在有两个依赖项:两个一起的链路和另一个单独附加的链路。
    • 这表明你可以使用一个CompletionStage,当它完成时,可以根据其结果派生多个新任务。

      结合 CompletableFuture

      第二种类型的completableFuture方法采用两种completableFuture,并以各异的方式将它们组合在一起, 就像两个人在比赛一样,一个completableFuture通常比另一个更早地到达终点, 这些方法允许你以不同的方法处理结果.

异常

CompletableFuture 在处理链中包装对象的方式相同,它也会缓冲异常。这些在处理时调用者是无感的,但仅当你尝试提取结果时才会被告知。

为了说明它们是如何工作的,我们首先创建一个类

// concurrent/Breakable.java
import java.util.concurrent.*;
public class Breakable {
    String id;
    private int failcount;

    public Breakable(String id, int failcount) {
        this.id = id;
        this.failcount = failcount;
    }

    @Override
    public String toString() {
        return "Breakable_" + id + " [" + failcount + "]";
    }

    public static Breakable work(Breakable b) {
        if (--b.failcount == 0) {
            System.out.println(
                    "Throwing Exception for " + b.id + ""
            );
            throw new RuntimeException(
                    "Breakable_" + b.id + " failed"
            );
        }
        System.out.println(b);
        return b;
    }
}

failcount > 0,且每次将对象传递给 work() 方法时, failcount - 1 。当failcount - 1 = 0 时,work() 将抛出一个异常。如果传给 work()failcount = 0work() 永远不会抛出异常。
注意,异常信息此示例中被抛出( RuntimeException )
在下面示例 test() 方法中,work() 多次应用于 Breakable,因此如果 failcount 在范围内,就会抛出异常。然而,在测试AE中,你可以从输出中看到抛出了异常,但它们从未出现:

// concurrent/CompletableExceptions.java
import java.util.concurrent.*;
public class CompletableExceptions {
    static CompletableFuture<Breakable> test(String id, int failcount) {
        return CompletableFuture.completedFuture(
                new Breakable(id, failcount))
                .thenApply(Breakable::work)
                .thenApply(Breakable::work)
                .thenApply(Breakable::work)
                .thenApply(Breakable::work);
    }

    public static void main(String[] args) {
        // Exceptions don't appear ...
        test("A", 1);
        test("B", 2);
        test("C", 3);
        test("D", 4);
        test("E", 5);
        // ... until you try to fetch the value:
        try {
            test("F", 2).get(); // or join() //这里是只有get的时候 异常才会被抛出
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        // Test for exceptions:
        System.out.println(
                test("G", 2).isCompletedExceptionally()
        );
        // Counts as "done":
        System.out.println(test("H", 2).isDone());
        // Force an exception:
        CompletableFuture<Integer> cfi =
                new CompletableFuture<>();
        System.out.println("done? " + cfi.isDone());
        cfi.completeExceptionally(
                new RuntimeException("forced"));
        try {
            cfi.get();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
输出结果:
Throwing Exception for A
Breakable_B [1]
Throwing Exception for B
Breakable_C [2]
Breakable_C [1]
Throwing Exception for C
Breakable_D [3]
Breakable_D [2]
Breakable_D [1]
Throwing Exception for D
Breakable_E [4]
Breakable_E [3]
Breakable_E [2]
Breakable_E [1]
Breakable_F [1]
Throwing Exception for F
java.lang.RuntimeException: Breakable_F failed
Breakable_G [1]
Throwing Exception for G
true
Breakable_H [1]
Throwing Exception for H
true
done? false
java.lang.RuntimeException: forced

AE 运行到抛出异常,然后…并没有将抛出的异常暴露给调用方。只有在测试 F 中调用 get() 时,我们才会看到抛出的异常。 测试 G 表明,你可以首先检查在处理期间是否抛出异常,而不抛出该异常。然而,test H 告诉我们,不管异常是否成功,它仍然被视为已“完成”。 代码的最后一部分展示了如何将异常插入到 CompletableFuture 中,而不管是否存在任何失败。
在连接或获取结果时,我们使用 CompletableFuture 提供的更复杂的机制来自动响应异常,而不是使用粗糙的 try-catch

你可以使用与我们看到的所有 CompletableFuture 相同的表单来完成此操作:在链中插入一个 CompletableFuture 调用。有三个选项 exceptionally()handle()whenComplete():

// concurrent/CatchCompletableExceptions.java
import java.util.concurrent.*;
public class CatchCompletableExceptions {
    static void handleException(int failcount) {
        // Call the Function only if there's an
        // exception, must produce same type as came in:
        CompletableExceptions
                .test("exceptionally", failcount)
                .exceptionally((ex) -> { // Function
                    if (ex == null)
                        System.out.println("I don't get it yet");
                    return new Breakable(ex.getMessage(), 0);
                })
                .thenAccept(str ->
                        System.out.println("result: " + str));

        // Create a new result (recover):
        CompletableExceptions
                .test("handle", failcount)
                .handle((result, fail) -> { // BiFunction
                    if (fail != null)
                        return "Failure recovery object";
                    else
                        return result + " is good";
                })
                .thenAccept(str ->
                        System.out.println("result: " + str));

        // Do something but pass the same result through:
        CompletableExceptions
                .test("whenComplete", failcount)
                .whenComplete((result, fail) -> { // BiConsumer
                    if (fail != null)
                        System.out.println("It failed");
                    else
                        System.out.println(result + " OK");
                })
                .thenAccept(r ->
                        System.out.println("result: " + r));
    }

    public static void main(String[] args) {
        System.out.println("**** Failure Mode ****");
        handleException(2);
        System.out.println("**** Success Mode ****");
        handleException(0);
    }
}
输出结果
**** Failure Mode ****
Breakable_exceptionally [1]
Throwing Exception for exceptionally
result: Breakable_java.lang.RuntimeException:
Breakable_exceptionally failed [0]
Breakable_handle [1]
Throwing Exception for handle
result: Failure recovery object
Breakable_whenComplete [1]
Throwing Exception for whenComplete
It failed
**** Success Mode ****
Breakable_exceptionally [-1]
Breakable_exceptionally [-2]
Breakable_exceptionally [-3]
Breakable_exceptionally [-4]
result: Breakable_exceptionally [-4]
Breakable_handle [-1]
Breakable_handle [-2]
Breakable_handle [-3]
Breakable_handle [-4]
result: Breakable_handle [-4] is good
Breakable_whenComplete [-1]
Breakable_whenComplete [-2]
Breakable_whenComplete [-3]
Breakable_whenComplete [-4]
Breakable_whenComplete [-4] OK
result: Breakable_whenComplete [-4]

exceptionally :返回一个新的CompletableFuture,当CompletableFuture完成时完成,结果是异常触发此CompletableFuture的完成特殊功能的给定功能; 否则,如果此CompletableFuture正常完成,则返回的CompletableFuture也会以相同的值正常完成。 注意:使用方法whenComplete和handle可以使用此功能更灵活的版本。 说白了就是异常会触发功能, 只是会返回一个相同的completableFuture

handle() 一致被调用来查看是否发生异常(必须检查 fail 是否为 true)。

  • 但是 handle() 可以生成任何新类型,所以它允许执行处理,而不是像使用 exceptionally()那样简单地恢复。
  • whenComplete() 类似于 handle(),同样必须测试它是否失败,但是参数是一个消费者,并且不修改传递给它的结果对象。

    检查性异常

    CompletableFutureparallel Stream 都不支持包含检查性异常的操作。相反,你必须在调用操作时处理检查到的异常,这会产生不太优雅的代码:手动去try/catch

    死锁

    由于任务会被阻塞,一个任务会等待另一个任务完成, 以此类推 ,这样一直下去,直到最后一个任务又在等第一个等待的任务.这得到了一个任务之间相互等待的连续循环, 没有哪个线程能继续, 这称之为死锁
    。真正的问题在于,程序看起来工作良好, 但是具有潜在的死锁危险。这时, 死锁可能发生,而事先却没有任何征兆, 所以 bug 会潜伏在你的程序例,直到客户发现它出乎意料的发生(以一种几乎肯定是很难重现的方式发生)。因此在编写并发程序的时候,进行仔细的程序设计以防止死锁是关键部分。

构造方法非线程安全