在上一章中,我们通过将自己的暂停任务(协程)编写为状态机来创建它们。通过要求它们实现 Future trait,我们为这些任务创建了一个通用API。我们还展示了如何使用一些关键字创建这些协程,并以编程方式重写它们,这样我们就不必手动实现这些状态机,而是以与通常相同的方式编写我们的程序。如果我们停下来看看我们目前所取得的进展,它在概念上非常简单:**我们有一个用于暂停任务(Future trait)的接口,并且我们有两个关键字(coroutine/wait)来指示我们想要重写为状态机的代码段,该状态机将我们的代码划分为可以暂停的段但是,我们没有事件循环,也没有调度器。我们将扩展这个例子,添加一个运行时,让我们能够高效地运行程序,并使我们能够比现在更高效地调度任务。将展示如何通过将运行时划分为两个部分:执行器(executor)反应器(reactor),来获得更灵活、更松耦合的设计。你将学习基本的运行时设计(runtime design)反应器(reactors)执行器(executors)唤醒器(wakers)衍生(spawning)。示例都是基于上一章的代码,所以需求是一样的。这些示例都是跨平台的,并且可以在 Rust (https://doc.rust-lang.org/beta/rustc/platform-support.html#tier-1-with-hosttools)) 和 mio** (https://github.com/tokio-rs/mio#platforms))支持的所有平台上运行。你只需要安装 Rust,并将代码下载到本地即可。要一步一步地学习这些例子,你还需要在你的机器上安装 corofy。如果你在第7章没有安装这个包,现在请进入仓库中的 ch08/corofy 文件夹并运行这个命令来安装

  1. cargo install --force --path .

在本例中,我们还将使用 delayserver(延迟服务器),因此需要打开一个单独的终端,进入仓库根目录下的 delayserver 文件夹,并 cargo run,以便为接下来的示例准备就绪。如果你因为某种原因必须更改 delayserver 监听的端口,请记住更改代码中的端口

介绍运行时以及我们为什么需要它们

如你所知,你需要在 Rust 中使用自己的运行时来驱动和调度异步任务。运行时有很多种,从流行的 Embassy 嵌入式运行时(https://github.com/embassy-rs/embassy))到 Tokio (https://github.com/tokio-rs/tokio)),前者更侧重于一般的多任务处理,可以取代许多平台上对实时操作系统(RTOS)的需求,后者侧重于流行的服务器和桌面操作系统上的非阻塞 I/O。Rust 中的所有运行时都需要做至少两件事:调度并驱动实现 Rust Future trait 的对象完成。我们主要关注流行的桌面和服务器操作系统(如Windows、Linux和macOS)上执行非阻塞 I/O 的运行时。这也是到目前为止大多数程序员在 Rust 中遇到的最常见的运行时类型。控制任务的安排是非常具有侵略性的,这几乎是一条单行道。如果你依赖用户层调度器来运行任务,就不能同时使用操作系统调度器(而不跳过几个步骤),因为将它们混合在代码中会造成严重破坏,可能会破坏编写异步程序的整个目的。下图说明了不同的调度器

Runtimes/Wakers/Reactor-Executor 模型 - 图1

屈服让步于操作系统调度器(yielding to the OS scheduler 交出控制权给到操作系统的调度器)的一个例子是使用默认的 **std::net::TcpStreamstd::thread::sleep 方法进行阻塞调用。即使使用标准库提供的互斥锁等原语进行潜在的阻塞调用,也可能会屈服于操作系统调度器。这就是为什么你经常会发现异步编程倾向于对它涉及的所有内容进行着色,并且很难只使用 async/await 运行程序的一部分。结果就是运行时必须使用标准库的非阻塞版本。理论上,您可以创建一个所有运行时使用的标准库的非阻塞版本,这是 async_std 计划的目标之一(https://book.async.rs/introduction))。然而,让社区就解决这一任务的方法达成一致是一个艰巨的任务,而且还没有真正取得成果。在开始实现示例之前,我们先讨论一下 Rust 中典型异步运行时的总体设计。大多数运行时,比如 TokioSmolasync-std,会将它们的运行时分为两部分。跟踪我们正在等待的事件并确保以高效的方式等待来自操作系统的通知的部分通常被称为反应器**驱动程序(reactor or driver)调度任务并轮询(poll)任务完成的部分称为执行器(executor)。让我们从宏观上看一下这个设计,以便我们知道在我们的示例中要实现什么

Reactors 和 executors

  1. 当我们看 Rust 如何建模异步任务时,将运行时分为两个不同的部分很有意义。如果你阅读 Future (https://doc.rustlang.org/std/future/trait.Future.html)) 和 Waker (https://doc.rust-lang.org/std/task/struct.Waker.html)) 的文档,你会看到 Rust 不仅定义了 Future trait 和 Waker 类型,还附带了关于如何使用它们的重要信息,比如之前提到的 Future trait 就是惰性 的。另一个例子是,调用 Waker::wake 将保证在相应的任务上至少调用一次 Future::poll
  2. 如果反应堆reactor)这个名字让你联想到核反应堆,你开始认为反应堆是一种动力或者驱动运行时的东西,现在就放弃这个想法。反应器只是对一组传入的事件作出反应,并将它们一个接一个地分发给处理程序。它是一个事件循环,在我们的例子中,它将事件分发给执行器。由反应器处理的事件可以是任何事情,从到期的定时器为嵌入式系统编写程序时的中断,或 I/O 事件(如TcpStream上的可读事件)。您可以在同一运行时内运行几种反应器
  3. 执行器**进程只是决定谁在 CPU 上得到时间,以及什么时候得到。执行器还必须调用 Future::poll 并将状态机推进到下一个状态这是一种调度器**
  4. 由于反应堆将响应事件,它们需要与事件源进行一些集成。如果我们继续使用 TcpStream 作为例子,一些东西将对其进行读取或写入,在这一点上,反应器需要知道它应该跟踪该源上的某些事件。出于这个原因,非阻塞I/O原语和反应器需要紧密集成,取决于你如何看待它,I/O原语要么必须有自己的反应器,要么你将有一个提供I/O原语(如套接字、端口和流)的反应器

现在我们已经介绍了一些总体设计,我们可以开始编写一些代码了。运行时往往很快就会变得复杂,所以为了保持尽可能简单,我们将在代码中避免任何错误处理,并对所有内容使用 unwrap 或 expect。我们还将尽可能地选择简单而不是聪明,选择可读性而不是效率

改进基础示例

  1. 创建一个新项目,并将其命名为 a-runtime (或者,在本书的源码库中找到 ch08/a-runtime)
  2. 复制 future.rs 和 http.rs。前面创建的第一个项目 a-coroutine(也可以从本书仓库的 ch07/a-coroutine 文件夹中复制文件)中的 src 文件夹下的 rs 文件
  3. 通过将以下内容添加到 Cargo.toml 中,确保将 mio 作为依赖项添加
  1. [dependencies]
  2. mio = { version = "0.8", features = ["net", "os-poll"] }
  1. 在 src 文件夹中创建一个名为 runtime.rs 的新文件
  2. 我们将使用 corofy 将以下 coroutine/await 程序更改为我们可以运行的状态机表示
  1. mod future;
  2. mod http;
  3. mod runtime;
  4. use future::{Future, PollState};
  5. use runtime::Runtime;
  6. fn main() {
  7. let future = async_main();
  8. let mut runtime = Runtime::new();
  9. runtime.block_on(future);
  10. }
  11. coroutine fn async_main() {
  12. println!("Program starting");
  13. let txt = http::Http::get("/600/HelloAsyncAwait").wait;
  14. println!("{txt}");
  15. let txt = http::Http::get("/400/HelloAsyncAwait").wait;
  16. println!("{txt}");
  17. }

只是这次我们从 coroutine/await 语法创建它,而不是手动编写状态机。接下来,我们需要使用 corofy 将其转换为相应代码,因为编译器不识别我们自己的 coroutine/wait 语法

  1. src
  2. |-- future.rs
  3. |-- http.rs
  4. |-- main.rs
  5. |-- runtime.rs

在我们继续之前,让我们可视化一下我们的系统当前是如何工作的,如果我们考虑由 coroutine/wait 创建两个 future 和两个对 Http::get 的调用。在 main 函数中轮询 Future trait 完成的循环在我们的可视化中扮演了执行器的角色,正如你所看到的,我们有一个 Future 链由

  1. Non-leaf futures created by async/await (or coroutine/wait in our example) that simply call poll on the next future until it reaches a leaf future
  2. Leaf futures that poll an actual source that’s either **Ready or NotReady**

下图展示了我们当前设计的简化概述

Runtimes/Wakers/Reactor-Executor 模型 - 图2

如果我们仔细观察 future 链,可以发现当一个 future 被轮询时,它会轮询所有的子 future,直到它到达一个表示我们实际正在等待的某个对象的 leaf future。如果 future 返回 NotReady,它会立即将其传播到链上。但是,如果它返回 Ready,状态机将一直进行下去,直到下一次 future 返回 NotReady。在所有的孩子都准备好回来之前,顶层的 future 不会解决。下面的图更仔细地观察了 future 链,并给出了它如何工作的简化概述

Runtimes/Wakers/Reactor-Executor 模型 - 图3

我们要做的第一个改进是避免持续轮询顶层 future 来推动它前进。我们将改变我们的设计,让它看起来更像这样

Runtimes/Wakers/Reactor-Executor 模型 - 图4

在这个设计中,我们使用前面学到的知识,但不是简单地使用 epoll,而是使用 mio 的跨平台抽象。它的工作方式我们现在应该很熟悉了,因为我们之前已经实现了它的简化版本。我们将在 Poll 实例中注册 interest(兴趣),并在返回 NotReady 结果时等待 Poll,而不是在顶层不断循环和轮询。这将使线程进入睡眠状态,在操作系统再次唤醒我们并通知我们正在等待的事件已经准备好之前,任何工作都不会完成。这种设计将更加高效和可扩展

改变当前的实现

现在我们对设计有了一个概览,也知道了要做什么,可以继续对程序进行必要的修改,下面我们来看看需要修改的每个文件。我们从 main.rs 开始。我们没有在 main 函数中轮询 future,而是创建了一个新的运行时结构体,并将 future 作为参数传递给 Runtime::block_on** **方法。在这个文件中我们不需要再做任何更改了。我们的主函数变成了这样

  1. fn main() {
  2. let future = async_main();
  3. let mut runtime = Runtime::new();
  4. runtime.block_on(future);
  5. }

The logic we had in the main function has now moved into the runtime module, and that’s also where we need to change the code that polls the future to completion from what we had earlier

  1. use crate::future::{Future, PollState};
  2. use mio::{Events, Poll, Registry};
  3. use std::sync::OnceLock;

下一步是创建一个名为 REGISTRY 的静态变量。如果你还记得,Registry 是我们在 Poll 实例中向事件注册兴趣的方式。在发出实际的 HTTP GET 请求时,我们希望在 TcpStream 上注册对事件的兴趣。我们可以让 Http::get 接受一个注册表结构,它存储以供以后使用,但我们希望保持 API 的干净,相反,我们希望在 HttpGetFuture 中访问注册表,而不必将其作为引用传递

  1. use crate::future::{Future, PollState};
  2. use mio::{Events, Poll, Registry};
  3. use std::sync::OnceLock;
  4. static REGISTRY: OnceLock<Registry> = OnceLock::new();
  5. pub fn registry() -> &'static Registry {
  6. REGISTRY.get().expect("Called outside a runtime context")
  7. }

我们使用 std::sync::OnceLock,这样就可以在运行时启动时初始化注册表,从而防止任何人(包括我们自己)在没有运行运行时实例的情况下调用 Http::get。如果我们在没有初始化运行时的情况下调用 Http::get,它将出现错误,因为在运行时模块之外访问它的唯一公开方法是通过 pub fn registry(){…} 函数,并且该调用将失败

我们也可以使用 thread_local! 宏,但在本章后面扩展示例时,需要从多线程访问它,所以在设计时要考虑到这一点

  1. use crate::future::{Future, PollState};
  2. use mio::{Events, Poll, Registry};
  3. use std::sync::OnceLock;
  4. static REGISTRY: OnceLock<Registry> = OnceLock::new();
  5. pub fn registry() -> &'static Registry {
  6. REGISTRY.get().expect("Called outside a runtime context")
  7. }
  8. pub struct Runtime {
  9. poll: Poll,
  10. }

现在,运行时只存储一个 Poll 实例。有趣的部分是 Runtime 的实现。由于它不太长,我将在这里介绍整个实现并在接下来进行解释

  1. use crate::future::{Future, PollState};
  2. use mio::{Events, Poll, Registry};
  3. use std::sync::OnceLock;
  4. static REGISTRY: OnceLock<Registry> = OnceLock::new();
  5. pub fn registry() -> &'static Registry {
  6. REGISTRY.get().expect("Called outside a runtime context")
  7. }
  8. pub struct Runtime {
  9. poll: Poll,
  10. }
  11. impl Runtime {
  12. pub fn new() -> Self {
  13. let poll = Poll::new().unwrap();
  14. let registry = poll.registry().try_clone().unwrap();
  15. REGISTRY.set(registry).unwrap();
  16. Self { poll }
  17. }
  18. pub fn block_on<F>(&mut self, future: F)
  19. where
  20. F: Future<Output = String>,
  21. {
  22. let mut future = future;
  23. loop {
  24. match future.poll() {
  25. PollState::NotReady => {
  26. println!("Schedule other tasks\n");
  27. let mut events = Events::with_capacity(100);
  28. self.poll.poll(&mut events, None).unwrap();
  29. },
  30. PollState::Ready(_) => break,
  31. }
  32. }
  33. }
  34. }

