第 24 章 并发编程

爱丽丝:“但是我不想进入疯狂的人群众”

猫咪:“oh,你无能为力,我们都疯了,我疯了,你也疯了”

爱丽丝:“你怎么知道我疯了”。

猫咪:“你一定疯了,否则你不会来到这里”——爱丽丝梦游仙境 第 6 章。

到目前为止,我们一直在编程,就像文学中的意识流叙事设备一样:首先发生一件事,然后是下一件事。我们完全控制所有步骤及其发生的顺序。如果我们将值设置为 5,那么稍后会回来并发现它是 47,这将是非常令人惊讶的。

我们现在进入了一个奇怪的并发世界,在此这个结果并不令人惊讶。你信赖的一切都不再可靠。它可能有效,也可能没有。很可能它会在某些条件下有效,而不是在其他条件下,你必须知道和了解这些情况以确定哪些有效。

作为类比,你的正常生活是在牛顿力学中发生的。物体具有质量:它们会下降并移动它们的动量。电线具有阻力,过线可以直线传播。但是,如果你进入非常小、热、冷、或者大的世界(我们不能生存),这些事情会发生变化。我们无法判断某物体是粒子还是波,光是否受到重力影响,一些物质变为超导体。

而不是单一的意识流叙事,我们在同时多条故事线进行的间谍小说里。一个间谍在一个特殊的岩石下李璐下微缩胶片,当第二个间谍来取回包裹时,它可能已经被第三个间谍带走了。但是这部特别的小说并没有把事情搞得一团糟;你可以轻松地走到尽头,永远不会弄明白什么。

构建并发应用程序非常类似于游戏Jenga,每当你拉出一个块并将其放置在塔上时,一切都会崩溃。每个塔楼和每个应用程序都是独一无二的,有自己的作用。您从构建系统中学到的东西可能不适用于下一个系统。

本章是对并发性的一个非常基本的介绍。虽然我使用了最现代的 Java 8 工具来演示原理,但这一章远非对该主题的全面处理。我的目标是为你提供足够的基础知识,使你能够解决问题的复杂性和危险性,从而安全的通过这些鲨鱼肆虐的困难水域。

对于更多凌乱,低级别的细节,请参阅附录:并发底层原理。要进一步深入这个领域,你还必须阅读 Brian Goetz 等人的 Java Concurrency in Practice。虽然在写作时,这本书已有十多年的历史,但它仍然包含你必须了解和理解的必需品。理想情况下,本章和附录是该书的精心准备。另一个有价值的资源是Bill Venner的 Inside the Java Virtual Machine,它详细描述了 JVM 的最内部工作方式,包括线程。

术语问题

在编程文献中并发、并行、多任务、多处理、多线程、分布式系统(以及可能的其他)使用了许多相互冲突的方式,并且经常被混淆。Brian Goetz 在 2016 年的演讲中指出了这一点From Concurrent to Parallel,他提出了一个合理的解释:

  • 并发是关于正确有效地控制对共享资源的访问。
  • 并行是使用额外的资源来更快地产生结果。

这些都是很好的定义,但有几十年的混乱产生了反对解决问题的历史。一般来说,当人们使用“并发”这个词时,他们的意思是“一切变得混乱”,事实上,我可能会在很多地方自己陷入这种想法,大多数书籍,包括 Brian Goetz 的 Java Concurrency in Practice,都在标题中使用这个词。

并发通常意味着“不止一个任务正在执行中”,而并行性几乎总是意味着“不止一个任务同时执行。”你可以立即看到这些定义的区别:并行也有不止一个任务“正在进行”。区别在于细节,究竟是如何“执行”发生的。此外,重叠:为并行编写的程序有时可以在单个处理器上运行,而一些并发编程系统可以利用多个处理器。

这是另一种方法,在减速[原文:slowdown]发生的地方写下定义:

并发

同时完成多个任务。在开始处理其他任务之前,当前任务不需要完成。并发解决了阻塞发生的问题。当任务无法进一步执行,直到外部环境发生变化时才会继续执行。最常见的例子是 I/O,其中任务必须等待一些 input(在这种情况下会被阻止)。这个问题产生在 I/O 密集型。

并行

同时在多个地方完成多个任务。这解决了所谓的计算密集型问题,如果将程序分成多个部分并在不同的处理器上编辑不同的部分,程序可以运行得更快。

术语混淆的原因在上面的定义中显示:其中核心是“在同一时间完成多个任务。”并行性通过多个处理器增加分布。更重要的是,两者解决了不同类型的问题:解决 I/O 密集型问题,并行化可能对你没有任何好处,因为问题不是整体速度,而是阻塞。并且考虑到计算力限制问题并试图在单个处理器上使用并发来解决它可能会浪费时间。两种方法都试图在更短的时间内完成更多,但它们实现加速的方式是不同的,并且取决于问题所带来的约束。

这两个概念混合在一起的一个主要原因是包括 Java 在内的许多编程语言使用相同的机制线程来实现并发和并行。

我们甚至可以尝试添加细致的粒度去定义(但是,这不是标准化的术语):

  • 纯并发:任务仍然在单个 CPU 上运行。纯并发系统产生的结果比顺序系统更快,但如果有更多的处理器,则运行速度不会更快
  • 并发-并行:使用并发技术,结果程序利用更多处理器并更快地生成结果
  • 并行-并发:使用并行编程技术编写,如果只有一个处理器,结果程序仍然可以运行(Java 8 Streams就是一个很好的例子)。
  • 纯并行:除非有多个处理器,否则不会运行。

在某些情况下,这可能是一个有用的分类法。

对并发性的语言和库支持似乎Leaky Abstraction是完美候选者。抽象的目标是“抽象出”那些对于手头想法不重要的东西,从不必要的细节中汲取灵感。如果抽象是漏洞,那些碎片和细节会不断重新声明自己是重要的,无论你试图隐藏它们多少

我开始怀疑是否真的有高度抽象。当编写这些类型的程序时,你永远不会被底层系统和工具屏蔽,甚至关于 CPU 缓存如何工作的细节。最后,如果你非常小心,你创作的东西在特定的情况下起作用,但它在其他情况下不起作用。有时,区别在于两台机器的配置方式,或者程序的估计负载。这不是 Java 特有的-它是并发和并行编程的本质。

您可能会认为纯函数式语言没有这些限制。实际上,纯函数式语言解决了大量并发问题,所以如果你正在解决一个困难的并发问题,你可以考虑用纯函数语言编写这个部分。但最终,如果你编写一个使用队列的系统,例如,如果它没有正确调整并且输入速率要么没有被正确估计或被限制(并且限制意味着,在不同情况下不同的东西具有不同的影响),该队列将填满并阻塞或溢出。最后,您必须了解所有细节,任何问题都可能会破坏您的系统。这是一种非常不同的编程方式

并发的新定义

几十年来,我一直在努力解决各种形式的并发问题,其中一个最大的挑战一直是简单地定义它。在撰写本章的过程中,我终于有了这样的洞察力,我认为可以定义它:

并发性是一系列性能技术,专注于减少等待

这实际上是一个相当多的声明,所以我将其分解:

  • 这是一个集合:有许多不同的方法来解决这个问题。这是使定义并发性如此具有挑战性的问题之一,因为技术差别很大
  • 这些是性能技术:就是这样。并发的关键点在于让您的程序运行得更快。在 Java 中,并发是非常棘手和困难的,所以绝对不要使用它,除非你有一个重大的性能问题 - 即使这样,使用最简单的方法产生你需要的性能,因为并发很快变得无法管理。
  • “减少等待”部分很重要而且微妙。无论(例如)你运行多少个处理器,你只能在等待某个地方时产生结果。如果您发起 I/O 请求并立即获得结果,没有延迟,因此无需改进。如果您在多个处理器上运行多个任务,并且每个处理器都以满容量运行,并且任何其他任务都没有等待,那么尝试提高吞吐量是没有意义的。并发的唯一形式是如果程序的某些部分被迫等待。等待可以以多种形式出现 - 这解释了为什么存在如此不同的并发方法。

值得强调的是,这个定义的有效性取决于等待这个词。如果没有什么可以等待,那就没有机会了。如果有什么东西在等待,那么就会有很多方法可以加快速度,这取决于多种因素,包括系统运行的配置,你要解决的问题类型以及其他许多问题。

并发的超能力

想象一下,你是一部科幻电影。您必须在高层建筑中搜索一个精心巧妙地隐藏在建筑物的一千万个房间之一中的单个物品。您进入建筑物并移动走廊。走廊分开了。

你自己完成这项任务需要一百个生命周期。

现在假设你有一个奇怪的超级大国。你可以将自己分开,然后在继续前进的同时将另一半送到另一个走廊。每当你在走廊或楼梯上遇到分隔到下一层时,你都会重复这个分裂的技巧。最后,你会有一个人在整个建筑物的每个终点走廊。

每个走廊都有一千个房间。你的超级大国正在变得有点瘦,所以你只能让自己 50 个人同时搜索房间。

一旦克隆体进入房间,它必须搜索房间的所有裂缝和隐藏的口袋。它切换到第二个超级大国。它分成了一百万个纳米机器人,每个机器人都会飞到或爬到房间里一些看不见的地方。你不明白这种力量 - 一旦你启动它就会起作用。在他们自己的控制下,纳米机器人开始行动,搜索房间然后回来重新组装成你,突然,不知何故,你只知道物品是否在房间里

我很想能够说,“你在科幻小说中的超级大国?这就是并发性。“每当你有更多的任务要解决时,它就像分裂两个一样简单。问题是我们用来描述这种现象的任何模型最终都是抽象的

以下是其中一个漏洞:在理想的世界中,每次克隆自己时,您还会复制硬件处理器来运行该克隆。但当然不会发生这种情况 - 您的机器上可能有四个或八个处理器(通常在写入时)。您可能还有更多,并且仍有许多情况只有一个处理器。在抽象的讨论中,物理处理器的分配方式不仅可以泄漏,甚至可以支配您的决策

让我们在科幻电影中改变一些东西。现在当每个克隆搜索者最终到达一扇门时,他们必须敲门并等到有人回答。如果我们每个搜索者有一个处理器,这没有问题 - 处理器只是空闲,直到门被回答。但是如果我们只有 8 个处理器和数千个搜索者,那么只是因为搜索者恰好是因为处理器闲置了被锁,等待一扇门被接听。相反,我们希望将处理器应用于搜索,在那里它可以做一些真正的工作,因此需要将处理器从一个任务切换到另一个任务的机制。

许多型号能够有效地隐藏处理器的数量,并允许您假装您的数量非常大。但是有些情况会发生故障的时候,你必须知道处理器的数量,以便你可以解决这个问题。

其中一个最大的影响取决于您是单个处理器还是多个处理器。如果你只有一个处理器,那么任务切换的成本也由该处理器承担,将并发技术应用于你的系统会使它运行得更慢。

这可能会让您决定,在单个处理器的情况下,编写并发代码时没有意义。然而,有些情况下,并发模型会产生更简单的代码,实际上值得让它运行得更慢以实现。

在克隆体敲门等待的情况下,即使单处理器系统也能从并发中受益,因为它可以从等待(阻塞)的任务切换到准备好的任务。但是如果所有任务都可以一直运行那么切换的成本会降低一切,在这种情况下,如果你有多个进程,并发通常只会有意义。

