已经简要了解了 Rust 的异步模型,Rust 的 future 是基于无堆栈协程的异步模型的一个例子。接下来,我们将了解它的真正含义以及它与堆栈协程(fiber/绿色线程)的区别。我们将以一个基于 futureasync/await 简化模型的示例为中心,看看如何使用它来创建可挂起和可恢复的任务,就像我们在创建自己的 fiber 时所做的那样。好消息是,这比实现我们自己的 fibers/green threads 容易得多

  • Introduction to stackless coroutines
  • An example of hand-written coroutines
  • async/await

Introduction to stackless coroutines

无堆栈协程是一种表示可以被中断和恢复的任务的方式。如果你还记得之前提到的,如果希望任务并发执行(同时进行,但不一定是并行执行),就需要能够暂停恢复任务。在最简单的形式中,协程只是一个任务,它可以通过将控制权交给(yield control)其调用者(caller)另一个协程(coroutine)或调度器(scheduler)来停止和恢复。许多语言都有协程实现,它也提供了一个运行时来处理调度和非阻塞 I/O,但区分协程和创建异步系统所涉及的其他机制是很有帮助的。在 Rust 中尤其如此,因为 Rust 没有运行时,只提供了创建具有语言原生支持的协程所需的基础设施。Rust 确保使用 Rust 编程的每个人都对可以**暂停和恢复的任务使用相同的抽象,但它为程序员留下了启动和运行异步系统**的所有其他细节

Fibers/green threads 表示这种可恢复任务的方式与操作系统非常相似。任务有一个堆栈,用于存储/恢复其当前执行状态,使暂停和恢复任务成为可能。状态机最简单的形式是一种数据结构,它可以处于一组预先确定的状态中。在协程的情况下,每个状态表示一个可能的暂停/恢复点。我们没有将暂停/恢复任务所需的状态存储在单独的堆栈中,我们将它保存在一个数据结构中。这有一些优点,我之前已经提到过,但最突出的是它们非常高效和灵活。这样做的缺点是,你永远不想手动编写这些状态机(因为麻烦),所以你需要编译器的某种支持,或者其他机制来重写状态机代码,而不是普通的函数调用

An example of hand-written coroutines

我们接下来要使用的示例是 Rust 异步模型的简化版本。我们将创建并实现以下内容

  • Our own simplified Future trait
  • A simple HTTP client that can only make GET requests
  • A task we can pause and resume implemented as a state machine
  • Our own simplified async/await syntax called coroutine/wait
  • 一个自制的预处理器(呼应了上述需要编译器某种支持的论述),以与 async/await 相同的方式将 async/await 函数转换为状态机

因此,为了真正揭开 coroutines、future 和 async/await 的神秘面纱,我们将不得不做出一些妥协。如果我们不这样做,我们最终会在 Rust 中重新实现所有的 async/await 和 future,这对于理解底层技术和概念来说太难了。因此,我们的示例将执行以下操作

  • Avoid error handling. If anything fails, we panic
  • 要具体而不是通用。创建通用的解决方案会引入很多复杂性,并使底层的概念更难推理,因为我们不得不创建额外的抽象级别
  • 限制它的能力。当然,你可以自由地扩展、修改和使用所有示例(我鼓励你这样做),但在示例中,我们只介绍我们需要的内容
  • Avoid macros

Creating coroutines:例子一

  1. mod http;
  2. mod future;
  1. [dependencies]
  2. mio = { version = "0.8", features = ["net", "os-poll"] }
  1. pub trait Future {
  2. type Output;
  3. fn poll(&mut self) -> PollState<Self::Output>;
  4. }

如果我们将它与 Rust 标准库中的 Future trait 进行对比,你会发现它非常相似,除了我们没有将 cx: &mut Context<’_> 作为参数,我们返回一个名称略有不同的枚举,以便区分它们,以便我们不会将它们混淆

  1. pub trait Future {
  2. type Output;
  3. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
  4. }
  1. pub trait Future {
  2. type Output;
  3. fn poll(&mut self) -> PollState<Self::Output>;
  4. }
  5. pub enum PollState<T> {
  6. Ready(T),
  7. NotReady,
  8. }