我们要做的第一件事是创建一个 new 函数。这将初始化我们的运行时并设置我们需要的一切。我们创建一个新的 Poll 实例,并从 Poll 实例中得到一个所有权的 Registry 版本。在这里,我们利用了将两个部分分开的能力。我们将 Registry 存储在 Registry 全局变量中,以便以后可以从 http 模块访问它,而无需引用运行时本身。下一个函数是 block_on 函数。我会一步一步来

  1. 首先,该函数接受一个泛型参数,并将阻塞任何以字符串输出类型实现我们 Future trait 的东西(请记住,这是目前我们支持的唯一 Future trait,因此如果没有数据可以返回,我们将返回一个空字符串)
  2. 不必将 mut future 作为参数,而是在函数体中定义一个声明为 mut 的变量。这只是为了保持 API 稍微干净一点,避免我们以后不得不进行一些小的更改
  3. 接下来,我们创建一个循环。我们将循环,直到我们收到的顶级 future 就绪。如果 future 返回了 NotReady,我们写一条消息让我们知道此时我们可以做其他事情,例如处理与 future 无关的事情,或者更有可能的是,如果我们的运行时支持多个顶级 future,则轮询另一个顶级 future

注意,我们需要向 mioPoll:: Poll 方法传递一个事件集合,但由于只有一个顶层 future 要运行,我们并不真正关心发生了哪个事件:我们只关心发生了什么,并且很可能意味着数据已经准备好了(记住无论如何,我们总是必须考虑错误唤醒)