在接听电话的客户服务部门,你只有一定数量的人,但是你可以拨打很多电话。那些人(处理器)必须一次拨打一个电话,直到完成电话和额外的电话必须排队。

在“鞋匠和精灵”的童话故事中,鞋匠做了很多工作,当他睡着时,一群精灵来为他制作鞋子。这里的工作是分布式的,但即使使用大量的物理处理器,在制造鞋子的某些部件时会产生限制 - 例如,如果鞋底需要制作鞋子,这会限制制鞋的速度并改变您设计解决方案的方式。

因此,您尝试解决的问题驱动解决方案的设计。打破一个“独立运行”问题的高级[原文:lovely ]抽象,然后就是实际发生的现实。物理现实不断侵入和震撼,这种抽象。

这只是问题的一部分。考虑一个制作蛋糕的工厂。我们不知何故在工人中分发了蛋糕制作任务,但是现在是时候让工人把蛋糕放在盒子里了。那里有一个盒子,准备收到蛋糕。但是,在工人将蛋糕放入盒子之前,另一名工人投入并将蛋糕放入盒子中!我们的工人已经把蛋糕放进去了,然后就开始了!这两个蛋糕被砸碎并毁了。这是常见的“共享内存”问题,产生我们称之为竞争条件的问题,其结果取决于哪个工作人员可以首先在框中获取蛋糕(通常使用锁定机制来解决问题,因此一个工作人员可以先抓住框并防止蛋糕砸)。

当“同时”执行的任务相互干扰时,会出现问题。他可以以如此微妙和偶然的方式发生,可能公平地说,并发性“可以说是确定性的,但实际上是非确定性的。”也就是说,你可以假设编写通过维护和代码检查正常工作的并发程序。然而,在实践中,编写仅看起来可行的并发程序更为常见,但是在适当的条件下,将会失败。这些情况可能会发生,或者很少发生,你在测试期间从未看到它们。实际上,编写测试代码通常无法为并发程序生成故障条件。由此产生的失败只会偶尔发生,因此它们以客户投诉的形式出现。 这是推动并发的最强有力的论据之一:如果你忽略它,你可能会被咬。

因此,并发似乎充满了危险,如果这让你有点害怕,这可能是一件好事。尽管 Java 8 在并发性方面做出了很大改进,但仍然没有像编译时验证或检查异常那样的安全网来告诉您何时出现错误。通过并发,您可以自己动手,只有知识渊博,可疑和积极,才能用 Java 编写可靠的并发代码。

并发为速度而生

在听说并发编程的问题之后,你可能会想知道它是否值得这么麻烦。答案是“不,除非你的程序运行速度不够快。”并且在决定它没有之前你会想要仔细思考。不要随便跳进并发编程的悲痛之中。如果有一种方法可以在更快的机器上运行您的程序,或者如果您可以对其进行分析并发现瓶颈并在该位置交换更快的算法,那么请执行此操作。只有在显然没有其他选择时才开始使用并发,然后仅在孤立的地方。

速度问题一开始听起来很简单:如果你想要一个程序运行得更快,将其分解成碎片并在一个单独的处理器上运行每个部分。由于我们能够提高时钟速度流(至少对于传统芯片),速度的提高是出现在多核处理器的形式而不是更快的芯片。为了使你的程序运行得更快,你必须学习利用那些超级处理器,这是并发性给你的一个建议。

使用多处理器机器,可以在这些处理器之间分配多个任务,这可以显着提高吞吐量。强大的多处理器 Web 服务器通常就是这种情况,它可以在程序中为 CPU 分配大量用户请求,每个请求分配一个线程。

但是,并发性通常可以提高在单个处理器上运行的程序的性能。这听起来像是一个双向的。如果考虑一下,由于上下文切换的成本增加(从一个任务更改为另一个任务),在单个处理器上运行的并发程序实际上应该比程序的所有部分顺序运行具有更多的开销。在表面上,将程序的所有部分作为单个任务运行并节省上下文切换的成本似乎更便宜。

可以产生影响的问题是阻塞。如果你的程序中的一个任务由于程序控制之外的某些条件(通常是 I/O)而无法继续,我们会说任务或线程阻塞(在我们的科幻故事中,克隆体已敲门而且是等待它打开)。如果没有并发性,整个程序就会停止,直到外部条件发生变化。但是,如果使用并发编写程序,则当一个任务被阻止时,程序中的其他任务可以继续执行,因此程序继续向前移动。实际上,从性能的角度来看,在单处理器机器上使用并发是没有意义的,除非其中一个任务可能阻塞。

单处理器系统中性能改进的一个常见例子是事件驱动编程,特别是用户界面编程。考虑一个程序执行一些长时间运行操作,从而最终忽略用户输入和无响应。如果你有一个“退出”按钮,你不想在你编写的每段代码中轮询它。这会产生笨拙的代码,无法保证程序员不会忘记执行检查。没有并发性,生成响应式用户界面的唯一方法是让所有任务定期检查用户输入。通过创建单独的执行线程来响应用户输入,该程序保证了一定程度的响应。

实现并发的直接方法是在操作系统级别,使用与线程不同的进程。进程是一个在自己的地址空间内运行的自包含程序。进程很有吸引力,因为操作系统通常将一个进程与另一个进程隔离,因此它们不会相互干扰,这使得进程编程相对容易。相比之下,线程共享内存和 I/O 等资源,因此编写多线程程序时遇到的困难是在不同的线程驱动的任务之间协调这些资源,一次不能通过多个任务访问它们。 有些人甚至提倡将进程作为并发的唯一合理方法[^1],但不幸的是,通常存在数量和开销限制,以防止它们在并发频谱中的适用性(最终你习惯了标准的并发性克制,“这种方法适用于一些情况但不适用于其他情况”)

一些编程语言旨在将并发任务彼此隔离。这些通常被称为函数式语言,其中每个函数调用不产生其他影响(因此不能与其他函数干涉),因此可以作为独立的任务来驱动。Erlang 就是这样一种语言,它包括一个任务与另一个任务进行通信的安全机制。如果您发现程序的一部分必须大量使用并发性并且您在尝试构建该部分时遇到了过多的问题,那么您可能会考虑使用专用并发语言创建程序的那一部分。 Java 采用了更传统的方法^2,即在顺序语言之上添加对线程的支持而不是在多任务操作系统中分配外部进程,线程在执行程序所代表的单个进程中创建任务交换。

并发性会带来成本,包括复杂性成本,但可以通过程序设计,资源平衡和用户便利性的改进来抵消。通常,并发性使您能够创建更加松散耦合的设计;否则,您的代码部分将被迫明确标注通常由并发处理的操作。

四句格言

在经历了多年的 Java 并发之后,我总结了以下四个格言:

1.不要这样做

2.没有什么是真的,一切可能都有问题

3.它起作用,并不意味着它没有问题

4.你仍然必须理解它

这些特别是关于 Java 设计中的问题,尽管它也可以应用于其他一些语言。但是,确实存在旨在防止这些问题的语言。

1.不要这样做

(不要自己动手)

避免纠缠于并发产生的深层问题的最简单方法就是不要这样做。虽然它是诱人的,并且似乎足够安全,可以尝试做简单的事情,但它存在无数、微妙的陷阱。如果你可以避免它,你的生活会更容易。

证明并发性的唯一因素是速度。如果你的程序运行速度不够快 - 在这里要小心,因为只是希望它运行得更快是不合理的 - 首先应用一个分析器(参见代码校验章中分析和优化)来发现你是否可以执行其他一些优化。

如果您被迫进行并发,请采取最简单,最安全的方法来解决问题。使用众所周知的库并尽可能少地编写自己的代码。有了并发性,就没有“太简单了”。自负才是你的敌人。

2.没有什么是真的,一切可能都有问题

没有并发性的编程,你会发现你的世界有一定的顺序和一致性。通过简单地将变量赋值给某个值,很明显它应该始终正常工作。

在并发领域,有些事情可能是真的而有些事情却不是,你必须认为没有什么是真的。你必须质疑一切。即使将变量设置为某个值也可能或者可能不会按预期的方式工作,并且从那里开始走下坡路。我已经很熟悉的东西,认为它显然有效但实际上并没有。

在非并发程序中你可以忽略的各种事情突然变得非常重要。例如,您必须知道处理器缓存以及保持本地缓存与主内存一致的问题。您必须了解对象构造的深度复杂性,以便您的构造对象不会意外地将数据暴露给其他线程进行更改。问题还有很多。

虽然这些主题太复杂,无法为您提供本章的专业知识(再次参见 Java Concurrency in Practice),但您必须了解它们。

3.它起作用,并不意味着它没有问题

您可以轻松编写一个似乎可以工作,但实际上是有问题的并发程序,并且该问题仅在最极限的条件下显示出来 - 在您部署程序后不可避免地会出现用户问题。

  • 你不能证明并发程序是正确的,你只能(有时)证明它是不正确的。
  • 大多数情况下你甚至不能这样做:如果它有问题,你可能无法检测到它。
  • 您通常不能编写有用的测试,因此您必须依靠代码检查结合深入的并发知识来发现错误。
  • 即使是有效的程序也只能在其设计参数下工作。当超出这些设计参数时,大多数并发程序会以某种方式失败。

在其他 Java 主题中,我们培养了一种感觉-决定论。一切都按照语言的承诺(或隐含)进行,这是令人欣慰和期待的 - 毕竟,编程语言的目的是让机器做我们想要的。从确定性编程的世界进入并发编程领域,我们遇到了一种称为Dunning-Kruger效应的认知偏差,可以概括为“你知道的越多,你认为你知道得越多。”这意味着“……相对不熟练的人拥有着虚幻的优越感,错误地评估他们的能力远高于实际。

我自己的经验是,无论你是多么确定你的代码是线程安全的,它可能已经无效了。你可以很容易地了解所有的问题,然后几个月或几年后你会发现一些概念让你意识到你编写的大多数内容实际上都容易受到并发错误的影响。当某些内容不正确时,编译器不会告诉您。为了使它正确,你必须在研究代码时掌握前脑的所有并发问题。

在 Java 的所有非并发领域,“没有明显的错误和没有明显的编译错误”似乎意味着一切都好。对于并发,它没有任何意义。你可以在这个情况下做的最糟糕的事情是“自信”。

4.你必须仍然理解

在格言 1-3 之后,您可能会对并发性感到害怕,并且认为,“到目前为止,我已经避免了它,也许我可以继续保留它。

这是一种理性的反应。您可能知道其他编程语言更好地设计用于构建并发程序 - 甚至是在 JVM 上运行的程序(从而提供与 Java 的轻松通信),例如 Clojure 或 Scala。为什么不用这些语言编写并发部分并将 Java 用于其他所有部分呢?

唉,你不能轻易逃脱:

  • 即使你从未明确地创建一个线程,你可能使用的框架 - 例如,Swing 图形用户界面(GUI)库,或者像Timer clas 那样简单的东西。
  • 这是最糟糕的事情:当您创建组件时,您必须假设这些组件可能在多线程环境中重用。即使你的解决方案是放弃并声明你的组件“不是线程安全的”,你仍然必须知道这样的声明是重要的,它是什么意思?

