你已经看到了我关于 Rust 中 async/await 的系列文章的结尾。

(开始结束的部分)

在过去的文章中,我们提出了一系列问题。

这些问题的答案帮助我们理解了 async/await 的工作原理。

我们通过手动实现 futures 来进行探索。

(这就是 async/.await 语法为我们隐藏的内容)

但这些语法实际上只是对 Future trait 实现的语法糖。

大家都喜欢语法糖,那么我们为什么会有 不想 使用 async/await 的场景呢?

(我知道并不是每个人都喜欢语法糖)

(在这里,我的类比可能已经开始崩塌了)

这是我们最终的问题。

以下是我们迄今为止提出的问题:

那我为什么需要这么做呢?

让我们来看一下。


为什么需要手动实现一个 future?

Rust 的 async/await 语法让我们可以轻松地编写异步代码。

但这并不是它最好的地方。

这种语法让我们可以编写 容易阅读 的异步代码。

使用 async/await 编写的代码要比替代方案容易理解得多。

比如说回调函数。

(像 JavaScript 那样)

或者更丑陋的东西,比如 Objective-C 的委托。

(我会为好奇的人留下 这个链接

但在 Rust 中实现 Future trait 让这些模式看起来都变得很亲切!

(好了,现在停止抱怨)

实际上,async/await 允许你组合多个 future。

这些可能是其他的 async 函数或代码块。

(它们本质上只是 futures)

也可能是由聪明的人编写的 futures。

(比如编写异步运行时的人)

但有些事情在不了解实现细节的情况下是做不到的。

特别是有些情况下,你需要访问 waker


为什么我们需要 waker?

记住,当一个 future 返回 Poll::Pending 时,这通常会向上传递到任务中。

驱动任务的 future 将返回 Poll::Pending

然后,这个任务不会被立即重新轮询。

事实上,直到任务被唤醒之前,它都不会被重新轮询。

第 2 部分 中,我们构建了 YieldNow 这个 future。

它返回 Poll::Pending,但在此之前会唤醒任务。

这会导致运行时尽快调度任务重新轮询。

这个 future 本身并没有多大意思。

(尽管我们从中学到了很多)

一个有趣的 future 不会在返回 Poll::Pending 之前唤醒任务。

它会在稍后的某个时间,当某些东西准备好但尚未准备好时唤醒任务。

那么,让我们来构建一个有趣的 future。


一个有趣的 future

(这是在 pending future 解散后成立的乐队)

一个真正有趣的 future 应该处理网络或其他操作系统级的事情。

但这超出了本系列文章的范围。

(其实这是个谎言)

(事实上,是超出了我的能力)

所以让我们构建一些不太复杂的东西。

并且最好没有太多的依赖。

(无论是知识还是代码)

我们将构建一个异步通道(async channel)!


附注:通道

通道是一种消息传递形式。

它并不是异步代码或 Rust 独有的概念。

(显然消息传递也不是独有的)

大多数通道可以被认为是一个队列。

消息从一端进入,从另一端出来。

通道可以根据消息来源、消息去向以及消息复制的方式进行分类。

  • 有多少个生产者可以将消息放入通道?
  • 有多少个消费者可以从通道中取出消息?
  • 每条消息会被传递多少次?

Tokio 文档很好地描述了它们的 消息传递选项

让我们快速浏览一下其中的一些通道。

我们可以将它们可视化。

这会帮助我们理解目前缺少什么。

单次通道(oneshot)

单次通道支持从单个生产者向单个消费者发送单个值。

让我们来看一张时序图:

单次通道的时序图:一条消息从生产者经过通道传递到消费者

这张图本身并不特别有趣。

但它构成了后续讨论的基础。

关键点在于,每种角色只有一个。

  • 一个生产者。
  • 一个消费者。
  • 一条消息。

(实际上,Tokio 的 oneshot 通道可以重复使用,但这在这里并不重要)

这是 Tokio 的 oneshot 通道的引用:tokio::sync::oneshot


多生产者单消费者(mpsc)

这就是 MPSC 的意思!

(这个缩写通常不带任何描述就被使用)

(现在你知道它的含义了)

这种通道支持多个生产者。

但只有一个消费者。

并且可以通过它们发送多条消息。

顺便插一句。

我们通常会区分有界通道和无界通道。

  • 有界通道 有固定大小的缓冲区。
  • 无界通道 则有一个无限大小的缓冲区。

(因此它们会一直填充,直到系统内存耗尽)

在讨论通道的高级概念时,我们会略过这个细节。

让我们来看看它的工作原理。

异步理解之路四 - 图2

首要的不同点

第一个显著的区别是通道有多个生产者。

(没什么大惊喜)

每个生产者都会向通道发送一些消息。

通道然后将这些消息传递给消费者。

消息是按顺序传递的。

(我们通常说消息是按照它们被发送的顺序传递的)

(实际上是按照通道接收到消息的顺序传递的)

(这 大多数情况下 是一样的)

在 Tokio 中的实现是 tokio::sync::mpsc


广播通道(broadcast)

在多生产者单消费者通道之后是……

多生产者多消费者(MPMC)通道!

(某种意义上)

广播通道始终是一种多消费者通道。

但它有一个特别之处。

所有消费者都会收到所有的值。

有些广播通道可能只允许一个生产者(SPMC)。

(单生产者多消费者)

而其他的则允许多个生产者(MPMC)。

(多生产者多消费者)

但通常我们会将 MPMC 保留给另一种类型的通道。

现在,让我们来看一下它的时序图。

异步理解之路四 - 图3

两个接收者都会收到相同的消息序列

这是广播通道的关键点。

Tokio 也提供了一个实现:tokio::sync::broadcast


多生产者多消费者(mpmc)

另一个我们现在理解的缩写!

多生产者多消费者。

正如我们刚刚看到的,广播通道可以被看作是一种 MPMC 通道。

不同之处在于,这种通道中的每条消息 只会被一个消费者接收

这种通道通常用于在有限数量的任务中分配工作。

时序图展示了这种区别。

异步理解之路四 - 图4

像我们之前看到的所有通道一样,这个通道是有序的。

消费者接收消息的顺序与消息发送的顺序一致。

但在消费者之间没有公平性概念。

第一个尝试接收消息的消费者会获得下一个消息。

(我们也可以说,这里没有负载均衡)

那么,在 Tokio 中实现它的链接在哪里?

没有这样的链接!

Tokio 并没有实现一个 MPMC(多生产者多消费者)通道。

所以我们来自己构建一个!

但在开始之前,先说些真相。


旁注:async-channel

其实已经有一些 MPMC 通道的实现。

async-channel 是一个例子。

它是 smol-rs 项目的一部分。

Smol 是另一个为 Rust 提供的异步运行时。

它比 Tokio 更加模块化。

因此,Smol 的一些组件可以直接用于 Tokio 运行时并正常工作。

不过,自己构建一个通道也能学到很多东西。

所以,让我们开始吧!


自己实现一个 MPMC 通道

我们要自己编写一个多生产者多消费者通道。

而且会很简单。

(你知道我一向追求简单)

这是完整代码:understanding_async_await::mpmc

如果你喜欢从头开始查看代码。

那么我们从 API 开始。

我们的设计基于 std 库和 Tokio 通道的 API。


通道 API

以下是我们 channel() 函数的签名:

(用 Rust 文档的方式,毕竟谁会不写文档呢,对吧?)

  1. /// 创建一个新的异步有界多生产者多消费者通道,
  2. /// 返回发送端和接收端。
  3. ///
  4. /// 通道会缓冲最多 `capacity` 条消息。当缓冲区满了时,
  5. /// 再次尝试发送消息将会等待,直到通道中有消息被接收。
  6. /// 当通道为空时,尝试接收新消息将会等待,直到有消息被发送到通道中。
  7. ///
  8. /// 如果所有接收端或发送端都断开连接,通道将会关闭。
  9. /// 此后尝试发送消息会返回 [`ChannelClosedError`],
  10. /// 而尝试接收消息会先清空通道中的剩余消息,
  11. /// 然后在通道完全为空时也会返回 [`ChannelClosedError`]。
  12. pub fn channel(capacity: usize) -> (Sender, Receiver)

好了,文档比函数签名多很多。

我们来拆解一下。

这个函数返回发送端和接收端。

在 Rust 中,这是构建通道的标准实践。

这两个部分可以根据需要单独传递。

彼此独立。

(稍后我们会看这两个部分)

在文档的第二段中,我们说明通道会缓冲消息。

因此,这个函数需要传入缓冲区大小 capacity

这意味着可以在没有接收消息的情况下发送最多 capacity 条消息。

然后呢?

通道满了。

发送端会等待,直到有消息被接收。

(当然是异步等待)

另一方面,通道可能是空的。

接收端会等待,直到有消息被发送。

(是的,是异步等待)

最后,我们提到了通道的关闭机制。

通道有两种方式会关闭。

一种是所有接收端都断开连接。

(这意味着所有接收端都被销毁了)

此时,不再可能接收到任何消息。

所以继续发送消息也就没有意义。

(背景音乐《Unsent Letter》)

我们会通过在下次尝试发送时返回错误来提醒发送端。

另一种情况是,所有发送端都断开连接。

在这种情况下,不会再有新的消息发送到通道。

但通道中可能还有未接收的消息。

我们会允许接收端取走这些消息。

(接收通道中的所有消息)

但是一旦通道为空,新接收请求将会永远阻塞。

(这个场景我没想到合适的背景音乐)

(真的应该在这里放《Unsent Letter》)

因此,如果尝试从空的已关闭通道接收消息,会返回一个错误。

以上内容应该很清楚了。

接下来我们来看发送端和接收端的部分。

还有一点。

这个通道只能发送 String 类型的消息。

(我知道这有点无聊)

(但我们先关注异步编程,不去涉及泛型的内容)

通道的两部分

(本想给这一节起名 “你是我的另一半”)

首先来看发送端。

  1. /// [`mpmc::channel`] 类型的发送部分。
  2. ///
  3. /// 可以通过 [`send`] 方法向通道发送消息。
  4. ///
  5. /// 这个部分可以被克隆,从多个任务中发送消息。如果所有发送端都被丢弃,
  6. /// 通道将会关闭。
  7. ///
  8. /// [`mpmc::channel`]: fn@super::mpmc::channel
  9. /// [`send`]: fn@Self::send
  10. pub struct Sender

结构体本身没有什么特别的。

它可以被克隆。

(但我们没有直接派生 Clone,稍后你会明白原因)

可以用它来发送消息。

来看一下这个方法:

  1. impl Sender {
  2. /// 发送一个值,等待直到有可用容量。
  3. ///
  4. /// 当至少有一个 [`Receiver`] 仍然连接到通道时,发送才会成功。
  5. /// 如果返回 `Err`,则意味着该值永远不会被接收。
  6. /// 但即使返回 `Ok`,也不能保证值一定会被接收,
  7. /// 因为所有接收端可能会在此方法返回 `Ok` 后立即断开连接。
  8. pub async fn send(&self, value: String) -> Result<(), ChannelClosedError>
  9. }

记住,我们今天只发送字符串。

(让这个通道支持泛型作为读者的练习)

我们的公共 API 是一个异步函数,用于接收要发送的值。

它返回一个结果。

结果可以是 Ok,带有单元类型 ()

或者返回错误。

唯一可能的错误是通道已关闭。

接下来看接收端。

  1. /// [`mpmc::channel`] 类型的接收部分。
  2. ///
  3. /// 可以通过 [`recv`] 方法从通道接收消息。
  4. ///
  5. /// 这个部分可以被克隆,从多个任务中接收消息。
  6. /// 每条消息只能被一个接收端接收。如果所有接收端都被丢弃,
  7. /// 通道将会关闭。
  8. ///
  9. /// [`mpmc::channel`]: fn@super::mpmc::channel
  10. /// [`recv`]: fn@Self::recv
  11. pub struct Receiver

同样,是我们所期望的结构。

再次声明接收端可以被克隆。

但我们没有派生 Clone

我们在 Receiver 上也只实现了一个公共(异步)函数。

  1. impl Receiver {
  2. /// 接收一个值,等待直到有值可用。
  3. ///
  4. /// 当通道关闭(所有发送端被丢弃)时,此方法将继续返回
  5. /// 通道缓冲区中存储的剩余值。一旦通道为空,
  6. /// 此方法将返回 [`ChannelClosedError`]。
  7. pub async fn recv(&self) -> Result<String, ChannelClosedError>
  8. }

成功时,recv() 将返回一个 String

唯一可能返回的错误是 ChannelClosedError

只有当通道为空时,才会返回此错误。

现在我们已经了解了我们想要实现的 API。

让我们看一个示例序列图,了解它的使用方式。

我们会用一个生产者和一个消费者来更好地理解 async/await 的部分。

异步理解之路四 - 图5

(实现此序列的代码可以在 GitHub 上找到:channel_halves.rs

我们的主任务调用 channel(1),并返回发送端和接收端。

这是一个容量为 1 的通道。

(不算很大)

接收端被发送到自己的任务中,开始在一个循环中接收消息。

现在我们假设它尝试接收一次。

recv().await 的调用必须等待,因为通道为空。

接下来,发送端被发送到它自己的任务中,发送两个值。

第一个值被发送。

然后,它尝试发送第二个值。

(我们在这里选择了自己的并发交错方式)

(让故事更有趣)

第二个值无法发送,因为通道已满。

所以 send().await 调用等待。

现在,接收端接收到一个值。

由于通道现在有了空间,等待的 send().await 调用返回。

发送任务完成。

这时,接收端接收到第二个值。

(一切正常)

然后它尝试再次接收。

这次它返回了一个 Err(ChannelClosedError)

因为发送端已被丢弃,通道被关闭。

而且通道现在也为空了。

所以我们的接收循环任务也完成了。

通过这个,我们为理解我们的 API 如何工作打下了基础。

现在是时候实现这个异步功能了。

这将涉及实现这两个异步函数:

Sender::send()Receiver::recv()

为了做到这一点,我们需要手动实现 futures。

(这在某种意义上是本系列的重点)

(通过实现其底层部分来理解 async/await)

那么,我们来逐一看看这些异步函数以及它们下面的 futures。

内部通道

在我们开始实现 futures 之前,有一点要说。

所有的发送端和接收端将共享这个通道。

我们打算用一种简单的方式来实现。

使用一个 ArcMutex 包装的私有结构体。

我们在第 3 部分中学习了 互斥锁

(我们也学到了一些我们不应该做的事情)

(所以我们不会做那些事情)

以这种方式使用 Mutex 通常不是非常高效。

在高负载下,它的性能肯定不会很好。

但它将帮助我们编写发送和接收的 futures。

而且我们可以确保不会导致数据腐败或数据竞争。

所以我们将创建一个 Channel 结构体。

这个结构体不会是公共的。

稍后我们会查看它的实现。

发送 future

我们将从实现 Sender::send() 开始。

让我们看一下一个序列图,展示我们三种不同的结果。

当我们准备实现 Future 时,我们将扩展这个图。

异步理解之路四 - 图6

这个图是在 async/await 层级上的。

这三种结果代表通道可能的三种状态。

状态:通道有容量。

在这种情况下,异步函数会立即返回 Ok

(异步函数立即返回与其他函数立即返回不同)

(它可能仍然会在此期间让出控制权给运行时)

状态:通道已关闭。

这是内部通道的终态。

(这意味着它是最后一种状态,之后没有其他状态了)

异步函数将立即返回通道关闭错误。

(同样要注意使用“立即”与“异步”的 caveat)

状态:通道已满。

在这种情况下,异步函数会等待,不返回任何值。

(我们知道这里可能需要 Poll::Pending

(但我们先别急)

某个时刻,容量会被释放

(我们希望如此)

这通常发生在接收端接收到消息时。

然后异步函数会像第一种情况一样返回 Ok

(因为我们本质上已进入第一种状态)

接下来,我们实现我们的异步函数。

我们不会深入构造 Sender 结构体的细节。

现在,知道 Sender 具有一个私有字段可以访问内部通道就足够了。

  1. pub struct Sender {
  2. inner: Arc<Mutex<Channel>>,
  3. }

为了实现这个公共异步函数,我们将需要一个 Future

我们将把这个 Future 称为 Send

(不太有创意,但这可能是好事)

(富有创意的名称可能对别人来说不那么清晰)

首先,这是 Sender::send() 的实现。

  1. pub async fn send(&self, value: String) -> Result<(), ChannelClosedError> {
  2. Send {
  3. value,
  4. inner: self.inner.clone(),
  5. }
  6. .await
  7. }

很简单吧?

我们构造我们的 Send future。

然后等待它。

为了清晰地表明类型,这里是 Send 的结构体定义。

  1. struct Send {
  2. value: String,
  3. inner: Arc<Mutex<Channel>>,
  4. }

我们的 Send future 只有一个任务:

当通道有容量时发送一个值。

我们甚至不需要为此维护状态。

我们将依赖通道的状态。

(而且我们会相信我们的异步运行时在我们返回 Poll::Ready 后不会再调度我们)

我们确实需要值本身。

并且 Send future 可能会比 Sender 的生命周期更长。

(尽管这并不理想)

所以我们会获取内部通道的引用。

在查看实现之前,让我们扩展一下我们的序列图。

我们将深入异步/等待语法,并进入 future 的世界。

异步理解之路四 - 图7

Send future 的创建是在状态之外的。

这是因为它始终是相同的。

正如你所看到的,我们已经创建了内部通道的部分 API。

它完全基于我们知道需要的内容。

它有一个(同步的)send() 函数,返回一个结果。

结果要么是 Ok(()),要么是根据通道是否已关闭或已满返回的两个错误中的一个。

前两个状态是相对简单的。

(通道有容量和通道已关闭)

接下来,我们来看通道已满的状态。

在这种情况下,Channel::send() 将返回一个错误,表明通道已满。

我们的 Send future 将返回 Poll::Pending

但在此之前…

我们需要一种方式将我们的 waker 注册到通道中。

然后我们会期望当有空闲容量时,我们的任务会被唤醒。

为此,通道有另一个方法 Channel::register_sender_waker()

图表稍微有点作弊。

我们知道通道不会直接 唤醒我们的任务

我们还会跳过通道的实现细节。

现在重点是,我们有一个需求。

当我们注册一个发送方 waker 时,通道必须在有容量时唤醒它。

当然,可能会有多个发送方,它们可能都已经注册了 wakers。

所以我们不能期望为下一个空闲容量被唤醒。

但这只是内部通道的实现细节。

现在,让我们深入到 Future 的实现。

  1. impl Future for Send {
  2. type Output = Result<(), ChannelClosedError>;
  3. fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
  4. let Ok(mut guard) = self.inner.lock() else {
  5. panic!("MPMC Channel has become corrupted.");
  6. };
  7. match guard.send(self.value.clone()) {
  8. Ok(_) => Poll::Ready(Ok(())),
  9. Err(ChannelSendError::Closed) => Poll::Ready(Err(ChannelClosedError {})),
  10. Err(ChannelSendError::Full) => {
  11. guard.register_sender_waker(cx.waker().clone());
  12. Poll::Pending
  13. }
  14. }
  15. }
  16. }

Output 类型与 Sender::send() 返回的值相同。

首先,我们尝试锁住包装内部通道的互斥锁。

如果返回错误,说明内部通道已经损坏。

(实际上,互斥锁被污染了)

(我们在 如何 Rust 处理互斥锁 中稍微讨论过污染)

无论如何,我们会触发 panic 来处理。

然后,我们会在内部通道上调用 send()

我们已经在序列图中讨论了这里发生的细节。

一个实现细节是,我们会克隆要发送的值到内部通道。

这是可以避免的。

但这样实现会更加复杂。

(因为如果通道已满,我们需要再次获取值)

所以我们现在先保持这种方式。

Tokio 的通道使用信号量许可来正确实现这一点,而不需要克隆。

这就是我们的 Send future 的实现!

最后,它并没有那么可怕。

我们已经知道我们需要做什么。

现在我们可以看看接收的 future。

稍后我们会讨论内部通道的实现。

(这主要是为了完整性)

(也是因为我们想覆盖实际上使用我们所“注册”的 waker)

recv future

我们刚刚看到了异步发送函数的实现。

以及支撑它的 future。

现在让我们来看异步函数 Receiver::recv()

你可能已经猜到,这个函数在很多方面和 Sender::send() 类似。

我们也会以相似的详细程度讲解接收函数。

这样可以确保它易于理解。

首先,让我们看一下到达异步函数之前的序列图。

异步理解之路四 - 图8

就像前面的发送函数一样,这个图是基于 async/await 层级的。

我们同样有三个状态。

但它们稍微有所不同。

状态:通道有消息。

在这种情况下,recv 异步函数会返回 Ok(msg)

这里的 msg 是通道中的第一个消息。

它是立即异步完成的。

(这意味着在异步的角度下立即返回,但实际上可能在这期间做任何事情)

状态:通道已关闭且为空。

这个状态与发送函数中的通道关闭状态类似。

但有一个额外的条件。

通道已关闭 并且 没有更多消息。

如果通道中仍有剩余的消息,接收者仍然可以接收它们。

即使通道已经关闭。

然而,如果通道已关闭且为空,则会返回错误。

这与发送者在通道关闭时返回的错误相同。

状态:通道为空(但未关闭)。

对于接收操作,“有趣”的状态是通道为空时。

因此,无法接收消息。

异步函数会等待。

它不会返回。

(我们可以感觉到某个地方会有 Poll::Pending

(但我们现在还看不见)

在某个时刻,新的消息会被发送到通道。

然后我们的异步函数将返回 Ok(msg)

就像在“通道有消息”状态下那样。

现在是时候实现了。

以下是异步函数 Receiver::recv()

  1. pub async fn recv(&self) -> Result<String, ChannelClosedError> {
  2. Recv {
  3. inner: self.inner.clone(),
  4. }
  5. .await
  6. }

我们看到需要一个新的 future。

显然,我们将它命名为 Recv

请注意,Receiver::recv() 不接受任何参数。

(只有 &self,即对自身的引用)

因此,Recv future 只需要对内部通道的引用。

为了完整性,这里是结构体的定义。

  1. pub struct Receiver {
  2. inner: Arc<Mutex<Channel>>,
  3. }

当我们为 Send 实现 Future 时,并没有持有任何状态。

我们利用了内部通道的状态。

当我们为 Recv 实现 Future 时,我们也将这么做。

但在编写代码之前,让我们理解一下我们需要什么。

下面是一个序列图,展示了我们需要考虑的不同状态。

(揭开帷幕,看到下面的未来)

(这是另一个很棒的乐队名称)

(或许也是文化船的名字)

Send 图相同,Recv 的创建发生在状态选项之外。

因为它始终是一样的。

异步理解之路四 - 图9

我们进一步扩展了必要的内部通道 API。

我们还需要一个 Channel::recv() 函数。

就像 Channel::send() 一样,它可以返回 3 种值。

如果有消息可以接收,它返回 Ok(msg)

然后我们的 future 可以返回带有该 msgPoll::Ready

如果通道已关闭且为空,它返回 Err(ChannelRecvError::Closed)

然后我们的 future 也可以立即返回 Poll::Ready,但这次是关闭错误。

(这就是 Err(ChannelClosedError),与发送时相同)

现在有趣的状态是当通道为空时。

(当然,是为空但未关闭)

此时我们返回 Poll::Pending

但首先我们需要注册我们的 waker。

接收器的 waker 需要在不同的条件下被唤醒,而不是发送器的 waker。

所以我们需要不同的 API 来注册它。

(但我们已经在调用另一个方法 Channel::register_sender_waker() 时透露了这一点)

这就是为什么我们需要 Channel::register_receiver_waker()

我们期望当新消息进入通道时,接收器 waker 会被唤醒。

在这个序列图中,我们展示了内部通道唤醒消费者任务。

但我们知道这会经过运行时。

即使我们已经知道了一切,让我们看看 Future 的实现。

  1. impl Future for Recv {
  2. type Output = Result<String, ChannelClosedError>;
  3. fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
  4. let Ok(mut guard) = self.inner.lock() else {
  5. panic!("MPMC Channel has become corrupted.");
  6. };
  7. match guard.recv() {
  8. Ok(value) => Poll::Ready(Ok(value)),
  9. Err(ChannelRecvError::Closed) => Poll::Ready(Err(ChannelClosedError {})),
  10. Err(ChannelRecvError::Empty) => {
  11. guard.register_receiver_waker(cx.waker().clone());
  12. Poll::Pending
  13. }
  14. }
  15. }
  16. }

Send 一样,OutputReceiver::recv() 的返回类型相同。

我们锁住了内部通道的 mutex。

(并进行相同的 mutex 中毒检查)

(我们的线程就像是食品品尝员)

(如果 mutex 中毒了,它就死了来警告其他人)

(不过这可能导致整个程序崩溃)

(这就像大家都死了)

然后我们调用 Channel::recv()

我们已经处理了三种选项。

所以我们不会重复自己。

就这样。

我们刚刚写出了我们需要的第二个也是最后一个 future,用于我们的异步 mpmc 通道!

当然,我们还想更详细地了解一下内部通道。

所以现在我们来看一下!

内部通道实现

我们已经定义了内部通道需要的四个方法。

(我一直叫它内部通道,但没有外部通道结构)

(所以这个结构体就叫做 Channel

  1. impl Channel {
  2. fn send(&mut self, value: String) -> Result<(), ChannelSendError>
  3. fn recv(&mut self) -> Result<String, ChannelRecvError>
  4. fn register_sender_waker(&mut self, waker: Waker)
  5. fn register_receiver_waker(&mut self, waker: Waker)
  6. }

首先,我们有 send()recv() 的同步版本。

它们各自有自己的错误类型。

(我们在实现 SendRecv future 时已经看过这两个)

还有两个方法用于注册 wakers。

一个用于发送器 wakers。

另一个用于接收器 wakers。

现在我们有足够的信息来填充 Channel 结构体。

  1. /// The inner mpmc channel implementation.
  2. ///
  3. /// This is a sync object. All methods return immediately.
  4. struct Channel {
  5. /// The message buffer
  6. buffer: VecDeque<String>,
  7. /// The capacity of the channel, this many messages can be buffered before
  8. /// sending will error.
  9. capacity: usize,
  10. /// Indicates when the channel has been closed.
  11. closed: bool,
  12. /// The number of connected `Sender`s.
  13. senders: usize,
  14. /// The number of active `Receiver`s.
  15. receivers: usize,
  16. /// A queue of wakers for senders awaiting free capacity in the channel.
  17. sender_wakers: VecDeque<Waker>,
  18. /// A queue of wakers for receivers awaiting a new message in the channel.
  19. receiver_wakers: VecDeque<Waker>,
  20. }

我已经包含了 rustdoc 注释来解释每个字段。

但我们还是来简单讲解一下各个部分。

我们使用一个 VecDeque 作为消息缓冲区。

我们单独保持通道的总 capacity

(我们 可以 使用 VecDeque 上的容量来存储这个值,但似乎这样做可能会出错)

(因为我们无法控制该容量值的工作方式)

我们还使用一个布尔值来跟踪通道是否已关闭。

第二组是两个计数器。

我们跟踪发送者和接收者的数量。

(实际上,我们期望发送者和接收者能够自行跟踪自己)

(但计数器需要在通道中)

最后一组是发送器和接收器 wakers 的队列。

这些将用于唤醒等待通道空闲容量的发送器。

并唤醒等待通道新消息的接收器。

接下来我们从后向前添加我们的实现。

这两个注册方法很简单。

  1. impl Channel {
  2. /// Registers a waker to be woken when capacity is available.
  3. ///
  4. /// Senders are woken in FIFO order.
  5. fn register_sender_waker(&mut self, waker: Waker) {
  6. self.sender_wakers.push_back(waker);
  7. }
  8. /// Registers a waker to be woken when a message is available.
  9. ///
  10. /// Receivers are woken in FIFO order.
  11. fn register_receiver_waker(&mut self, waker: Waker) {
  12. self.receiver_wakers.push_back(waker);
  13. }
  14. }

每个方法都将 waker 推送到队列的末尾。

目前我们只需要这些。

现在让我们看看 Channel::send() 方法的实现。

  1. impl Channel {
  2. /// Sends a message across the channel.
  3. ///
  4. /// If the message can be sent, the next receiver waker in the queue (if
  5. /// any) will be woken as there is now an additional message which can be
  6. /// received.
  7. ///
  8. /// An error will be returned if the channel is full or closed.
  9. fn send(&mut self, value: String) -> Result<(), ChannelSendError> {
  10. if self.closed {
  11. return Err(ChannelSendError::Closed);
  12. }
  13. if self.buffer.len() < self.capacity {
  14. self.buffer.push_back(value);
  15. self.wake_next_receiver();
  16. Ok(())
  17. } else {
  18. Err(ChannelSendError::Full)
  19. }
  20. }
  21. }

我们检查通道是否已关闭。

如果是,则返回 Closed 错误。

接下来我们检查是否有容量。

如果有,就将值推入缓冲区的末尾。

然后唤醒下一个接收器。

(稍后会讲解这部分)

并返回 OK(()),我们完成了。

如果没有容量,就返回 Full 错误。

现在回到唤醒下一个接收器的实现。

  1. impl Channel {
  2. /// Wakes the receiver at the front of the queue.
  3. ///
  4. /// If no receiver wakers are registered, this method does nothing.
  5. fn wake_next_receiver(&mut self) {
  6. if let Some(waker) = self.receiver_wakers.pop_front() {
  7. waker.wake();
  8. }
  9. }
  10. }

如你所见,我们弹出下一个接收器 waker。

如果有,就唤醒它。

如果没有,就什么都不做。

没有接收器 waker 是最常见的情况。

(除非通道永远满了)

(这不是理想的情况,但也可能发生)

不过,我们还是尝试从队列中弹出下一个值,并使用它来检查是否有 waker。

请注意,所有这些方法都是同步的。

我们假设调用 Channel 方法的主体已经锁定了它的 mutex。

(“谁”就是 SenderReceiver 或它们的 futures SendRecv

因此我们不需要担心来自多个线程的访问。

(再说一次,这是多线程的作弊,但它让我们可以专注于 Future 实现)

Channel::recv() 的实现类似简单。

  1. impl Channel {
  2. /// Receives a message from the channel.
  3. ///
  4. /// If a message can be received, then the next sender waker in the queue
  5. /// (if any) will be woken as there is now additional free capacity to send
  6. /// another message.
  7. ///
  8. /// An error will be returned if the channel is empty. The error will
  9. /// depend on whether the channel is also closed.
  10. fn recv(&mut self) -> Result<String, ChannelRecvError> {
  11. match self.buffer.pop_front() {
  12. Some(value) => {
  13. self.wake_next_sender();
  14. Ok(value)
  15. }
  16. None => {
  17. if !self.closed {
  18. Err(ChannelRecvError::Empty)
  19. } else {
  20. Err(ChannelRecvError::Closed)
  21. }
  22. }
  23. }
  24. }
  25. }

我们尝试从缓冲区弹出一个结果。

(如果通道仍有消息,我们不在乎它是否已关闭)

如果有值,我们唤醒下一个发送器。

(缓冲区现在有额外的空闲容量)

然后返回该值。

如果没有值,说明缓冲区为空。

如果通道未关闭,则返回 Empty 错误。

否则,通道已关闭,我们返回 Closed 错误。

发送器唤醒方法基本与接收器的方法相同。

  1. impl Channel {
  2. /// Wakes the sender at the front of the queue.
  3. ///
  4. /// If no sender wakers are registered, this method does nothing.
  5. fn wake_next_sender(&mut self) {
  6. if let Some(waker) = self.sender_wakers.pop_front() {
  7. waker.wake();
  8. }
  9. }
  10. }

这就是内部通道实现的结尾。

(几乎是结尾)

(还有一点点)

计数

有几个不同的地方可以放置递增/递减的逻辑。

对于这段代码,我将递增逻辑放在了 new() 方法中。

而递减逻辑则放在了 Drop 实现中。

让我们通过时序图来看一下我们频道发送者的生命周期。

请注意,在这一点上,我们只关注 Sender

Receiver 的实现是相同的,因此没有必要重复说明。

异步理解之路四 - 图10

计数

在初始创建频道时,会创建一个带有内部频道的 Sender

Sender 负责调用 Channel::inc_senders()

此时,内部频道的发送者计数为 1。

接下来是发送者的克隆。

(这对于有多个生产者是很重要的)

(我们的接收者也可以以相同的方式克隆,从而获得多个消费者)

(这就是 MPMC!)

在这里,我们依赖 Sender::new() 来递增内部频道中的发送者计数。

(这就是为什么将该逻辑放在 new() 中是有意义的)

此时,内部频道的发送者计数为 2。

然后我们进入了销毁逻辑。

在 Rust 中,Drop 特性为结构体提供了一种析构函数。

我们不需要显式地调用 drop()

当对象超出作用域时,它会自动被调用。

所以我们将利用这个来递减计数器。

假设我们克隆的发送者被销毁。

计数器会递减。

所以,内部频道的发送者计数再次变为 1。

此时没有更多操作。

最后,原始发送者也被销毁。

这时,内部频道的发送者计数变为 0。

它会调用 Channel::close() 来关闭自己。

close() 内,频道还会唤醒任何仍然注册的唤醒器。

我们预计这些唤醒器应该只有接收器的唤醒器。

Send future 可以在被轮询之前被发送到另一个任务。

所以,可能会有一个发送者唤醒器注册在一个 Send future 上,而其对应的 Sender 已经被销毁。

为了更安全起见,最好唤醒所有剩下的唤醒器。

这样可以避免任务因丢失唤醒器而被卡住。

接下来,让我们深入看一下实现。

首先是前两阶段。

(新建频道和发送者克隆)

我们需要 newClone 特性的实现。

下面是代码:

  1. impl Sender {
  2. fn new(inner: Arc<Mutex<Channel>>) -> Self {
  3. {
  4. match inner.lock() {
  5. Ok(mut guard) => guard.inc_senders(),
  6. Err(_) => panic!("MPMC Channel has become corrupted."),
  7. }
  8. }
  9. Self { inner }
  10. }
  11. }

请注意,new() 需要锁住频道的互斥锁才能访问它。

它可能被破坏(poisoned)。

这会导致我们出现 panic。

我们不在 lock() 的结果上使用 expect 有原因。

我们不希望在错误信息中泄露实现细节。

(这里的实现指的是我们使用互斥锁)

因此,最好通过模式匹配来处理结果。

如果互斥锁没有被破坏,我们就会调用 Channel::inc_senders()

Clone 只会将 Arc-Mutex 传递给 new

这也是我们不能派生 Clone 的原因。

因为我们需要调用 Sender::new()

  1. impl Clone for Sender {
  2. fn clone(&self) -> Self {
  3. Self::new(self.inner.clone())
  4. }
  5. }

为了实现 Drop,我们也需要锁住互斥锁。

  1. impl Drop for Sender {
  2. fn drop(&mut self) {
  3. match self.inner.lock() {
  4. Ok(mut guard) => guard.dec_senders(),
  5. Err(_) => panic!("MPMC Channel has become corrupted."),
  6. }
  7. }
  8. }

只要互斥锁没有被破坏,我们就会调用 Channel::dec_senders()

剩下的逻辑在 Channel 的最后几个方法中:

  1. impl Channel {
  2. /// 递增发送者计数
  3. fn inc_senders(&mut self) {
  4. self.senders += 1;
  5. }
  6. /// 递减发送者计数。
  7. ///
  8. /// 如果计数达到零,关闭频道。
  9. fn dec_senders(&mut self) {
  10. self.senders -= 1;
  11. if self.senders == 0 {
  12. self.close();
  13. }
  14. }
  15. /// 关闭频道。
  16. ///
  17. /// 所有已注册但尚未唤醒的发送者和接收者唤醒器都会被唤醒。
  18. fn close(&mut self) {
  19. self.closed = true;
  20. while let Some(waker) = self.sender_wakers.pop_front() {
  21. waker.wake();
  22. }
  23. while let Some(waker) = self.receiver_wakers.pop_front() {
  24. waker.wake();
  25. }
  26. }
  27. }

inc_senders() 方法仅仅是递增计数器。

dec_senders() 方法还会检查计数是否为零。

如果计数为零,它会关闭频道。

最后,close() 方法将布尔标志设置为 true

然后它会唤醒所有的唤醒器。

这意味着它会一个一个地从各自的队列中弹出唤醒器,并唤醒它们。

这样可以避免任务被卡住。

它还避免了一个严重的引用循环问题。

等等,什么是引用循环?

严重的引用循环

我们的实现包含了一个严重的引用循环。

如果没有正确处理,这会导致内存泄漏。

(但别担心,它已经被妥善处理了)

让我们解释一下。

在一个任务完成之前,它是由运行时拥有的。

而我们的任务拥有它当前正在轮询的 future。

这将是一个生产者任务的 Send future。

而我们的 Send future 持有内部频道的一个 Arc

(通过互斥锁)

这将防止 Arc 的计数器降为零。

因此,唤醒器将永远不会被丢弃。

这会非常不幸。

因为即使是 Tokio Console 的 lost waker lint 也不会发现这一点。

(因为唤醒器计数仍然是 1)

(控制台无法知道它将不再被使用)

这是引用循环的图示。

异步理解之路四 - 图11

但是由于我们在关闭频道时会唤醒所有的唤醒器。

而频道的关闭并不依赖于可能引用内部频道的 future 数量。

因此,我们打破了循环,一切都可以被释放。

如上所述,接收器的实现几乎相同。

所以这里不再赘述。

不过你可以在完整的代码中找到它:understanding_async_await::mpmc

此外,还有一个包含多个生产者和消费者的小示例:channel

实际上,你可以查看我的博客仓库。

在其中你可以运行本系列中的大量代码:understanding-async-await

结束

这就是我们实现异步多生产者多消费者频道的结尾。

这也是这系列博客文章的结束。

标题是“我如何最终理解 Rust 中的 async/await”。

(你知道这是一个系列吧?)

(不知道?)

(幸运的是,你可以从开始开始)

在这系列文章中,我们提出了四个问题。

这里是简略版。

(剧透警告!)

如果我不等待它,我的任务为什么什么都不做?

(因为它返回的是一个处于初始状态的状态机)

挂起的 future 是如何被唤醒的?

(有一个叫做 waker 的东西唤醒拥有任务)

为什么不应该在 await 点持有互斥锁?

(因为我们实际上在每个 await 点都从一个函数返回)

为什么我还会想手动编写一个 future?

(因为没有写成“手动 future”的 async 原语,你实际上做不了任何异步操作)

当然,最后一个问题的真正原因是为了给我一个写它的借口。

现在我把这个系列的文章写完了。

(而且终于把写博客系列的冲动消化了)

我打算回到写一些更小的帖子,讲述我当时正在做的事情。