我们需要做的最后一件事是在我们的 http 模块中向服务器写入请求后,为读取事件注册兴趣

  1. use crate::{future::PollState, runtime, Future};
  2. use mio::{Interest, Token};
  3. use std::io::{ErrorKind, Read, Write};

我们需要添加对运行时模块的依赖,以及来自 mio 的一些类型

  1. use crate::{future::PollState, runtime, Future};
  2. use mio::{Interest, Token};
  3. use std::io::{ErrorKind, Read, Write};
  4. impl Future for HttpGetFuture {
  5. type Output = String;
  6. fn poll(&mut self) -> PollState<Self::Output> {
  7. println!("FIRST POLL - START OPERATION");
  8. self.write_request();
  9. runtime::registry().register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE).unwrap();
  10. }
  11. let mut buff = vec![0u8; 4096];
  12. loop {
  13. match self.stream.as_mut().unwrap().read(&mut buff) {
  14. Ok(0) => {
  15. let s = String::from_utf8_lossy(&self.buffer);
  16. break PollState::Ready(s.to_string());
  17. },
  18. Ok(n) => {
  19. self.buffer.extend(&buff[0..n]);
  20. continue;
  21. },
  22. Err(e) if e.kind() == ErrorKind::WouldBlock => {
  23. break PollState::NotReady;
  24. },
  25. Err(e) => panic!("{e:?}"),
  26. }
  27. }
  28. }

在第一次轮询中,在我们写完请求之后,我们对这个 TcpStream 上的可读事件注册兴趣。我们还删除了这行代码

  1. return PollState::NotReady;

通过删除他的行,我们将立即轮询 TcpStream,这是有意义的,因为如果我们立即得到响应,我们并不是真的想将控制权返回给调度器。在这里,无论哪种方式您都不会出错,因为我们将 TcpStream 注册为我们的反应堆的事件源,并且在任何情况下都会得到唤醒。这些更改是让我们的示例重新启动和运行所需的最后一步

创建合适的运行时

因此,如果我们将运行时不同部分之间的依赖程度可视化,我们当前的设计可以这样描述

Runtimes/Wakers/Reactor-Executor 模型 - 图5