同样,如果我们将其与 Rust 标准库中的 Poll 枚举进行比较,我们会发现它们几乎是一样的

  1. pub enum Poll<T> {
  2. Ready(T),
  3. Pending,
  4. }
  1. // URL: http://127.0.0.1:8080/1000/HelloWorld
  2. use crate::future::{Future, PollState};
  3. use std::io::{ErrorKind, Read, Write};
  4. fn get_req(path: &str) -> String {
  5. format!(
  6. "GET {path} HTTP/1.1\r\n\
  7. Host: localhost\r\n\
  8. Connection: close\r\n\
  9. \r\n"
  10. )
  11. }
  12. pub struct Http;
  13. impl Http {
  14. pub fn get(path: &str) -> impl Future<Output = String> {
  15. HttpGetFuture::new(path)
  16. }
  17. }
  18. struct HttpGetFuture {
  19. stream: Option<mio::net::TcpStream>,
  20. buffer: Vec<u8>,
  21. path: String,
  22. }
  23. impl HttpGetFuture {
  24. fn new(path: &'static str) -> Self {
  25. Self {
  26. stream: None,
  27. buffer: vec![],
  28. Path: path.to_string(),
  29. }
  30. }
  31. fn write_request(&mut self) {
  32. let stream = std::net::TcpStream::connect("127.0.0.1:8080").unwrap();
  33. stream.set_nonblocking(true).unwrap();
  34. let mut stream = mio::net::TcpStream::from_std(stream);
  35. stream.write_all(get_req(&self.path).as_bytes()).unwrap();
  36. self.stream = Some(stream);
  37. }
  38. }
  39. impl Future for HttpGetFuture {
  40. type Output = String;
  41. fn poll(&mut self) -> PollState<Self::Output> {
  42. if self.stream.is_none() {
  43. println!("FIRST POLL - START OPERATION");
  44. self.write_request();
  45. return PollState::NotReady;
  46. }
  47. let mut buff = vec![0u8; 4096];
  48. loop {
  49. match self.stream.as_mut().unwrap().read(&mut buff) {
  50. Ok(0) => {
  51. let s = String::from_utf8_lossy(&self.buffer);
  52. break PollState::Ready(s.to_string());
  53. }
  54. Ok(n) => {
  55. self.buffer.extend(&buff[0..n]);
  56. continue;
  57. }
  58. Err(e) if e.kind() == ErrorKind::WouldBlock => {
  59. break PollState::NotReady;
  60. }
  61. Err(e) if e.kind() == ErrorKind::Interrupted => {
  62. continue;
  63. }
  64. Err(e) => panic!("{e:?}"),
  65. }
  66. }
  67. }
  68. }
  1. 调用成功返回,读取了0个字节。我们已经从流中读取了所有数据,并收到了整个GET响应。我们从读取的数据中创建了一个字符串,并将其包装在 PollState::Ready 中,然后返回
  2. 调用成功返回,读取 n > 0字节。如果是这样,我们将数据读入缓冲区,并将数据附加到 self 中。缓冲,并立即尝试从流中读取更多数据
  3. 我们会得到一个类似 WouldBlock 的错误。如果是这种情况,我们知道因为我们将流设置为非阻塞,数据还没有准备好,或者有更多数据但我们还没有收到。在这种情况下,我们返回 PollState::NotReady,表示需要更多的轮询调用才能完成操作
  4. 我们得到一个类似中断的错误。这是一个有点特殊的情况,因为读取可能会被信号中断。如果发生了错误,通常的处理方法是尝试再读取一次
  5. We get an error that we can’t handle, and since our example does no error handling, we simply panic!

有一点想指出。我们可以将 要建模的协程 视为一个具有三个状态的非常简单的状态机

  • Not started, indicated by self.stream being None
  • Pending, indicated by self.stream being Some and a read to stream.read returning WouldBlock
  • Resolved, indicated by self.stream being Some and a call to stream.read returning 0 bytes

如您所见,当尝试读取我们的 TcpStream 时,此模型很好地映射到操作系统报告的状态。大多数像这样的 leaf future 将非常简单,尽管我们在这里没有明确表示状态,但它仍然适合我们基于协程的状态机模型

Creating coroutines:例子二