人们有时会认为并发性太难,不能包含在介绍该语言的书中。他们认为并发是一个可以独立对待的独立主题,并且它在日常编程中出现的少数情况(例如图形用户界面)可以用特殊的习语来处理。如果你可以避免它,为什么要介绍这样的复杂的主题。

唉,如果只是这样的话,那就太好了。但不幸的是,您无法选择何时在 Java 程序中出现线程。仅仅你从未写过自己的线程,并不意味着你可以避免编写线程代码。例如,Web 系统是最常见的 Java 应用程序之一,本质上是多线程的 Web 服务器通常包含多个处理器,而并行性是利用这些处理器的理想方式。就像这样的系统看起来那么简单,你必须理解并发才能正确地编写它。

Java 是一种多线程语言,如果您了解它们是否存在并发问题。因此,有许多 Java 程序正在使用中,或者只是偶然工作,或者大部分时间工作并且不时地发生问题,因为。有时这种问题是相对良性的,但有时它意味着丢失有价值的数据,如果你没有意识到并发问题,你最终可能会把问题放在其他地方而不是你的代码中。如果将程序移动到多处理器系统,则可以暴露或放大这些类型的问题。基本上,了解并发性使您意识到正确的程序可能会表现出错误的行为。

残酷的真相

当人类开始烹饪他们的食物时,他们大大减少了他们的身体分解和消化食物所需的能量。烹饪创造了一个“外化的胃”,从而释放出追去其他的的能力。火的使用促成了文明。

我们现在通过计算机和网络技术创造了一个“外化大脑”,开始了第二次基本转变。虽然我们只是触及表面,但已经引发了其他转变,例如设计生物机制的能力,并且已经看到文化演变的显着加速(过去,人们不得不前往混合文化,但现在他们开始混合互联网)。这些转变的影响和好处已经超出了科幻作家预测它们的能力(他们在预测文化和个人变化,甚至技术转变的次要影响方面都特别困难)。

有了这种根本性的人类变化,看到许多破坏和失败的实验并不令人惊讶。实际上,进化依赖于无数的实验,其中大多数都失败了。这些实验是向前发展的必要条件

Java 是在充满自信,热情和睿智的氛围中创建的。在发明一种编程语言时,很容易就像语言的初始可塑性会持续存在一样,你可以把某些东西拿出来,如果不能解决问题,那么就修复它。编程语言以这种方式是独一无二的 - 它们经历了类似水的改变:气态,液态和最终的固态。在气体相位期间,灵活性似乎是无限的,并且很容易认为它总是那样。一旦人们开始使用您的语言,变化就会变得更加严重,环境变得更加粘稠。语言设计的过程本身就是一门艺术。

紧迫感来自互联网的最初兴起。它似乎是一场比赛,第一个通过起跑线的人将“获胜”(事实上,Java,JavaScript 和 PHP 等语言的流行程度可以证明这一点)。唉,通过匆忙设计语言而产生的认知负荷和技术债务最终会赶上我们。

Turing completeness是不足够的;语言需要更多的东西:它们必须能够创造性地表达,而不是用不必要的东西来衡量我们。解放我们的心理能力只是为了扭转并再次陷入困境,这是毫无意义的。我承认,尽管存在这些问题,我们已经完成了令人惊奇的事情,但我也知道如果没有这些问题我们能做得更多。

热情使原始 Java 设计师因为看起来有必要而投入功能。信心(以及原始语言的气味)让他们认为任何问题都可以解决。在时间轴的某个地方,有人认为任何加入 Java 的东西是固定的和永久性的 - 这是非常有信心,相信第一个决定永远是正确的,因此我们看到 Java 的体系中充斥着糟糕的决策。其中一些决定最终没有什么后果;例如,您可以告诉人们不要使用 Vector,但保留了对之前版本的支持。

线程包含在 Java 1.0 中。当然,并发性是影响语言远角的基本语言设计决策,很难想象以后添加它。公平地说,当时并不清楚基本的并发性是多少。像 C 这样的其他语言能够将线程视为一个附加功能,因此 Java 设计师也纷纷效仿,包括一个 Thread 类和必要的 JVM 支持(这比你想象的要复杂得多)。

C 语言是面向过程语言,这限制了它的野心。这些限制使附加线程库合理。当采用原始模型并将其粘贴到复杂语言中时,Java 的大规模扩展迅速暴露了基本问题。在 Thread 类中的许多方法的弃用以及后续的高级库浪潮中,这种情况变得明显,这些库试图提供更好的并发抽象。

不幸的是,为了在更高级别的语言中获得并发性,所有语言功能都会受到影响,包括最基本的功能,例如标识符代表可变值。在函数和方法中,所有不变和防止副作用的方法都会导致简化并发编程(这些是纯函数式编程语言的基础)的变化,但当时对于主流语言的创建者来说似乎是奇怪的想法。最初的 Java 设计师要么对这些选择有所了解,要么认为它们太不同了,并且会抛弃许多潜在的语言采用者。我们可以慷慨地说,语言设计社区当时根本没有足够的经验来理解调整在线程库中的影响。

Java 实验告诉我们,结果是悄然灾难性的。程序员很容易陷入认为 Java 线程并不那么困难的陷阱。似乎工作的程序充满了微妙的并发 bug。

为了获得正确的并发性,语言功能必须从头开始设计并考虑并发性。这艘船航行了;Java 将不再是为并发而设计的语言,而只是一种允许它的语言。

尽管有这些基本的不可修复的缺陷,但令人印象深刻的是它还有多远。Java 的后续版本添加了库,以便在使用并发时提升抽象级别。事实上,我根本不会想到有可能在 Java 8 中进行改进:并行流和CompletableFutures - 这是惊人的史诗般的变化,我会惊奇地重复的查看它[^3]。

这些改进非常有用,我们将在本章重点介绍并行流和CompletableFutures。虽然它们可以大大简化您对并发和后续代码的思考方式,但基本问题仍然存在:由于 Java 的原始设计,代码的所有部分仍然容易受到攻击,您仍然必须理解这些复杂和微妙的问题。Java 中的线程绝不是简单或安全的;那种经历必须降级为另一种更新的语言。

本章其余部分

这是我们将在本章的其余部分介绍的内容。请记住,本章的重点是使用最新的高级 Java 并发结构。使用这些使得您的生活比旧的替代品更加轻松。但是,您仍会在遗留代码中遇到一些低级工具。有时,你可能会被迫自己使用其中的一些。附录:并发底层原理包含一些更原始的 Java 并发元素的介绍。

  • Parallel Streams(并发流) 到目前为止,我已经强调了 Java 8 Streams 提供的改进语法。现在您对该语法(作为一个粉丝,我希望)感到满意,您可以获得额外的好处:您可以通过简单地将 parallel()添加到表达式来并行化流。这是一种简单,强大,坦率地说是利用多处理器的惊人方式

添加 parallel()来提高速度似乎是微不足道的,但是,唉,它就像你刚刚在残酷的真相中学到的那样简单。我将演示并解释一些盲目添加 parallel()到 Stream 表达式的缺陷。

  • 创建和运行任务 任务是一段可以独立运行的代码。为了解释创建和运行任务的一些基础知识,本节介绍了一种比并行流或 CompletableFutures:Executor 更复杂的机制。执行者管理一些低级 Thread 对象(Java 中最原始的并发形式)。您创建一个任务,然后将其交给 Executorto 运行。

有多种类型的 Executor 用于不同的目的。在这里,我们将展示规范形式,代表创建和运行任务的最简单和最佳方法。

  • 终止长时间运行的任务 任务独立运行,因此需要一种机制来关闭它们。典型的方法使用了一个标志,这引入了共享内存的问题,我们将使用 Java 的“Atomic”库来回避它。
  • Completable Futures 当您将衣服带到干洗店时,他们会给您一张收据。你继续完成其他任务,最终你的衣服很干净,你可以拿起它。收据是您与干洗店在后台执行的任务的连接。这是 Java 5 中引入的 Future 的方法。

Future 比以前的方法更方便,但你仍然必须出现并用收据取出干洗,等待任务没有完成。对于一系列操作,Futures 并没有真正帮助那么多。

Java 8 CompletableFuture 是一个更好的解决方案:它允许您将操作链接在一起,因此您不必将代码写入接口排序操作。有了 CompletableFuture 完美的结合,就可以更容易地做出“采购原料,组合成分,烹饪食物,提供食物,清理菜肴,储存菜肴”等一系列链式操作。

  • 死锁 某些任务必须去等待 - 阻塞来获得其他任务的结果。被阻止的任务有可能等待另一个被阻止的任务,等待另一个被阻止的任务,等等。如果被阻止的任务链循环到第一个,没有人可以取得任何进展,你就会陷入僵局。

如果在运行程序时没有立即出现死锁,则会出现最大的问题。您的系统可能容易出现死锁,并且只会在某些条件下死锁。程序可能在某个平台上运行正常,例如您的开发机器,但是当您将其部署到不同的硬件时会开始死锁。

死锁通常源于细微的编程错误;一系列无辜的决定,最终意外地创建了一个依赖循环。本节包含一个经典示例,演示了死锁的特性。

我们将通过模拟创建披萨的过程完成本章,首先使用并行流实现它,然后是完成配置。这不仅仅是两种方法的比较,更重要的是探索你应该投入多少工作来加速计划。

  • 努力,复杂,成本

并行流

Java 8 流的一个显着优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,他们使用一种特殊的迭代器,称为 Spliterator,它被限制为易于自动分割。这产生了相当神奇的结果,即能够简单用 parallel()然后流中的所有内容都作为一组并行任务运行。如果您的代码是使用 Streams 编写的,那么并行化以提高速度似乎是一种琐事

例如,考虑来自 Streams 的 Prime.java。查找质数可能是一个耗时的过程,我们可以看到该程序的计时:

  1. // concurrent/ParallelPrime.java
  2. import java.util.*;
  3. import java.util.stream.*;
  4. import static java.util.stream.LongStream.*;
  5. import java.io.*;
  6. import java.nio.file.*;
  7. import onjava.Timer;
  8. public class ParallelPrime {
  9. static final int COUNT = 100_000;
  10. public static boolean isPrime(long n){
  11. return rangeClosed(2, (long)Math.sqrt(n)).noneMatch(i -> n % i == 0);
  12. }
  13. public static void main(String[] args)
  14. throws IOException {
  15. Timer timer = new Timer();
  16. List<String> primes =
  17. iterate(2, i -> i + 1)
  18. .parallel() // [1]
  19. .filter(ParallelPrime::isPrime)
  20. .limit(COUNT)
  21. .mapToObj(Long::toString)
  22. .collect(Collectors.toList());
  23. System.out.println(timer.duration());
  24. Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE);
  25. }
  26. }
  27. /*
  28. Output:
  29. 1224
  30. */

请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止过激的优化;如果我们没有对结果做任何事情,那么一个高级的编译器可能会观察到程序没有意义并且消除了计算(这不太可能,但并非不可能)。请注意使用 nio2 库编写文件的简单性(在文件一章中有描述)。

当我注释掉[1] parallel()行时,我的结果大约是 parallel()的三倍。