如果我们想要在 reactor 和 executor 之间实现松耦合,我们需要提供一个接口来通知 executor,当一个允许 future 进行的事件发生时,它应该被唤醒。在 Rust 的标准库中,这种类型被称为 Waker (https://doc.rust-lang.org/stable/std/task/struct.Waker.html)),这不是巧合。如果我们改变我们的可视化来反映这一点,它看起来会像这样

Runtimes/Wakers/Reactor-Executor 模型 - 图6

我们采用了与 Rust 现在相同的设计并不是巧合。从 Rust 的角度来看,这是一个最小的设计,但它允许各种各样的运行时设计,而不会为 future 设置太多限制。Rust 有一个工作组,其任务是在标准库中包含广泛使用的 traits 和接口,您可以在这里找到更多信息:https://rust-lang.github.io/wg-async/welcome.html。您还可以在这里获得他们工作项目的概述并跟踪他们的进展:https://github.com/orgs/rust-lang/projects/28/views/1。在阅读了本书之后,你甚至可能想参与进来(https://rust-lang.github.io/wg-async/welcome.html#-getting-involved)),让 async Rust 更好地为每个人服务。如果我们更改系统图来反映我们需要对运行时进行的更改,它将如下所示

Runtimes/Wakers/Reactor-Executor 模型 - 图7

我们有两个相互之间没有直接依赖关系的部分。我们有一个执行器(Executor)来调度任务,并在轮询一个 future 对象时传递一个唤醒器(Waker),该 future 对象最终将被反应堆捕获并存储。当反应器收到事件就绪的通知时,它会定位与该任务相关联的唤醒器,并对其调用Wake::wake。这使我们能够

  • 运行几个 OS 线程,每个线程都有自己的执行器,但共享同一个反应器
  • 拥有多个反应器来处理不同类型的 leaf future,并确保在它可以继续运行时唤醒正确的 executor

现在,我们已经知道该做什么了,是时候开始用代码编写它了

Step 1 – 通过添加一个 Reactor 和一个 Waker 来改进我们的运行时设计

在这一步中,我们将做以下更改

  1. 更改项目结构,使其反映我们的新设计
  2. 找到一种不直接依赖轮询的方法让执行器进程休眠和唤醒,并基于此创建一个唤醒器,让我们能够唤醒执行器进程并识别哪些任务准备好继续执行
  3. 修改 Future 的特征定义,让 poll 接受 &Waker 作为参数

Runtimes/Wakers/Reactor-Executor 模型 - 图8

让我们首先对我们的项目结构进行一些更改,以便我们可以在此基础上继续开发。我们要做的第一件事是将运行时模块划分为两个子模块, reactor 和 executor

  1. mod executor;
  2. mod reactor;
  1. src
  2. |-- runtime
  3. |-- executor.rs
  4. |-- reactor.rs
  5. |-- future.rs
  6. |-- http.rs
  7. |-- main.rs
  8. |-- runtime.rs
  1. pub use executor::{spawn, Executor, Waker};
  2. pub use reactor::reactor;
  3. mod executor;
  4. mod reactor;
  5. pub fn init() -> Executor {
  6. reactor::start();
  7. Executor::new()
  8. }

Creating a Waker

我们需要找到一种不直接依赖轮询的方式来睡眠和唤醒执行器。事实证明这很简单。标准库为我们提供了工作所需的东西。通过调用std::thread::current(),我们可以得到一个 thread 对象。这个对象是当前线程的句柄,它让我们可以访问一些方法,其中之一是 unpark。标准库也提供了一个名为 std::thread::park() 的方法,该方法会让操作系统的调度器把线程停在那里,直到我们要求释放它为止。事实证明,如果我们把这些结合起来,我们就有了一种 park and unpark 执行器的方法,这正是我们所需要的。让我们基于此创建一个唤醒类型。在我们的示例中,我们将在执行器模块中定义 Waker,因为我们在那里创建了这种确切类型的 Waker,但你可以认为它属于 future 模块,因为它是 future trait 的一部分

Runtimes/Wakers/Reactor-Executor 模型 - 图9

我们不会将 Waker 实现为 trait,因为传递 trait 对象会显著增加我们示例的复杂性,而且它也不符合 Rust 中 Future 和 Waker 的当前设计

  1. use crate::future::{Future, PollState};
  2. use std::{
  3. cell::{Cell, RefCell},
  4. collections::HashMap,
  5. sync::{Arc, Mutex},
  6. thread::{self, Thread},
  7. };
  8. #[derive(Clone)]
  9. pub struct Waker {
  10. thread: Thread,
  11. id: usize,
  12. ready_queue: Arc<Mutex<Vec<usize>>>,
  13. }
  • thread – 我们前面提到的 Thread 对象的句柄
  • id – 一个 usize 属性,用于标识该唤醒器关联的任务
  • ready_queue – 这是一个可以在线程之间共享的对 Vec 的引用,其中 usize 表示就绪队列中任务的 ID。我们与 executor 共享这个对象,这样当队列就绪时,我们就可以将与唤醒器关联的任务 ID 推送到队列中
  1. use crate::future::{Future, PollState};
  2. use std::{
  3. cell::{Cell, RefCell},
  4. collections::HashMap,
  5. sync::{Arc, Mutex},
  6. thread::{self, Thread},
  7. };
  8. #[derive(Clone)]
  9. pub struct Waker {
  10. thread: Thread,
  11. id: usize,
  12. ready_queue: Arc<Mutex<Vec<usize>>>,
  13. }
  14. impl Waker {
  15. pub fn wake(&self) {
  16. self.ready_queue
  17. .lock()
  18. .map(|mut q| q.push(self.id))
  19. .unwrap();
  20. self.thread.unpark();
  21. }
  22. }

当调用 Waker::wake 时,我们首先对互斥量进行锁,以保护我们与执行器共享的就绪队列。然后,我们将标识该唤醒器关联的任务的 id 值推送到就绪队列。完成之后,我们在执行器线程上调用 unpark 并唤醒它。它现在会在就绪队列中找到与该唤醒器相关的任务,并对其进行轮询

Runtimes/Wakers/Reactor-Executor 模型 - 图10

Changing the Future definition

  1. use crate::runtime::Waker;
  2. pub trait Future {
  3. type Output;
  4. fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>;
  5. }

现在,你有一个选择。我们将不会使用 join_all 函数或 JoinAll 结构体。如果你不想保留它们,可以删除与 join_all 相关的所有内容,这就是 future.rs 中需要做的全部工作。如果你想保留它们进行进一步的实验,你需要更改 JoinAll 的 Future 实现,使其接受一个waker: &Waker 参数,并记住在 match fut.poll(waker) 中轮询已连接的 Future 时传递 waker 参数。在步骤1中要做的其余事情是进行一些小的更改,我们在其中实现 Future trait

  1. use crate::{future::PollState, runtime::{self, reactor, Waker}, Future};
  2. use mio::Interest;
  3. use std::io::{ErrorKind, Read, Write};
  4. impl Future for HttpGetFuture {
  5. type Output = String;
  6. fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
  7. ...
  8. }
  9. }