我们将继续从零开始建立我们的知识和理解。我们要做的第一件事是创建一个任务,我们可以通过手动将其建模为状态机来停止和恢复任务。完成这些之后,我们来看看这种对暂停任务进行建模的方式如何使我们能够编写类似于 async/await 的语法,并依靠代码转换来创建这些状态机,而不是手动编写它们。我们将创建一个简单的程序,完成以下任务

  1. 当我们的暂缓任务开始时打印一条消息
  2. 向我们的 delayserver(模拟异步系统中发送网络延迟 IO 的服务器) 发出 GET 请求
  3. 等待 GET 请求
  4. 打印来自服务器的响应
  5. 向 delayserver 发出第二个 GET 请求
  6. 等待服务器的第二个响应
  7. 打印来自服务器的响应
  8. 退出程序

此外,我们将通过对我们手工编写的协程调用 Future::poll 来执行我们的程序,只要运行它到完成所需的次数即可。目前还没有 runtimereactorexecutor,因为我们将在下一章中介绍它们。如果我们将程序编写为一个 async 函数,它将如下所示

  1. async fn async_main() {
  2. println!("Program starting");
  3. let txt = Http::get("/1000/HelloWorld").await;
  4. println!("{txt}");
  5. let txt2 = Http::("500/HelloWorld2").await;
  6. println!("{txt2}");
  7. }
  1. use std::time::Instant;
  2. mod future;
  3. mod http;
  4. use crate::http::Http;
  5. use future::{Future, PollState};
  6. struct Coroutine {
  7. state: State,
  8. }
  9. enum State {
  10. Start,
  11. Wait1(Box<dyn Future<Output = String>>),
  12. Wait2(Box<dyn Future<Output = String>>),
  13. Resolved,
  14. }
  • Start: 协程已经创建,但还没有轮询
  • Wait1: 当我们调用 Http::get 时,我们将得到一个 HttpGetFuture 返回值,并将其存储在 State 枚举中。在这一点上,我们将控制权返回给调用函数,以便它可以在需要时做其他事情。我们选择在所有输出字符串的 Future 函数中使其泛型,但由于我们现在只有一种Future,我们可以使它简单地持有 HttpGetFuture,它将以相同的方式工作
  • Wait2: 第二次调用 Http::get 是我们将控制权传递回调用函数的第二次地方
  • Resolved: The future is resolved(已解决) and there is no more work to do
  1. use std::time::Instant;
  2. mod future;
  3. mod http;
  4. use crate::http::Http;
  5. use future::{Future, PollState};
  6. struct Coroutine {
  7. state: State,
  8. }
  9. enum State {
  10. Start,
  11. Wait1(Box<dyn Future<Output = String>>),
  12. Wait2(Box<dyn Future<Output = String>>),
  13. Resolved,
  14. }
  15. impl Coroutine {
  16. fn new() -> Self {
  17. Self {
  18. state: State::Start,
  19. }
  20. }
  21. }
  22. impl Future for Coroutine {
  23. type Output = ();
  24. fn poll(&mut self) -> PollState<Self::Output> {
  25. loop {
  26. match self.state {
  27. State::Start => {
  28. println!("Program starting");
  29. let fut = Box::new(Http::get("/600/HelloWorld1"));
  30. self.state = State::Wait1(fut);
  31. },
  32. State::Wait1(ref mut fut) => match fut.poll() {
  33. PollState::Ready(txt) => {
  34. println!("{txt}");
  35. let fut2 = Box::new(Http::get("/400/HelloWorld2"));
  36. self.state = State::Wait2(fut2);
  37. }
  38. PollState::NotReady => break PollState::NotReady,
  39. },
  40. State::Wait2(ref mut fut2) => match fut2.poll() {
  41. PollState::Ready(txt2) => {
  42. println!("{txt2}");
  43. self.state = State::Resolved;
  44. break PollState::Ready(());
  45. }
  46. PollState::NotReady => break PollState::NotReady,
  47. },
  48. State::Resolved => panic!("Polled a resolved future"),
  49. }
  50. }
  51. }
  52. }
  53. fn async_main() -> impl Future<Output = ()> {
  54. Coroutine::new()
  55. }
  56. fn main() {
  57. let mut future = async_main();
  58. loop {
  59. match future.poll() {
  60. PollState::NotReady => {
  61. println!("Schedule other tasks");
  62. },
  63. PollState::Ready(_) => break,
  64. }
  65. thread::sleep(Duration::from_millis(100));
  66. }
  67. }
  68. // 输出:
  69. Program starting
  70. FIRST POLL - START OPERATION
  71. Schedule other tasks
  72. Schedule other tasks
  73. Schedule other tasks
  74. Schedule other tasks
  75. Schedule other tasks
  76. Schedule other tasks
  77. HTTP/1.1 200 OK
  78. content-length: 11
  79. connection: close
  80. content-type: text/plain; charset=utf-8
  81. date: Tue, 24 Oct 2023 20:39:13 GMT
  82. HelloWorld1
  83. FIRST POLL - START OPERATION
  84. Schedule other tasks
  85. Schedule other tasks
  86. Schedule other tasks
  87. Schedule other tasks
  88. HTTP/1.1 200 OK
  89. content-length: 11
  90. connection: close
  91. content-type: text/plain; charset=utf-8
  92. date: Tue, 24 Oct 2023 20:39:13 GMT
  93. HelloWorld2