并行流似乎是一个甜蜜的交易。您所需要做的就是将编程问题转换为流,然后插入 parallel()以加快速度。实际上,有时候这很容易。但遗憾的是,有许多陷阱。

  • parallel()不是灵丹妙药

作为对流和并行流的不确定性的探索,让我们看一个看似简单的问题:求和数字的增量序列。事实证明这是一个令人惊讶的数量,并且我将冒险将它们进行比较 - 试图小心,但承认我可能会在计时代码执行时遇到许多基本陷阱之一。结果可能有一些缺陷(例如 JVM 没有“升温”),但我认为它仍然提供了一些有用的指示。

我将从一个计时方法 rigorously 开始,它采用LongSupplier,测量getAsLong()调用的长度,将结果与checkValue进行比较并显示结果。

请注意,一切都必须严格使用long;我花了一些时间发现隐蔽的溢出,然后才意识到在重要的地方错过了long

所有关于时间和内存的数字和讨论都是指“我的机器”。你的经历可能会有所不同。

  1. // concurrent/Summing.java
  2. import java.util.stream.*;
  3. import java.util.function.*;
  4. import onjava.Timer;
  5. public class Summing {
  6. static void timeTest(String id, long checkValue, LongSupplier operation){
  7. System.out.print(id + ": ");
  8. Timer timer = newTimer();
  9. long result = operation.getAsLong();
  10. if(result == checkValue)
  11. System.out.println(timer.duration() + "ms");
  12. else
  13. System.out.format("result: %d%ncheckValue: %d%n", result, checkValue);
  14. }
  15. public static final int SZ = 100_000_000;// This even works://
  16. public static final int SZ = 1_000_000_000;
  17. public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula
  18. public static void main(String[] args){
  19. System.out.println(CHECK);
  20. timeTest("Sum Stream", CHECK, () ->
  21. LongStream.rangeClosed(0, SZ).sum());
  22. timeTest("Sum Stream Parallel", CHECK, () ->
  23. LongStream.rangeClosed(0, SZ).parallel().sum());
  24. timeTest("Sum Iterated", CHECK, () ->
  25. LongStream.iterate(0, i -> i + 1)
  26. .limit(SZ+1).sum());
  27. // Slower & runs out of memory above 1_000_000:
  28. // timeTest("Sum Iterated Parallel", CHECK, () ->
  29. // LongStream.iterate(0, i -> i + 1)
  30. // .parallel()
  31. // .limit(SZ+1).sum());
  32. }
  33. }
  34. /* Output:5000000050000000
  35. Sum Stream: 167ms
  36. Sum Stream Parallel: 46ms
  37. Sum Iterated: 284ms
  38. */

CHECK值是使用 Carl Friedrich Gauss 在 1700 年代后期仍在小学时创建的公式计算出来的.

main() 的第一个版本使用直接生成 Stream 并调用 sum() 的方法。我们看到流的好处在于十亿分之一的 SZ 在没有溢出的情况下处理(我使用较小的数字,因此程序运行时间不长)。使用 parallel() 的基本范围操跟快。

如果使用iterate()来生成序列,则减速是戏剧性的,可能是因为每次生成数字时都必须调用 lambda。但是如果我们尝试并行化,那么结果通常比非并行版本花费的时间更长,但是当SZ超过一百万时,它也会耗尽内存(在某些机器上)。当然,当你可以使用range()时,你不会使用iterate(),但如果你生成的东西不是简单的序列,你必须使用iterate()。应用parallel()是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:

  • 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。
  • 阵列分割成本低廉,均匀且具有完美的分裂知识。
  • 链接列表没有这些属性;“拆分”一个链表仅仅意味着把它分成“第一元素”和“其余列表”,这相对无用。
  • 无状态生成器的行为类似于数组;使用上述范围是无可争议的。
  • 迭代生成器的行为类似于链表; iterate() 是一个迭代生成器。

现在让我们尝试通过在数组中填充值来填充数组来解决问题。因为数组只分配了一次,所以我们不太可能遇到垃圾收集时序问题。

首先我们将尝试一个充满原始long的数组:

  1. // concurrent/Summing2.java
  2. // {ExcludeFromTravisCI}import java.util.*;
  3. public class Summing2 {
  4. static long basicSum(long[] ia) {
  5. long sum = 0;
  6. int size = ia.length;
  7. for(int i = 0; i < size; i++)
  8. sum += ia[i];return sum;
  9. }
  10. // Approximate largest value of SZ before
  11. // running out of memory on mymachine:
  12. public static final int SZ = 20_000_000;
  13. public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
  14. public static void main(String[] args) {
  15. System.out.println(CHECK);
  16. long[] la = newlong[SZ+1];
  17. Arrays.parallelSetAll(la, i -> i);
  18. Summing.timeTest("Array Stream Sum", CHECK, () ->
  19. Arrays.stream(la).sum());
  20. Summing.timeTest("Parallel", CHECK, () ->
  21. Arrays.stream(la).parallel().sum());
  22. Summing.timeTest("Basic Sum", CHECK, () ->
  23. basicSum(la));// Destructive summation:
  24. Summing.timeTest("parallelPrefix", CHECK, () -> {
  25. Arrays.parallelPrefix(la, Long::sum)
  26. return la[la.length - 1];
  27. });
  28. }
  29. }
  30. /* Output:200000010000000
  31. Array Stream
  32. Sum: 104ms
  33. Parallel: 81ms
  34. Basic Sum: 106ms
  35. parallelPrefix: 265ms
  36. */

第一个限制是内存大小;因为数组是预先分配的,所以我们不能创建几乎与以前版本一样大的任何东西。并行化可以加快速度,甚至比使用 basicSum() 循环更快。有趣的是, Arrays.parallelPrefix() 似乎实际上减慢了速度。但是,这些技术中的任何一种在其他条件下都可能更有用 - 这就是为什么你不能做出任何确定性的声明,除了“你必须尝试一下”。”

最后,考虑使用盒装Long的效果:

  1. // concurrent/Summing3.java
  2. // {ExcludeFromTravisCI}
  3. import java.util.*;
  4. public class Summing3 {
  5. static long basicSum(Long[] ia) {
  6. long sum = 0;
  7. int size = ia.length;
  8. for(int i = 0; i < size; i++)
  9. sum += ia[i];
  10. return sum;
  11. }
  12. // Approximate largest value of SZ before
  13. // running out of memory on my machine:
  14. public static final int SZ = 10_000_000;
  15. public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
  16. public static void main(String[] args) {
  17. System.out.println(CHECK);
  18. Long[] aL = newLong[SZ+1];
  19. Arrays.parallelSetAll(aL, i -> (long)i);
  20. Summing.timeTest("Long Array Stream Reduce", CHECK, () ->
  21. Arrays.stream(aL).reduce(0L, Long::sum));
  22. Summing.timeTest("Long Basic Sum", CHECK, () ->
  23. basicSum(aL));
  24. // Destructive summation:
  25. Summing.timeTest("Long parallelPrefix",CHECK, ()-> {
  26. Arrays.parallelPrefix(aL, Long::sum);
  27. return aL[aL.length - 1];
  28. });
  29. }
  30. }
  31. /* Output:50000005000000
  32. Long Array
  33. Stream Reduce: 1038ms
  34. Long Basic
  35. Sum: 21ms
  36. Long parallelPrefix: 3616ms
  37. */

现在可用的内存量大约减半,并且所有情况下所需的时间都会很长,除了basicSum(),它只是循环遍历数组。令人惊讶的是, Arrays.parallelPrefix() 比任何其他方法都要花费更长的时间。

我将 parallel() 版本分开了,因为在上面的程序中运行它导致了一个冗长的垃圾收集,扭曲了结果:

  1. // concurrent/Summing4.java
  2. // {ExcludeFromTravisCI}
  3. import java.util.*;
  4. public class Summing4 {
  5. public static void main(String[] args) {
  6. System.out.println(Summing3.CHECK);
  7. Long[] aL = newLong[Summing3.SZ+1];
  8. Arrays.parallelSetAll(aL, i -> (long)i);
  9. Summing.timeTest("Long Parallel",
  10. Summing3.CHECK, () ->
  11. Arrays.stream(aL)
  12. .parallel()
  13. .reduce(0L,Long::sum));
  14. }
  15. }
  16. /* Output:50000005000000
  17. Long Parallel: 1014ms
  18. */

它比非 parallel()版本略快,但并不显着。

这种时间增加的一个重要原因是处理器内存缓存。使用Summing2.java中的原始long,数组la是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 Long parallelPrefix 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为Long生成一个超出缓存的引用。

使用Summing3.javaSumming4.javaaL是一个Long数组,它不是一个连续的数据数组,而是一个连续的Long对象引用数组。尽管该数组可能会在缓存中出现,但指向的对象几乎总是超出缓存。

这些示例使用不同的 SZ 值来显示内存限制。

为了进行时间比较,以下是 SZ 设置为最小值 1000 万的结果:

Sum Stream: 69msSum Stream Parallel: 18msSum Iterated: 277ms Array Stream Sum: 57ms Parallel: 14ms Basic Sum: 16ms parallelPrefix: 28ms Long Array Stream Reduce: 1046ms Long Basic Sum: 21ms Long parallelPrefix: 3287ms Long Parallel: 1008ms

虽然 Java 8 的各种内置“并行”工具非常棒,但我认为它们被视为神奇的灵丹妙药:“只需添加 parallel()并且它会更快!”我希望我已经开始表明情况并非所有都是如此,并且盲目地应用内置的“并行”操作有时甚至会使运行速度明显变慢。

  • parallel()/limit()交点

使用 parallel()时会有更复杂的问题。从其他语言中吸取的流是围绕无限流模型设计的。如果您拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果您使用无限流,则使用针对流优化的算法。

Java 8 将两者合并起来。例如,Collections没有内置的map()操作。在 Collection 和 Map 中唯一类似流的批处理操作是forEach()。如果要执行map()reduce()等操作,必须首先将 Collection 转换为存在这些操作的 Stream:

  1. // concurrent/CollectionIntoStream.java
  2. import onjava.*;
  3. import java.util.*;
  4. import java.util.stream.*;
  5. public class CollectionIntoStream {
  6. public static void main(String[] args) {
  7. List<String> strings = Stream.generate(new Rand.String(5))
  8. .limit(10)
  9. .collect(Collectors.toList());
  10. strings.forEach(System.out::println);
  11. // Convert to a Stream for many more options:
  12. String result = strings.stream()
  13. .map(String::toUpperCase)
  14. .map(s -> s.substring(2))
  15. .reduce(":", (s1, s2) -> s1 + s2)
  16. System.out.println(result);
  17. }
  18. }
  19. /* Output:btpen
  20. pccux
  21. szgvg
  22. meinn
  23. eeloz
  24. tdvew
  25. cippc
  26. ygpoa
  27. lkljl
  28. bynxt
  29. :PENCUXGVGINNLOZVEWPPCPOALJLNXT
  30. */

Collection确实有一些批处理操作,如removeAll()removeIf()retainAll(),但这些都是破坏性的操作.ConcurrentHashMapforEachandreduce操作有特别广泛的支持。