最后一个需要修改的文件是 main.rs。由于 corofy 不知道 Waker 的类型,我们需要在 main.rs 中为我们生成的协程中更改几行

  1. use runtime::Waker;

在 Coroutine block 的 impl Future 中,更改我突出显示的以下三行代码

  1. fn poll(&mut self, waker: &Waker)
  2. match f1.poll(waker)
  3. match f2.poll(waker)

Step 2 – 实现一个合适的执行器

在这一步中,我们将创建一个执行器

  • 持有许多顶层 future 并在它们之间转换
  • 使我们能够从异步程序中的任何位置派生新的顶层 future
  • 当没有事情做的时候提交 Waker 以至于可以 sleep 并且可以唤醒顶层 future 让它们执行
  • 让每个执行器进程运行在各自的 OS 线程上,从而可以同时运行多个执行器进程

Runtimes/Wakers/Reactor-Executor 模型 - 图11

  1. type Task = Box<dyn Future<Output = String>>;
  2. thread_local! {
  3. static CURRENT_EXEC: ExecutorCore = ExecutorCore::default();
  4. }

thread_local ! 宏允许我们定义一个静态变量,该变量对于第一次调用它的线程是唯一的。这意味着我们创建的所有线程都有自己的实例,并且一个线程不可能访问另一个线程的 CURRENT_EXEC 变量。我们调用变量 CURRENT_EXEC,因为它持有当前线程上运行的执行器

  1. type Task = Box<dyn Future<Output = String>>;
  2. thread_local! {
  3. static CURRENT_EXEC: ExecutorCore = ExecutorCore::default();
  4. }
  5. #[derive(Default)]
  6. struct ExecutorCore {
  7. tasks: RefCell<HashMap<usize, Task>>,
  8. ready_queue: Arc<Mutex<Vec<usize>>>,
  9. next_id: Cell<usize>,
  10. }

ExecutorCore holds all the state for our Executor

  • tasks:这是一个 HashMap,键为 usize,数据为 Task(还记得我们之前创建的别名吗)。这将保存与该线程上的执行器关联的所有 top-level futures,并允许我们为每个 future 指定一个 id 属性来标识它们。我们不能简单地改变一个静态变量,所以我们需要内部可变性。因为这只能从一个线程调用,所以 RefCell 将执行此操作,因为不需要同步
  • ready_queue:这是一个简单的 Vec,它存储了执行器要轮询的任务的id。回头看一看图8 - 7,你就会明白这是如何融入我们在其中概述的设计的。如前所述,我们可以在这里存储诸如 Arc> 之类的东西,但这给我们的示例增加了相当多的复杂性。当前设计唯一的缺点是不能直接获取任务的引用,我们必须在tasks集合中查找它,这需要时间。对这个集合的一个 Arc<…> (共享引用)会被传递给执行器创建的每个唤醒器。由于唤醒器可以(也将会)被发送到不同的线程,并且通过将任务的ID添加到 ready_queue 来表明特定的任务已经准备好了,我们需要将它包装在一个 Arc> 中
  • next_id:这是一个计数器,它会给出下一个可用的 I,这意味着它不应该为这个执行器实例两次给出相同的 ID。我们将使用它给每个top-level future 一个唯一的 ID。由于执行器实例只能在创建它的同一个线程上访问,一个简单的 Cell 就足以满足我们所需的内部可变性

ExecutorCore 继承了 Default trait,因为这里没有我们需要的特殊初始状态,它使代码保持简洁。下一个函数很重要。spawn 函数允许我们在程序的任何地方向执行器注册新的 top-level futures

  1. type Task = Box<dyn Future<Output = String>>;
  2. thread_local! {
  3. static CURRENT_EXEC: ExecutorCore = ExecutorCore::default();
  4. }
  5. #[derive(Default)]
  6. struct ExecutorCore {
  7. tasks: RefCell<HashMap<usize, Task>>,
  8. ready_queue: Arc<Mutex<Vec<usize>>>,
  9. next_id: Cell<usize>,
  10. }
  11. pub fn spawn<F>(future: F)
  12. where
  13. F: Future<Output = String> + 'static,
  14. {
  15. CURRENT_EXEC.with(|e| {
  16. let id = e.next_id.get();
  17. e.tasks.borrow_mut().insert(id, Box::new(future));
  18. e.ready_queue.lock().map(|mut q| q.push(id)).unwrap();
  19. e.next_id.set(id + 1);
  20. });
  21. }

Runtimes/Wakers/Reactor-Executor 模型 - 图12

  1. type Task = Box<dyn Future<Output = String>>;
  2. thread_local! {
  3. static CURRENT_EXEC: ExecutorCore = ExecutorCore::default();
  4. }
  5. #[derive(Default)]
  6. struct ExecutorCore {
  7. tasks: RefCell<HashMap<usize, Task>>,
  8. ready_queue: Arc<Mutex<Vec<usize>>>,
  9. next_id: Cell<usize>,
  10. }
  11. pub fn spawn<F>(future: F)
  12. where
  13. F: Future<Output = String> + 'static,
  14. {
  15. CURRENT_EXEC.with(|e| {
  16. let id = e.next_id.get();
  17. e.tasks.borrow_mut().insert(id, Box::new(future));
  18. e.ready_queue.lock().map(|mut q| q.push(id)).unwrap();
  19. e.next_id.set(id + 1);
  20. });
  21. }
  22. pub struct Executor;
  23. impl Executor {
  24. pub fn new() -> Self {
  25. Self {}
  26. }
  27. fn pop_ready(&self) -> Option<usize> {
  28. CURRENT_EXEC.with(|q| q.ready_queue.lock().map(|mut q| q.pop()).unwrap())
  29. }
  30. fn get_future(&self, id: usize) -> Option<Task> {
  31. CURRENT_EXEC.with(|q| q.tasks.borrow_mut().remove(&id))
  32. }
  33. fn get_waker(&self, id: usize) -> Waker {
  34. Waker {
  35. id,
  36. thread: thread::current(),
  37. ready_queue: CURRENT_EXEC.with(|q| q.ready_queue.clone()),
  38. }
  39. }
  40. fn insert_task(&self, id: usize, task: Task) {
  41. CURRENT_EXEC.with(|q| q.tasks.borrow_mut().insert(id, task));
  42. }
  43. fn task_count(&self) -> usize {
  44. CURRENT_EXEC.with(|q| q.tasks.borrow().len())
  45. }
  46. }