通过查看打印输出,您可以了解程序流程

  1. First, we see Program starting, which executes at the start of our coroutine
  2. We then see that we immediately move on to the FIRST POLL – START OPERATION message that we only print when the future returned from our HTTP client is polled the first time
  3. 接下来,可以看到我们回到了主函数中,理论上可以继续执行其他任务
  4. 每100 毫秒,我们检查任务是否完成,并收到相同的消息,告诉我们可以调度其他任务
  5. 大约 600 毫秒后,我们收到了打印出来的响应
  6. 我们再次重复此过程,直到收到并打印来自服务器的第二个响应

恭喜你,现在你已经创建了一个任务,可以在不同的地方暂停和恢复,让它继续进行。谁会想写这样的代码来完成一个简单的任务呢?是的,这有点夸大其词,但我敢打赌,当你将55行状态机与完成同样的事情需要编写的7行普通顺序代码进行比较时,很少有程序员喜欢编写55行状态机。如果我们回想一下大多数用户层对并发操作的抽象的目标,就会发现这种方法只检查了我们的目标中的一个

  • Efficient
  • Expressive
  • Easy to use and hard to misuse

我们的状态机将是高效的,但也仅此而已。我们标记每个函数的开始每个我们想要将控制权交还给调用者的点,并使用几个关键字为我们生成状态机,那么我们编写的代码可能会简单得多。这就是 async/await 背后的基本思想。让我们看看这个例子是如何工作的

对比 async/await

前面的例子可以用 async/await 关键字简单地写成下面这样

  1. async fn async_main() {
  2. println!("Program starting");
  3. let txt = Http::get("/1000/HelloWorld").await;
  4. println!("{txt}");
  5. let txt2 = Http::("500/HelloWorld2").await;
  6. println!("{txt2}");
  7. }

这是7行代码,看起来与你在普通子程序/函数中编写的代码非常熟悉。事实证明,我们可以让编译器为我们编写这些状态机,而不是自己编写它们。不仅如此,仅使用简单的宏来帮助我们就可以走得很远,这正是当前的 async/await 语法在成为语言的一部分之前的原型设计方式。缺点当然是这些函数看起来像普通的子程序,但实际上在本质上非常不同。对于像 Rust 这样的强类型语言,借用语义而不是使用垃圾收集器,不可能隐藏这些函数不同的事实。这可能会给程序员带来一些困惑,因为他们希望所有东西的行为都是一样的(即抽象统一)

协程的额外例子

为了展示我们的示例与我们在 Rust 中使用 std::future::future traitasync/await 得到的行为有多么接近,我创建了与我们刚刚在 a-coroutines 中使用“正确的”future 和 async/await 语法所做的完全相同的示例。首先你会注意到,它只需要对代码进行非常小的更改。其次,您可以看到,输出显示了与我们自己手写状态机示例中完全相同的程序流程。那么,让我们更进一步。为了避免混淆,由于我们的协程现在只屈从于调用函数(还没有调度器、事件循环或类似的东西),我们使用一种略有不同的语法,称为 coroutine/wait,并创建一种方法来为我们生成这些状态机