在许多情况下,只在集合上调用stream()或者parallelStream()没有问题。但是,有时将StreamCollection混合会产生意外。这是一个有趣的难题:

  1. // concurrent/ParallelStreamPuzzle.java
  2. import java.util.*;
  3. import java.util.function.*;
  4. import java.util.stream.*;
  5. public class ParallelStreamPuzzle {
  6. static class IntGenerator
  7. implements Supplier<Integer> {
  8. private int current = 0;
  9. public Integer get() {
  10. return current++;
  11. }
  12. }
  13. public static void main(String[] args) {
  14. List<Integer> x = Stream.generate(newIntGenerator())
  15. .limit(10)
  16. .parallel() // [1]
  17. .collect(Collectors.toList());
  18. System.out.println(x);
  19. }
  20. }
  21. /* Output:
  22. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  23. */

如果[1]注释运行它,它会产生预期的: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 每次。但是包含了 parallel(),它看起来像一个随机数生成器,带有输出(从一次运行到下一次运行不同),如: [0, 3, 6, 8, 11, 14, 17, 20, 23, 26] 这样一个简单的程序怎么会这么破碎呢?让我们考虑一下我们在这里要实现的目标:“并行生成。”“那意味着什么?一堆线程都在拉动一个生成器,在某种程度上选择一组有限的结果?代码使它看起来很简单,但它转向是一个特别凌乱的问题。

为了看到它,我们将添加一些仪器。由于我们正在处理线程,因此我们必须将任何跟踪信息捕获到并发数据结构中。在这里我使用ConcurrentLinkedDeque

  1. // concurrent/ParallelStreamPuzzle2.java
  2. import java.util.*;
  3. import java.util.function.*;
  4. import java.util.stream.*;
  5. import java.util.concurrent.*;
  6. import java.util.concurrent.atomic.*;
  7. import java.nio.file.*;
  8. public class ParallelStreamPuzzle2 {
  9. public static final Deque<String> trace =
  10. new ConcurrentLinkedDeque<>();
  11. static class
  12. IntGenerator implements Supplier<Integer> {
  13. private AtomicInteger current =
  14. new AtomicInteger();
  15. public Integerget() {
  16. trace.add(current.get() + ": " +Thread.currentThread().getName());
  17. return current.getAndIncrement();
  18. }
  19. }
  20. public static void main(String[] args) throws Exception {
  21. List<Integer> x = Stream.generate(newIntGenerator())
  22. .limit(10)
  23. .parallel()
  24. .collect(Collectors.toList());
  25. System.out.println(x);
  26. Files.write(Paths.get("PSP2.txt"), trace);
  27. }
  28. }
  29. /*
  30. Output:
  31. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  32. */

current 是使用线程安全的AtomicInteger类定义的,可以防止竞争条件;parallel()允许多个线程调用get()

在查看PSP2.txtIntGenerator.get()被调用 1024 次时,您可能会感到惊讶。

0: main 1: ForkJoinPool.commonPool-worker-1 2: ForkJoinPool.commonPool-worker-2 3: ForkJoinPool.commonPool-worker-2 4: ForkJoinPool.commonPool-worker-1 5: ForkJoinPool.commonPool-worker-1 6: ForkJoinPool.commonPool-worker-1 7: ForkJoinPool.commonPool-worker-1 8: ForkJoinPool.commonPool-worker-4 9: ForkJoinPool.commonPool-worker-4 10: ForkJoinPool.commonPool-worker-4 11: main 12: main 13: main 14: main 15: main…10 17: ForkJoinPool.commonPool-worker-110 18: ForkJoinPool.commonPool-worker-610 19: ForkJoinPool.commonPool-worker-610 20: ForkJoinPool.commonPool-worker-110 21: ForkJoinPool.commonPool-worker-110 22: ForkJoinPool.commonPool-worker-110 23: ForkJoinPool.commonPool-worker-1

这个块大小似乎是内部实现的一部分(尝试使用limit()的不同参数来查看不同的块大小)。将parallel()limit()结合使用可以预取一串值,作为流输出。

试着想象一下这里发生了什么:一个流抽象出无限序列,按需生成。当你要求它并行产生流时,你要求所有这些线程尽可能地调用 get()。添加 limit(),你说“只需要这些。”基本上,当你将 parallel()与 limit()结合使用时,你要求随机输出 - 这可能对你正在解决的问题很好。但是当你这样做时,你必须明白。这是一个仅限专家的功能,而不是要争辩说“Java 弄错了”。

什么是更合理的方法来解决问题?好吧,如果你想生成一个 int 流,你可以使用 IntStream.range(),如下所示:

  1. // concurrent/ParallelStreamPuzzle3.java
  2. // {VisuallyInspectOutput}
  3. import java.util.*;
  4. import java.util.stream.*;
  5. public class ParallelStreamPuzzle3 {
  6. public static void main(String[] args) {
  7. List<Integer> x = IntStream.range(0, 30)
  8. .peek(e -> System.out.println(e + ": " +Thread.currentThread()
  9. .getName()))
  10. .limit(10)
  11. .parallel()
  12. .boxed()
  13. .collect(Collectors.toList());
  14. System.out.println(x);
  15. }
  16. }
  17. /* Output:
  18. 8: main
  19. 6: ForkJoinPool.commonPool-worker-5
  20. 3: ForkJoinPool.commonPool-worker-7
  21. 5: ForkJoinPool.commonPool-worker-5
  22. 1: ForkJoinPool.commonPool-worker-3
  23. 2: ForkJoinPool.commonPool-worker-6
  24. 4: ForkJoinPool.commonPool-worker-1
  25. 0: ForkJoinPool.commonPool-worker-4
  26. 7: ForkJoinPool.commonPool-worker-1
  27. 9: ForkJoinPool.commonPool-worker-2
  28. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  29. */

为了表明parallel()确实有效,我添加了一个对peek()的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。

您还可以看到 boxed()的添加,它接受 int 流并将其转换为 Integer 流。

现在我们得到多个线程产生不同的值,但它只产生 10 个请求的值,而不是 1024 个产生 10 个值。

它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。当一个简单的数字序列并行生成时,有点难以想象。如果你使用昂贵的产品,它可能有意义 - 但这都是猜测。唯一知道的是通过测试。记住这句格言:“首先制作它,然后快速制作 - 但只有你必须这样做。”parallel()limit()仅供专家使用(并且要清楚,我不认为自己是这里的专家)。

  • 并行流只看起来很容易

实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如您所见,只需将parallel()打到您的 Stream 操作上并不一定是安全的事情。在使用parallel()之前,您必须了解并行性如何帮助或损害您的操作。有个错误认识是认为并行性总是一个好主意。事实上并不是。Stream 意味着您不需要重写所有代码以便并行运行它。流什么都不做的是取代理解并行性如何工作的需要,以及它是否有助于实现您的目标。

创建和运行任务

如果无法通过并行流实现并发,则必须创建并运行自己的任务。稍后您将看到运行任务的理想 Java 8 方法是 CompletableFuture,但我们将使用更基本的工具介绍概念。

Java 并发的历史始于非常原始和有问题的机制,并且充满了各种尝试的改进。这些主要归入附录:低级并发(Appendix: Low-Level Concurrency)。在这里,我们将展示一个规范形式,表示创建和运行任务的最简单,最好的方法。与并发中的所有内容一样,存在各种变体,但这些变体要么降级到该附录,要么超出本书的范围。

  • Tasks and Executors

在 Java 的早期版本中,您通过直接创建自己的 Thread 对象来使用线程,甚至将它们子类化以创建您自己的特定“任务线程”对象。你手动调用了构造函数并自己启动了线程。

创建所有这些线程的开销变得非常重要,现在不鼓励采用实际操作方法。在 Java 5 中,添加了类来为您处理线程池。您可以将任务创建为单独的类型,然后将其交给 ExecutorService 以运行该任务,而不是为每种不同类型的任务创建新的 Thread 子类型。ExecutorService 为您管理线程,并且在运行任务后重新循环线程而不是丢弃线程。

首先,我们将创建一个几乎不执行任务的任务。它“sleep”(暂停执行)100 毫秒,显示其标识符和正在执行任务的线程的名称,然后完成:

  1. // concurrent/NapTask.java
  2. import onjava.Nap;
  3. public class NapTask implements Runnable {
  4. finalint id;
  5. public NapTask(int id) {
  6. this.id = id;
  7. }
  8. @Override
  9. public void run() {
  10. new Nap(0.1);// Seconds
  11. System.out.println(this + " "+
  12. Thread.currentThread().getName());
  13. }
  14. @Override
  15. public String toString() {
  16. return"NapTask[" + id + "]";
  17. }
  18. }

这只是一个Runnable:一个包含run()方法的类。它没有包含实际运行任务的机制。我们使用Nap类中的“sleep”:

  1. // onjava/Nap.java
  2. package onjava;
  3. import java.util.concurrent.*;
  4. public class Nap {
  5. public Nap(double t) { // Seconds
  6. try {
  7. TimeUnit.MILLISECONDS.sleep((int)(1000 * t));
  8. } catch(InterruptedException e){
  9. throw new RuntimeException(e);
  10. }
  11. }
  12. public Nap(double t, String msg) {
  13. this(t);
  14. System.out.println(msg);
  15. }
  16. }

为了消除异常处理的视觉噪声,这被定义为实用程序。第二个构造函数在超时时显示一条消息

TimeUnit.MILLISECONDS.sleep()的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在您的计算机上运行另一个窗口。OS 任务管理器定期检查sleep()是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。

你可以看到sleep()抛出一个已检查的InterruptedException;这是原始 Java 设计中的一个工件,它通过突然断开它们来终止任务。因为它往往会产生不稳定的状态,所以后来不鼓励终止。但是,我们必须在需要或仍然发生终止的情况下捕获异常。

要执行任务,我们将从最简单的方法—SingleThreadExecutor 开始:

  1. / concurrent/SingleThreadExecutor.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. import onjava.*;
  5. public class SingleThreadExecutor {
  6. public static void main(String[] args) {
  7. ExecutorService exec =
  8. Executors.newSingleThreadExecutor();
  9. IntStream.range(0, 10)
  10. .mapToObj(NapTask::new)
  11. .forEach(exec::execute);
  12. System.out.println("All tasks submitted");
  13. exec.shutdown();
  14. while(!exec.isTerminated()) {
  15. System.out.println(
  16. Thread.currentThread().getName()+
  17. " awaiting termination");
  18. new Nap(0.1);
  19. }
  20. }
  21. }
  22. /* Output:
  23. All tasks submitted
  24. main awaiting termination
  25. main awaiting termination
  26. NapTask[0] pool-1-thread-1
  27. main awaiting termination
  28. NapTask[1] pool-1-thread-1
  29. main awaiting termination
  30. NapTask[2] pool-1-thread-1
  31. main awaiting termination
  32. NapTask[3] pool-1-thread-1
  33. main awaiting termination
  34. NapTask[4] pool-1-thread-1
  35. main awaiting termination
  36. NapTask[5] pool-1-thread-1
  37. main awaiting termination
  38. NapTask[6] pool-1-thread-1
  39. main awaiting termination
  40. NapTask[7] pool-1-thread-1
  41. main awaiting termination
  42. NapTask[8] pool-1-thread-1
  43. main awaiting termination
  44. NapTask[9] pool-1-thread-1
  45. */