执行器实现的最后一部分是 block_on 函数。这也是我们关闭 impl Executor block 的地方

  1. type Task = Box<dyn Future<Output = String>>;
  2. thread_local! {
  3. static CURRENT_EXEC: ExecutorCore = ExecutorCore::default();
  4. }
  5. #[derive(Default)]
  6. struct ExecutorCore {
  7. tasks: RefCell<HashMap<usize, Task>>,
  8. ready_queue: Arc<Mutex<Vec<usize>>>,
  9. next_id: Cell<usize>,
  10. }
  11. pub fn spawn<F>(future: F)
  12. where
  13. F: Future<Output = String> + 'static,
  14. {
  15. CURRENT_EXEC.with(|e| {
  16. let id = e.next_id.get();
  17. e.tasks.borrow_mut().insert(id, Box::new(future));
  18. e.ready_queue.lock().map(|mut q| q.push(id)).unwrap();
  19. e.next_id.set(id + 1);
  20. });
  21. }
  22. pub struct Executor;
  23. impl Executor {
  24. pub fn new() -> Self {
  25. Self {}
  26. }
  27. fn pop_ready(&self) -> Option<usize> {
  28. CURRENT_EXEC.with(|q| q.ready_queue.lock().map(|mut q| q.pop()).unwrap())
  29. }
  30. fn get_future(&self, id: usize) -> Option<Task> {
  31. CURRENT_EXEC.with(|q| q.tasks.borrow_mut().remove(&id))
  32. }
  33. fn get_waker(&self, id: usize) -> Waker {
  34. Waker {
  35. id,
  36. thread: thread::current(),
  37. ready_queue: CURRENT_EXEC.with(|q| q.ready_queue.clone()),
  38. }
  39. }
  40. fn insert_task(&self, id: usize, task: Task) {
  41. CURRENT_EXEC.with(|q| q.tasks.borrow_mut().insert(id, task));
  42. }
  43. fn task_count(&self) -> usize {
  44. CURRENT_EXEC.with(|q| q.tasks.borrow().len())
  45. }
  46. pub fn block_on<F>(&mut self, future: F)
  47. where
  48. F: Future<Output = String> + 'static,
  49. {
  50. spawn(future);
  51. loop {
  52. while let Some(id) = self.pop_ready() {
  53. let mut future = match self.get_future(id) {
  54. Some(f) => f,
  55. // guard against false wakeups
  56. None => continue,
  57. };
  58. let waker = self.get_waker(id);
  59. match future.poll(&waker) {
  60. PollState::NotReady => self.insert_task(id, future),
  61. PollState::Ready(_) => continue,
  62. }
  63. }
  64. let task_count = self.task_count();
  65. let name = thread::current().name().unwrap_or_default().to_string();
  66. if task_count > 0 {
  67. println!("{name}: {task_count} pending tasks. Sleep until notified.");
  68. thread::park();
  69. } else {
  70. println!("{name}: All tasks are finished");
  71. break;
  72. }
  73. }
  74. }
  75. }

block_on 是执行器的入口。通常,你会先传入一个 top-level future,当 top-level future 执行时,它会在执行器上生成新的 top-level future。当然,每个新的 future 也可以衍生(spawn)出新的 future 到执行器进程中,这就是异步程序的基本工作方式。在许多方面,你可以像在普通 Rust 程序中查看 main 函数一样查看这个第一个 top-level future。spawn 与 thread::spawn 类似,只是在这个例子中,任务保持在同一个操作系统线程中。这意味着任务将无法并行运行,这反过来允许我们避免任何任务之间同步的需要,以避免数据竞争。让我们一步一步来看这个函数

  1. 我们做的第一件事就是在自己身上 spawn 出我们所接受的 future。有很多方法可以实现这一点,但这是最简单的方法
  2. 然后,只要异步程序在运行,循环就会一直运行
  3. 每次循环时,我们都创建一个内部的 while let Some(…) 循环,只要 ready_queue 中有任务,这个循环就会运行
  4. 如果 ready_queue 中有任务,则将 Future 对象从集合中删除,从而获得该对象的所有权。如果没有 Future,我们通过继续进行来防止错误的唤醒(这意味着我们已经完成了它,但仍然得到唤醒)。例如,这将发生在 Windows 上,因为当连接关闭时我们会得到一个可读事件,但即使我们可以过滤这些事件,mio 也不保证不会发生错误唤醒,所以我们无论如何都必须处理这种可能性
  5. 接下来,创建一个新的唤醒器实例,把它传给 Future::poll()。请记住,此唤醒实例现在持有标识此特定 Future 特征的 id 属性和我们当前正在运行的线程的句柄
  6. 下一步是调用 Future::poll
  7. 如果返回值不是 ready,就把任务插入到 tasks 集合中。我想强调的是,当 Future trait 返回 NotReady 时,我们知道它会对其进行安排,以便在稍后的时间点调用 Waker::wake
  8. 如果 Future trait 返回 Ready,我们只需继续处理 Ready 队列中的下一项。由于我们获得了 Future trait 的所有权,这将在我们进入 while let 循环的下一次迭代之前删除对象
  9. 现在我们已经轮询了就绪队列中的所有任务,我们要做的第一件事是获取任务计数,看看还剩下多少个任务
  10. 我们还获取了当前线程的名称,以便将来记录日志(这与我们的执行器如何工作无关)
  11. 如果任务数大于 0,我们向终端打印一条消息,并调用 thread::park()。停止线程将把控制权交给操作系统的调度器,我们的执行器不会做任何事情,直到它被再次唤醒
  12. 如果任务计数为 0,则异步程序结束,退出主循环