再次手写 coroutine/wait

coroutine/wait 语法与 async/await 语法有明显的相似之处,尽管它有很多限制。基本规则如下

  • 每个以协程为前缀的函数都将被重写为我们所写的状态机
  • coroutine 标记的函数的返回类型将被重写,以便它们返回 -> impl Future (我们的语法将只处理输出字符串的 Future)
  • 只有实现了 Future 特征 的对象才能加上 .wait 后缀。这些点将在我们的状态机中表示为单独的阶段
  • 带 coroutine 前缀的函数可以调用普通函数,但是普通函数不能调用 coroutine 函数并期望发生任何事情,除非它们反复调用 coroutine 函数的 poll,直到它们返回 PollState::Ready

我们的实现将确保如果我们编写以下代码,它将编译到本章开始时编写的相同状态机(除了所有的 coroutine 将返回一个字符串)

  1. coroutine fn async_main() {
  2. println!("Program starting");
  3. let txt = Http::get("/1000/HelloWorld").await;
  4. println!("{txt}");
  5. let txt2 = Http::("500/HelloWorld2").await;
  6. println!("{txt2}");
  7. }

coroutine/wait 在 Rust 中不是有效的关键字。如果我这样写,我会得到一个编译错误!你是对的。因此,我创建了一个名为 corofy 的小程序,它为我们重写了 coroutine/wait 函数到这些状态机中。让我们快速解释一下

corofy:协程预处理器

在 Rust 中重写代码的最佳方法是使用宏系统。缺点是不清楚它编译成什么,扩展宏不是我们的用例的最佳选择,因为主要目标之一是查看我们编写的代码和它转换成的代码之间的差异。除此之外,宏的阅读和理解可能会相当复杂,除非你经常使用它们。相反,corofy 是一个普通的 Rust 程序,你可以在 ch07/corofy 的仓库中找到。如果您进入该文件夹,您可以通过编写以下代码来全局安装该工具

  1. cargo install --path .

现在你可以在任何地方使用这个工具。它通过提供一个包含 coroutine/wait 语法的输入文件来工作,例如 corofy ./src/main.rs [optional output file]。如果不指定输出文件,它会在同一个文件夹中创建一个后缀为 _corofied 的文件。该程序读取你提供的文件,并搜索 coroutine 关键字的用法。它接受这些函数,将它们注释掉(因此它们仍然在文件中),最后将它们放在文件中,并直接在下面写出状态机的实现,指出状态机的哪些部分是您在等待点(wait points)之间实际编写的代码

an example of a coroutine/wait transformation

下面的示例将基于与第一个示例完全相同的代码。这个例子在代码仓库的 ch07/b-async-await 文件夹中。如果你编写了本书中的每个示例,并且不依赖仓库中的现有代码,你可以做到以下两件事之一。无论你选择什么,你面前的代码都应该是相同的

  • Keep changing the code in the first example
  • Create a new cargo project called b-async-await and copy everything in the src folder and the dependencies section from Cargo.toml from the previous example over to the new one
  1. use std::time::Instant;
  2. mod http;
  3. mod future;
  4. use future::*;
  5. use crate::http::Http;
  6. fn get_path(i: usize) -> String {
  7. format!("/{}/HelloWorld{i}", i * 1000)
  8. }
  9. coroutine fn async_main() {
  10. println!("Program starting");
  11. let txt = Http::get(&get_path(0)).wait;
  12. println!("{txt}");
  13. let txt = Http::get(&get_path(1)).wait;
  14. println!("{txt}");
  15. let txt = Http::get(&get_path(2)).wait;
  16. println!("{txt}");
  17. let txt = Http::get(&get_path(3)).wait;
  18. println!("{txt}");
  19. let txt = Http::get(&get_path(4)).wait;
  20. println!("{txt}");
  21. }
  22. fn main() {
  23. let start = Instant::now();
  24. let mut future = async_main();
  25. loop {
  26. match future.poll() {
  27. PollState::NotReady => (),
  28. PollState::Ready(_) => break,
  29. }
  30. }
  31. println!("\nELAPSED TIME: {}", start.elapsed().as_secs_f32());
  32. }
  • 这段代码有一些变化。首先,我们添加了一个方便的函数 get_path,用于为我们的 GET 请求创建新路径,它创建了一个我们可以在 GET 请求中使用的路径,该路径具有延迟和基于我们传入的整数的消息
  • 接下来,在我们的 async_main 函数中,我们创建了5个延迟从0到4秒不等的请求
  • 我们做的最后一项修改是在 main 函数中。我们不再在每次调用轮询时都打印一条消息,因此我们不再使用 thread::sleep 来限制调用的数量。相反,我们测量从进入主函数到退出主函数的时间,因为我们可以用它来证明代码是否并发运行