首先请注意,没有SingleThreadExecutor类。newSingleThreadExecutor()Executors中的工厂,它创建特定类型的^4

我创建了十个 NapTasks 并将它们提交给 ExecutorService,这意味着它们开始自己运行。然而,在此期间,main()继续做事。当我运行 callexec.shutdown()时,它告诉 ExecutorService 完成已经提交的任务,但不接受任何新任务。此时,这些任务仍然在运行,因此我们必须等到它们在退出 main()之前完成。这是通过检查 exec.isTerminated()来实现的,这在所有任务完成后变为 true。

请注意,main()中线程的名称是 main,并且只有一个其他线程 pool-1-thread-1。此外,交错输出显示两个线程确实同时运行。

如果你只是调用 exec.shutdown(),程序将完成所有任务。也就是说,虽然不需要(!exec.isTerminated())。

  1. // concurrent/SingleThreadExecutor2.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class SingleThreadExecutor2 {
  5. public static void main(String[] args)throws InterruptedException {
  6. ExecutorService exec
  7. =Executors.newSingleThreadExecutor();
  8. IntStream.range(0, 10)
  9. .mapToObj(NapTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. /* Output:
  15. NapTask[0] pool-1-thread-1
  16. NapTask[1] pool-1-thread-1
  17. NapTask[2] pool-1-thread-1
  18. NapTask[3] pool-1-thread-1
  19. NapTask[4] pool-1-thread-1
  20. NapTask[5] pool-1-thread-1
  21. NapTask[6] pool-1-thread-1
  22. NapTask[7] pool-1-thread-1
  23. NapTask[8] pool-1-thread-1
  24. NapTask[9] pool-1-thread-1
  25. */

一旦你 callexec.shutdown(),尝试提交新任务将抛出 RejectedExecutionException。

  1. // concurrent/MoreTasksAfterShutdown.java
  2. import java.util.concurrent.*;
  3. public class MoreTasksAfterShutdown {
  4. public static void main(String[] args) {
  5. ExecutorService exec
  6. =Executors.newSingleThreadExecutor();
  7. exec.execute(newNapTask(1));
  8. exec.shutdown();
  9. try {
  10. exec.execute(newNapTask(99));
  11. } catch(RejectedExecutionException e) {
  12. System.out.println(e);
  13. }
  14. }
  15. }
  16. /* Output:
  17. java.util.concurrent.RejectedExecutionException: TaskNapTask[99] rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Shutting down, pool size = 1,active threads = 1, queued tasks = 0, completed tasks =0]NapTask[1] pool-1-thread-1
  18. */

exec.shutdown()的替代方法是exec.shutdownNow(),它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。

  • 使用更多线程

使用线程的重点是(几乎总是)更快地完成任务,那么我们为什么要限制自己使用 SingleThreadExecutor 呢?查看执行Executors的 Javadoc,您将看到更多选项。例如 CachedThreadPool:

  1. // concurrent/CachedThreadPool.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class CachedThreadPool {
  5. public static void main(String[] args) {
  6. ExecutorService exec
  7. =Executors.newCachedThreadPool();
  8. IntStream.range(0, 10)
  9. .mapToObj(NapTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. /* Output:
  15. NapTask[7] pool-1-thread-8
  16. NapTask[4] pool-1-thread-5
  17. NapTask[1] pool-1-thread-2
  18. NapTask[3] pool-1-thread-4
  19. NapTask[0] pool-1-thread-1
  20. NapTask[8] pool-1-thread-9
  21. NapTask[2] pool-1-thread-3
  22. NapTask[9] pool-1-thread-10
  23. NapTask[6] pool-1-thread-7
  24. NapTask[5] pool-1-thread-6
  25. */

当你运行这个程序时,你会发现它完成得更快。这是有道理的,而不是使用相同的线程来顺序运行每个任务,每个任务都有自己的线程,所以它们都并行运行。似乎没有缺点,很难看出为什么有人会使用 SingleThreadExecutor。

要理解这个问题,我们需要一个更复杂的任务:

  1. // concurrent/InterferingTask.java
  2. public class InterferingTask implements Runnable {
  3. final int id;
  4. private static Integer val = 0;
  5. public InterferingTask(int id) {
  6. this.id = id;
  7. }
  8. @Override
  9. public void run() {
  10. for(int i = 0; i < 100; i++)
  11. val++;
  12. System.out.println(id + " "+
  13. Thread.currentThread().getName() + " " + val);
  14. }
  15. }

每个任务增加 val 一百次。这似乎很简单。让我们用 CachedThreadPool 尝试一下:

  1. // concurrent/CachedThreadPool2.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class CachedThreadPool2 {
  5. public static void main(String[] args) {
  6. ExecutorService exec
  7. =Executors.newCachedThreadPool();
  8. IntStream.range(0, 10)
  9. .mapToObj(InterferingTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. /* Output:
  15. 0 pool-1-thread-1 200
  16. 1 pool-1-thread-2 200
  17. 4 pool-1-thread-5 300
  18. 5 pool-1-thread-6 400
  19. 8 pool-1-thread-9 500
  20. 9 pool-1-thread-10 600
  21. 2 pool-1-thread-3 700
  22. 7 pool-1-thread-8 800
  23. 3 pool-1-thread-4 900
  24. 6 pool-1-thread-7 1000
  25. */

输出不是我们所期望的,并且从一次运行到下一次运行会有所不同。问题是所有的任务都试图写入 val 的单个实例,并且他们正在踩着彼此的脚趾。我们说这样的类不是线程安全的。让我们看看 SingleThreadExecutor 会发生什么:

  1. // concurrent/SingleThreadExecutor3.java
  2. import java.util.concurrent.*;
  3. import java.util.stream.*;
  4. public class SingleThreadExecutor3 {
  5. public static void main(String[] args)throws InterruptedException {
  6. ExecutorService exec
  7. =Executors.newSingleThreadExecutor();
  8. IntStream.range(0, 10)
  9. .mapToObj(InterferingTask::new)
  10. .forEach(exec::execute);
  11. exec.shutdown();
  12. }
  13. }
  14. /* Output:
  15. 0 pool-1-thread-1 100
  16. 1 pool-1-thread-1 200
  17. 2 pool-1-thread-1 300
  18. 3 pool-1-thread-1 400
  19. 4 pool-1-thread-1 500
  20. 5 pool-1-thread-1 600
  21. 6 pool-1-thread-1 700
  22. 7 pool-1-thread-1 800
  23. 8 pool-1-thread-1 900
  24. 9 pool-1-thread-1 1000
  25. */

现在我们每次都得到一致的结果,尽管InterferingTask缺乏线程安全性。这是 SingleThreadExecutor 的主要好处 - 因为它一次运行一个任务,这些任务不会相互干扰,因此强加了线程安全性。这种现象称为线程限制,因为在单线程上运行任务限制了它们的影响。线程限制限制了加速,但可以节省很多困难的调试和重写。

  • 产生结果

因为InterferingTask是一个Runnable,它没有返回值,因此只能使用副作用产生结果 - 操纵缓冲值而不是返回结果。副作用是并发编程中的主要问题之一,因为我们看到了CachedThreadPool2.javaInterferingTask中的val被称为可变共享状态,这就是问题所在:多个任务同时修改同一个变量会产生竞争。结果取决于首先在终点线上执行哪个任务,并修改变量(以及其他可能性的各种变化)。

避免竞争条件的最好方法是避免可变的共享状态。我们可以称之为自私的孩子原则:什么都不分享。

使用InterferingTask,最好删除副作用并返回任务结果。为此,我们创建Callable而不是Runnable

  1. // concurrent/CountingTask.java
  2. import java.util.concurrent.*;
  3. public class CountingTask implements Callable<Integer> {
  4. final int id;
  5. public CountingTask(int id) { this.id = id; }
  6. @Override
  7. public Integer call() {
  8. Integer val = 0;
  9. for(int i = 0; i < 100; i++)
  10. val++;
  11. System.out.println(id + " " +
  12. Thread.currentThread().getName() + " " + val);
  13. return val;
  14. }
  15. }

call()完全独立于所有其他 CountingTasks 生成其结果,这意味着没有可变的共享状态

ExecutorService允许您使用invokeAll()启动集合中的每个 Callable:

  1. // concurrent/CachedThreadPool3.java
  2. import java.util.*;
  3. import java.util.concurrent.*;
  4. import java.util.stream.*;
  5. public class CachedThreadPool3 {
  6. public static Integer extractResult(Future<Integer> f) {
  7. try {
  8. return f.get();
  9. } catch(Exception e) {
  10. throw new RuntimeException(e);
  11. }
  12. }
  13. public static void main(String[] args)throws InterruptedException {
  14. ExecutorService exec =
  15. Executors.newCachedThreadPool();
  16. List<CountingTask> tasks =
  17. IntStream.range(0, 10)
  18. .mapToObj(CountingTask::new)
  19. .collect(Collectors.toList());
  20. List<Future<Integer>> futures =
  21. exec.invokeAll(tasks);
  22. Integer sum = futures.stream()
  23. .map(CachedThreadPool3::extractResult)
  24. .reduce(0, Integer::sum);
  25. System.out.println("sum = " + sum);
  26. exec.shutdown();
  27. }
  28. }
  29. /* Output:
  30. 1 pool-1-thread-2 100
  31. 0 pool-1-thread-1 100
  32. 4 pool-1-thread-5 100
  33. 5 pool-1-thread-6 100
  34. 8 pool-1-thread-9 100
  35. 9 pool-1-thread-10 100
  36. 2 pool-1-thread-3 100
  37. 3 pool-1-thread-4 100
  38. 6 pool-1-thread-7 100
  39. 7 pool-1-thread-8 100
  40. sum = 1000
  41. */

只有在所有任务完成后,invokeAll()才会返回一个Future列表,每个任务一个FutureFuture是 Java 5 中引入的机制,允许您提交任务而无需等待它完成。在这里,我们使用ExecutorService.submit():

  1. // concurrent/Futures.java
  2. import java.util.*;
  3. import java.util.concurrent.*;
  4. import java.util.stream.*;
  5. public class Futures {
  6. public static void main(String[] args)throws InterruptedException, ExecutionException {
  7. ExecutorService exec
  8. =Executors.newSingleThreadExecutor();
  9. Future<Integer> f =
  10. exec.submit(newCountingTask(99));
  11. System.out.println(f.get()); // [1]
  12. exec.shutdown();
  13. }
  14. }
  15. /* Output:
  16. 99 pool-1-thread-1 100
  17. 100
  18. */
  • [1] 当你的任务尚未完成的Future上调用get()时,调用会阻塞(等待)直到结果可用。

但这意味着,在CachedThreadPool3.java中,Future似乎是多余的,因为invokeAll()甚至在所有任务完成之前都不会返回。但是,这里的 Future 并不用于延迟结果,而是用于捕获任何可能发生的异常。

还要注意在CachedThreadPool3.java.get()中抛出异常,因此extractResult()在 Stream 中执行此提取。

因为当你调用get()时,Future会阻塞,所以它只能解决等待任务完成的问题。最终,Futures被认为是一种无效的解决方案,现在不鼓励,支持 Java 8 的CompletableFuture,我们将在本章后面探讨。当然,您仍会在遗留库中遇到 Futures

我们可以使用并行 Stream 以更简单,更优雅的方式解决这个问题:

  1. // concurrent/CountingStream.java
  2. // {VisuallyInspectOutput}
  3. import java.util.*;
  4. import java.util.concurrent.*;
  5. import java.util.stream.*;
  6. public class CountingStream {
  7. public static void main(String[] args) {
  8. System.out.println(
  9. IntStream.range(0, 10)
  10. .parallel()
  11. .mapToObj(CountingTask::new)
  12. .map(ct -> ct.call())
  13. .reduce(0, Integer::sum));
  14. }
  15. }
  16. /* Output:
  17. 1 ForkJoinPool.commonPool-worker-3 100
  18. 8 ForkJoinPool.commonPool-worker-2 100
  19. 0 ForkJoinPool.commonPool-worker-6 100
  20. 2 ForkJoinPool.commonPool-worker-1 100
  21. 4 ForkJoinPool.commonPool-worker-5 100
  22. 9 ForkJoinPool.commonPool-worker-7 100
  23. 6 main 100
  24. 7 ForkJoinPool.commonPool-worker-4 100
  25. 5 ForkJoinPool.commonPool-worker-2 100
  26. 3 ForkJoinPool.commonPool-worker-3 100
  27. 1000
  28. */

这不仅更容易理解,我们需要做的就是将parallel()插入到其他顺序操作中,然后一切都在同时运行。

  • Lambda 和方法引用作为任务

使用 lambdas 和方法引用,您不仅限于使用RunnablesCallables。因为 Java 8 通过匹配签名来支持 lambda 和方法引用(即,它支持结构一致性),所以我们可以将 notRunnables 或 Callables 的参数传递给 ExecutorService:

使用 lambdas 和方法引用,您不仅限于使用RunnablesCallables。因为 Java 8 通过匹配签名来支持 lambda 和方法引用(即,它支持结构一致性),所以我们可以将不是RunnablesCallables的参数传递给ExecutorService

  1. // concurrent/LambdasAndMethodReferences.java
  2. import java.util.concurrent.*;
  3. class NotRunnable {
  4. public void go() {
  5. System.out.println("NotRunnable");
  6. }
  7. }
  8. class NotCallable {
  9. public Integer get() {
  10. System.out.println("NotCallable");
  11. return 1;
  12. }
  13. }
  14. public class LambdasAndMethodReferences {
  15. public static void main(String[] args)throws InterruptedException {
  16. ExecutorService exec =
  17. Executors.newCachedThreadPool();
  18. exec.submit(() -> System.out.println("Lambda1"));
  19. exec.submit(newNotRunnable()::go);
  20. exec.submit(() -> {
  21. System.out.println("Lambda2");
  22. return 1;
  23. });
  24. exec.submit(newNotCallable()::get);
  25. exec.shutdown();
  26. }
  27. }
  28. /* Output:
  29. Lambda1
  30. NotCallable
  31. NotRunnable
  32. Lambda2
  33. */

这里,前两个submit()调用可以改为调用execute()。所有submit()调用都返回Futures,您可以在后两次调用的情况下提取结果。

终止耗时任务

并发程序通常使用长时间运行的任务。可调用任务在完成时返回值;虽然这给它一个有限的寿命,但仍然可能很长。可运行的任务有时被设置为永远运行的后台进程。您经常需要一种方法在正常完成之前停止RunnableCallable任务,例如当您关闭程序时。

最初的 Java 设计提供了中断运行任务的机制(为了向后兼容,仍然存在);中断机制包括阻塞问题。中断任务既乱又复杂,因为您必须了解可能发生中断的所有可能状态,以及可能导致的数据丢失。使用中断被视为反对模式,但我们仍然被迫接受。

InterruptedException,因为设计的向后兼容性残留。

任务终止的最佳方法是设置任务周期性检查的标志。然后任务可以通过自己的 shutdown 进程并正常终止。不是在任务中随机关闭线程,而是要求任务在到达了一个较好时自行终止。这总是产生比中断更好的结果,以及更容易理解的更合理的代码。

以这种方式终止任务听起来很简单:设置任务可以看到的boolean flag。编写任务,以便定期检查标志并执行正常终止。这实际上就是你所做的,但是有一个复杂的问题:我们的旧克星,共同的可变状态。如果该标志可以被另一个任务操纵,则存在碰撞可能性。

在研究 Java 文献时,你会发现很多解决这个问题的方法,经常使用volatile关键字。我们将使用更简单的技术并避免所有易变的参数,这些都在附录:低级并发中有所涉及。

Java 5 引入了Atomic类,它提供了一组可以使用的类型,而不必担心并发问题。我们将添加AtomicBoolean标志,告诉任务清理自己并退出。

  1. // concurrent/QuittableTask.java
  2. import java.util.concurrent.atomic.AtomicBoolean;import onjava.Nap;
  3. public class QuittableTask implements Runnable {
  4. final int id;
  5. public QuittableTask(int id) {
  6. this.id = id;
  7. }
  8. private AtomicBoolean running =
  9. new AtomicBoolean(true);
  10. public void quit() {
  11. running.set(false);
  12. }
  13. @Override
  14. public void run() {
  15. while(running.get()) // [1]
  16. new Nap(0.1);
  17. System.out.print(id + " "); // [2]
  18. }
  19. }

虽然多个任务可以在同一个实例上成功调用quit(),但是AtomicBoolean可以防止多个任务同时实际修改running,从而使quit()方法成为线程安全的。

  • [1]:只要运行标志为 true,此任务的 run()方法将继续。
  • [2]: 显示仅在任务退出时发生。

需要running AtomicBoolean证明编写 Java program 并发时最基本的困难之一是,如果running是一个普通的布尔值,你可能无法在执行程序中看到问题。实际上,在这个例子中,你可能永远不会有任何问题 - 但是代码仍然是不安全的。编写表明该问题的测试可能很困难或不可能。因此,您没有任何反馈来告诉您已经做错了。通常,您编写线程安全代码的唯一方法就是通过了解事情可能出错的所有细微之处。

作为测试,我们将启动很多 QuittableTasks 然后关闭它们。尝试使用较大的 COUNT 值

  1. // concurrent/QuittingTasks.java
  2. import java.util.*;
  3. import java.util.stream.*;
  4. import java.util.concurrent.*;
  5. import onjava.Nap;
  6. public class QuittingTasks {
  7. public static final int COUNT = 150;
  8. public static void main(String[] args) {
  9. ExecutorService es =
  10. Executors.newCachedThreadPool();
  11. List<QuittableTask> tasks =
  12. IntStream.range(1, COUNT)
  13. .mapToObj(QuittableTask::new)
  14. .peek(qt -> es.execute(qt))
  15. .collect(Collectors.toList());
  16. new Nap(1);
  17. tasks.forEach(QuittableTask::quit); es.shutdown();
  18. }
  19. }
  20. /* Output:24 27 31 8 11 7 19 12 16 4 23 3 28 32 15 20 63 60 68 6764 39 47 52 51 55 40 43 48 59 44 56 36 35 71 72 83 10396 92 88 99 100 87 91 79 75 84 76 115 108 112 104 107111 95 80 147 120 127 119 123 144 143 116 132 124 128
  21. 136 131 135 139 148 140 2 126 6 5 1 18 129 17 14 13 2122 9 10 30 33 58 37 125 26 34 133 145 78 137 141 138 6274 142 86 65 73 146 70 42 149 121 110 134 105 82 117106 113 122 45 114 118 38 50 29 90 101 89 57 53 94 4161 66 130 69 77 81 85 93 25 102 54 109 98 49 46 97
  22. */

我使用peek()QuittableTasks传递给ExecutorService,然后将这些任务收集到List.main()中,只要任何任务仍在运行,就会阻止程序退出。即使为每个任务按顺序调用 quit()方法,任务也不会按照它们创建的顺序关闭。独立运行的任务不会确定性地响应信号。

CompletableFuture 类

作为介绍,这里是使用 CompletableFutures 在 QuittingTasks.java 中:

  1. // concurrent/QuittingCompletable.java
  2. import java.util.*;
  3. import java.util.stream.*;
  4. import java.util.concurrent.*;
  5. import onjava.Nap;
  6. public class QuittingCompletable {
  7. public static void main(String[] args) {
  8. List<QuittableTask> tasks =
  9. IntStream.range(1, QuittingTasks.COUNT)
  10. .mapToObj(QuittableTask::new)
  11. .collect(Collectors.toList());
  12. List<CompletableFuture<Void>> cfutures =
  13. tasks.stream()
  14. .map(CompletableFuture::runAsync)
  15. .collect(Collectors.toList());
  16. new Nap(1);
  17. tasks.forEach(QuittableTask::quit);
  18. cfutures.forEach(CompletableFuture::join);
  19. }
  20. }
  21. /* Output:7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2526 27 28 29 30 31 32 33 34 6 35 4 38 39 40 41 42 43 4445 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 6263 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 8081 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 9899 100 101 102 103 104 105 106 107 108 109 110 111 1121 113 114 116 117 118 119 120 121 122 123 124 125 126127 128 129 130 131 132 133 134 135 136 137 138 139 140141 142 143 144 145 146 147 148 149 5 115 37 36 2 3*/

任务是一个List ,就像在QuittingTasks.java中一样,但是在这个例子中,没有peek()将每个QuittableTask提交给ExecutorService。相反,在创建 cfutures 期间,每个任务都交给CompletableFuture::runAsync。这执行VerifyTask.run()并返回CompletableFuture 。因为run()不返回任何内容,所以在这种情况下我只使用CompletableFuture调用join()来等待它完成。

在此示例中需要注意的重要事项是,运行任务不需要ExecutorService。这由CompletableFuture管理(尽管有提供自己的ExecutorService的选项)。你也不需要调用shutdown();事实上,除非你像我这样明确地调用join(),程序将尽快退出,而不必等待任务完成。

这个例子只是一个起点。你很快就会看到 ComplempleFutures 能够做得更多。

基本用法

这是一个带有静态方法work()的类,它对该类的对象执行某些工作:

  1. // concurrent/Machina.java
  2. import onjava.Nap;
  3. public class Machina {
  4. public enum State {
  5. START, ONE, TWO, THREE, END;
  6. State step() {
  7. if(equals(END))
  8. return END;
  9. return values()[ordinal() + 1];
  10. }
  11. }
  12. private State state = State.START;
  13. private final int id;
  14. public Machina(int id) {
  15. this.id = id;
  16. }
  17. public static Machina work(Machina m) {
  18. if(!m.state.equals(State.END)){
  19. new Nap(0.1);
  20. m.state = m.state.step();
  21. }
  22. System.out.println(m);return m;
  23. }
  24. @Override
  25. public StringtoString() {
  26. return"Machina" + id + ": " + (state.equals(State.END)? "complete" : state);
  27. }
  28. }

这是一个有限状态机,一个微不足道的机器,因为它没有分支……它只是从头到尾遍历一条路径。work()方法将机器从一个状态移动到下一个状态,并且需要 100 毫秒才能完成“工作”。

我们可以用CompletableFuture做的一件事是使用completedFuture()将它包装在感兴趣的对象中

  1. // concurrent/CompletedMachina.java
  2. import java.util.concurrent.*;
  3. public class CompletedMachina {
  4. public static void main(String[] args) {
  5. CompletableFuture<Machina> cf =
  6. CompletableFuture.completedFuture(
  7. new Machina(0));
  8. try {
  9. Machina m = cf.get(); // Doesn't block
  10. } catch(InterruptedException |
  11. ExecutionException e) {
  12. throw new RuntimeException(e);
  13. }
  14. }
  15. }

completedFuture()创建一个“已经完成”的CompletableFuture。对这样一个未来做的唯一有用的事情是get()里面的对象,所以这看起来似乎没有用。注意CompletableFuture被输入到它包含的对象。这个很重要。

通常,get()在等待结果时阻塞调用线程。此块可以通过InterruptedExceptionExecutionException中断。在这种情况下,阻止永远不会发生,因为 CompletableFutureis 已经完成,所以答案立即可用。

当我们将 Machina 包装在 CompletableFuture 中时,我们发现我们可以在 CompletableFuture 上添加操作来处理所包含的对象,事情变得更加有趣:

  1. // concurrent/CompletableApply.java
  2. import java.util.concurrent.*;
  3. public class CompletableApply {
  4. public static void main(String[] args) {
  5. CompletableFuture<Machina> cf =
  6. CompletableFuture.completedFuture(
  7. new Machina(0));
  8. CompletableFuture<Machina> cf2 =
  9. cf.thenApply(Machina::work);
  10. CompletableFuture<Machina> cf3 =
  11. cf2.thenApply(Machina::work);
  12. CompletableFuture<Machina> cf4 =
  13. cf3.thenApply(Machina::work);
  14. CompletableFuture<Machina> cf5 =
  15. cf4.thenApply(Machina::work);
  16. }
  17. }
  18. /* Output:
  19. Machina0: ONE
  20. Machina0: TWO
  21. Machina0: THREE
  22. Machina0: complete
  23. */

thenApply()应用一个接受输入并产生输出的函数。在这种情况下,work()函数产生与它相同的类型,因此每个得到的CompletableFuture仍然被输入为Machina,但是(类似于Streams中的map()Function也可以返回不同的类型,这将反映在返回类型

您可以在此处看到有关 CompletableFutures 的重要信息:它们会在您执行操作时自动解包并重新包装它们所携带的对象。这样你就不会陷入麻烦的细节,这使得编写和理解代码变得更加简单。

我们可以消除中间变量并将操作链接在一起,就像我们使用 Streams 一样:

  1. // concurrent/CompletableApplyChained.javaimport java.util.concurrent.*;
  2. import onjava.Timer;
  3. public class CompletableApplyChained {
  4. public static void main(String[] args) {
  5. Timer timer = new Timer();
  6. CompletableFuture<Machina> cf =
  7. CompletableFuture.completedFuture(
  8. new Machina(0))
  9. .thenApply(Machina::work)
  10. .thenApply(Machina::work)
  11. .thenApply(Machina::work)
  12. .thenApply(Machina::work);
  13. System.out.println(timer.duration());
  14. }
  15. }
  16. /* Output:
  17. Machina0: ONE
  18. Machina0: TWO
  19. Machina0: THREE
  20. Machina0: complete
  21. 514
  22. */

在这里,我们还添加了一个Timer,它向我们展示每一步增加 100 毫秒,还有一些额外的开销。 CompletableFutures的一个重要好处是它们鼓励使用私有子类原则(不分享任何东西)。默认情况下,使用thenApply()来应用一个不与任何人通信的函数 - 它只需要一个参数并返回一个结果。这是函数式编程的基础,并且它在并发性方面非常有效[^5]。并行流和 ComplempleFutures 旨在支持这些原则。只要您不决定共享数据(共享非常容易,甚至意外)您可以编写相对安全的并发程序。

回调thenApply()开始一个操作,在这种情况下,在完成所有任务之前,不会完成e CompletableFuture的创建。虽然这有时很有用,但是启动所有任务通常更有价值,这样就可以运行时继续前进并执行其他操作。我们通过在操作结束时添加 Async 来实现此目的:

  1. // concurrent/CompletableApplyAsync.java
  2. import java.util.concurrent.*;
  3. import onjava.*;
  4. public class CompletableApplyAsync {
  5. public static void main(String[] args) {
  6. Timer timer = new Timer();
  7. CompletableFuture<Machina> cf =
  8. CompletableFuture.completedFuture(
  9. new Machina(0))
  10. .thenApplyAsync(Machina::work)
  11. .thenApplyAsync(Machina::work)
  12. .thenApplyAsync(Machina::work)
  13. .thenApplyAsync(Machina::work);
  14. System.out.println(timer.duration());
  15. System.out.println(cf.join());
  16. System.out.println(timer.duration())
  17. }
  18. }
  19. /* Output:
  20. 116
  21. Machina0: ONE
  22. Machina0: TWO
  23. Machina0:THREE
  24. Machina0: complete
  25. Machina0: complete
  26. 552
  27. */

同步调用(我们通常使用得那种)意味着“当你完成工作时,返回”,而异步调用以意味着“立刻返回但是继续后台工作。”正如你所看到的,cf的创建现在发生得跟快。每次调用 thenApplyAsync() 都会立刻返回,因此可以进行下一次调用,整个链接序列的完成速度比以前快得快。

事实上,如果没有回调cf.join() t方法,程序会在完成其工作之前退出(尝试取出该行)对join()阻止了 main()进程的进行,直到 cf 操作完成,我们可以看到大部分时间的确在哪里度过。

这种“立即返回”的异步能力需要CompletableFuture库进行一些秘密工作。特别是,它必须将您需要的操作链存储为一组回调。当第一个后台操作完成并返回时,第二个后台操作必须获取生成的Machina并开始工作,当完成后,下一个操作将接管,等等。但是没有我们普通的函数调用序列,通过程序调用栈控制,这个顺序会丢失,所以它使用回调 - 一个函数地址表来存储。

幸运的是,您需要了解有关回调的所有信息。程序员将你手工造成的混乱称为“回调地狱”。通过异步调用,CompletableFuture 为您管理所有回调。除非你知道关于你的系统有什么特定的改变,否则你可能想要使用异步调用。

  • 其他操作 当您查看 CompletableFuture 的 Javadoc 时,您会看到它有很多方法,但这个方法的大部分来自不同操作的变体。例如,有 thenApply(),thenApplyAsync()和 thenApplyAsync()的第二种形式,它接受运行任务的 Executor(在本书中我们忽略了 Executor 选项)。

这是一个显示所有“基本”操作的示例,它们不涉及组合两个 CompletableFutures 或异常(我们将在稍后查看)。首先,我们将重复使用两个实用程序以提供简洁和方便:

  1. // concurrent/CompletableUtilities.java
  2. package onjava; import java.util.concurrent.*;
  3. public class CompletableUtilities {
  4. // Get and show value stored in a CF:
  5. public static void showr(CompletableFuture<?> c) {
  6. try {
  7. System.out.println(c.get());
  8. } catch(InterruptedException
  9. | ExecutionException e) {
  10. throw new RuntimeException(e);
  11. }
  12. }
  13. // For CF operations that have no value:
  14. public static void voidr(CompletableFuture<Void> c) {
  15. try {
  16. c.get(); // Returns void
  17. } catch(InterruptedException
  18. | ExecutionException e) {
  19. throw new RuntimeException(e);
  20. }
  21. }
  22. }

showr()在 CompletableFuture 上调用 get()并显示结果,捕获两个可能的异常。voidr()是 CompletableFuture 的 showr()版本,即 CompletableFutures,仅在任务完成或失败时显示。

为简单起见,以下 CompletableFutures 只包装整数。cfi()是一个方便的方法,它在完成的 CompletableFuture 中包装一个 int:

  1. // concurrent/CompletableOperations.java
  2. import java.util.concurrent.*;
  3. import static onjava.CompletableUtilities.*;
  4. public class CompletableOperations {
  5. static CompletableFuture<Integer> cfi(int i) {
  6. return CompletableFuture.completedFuture( Integer.valueOf(i));
  7. }
  8. public static void main(String[] args) {
  9. showr(cfi(1)); // Basic test
  10. voidr(cfi(2).runAsync(() ->
  11. System.out.println("runAsync")));
  12. voidr(cfi(3).thenRunAsync(() ->
  13. System.out.println("thenRunAsync")));
  14. voidr(CompletableFuture.runAsync(() ->
  15. System.out.println("runAsync is static")));
  16. showr(CompletableFuture.supplyAsync(() -> 99));
  17. voidr(cfi(4).thenAcceptAsync(i ->
  18. System.out.println("thenAcceptAsync: " + i)));
  19. showr(cfi(5).thenApplyAsync(i -> i + 42));
  20. showr(cfi(6).thenComposeAsync(i -> cfi(i + 99)));
  21. CompletableFuture<Integer> c = cfi(7);
  22. c.obtrudeValue(111);
  23. showr(c);
  24. showr(cfi(8).toCompletableFuture());
  25. c = new CompletableFuture<>();
  26. c.complete(9);
  27. showr(c);
  28. c = new CompletableFuture<>();
  29. c.cancel(true);
  30. System.out.println("cancelled: " + c.isCancelled());
  31. System.out.println("completed exceptionally: " +
  32. c.isCompletedExceptionally());
  33. System.out.println("done: " + c.isDone());
  34. System.out.println(c);
  35. c = new CompletableFuture<>();
  36. System.out.println(c.getNow(777));
  37. c = new CompletableFuture<>();
  38. c.thenApplyAsync(i -> i + 42)
  39. .thenApplyAsync(i -> i * 12);
  40. System.out.println("dependents: " + c.getNumberOfDependents());
  41. c.thenApplyAsync(i -> i / 2);
  42. System.out.println("dependents: " + c.getNumberOfDependents());
  43. }
  44. }
  45. /* Output:
  46. 1
  47. runAsync
  48. thenRunAsync
  49. runAsync is static
  50. 99
  51. thenAcceptAsync: 4
  52. 47
  53. 105
  54. 111
  55. 8
  56. 9
  57. cancelled: true
  58. completed exceptionally: true
  59. done: true
  60. java.util.concurrent.CompletableFuture@6d311334[Complet ed exceptionally]
  61. 777
  62. dependents: 1
  63. dependents: 2
  64. */

main()包含一系列可由其 int 值引用的测试。cfi(1)演示了 showr()正常工作。cfi(2)是调用 runAsync()的示例。由于 Runnable 不产生返回值,因此结果是 CompletableFuture ,因此使用 voidr()。

死锁

构造函数非线程安全

复杂性和代价

本章小结

[^1]: 例如,Eric-Raymond 在“VIIX 编程艺术”(Addison-Wesley,2004)中提出了一个很好的案例。

[^3]: 有人谈论在 Java——10 中围绕泛型做一些类似的基本改进,这将是非常令人难以置信的。

[^5]: 不,永远不会有纯粹的功能性 Java。我们所能期望的最好的是一种在 JVM 上运行的全新语言。