Step 3 – 实现一个合适的反应堆

  • Efficiently wait and handle events that our runtime is interested in
  • Store a collection of Waker types and make sure to wake the correct Waker when it gets a notification on a source it’s tracking
  • Provide the necessary mechanisms for leaf futures such as HttpGetFuture, to register and deregister interests in events
  • Provide a way for leaf futures to store the last received Waker
  1. use crate::runtime::Waker;
  2. use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
  3. use std::{
  4. collections::HashMap,
  5. sync::{
  6. atomic::{AtomicUsize, Ordering},
  7. Arc, Mutex, OnceLock,
  8. },
  9. thread,
  10. }
  11. type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
  12. static REACTOR: OnceLock<Reactor> = OnceLock::new();

该变量将保存一个 OnceLock。与静态变量 CURRENT_EXEC 不同的是,这可以从不同的线程访问。OnceLock 允许我们定义一个可以写入一次的静态变量,以便在启动反应器时对其进行初始化。通过这样做,我们也可以确保在我们的程序中只能有一个特定的反应器实例在运行

  1. use crate::runtime::Waker;
  2. use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
  3. use std::{
  4. collections::HashMap,
  5. sync::{
  6. atomic::{AtomicUsize, Ordering},
  7. Arc, Mutex, OnceLock,
  8. },
  9. thread,
  10. }
  11. type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
  12. static REACTOR: OnceLock<Reactor> = OnceLock::new();
  13. pub fn reactor() -> &'static Reactor {
  14. REACTOR.get().expect("Called outside an runtime context")
  15. }
  16. pub struct Reactor {
  17. wakers: Wakers,
  18. registry: Registry,
  19. next_id: AtomicUsize,
  20. }
  • wakers – 一个 Waker 对象的 HashMap,每个对象由一个整数标识
  • registry – 保存一个注册表实例,以便我们可以与 mio 中的事件队列进行交互
  • next_id – 存储下一个可用的 ID,以便我们可以跟踪哪个事件发生了,哪个唤醒器应该被唤醒
  1. use crate::runtime::Waker;
  2. use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
  3. use std::{
  4. collections::HashMap,
  5. sync::{
  6. atomic::{AtomicUsize, Ordering},
  7. Arc, Mutex, OnceLock,
  8. },
  9. thread,
  10. }
  11. type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
  12. static REACTOR: OnceLock<Reactor> = OnceLock::new();
  13. pub fn reactor() -> &'static Reactor {
  14. REACTOR.get().expect("Called outside an runtime context")
  15. }
  16. pub struct Reactor {
  17. wakers: Wakers,
  18. registry: Registry,
  19. next_id: AtomicUsize,
  20. }
  21. impl Reactor {
  22. pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
  23. self.registry.register(stream, Token(id), interest).unwrap();
  24. }
  25. pub fn set_waker(&self, waker: &Waker, id: usize) {
  26. let _ = self
  27. .wakers
  28. .lock()
  29. .map(|mut w| w.insert(id, waker.clone()).is_none())
  30. .unwrap();
  31. }
  32. pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
  33. self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
  34. self.registry.deregister(stream).unwrap();
  35. }
  36. pub fn next_id(&self) -> usize {
  37. self.next_id.fetch_add(1, Ordering::Relaxed)
  38. }
  39. }

Runtimes/Wakers/Reactor-Executor 模型 - 图13

  1. use crate::runtime::Waker;
  2. use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
  3. use std::{
  4. collections::HashMap,
  5. sync::{
  6. atomic::{AtomicUsize, Ordering},
  7. Arc, Mutex, OnceLock,
  8. },
  9. thread,
  10. }
  11. type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
  12. static REACTOR: OnceLock<Reactor> = OnceLock::new();
  13. pub fn reactor() -> &'static Reactor {
  14. REACTOR.get().expect("Called outside an runtime context")
  15. }
  16. pub struct Reactor {
  17. wakers: Wakers,
  18. registry: Registry,
  19. next_id: AtomicUsize,
  20. }
  21. impl Reactor {
  22. pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
  23. self.registry.register(stream, Token(id), interest).unwrap();
  24. }
  25. pub fn set_waker(&self, waker: &Waker, id: usize) {
  26. let _ = self
  27. .wakers
  28. .lock()
  29. .map(|mut w| w.insert(id, waker.clone()).is_none())
  30. .unwrap();
  31. }
  32. pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
  33. self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
  34. self.registry.deregister(stream).unwrap();
  35. }
  36. pub fn next_id(&self) -> usize {
  37. self.next_id.fetch_add(1, Ordering::Relaxed)
  38. }
  39. }
  40. fn event_loop(mut poll: Poll, wakers: Wakers) {
  41. let mut events = Events::with_capacity(100);
  42. loop {
  43. poll.poll(&mut events, None).unwrap();
  44. for e in events.iter() {
  45. let Token(id) = e.token();
  46. let wakers = wakers.lock().unwrap();
  47. if let Some(waker) = wakers.get(&id) {
  48. waker.wake();
  49. }
  50. }
  51. }
  52. }