现在我们的 main.rs 和前面的例子类似,我们可以用 corofy 把它重写成一个状态机。假设我们在根目录下的 ch07/b-async-await 文件夹中,可以写如下代码

  1. corofy ./src/main.rs

这将输出一个名为 main_corofied.rs 的文件。你可以打开并检查 src 文件夹。现在,可以复制 main_corofied.rs 的所有内容。并将其粘贴到 main.rs 中。为了方便起见,有一个名为 original_main.rs 的文件。在项目的根目录中包含 main.rs 的代码。所以你不需要保存 main.rs 的原始内容。如果每个示例都是从书中复制到自己的项目中编写出来的,那么将 main 的原始内容保存起来是明智的。在你覆盖它之前的某个地方。我不会在这里展示整个状态机,因为使用 coroutine/wait 的39行代码在写为状态机时最终变成了170行代码,但我们的状态枚举(State enum)现在看起来像这样

  1. enum State0 {
  2. Start,
  3. Wait1(Box<dyn Future<Output = String>>),
  4. Wait2(Box<dyn Future<Output = String>>),
  5. Wait3(Box<dyn Future<Output = String>>),
  6. Wait4(Box<dyn Future<Output = String>>),
  7. Wait5(Box<dyn Future<Output = String>>),
  8. Resolved,
  9. }

如果使用 cargo run 运行该程序,将得到以下输出

  1. Program starting
  2. FIRST POLL - START OPERATION
  3. HTTP/1.1 200 OK
  4. content-length: 11
  5. connection: close
  6. content-type: text/plain; charset=utf-8
  7. date: Tue, xx xxx xxxx 21:05:55 GMT
  8. HelloWorld0
  9. FIRST POLL - START OPERATION
  10. HTTP/1.1 200 OK
  11. content-length: 11
  12. connection: close
  13. content-type: text/plain; charset=utf-8
  14. date: Tue, xx xxx xxxx 21:05:56 GMT
  15. HelloWorld1
  16. FIRST POLL - START OPERATION
  17. HTTP/1.1 200 OK
  18. content-length: 11
  19. connection: close
  20. content-type: text/plain; charset=utf-8
  21. date: Tue, xx xxx xxxx 21:05:58 GMT
  22. HelloWorld2
  23. FIRST POLL - START OPERATION
  24. HTTP/1.1 200 OK
  25. content-length: 11
  26. connection: close
  27. content-type: text/plain; charset=utf-8
  28. date: Tue, xx xxx xxxx 21:06:01 GMT
  29. HelloWorld3
  30. FIRST POLL - START OPERATION
  31. HTTP/1.1 200 OK
  32. content-length: 11
  33. connection: close
  34. content-type: text/plain; charset=utf-8
  35. date: Tue, xx xxx xxxx 21:06:05 GMT
  36. HelloWorld4
  37. ELAPSED TIME: 10.043025

所以,你可以看到我们的代码按预期运行。由于我们在每次调用 Http::get 时都调用了 wait,所以代码按顺序运行,当我们查看10秒的运行时间时,这一点很明显。这是有道理的,因为我们要求的延迟是 0 + 1 + 2 + 3 + 4,等于10秒。如果我们想让 future 并发运行呢?你还记得我们说过 futurelazy 吗?好。所以,你知道仅仅通过创造 future 是无法实现并发的。我们得 轮询(poll)他们才能开始行动。为了解决这个问题,我们从 Tokio 那里获得了一些灵感,创建了一个名为 join_all 的函数。它需要一个 future 的集合,并驱动它们并发地完成。让我们创建本章最后一个这样做的例子