Runtimes/Wakers/Reactor-Executor 模型 - 图14

  1. pub fn start() {
  2. use thread::spawn;
  3. let wakers = Arc::new(Mutex::new(HashMap::new()));
  4. let poll = Poll::new().unwrap();
  5. let registry = poll.registry().try_clone().unwrap();
  6. let next_id = AtomicUsize::new(1);
  7. let reactor = Reactor {
  8. wakers: wakers.clone(),
  9. registry,
  10. next_id,
  11. };
  12. REACTOR.set(reactor).ok().expect("Reactor already running");
  13. spawn(move || event_loop(poll, wakers));
  14. }
  1. struct HttpGetFuture {
  2. stream: Option<mio::net::TcpStream>,
  3. buffer: Vec<u8>,
  4. path: String,
  5. id: usize,
  6. }
  7. impl HttpGetFuture {
  8. fn new(path: String) -> Self {
  9. let id = reactor().next_id();
  10. Self {
  11. stream: None,
  12. buffer: vec![],
  13. path,
  14. id,
  15. }
  16. }
  17. }

我们需要做的第一件事是确保我们在 Poll 实例中注册 interest,并在 future 第一次被轮询时在 Reactor 中注册我们接收到的 Waker。由于我们不再直接向 Registry 注册,我们删除了这行代码,并添加了这些新行

  1. if self.stream.is_none() {
  2. println!("FIRST POLL - START OPERATION");
  3. self.write_request();
  4. let stream = self.stream.as_mut().unwrap();
  5. runtime::reactor().register(stream, Interest::READABLE, self.id);
  6. runtime::reactor().set_waker(waker, self.id);
  7. }
  1. match self.stream.as_mut().unwrap().read(&mut buff) {
  2. Ok(0) => {
  3. let s = String::from_utf8_lossy(&self.buffer);
  4. runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
  5. break PollState::Ready(s.to_string());
  6. },
  7. Ok(n) => {
  8. self.buffer.extend(&buff[0..n]);
  9. continue;
  10. },
  11. Err(e) if e.kind() == ErrorKind::WouldBlock => {
  12. runtime::reactor().set_waker(waker, self.id);
  13. break PollState::NotReady;
  14. },
  15. Err(e) => panic!("{e:?}"),
  16. }

让我们回到 main.rs 并更改 main 函数,以便我们在新的运行时中正确运行程序

  1. mod future;
  2. mod http;
  3. mod runtime;
  4. use future::{Future, PollState};
  5. use runtime::Waker;
  6. fn main() {
  7. let mut executor = runtime::init();
  8. executor.block_on(async_main());
  9. }
  10. // cargo run

然而,我们还没有真正使用运行时的任何新功能,所以在结束本章之前,让我们来玩一些有趣的东西,看看它能做什么

尝试使用我们的新运行时

An example using concurrency

  1. fn main() {
  2. let mut executor = runtime::init();
  3. executor.block_on(async_main());
  4. }
  5. coro fn request(i: usize) {
  6. let path = format!("/{}/HelloWorld{i}", i * 1000);
  7. let txt = Http::get(&path).wait;
  8. println!("{txt}");
  9. }
  10. coro fn async_main() {
  11. println!("Program starting");
  12. for i in 0..5 {
  13. let future = request(i);
  14. runtime::spawn(future);
  15. }
  16. }

Runtimes/Wakers/Reactor-Executor 模型 - 图15

同时并行地运行多个 future

这一次,我们生成了多个线程,并为每个线程提供了自己的执行器,以便我们可以使用相同的反应器为所有执行器实例同时并行运行前面的例子。我们还将对打印输出做一个小调整,以免被数据淹没

  1. mod future;
  2. mod http;
  3. mod runtime;
  4. use crate::http::Http;
  5. use future::{Future, PollState};
  6. use runtime::{Executor, Waker};
  7. use std::thread::Builder;
  8. fn main() {
  9. let mut executor = runtime::init();
  10. let mut handles = vec![];
  11. for i in 1..12 {
  12. let name = format!("exec-{i}");
  13. let h = Builder::new().name(name).spawn(move || {
  14. let mut executor = Executor::new();
  15. executor.block_on(async_main());
  16. }).unwrap();
  17. handles.push(h);
  18. }
  19. executor.block_on(async_main());
  20. handles.into_iter().for_each(|h| h.join().unwrap());
  21. }
  22. coroutine fn request(i: usize) {
  23. let path = format!("/{}/HelloWorld{i}", i * 1000);
  24. let txt = Http::get(&path).wait;
  25. let txt = txt.lines().last().unwrap_or_default();
  26. println!(«{txt}»);
  27. }
  28. coroutine fn async_main() {
  29. println!("Program starting");
  30. for i in 0..5 {
  31. let future = request(i);
  32. runtime::spawn(future);
  33. }
  34. }

我当前运行的机器有12个内核,所以当我创建11个新线程来运行相同的异步任务时,我将使用我机器上的所有内核。你会注意到,我们还为每个线程提供了一个唯一的名称,在记录日志时使用,以便更容易跟踪幕后发生的事情

Runtimes/Wakers/Reactor-Executor 模型 - 图16

总结

  1. 在本章中,你学习了很多关于运行时的知识,以及 Rust 为什么要这样设计 Future trait 和 wake。你还了解了反应器和执行器进程、唤醒器类型、future 特性,以及通过 join_all 函数实现并发以及在执行器进程上生成新的 top-level futures 的不同方式。到目前为止,你也已经了解了如何通过结合我们自己的运行时和操作系统线程来实现并发和并行
  2. 现在,我们已经创建了我们自己的异步宇宙,包括 coro/wait,我们自己的 Future trait,我们自己的唤醒器定义和我们自己的运行时。我确保了我们不会偏离 Rust 中异步编程背后的核心思想,以便一切都直接适用于 async/await、Future traits、Waker 类型和日常编程中的运行时。到目前为止,我们已经到了本书的最后一部分
  3. 最后一章将把我们的例子转换为使用真正的 Future trait、Waker、async/await 等,而不是我们自己的版本。在那一章中,我们还将留出一些篇幅来讨论异步 Rust 的现状,包括一些最流行的运行时,但在此之前,我想再介绍一个主题:pinning。钉扎(pinning)的概念似乎是最难理解且与所有其他语言最不同的主题之一。在编写异步 Rust 时,您将在某些时候必须处理这样一个事实,即 Rust 中 Future traits 必须在轮询之前固定