concurrent futures(并发 future)

好,我们在上一个例子的基础上做同样的事情。创建一个名为 c-async-await 的新项目,并复制 Cargo.toml。然后将 src 文件夹中的所有内容删除。The first thing we’ll do is go to future.rs and add a join_all function below our existing code

  1. pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
  2. let futures = futures.into_iter().map(|f| (false, f)).collect();
  3. JoinAll {
  4. futures,
  5. finished_count: 0,
  6. }
  7. }

This function takes a collection of futures as an argument and returns a JoinAll future. The function simply creates a new collection. In this collection, we will have tuples consisting of the original futures we received and a bool value indicating whether the future is resolved or not

  1. pub struct JoinAll<F: Future> {
  2. futures: Vec<(bool, F)>,
  3. finished_count: usize,
  4. }

This struct will simply store the collection we created and a finished_count. The last field will make it a little bit easier to keep track of how many futures have been resolved. As we’re getting used to by now, most of the interesting parts happen in the Future implementation for JoinAll

  1. impl<F: Future> Future for JoinAll<F> {
  2. type Output = String;
  3. fn poll(&mut self) -> PollState<Self::Output> {
  4. for (finished, fut) in self.futures.iter_mut() {
  5. if *finished {
  6. continue;
  7. }
  8. match fut.poll() {
  9. PollState::Ready(_) => {
  10. *finished = true;
  11. self.finished_count += 1;
  12. },
  13. PollState::NotReady => continue,
  14. }
  15. }
  16. if self.finished_count == self.futures.len() {
  17. PollState::Ready(String::new())
  18. } else {
  19. PollState::NotReady
  20. }
  21. }
  22. }

我们将 Output 设置为 String。这可能会让你感到奇怪,因为我们实际上并没有从这个实现中返回任何东西。原因是 corofy 只能处理返回字符串的 future (这是它很多很多缺点之一),所以我们接受它并在完成时返回一个空字符串。接下来是轮询(poll)实现。我们要做的第一件事是循环遍历每个 (flag, future) 元组。在循环内部,我们首先检查 future 的标志是否设置为 finished。如果是,则直接访问集合中的下一项。如果它没有完成,we poll the future。If we get PollState::Ready back, we set the flag for this future to true so that we won’t poll it again and we increase the finished count。If we get PollState::NotReady, we simply continue to the next future in the collection。After iterating through the entire collection, we check if we’ve resolved all the futures we originally received in

  1. if self.finished_count == self.futures.len()

If all our futures have been resolved, we return PollState::Ready with an empty string (to make corofy happy). If there are still unresolved futures, we return PollState::NotReady。这里有一点需要注意。第一次调用 JoinAll::poll 时,它会对集合中的每个 future 调用 poll。轮询每个 future 将启动它们代表的任何操作,并允许它们并发地进行(progress concurrently)。这是通过惰性协程(lazy coroutines)实现并发的一种方法,就像我们在这里处理的那些协程

Next up are the changes we’ll make in main.rs. The main function will be the same, as well as the imports and declarations at the start of the file, so I’ll only present the coroutine/await functions that we’ve changed

  1. coroutine fn request(i: usize) {
  2. let path = format!("/{}/HelloWorld{i}", i * 1000);
  3. let txt = Http::get(&path).wait;
  4. println!("{txt}");
  5. }
  6. coroutine fn async_main() {
  7. println!("Program starting");
  8. let mut futures = vec![];
  9. for i in 0..5 {
  10. futures.push(request(i));
  11. }
  12. future::join_all(futures).wait;
  13. }

Now we have two coroutine/wait functions。async_main stores a set of coroutines created by read_request in a Vec。Then it creates a JoinAll future and calls wait on it。下一个 coroutine/wait 函数是 read_requests,它接受一个整数作为输入,并使用它来创建 GET 请求。这个协程将依次等待响应并在响应到达时打印出结果。由于我们创建的请求延迟为 0、1、2、3、4 秒,我们应该期望整个程序在4秒多一点的时间内完成,因为所有任务都将并发执行。延迟时间短的任务将在延迟4秒的任务完成时完成。We can now transform our coroutine/await functions into state machines by making sure we’re in the folder ch07/c-async-await and writing corofy ./src/main.rs。You should now see a file called main_corofied.rs in the src folder. Copy its contents and replace what’s in main.rs with it。如果通过编写 cargo run 来运行该程序,应该会得到以下输出

  1. Program starting
  2. FIRST POLL - START OPERATION
  3. FIRST POLL - START OPERATION
  4. FIRST POLL - START OPERATION
  5. FIRST POLL - START OPERATION
  6. FIRST POLL - START OPERATION
  7. HTTP/1.1 200 OK
  8. content-length: 11
  9. connection: close
  10. content-type: text/plain; charset=utf-8
  11. date: Tue, xx xxx xxxx 21:11:36 GMT
  12. HelloWorld0
  13. HTTP/1.1 200 OK
  14. content-length: 11
  15. connection: close
  16. content-type: text/plain; charset=utf-8
  17. date: Tue, xx xxx xxxx 21:11:37 GMT
  18. HelloWorld1
  19. HTTP/1.1 200 OK
  20. content-length: 11
  21. connection: close
  22. content-type: text/plain; charset=utf-8
  23. date: Tue, xx xxx xxxx 21:11:38 GMT
  24. HelloWorld2
  25. HTTP/1.1 200 OK
  26. content-length: 11
  27. connection: close
  28. content-type: text/plain; charset=utf-8
  29. date: Tue, xx xxx xxxx 21:11:39 GMT
  30. HelloWorld3
  31. HTTP/1.1 200 OK
  32. content-length: 11
  33. connection: close
  34. content-type: text/plain; charset=utf-8
  35. date: Tue, xx xxx xxxx 21:11:40 GMT
  36. HelloWorld4
  37. ELAPSED TIME: 4.0084987

这里需要注意的是经过的时间。现在只有4秒多一点,就像我们预期的那样,我们的 futures 并发运行。如果我们看看 coroutine/await 如何从程序员的角度改变了编写协程的体验,我们会看到我们现在离目标更近了

  • Efficient(高效的)状态机不需要上下文切换,只 保存/恢复(save/restore)与特定任务相关的数据。我们没有 growing vs segmented stack issues,因为它们都使用相同的操作系统提供的栈
  • Expressive(表达力):我们可以像在“普通正常”Rust 中一样编写代码,并且在编译器的支持下,我们可以得到相同的错误消息并使用相同的工具
  • Easy to use and hard to misuse(易于使用和不易误用):在这一点上,我们可能与典型的 fiber/green线程 实现略有差距,因为我们的程序被编译器“在背后”大量转换,这可能会导致一些粗糙的边缘。具体来说,你不能从一个普通函数调用 async 函数并期望发生任何有意义的事情。你必须以某种方式主动轮询它完成,当我们开始添加运行时时,这变得更加复杂。然而,在大多数情况下,我们可以按照我们习惯的方式编写程序

最后的思考

在本章结束之前,我想指出的是,现在我们应该清楚为什么协程是不可抢占的。如果你还记得前面我们说过一个堆栈协程(例如 fibers/green threads 的例子)可以被抢占,并且它的执行可以在任何地方暂停。这是因为它们有一个栈,暂停一个任务很简单,只需将当前执行状态存储到栈中,然后跳转到另一个任务。这在这里是不可能的。我们唯一可以停止和恢复执行的地方是我们手动标记为等待的预定义暂停点。理论上,如果您有一个紧密集成的系统,可以在其中控制编译器、协程定义、调度器和I/O原语,则可以向状态机添加额外的状态,并创建可以暂停/恢复任务的额外点。这些悬挂点对用户来说可能是不透明的,并且与普通的等待/挂起(wait/suspension)点处理方式不同。例如,每次遇到正常的函数调用时,您可以添加一个暂停点(状态机中的一个新状态),如果当前任务已耗尽其时间预算或类似的情况,您可以在此向调度器提交。如果有,您可以安排另一个任务在稍后运行并恢复该任务,即使这不是以协作方式发生的。然而,尽管这对用户来说是不可见的,但这与能够从代码中的任何点停止/恢复(stop/resume)执行是不一样的。这也违背了协程通常隐含的合作性质。我们只是创建了可以暂停和恢复的任务。目前还没有事件循环、调度或类似的东西