在本章中,我们将从改进协程开始,增加在状态改变时存储变量的能力pinning 的整个概念对许多开发人员来说是陌生的,就像 Rust 的所有权系统一样,需要一些时间来获得一个良好的和有效的心理模型。幸运的是,pinning 的概念并不难理解,但它在语言中的实现方式以及它如何与 Rust 的类型系统交互是抽象的,很难掌握

Improving our example 1 – variables

那么,让我们继续上一章的内容,回顾一下这一点。我们有以下内容

  • A Future trait
  • A coroutine implementation using coroutine/await syntax and a preprocessor
  • A reactor based on mio::Poll
  • An executor that allows us to spawn as many top-level tasks as we want and schedules the ones that are ready to run
  • An HTTP client that only makes HTTP GET requests to our local delayserver instance

这并没有那么糟糕。我们可能会说我们的 HTTP 客户端有一些限制,但这不是本书的重点,所以我们可以接受这一点。然而,我们的协程实现受到严重限制。让我们看看如何让我们的协程更有用。我们当前实现的最大缺点是没有任何东西,我的意思是没有任何东西,可以跨越等待点。首先解决这个问题是有意义的。我们将在第8章(这个示例的最后一个版本)中使用 d-multiple-threads 的“库”代码,但我们将修改 main.rs 文件通过添加一个更短和更简单的示例

  1. Create a folder called a-coroutines-variables
  2. Enter the folder and run cargo init
  3. Delete the default main.rs file and copy everything from the ch08/d-multiplethreads/src folder into the ch10/a-coroutines-variables/src folder
  4. Open Cargo.toml and add the dependency on mio to the dependencies section
  1. mio = {version = "0.8", features = ["net", "os-poll"]}
  1. src
  2. |-- runtime
  3. |-- executor.rs
  4. |-- reactor.rs
  5. |-- future.rs
  6. |-- http.rs
  7. |-- main.rs
  8. |-- runtime.rs

我们将最后一次使用 corofy 来为我们生成样板状态机。将以下代码复制到 main.rs

  1. mod future;
  2. mod http;
  3. mod runtime;
  4. use crate::http::Http;
  5. use future::{Future, PollState};
  6. use runtime::Waker;
  7. fn main() {
  8. let mut executor = runtime::init();
  9. executor.block_on(async_main());
  10. }
  11. coroutine fn async_main() {
  12. println!("Program starting");
  13. let txt = Http::get("/600/HelloAsyncAwait").wait;
  14. println!("{txt}");
  15. let txt = Http::get("/400/HelloAsyncAwait").wait;
  16. println!("{txt}");
  17. }
  1. corofy ./src/main.rs ./src/main.rs

最后一步是解决 corofy 不知道 Waker 的问题。你可以通过编写 cargo check 让编译器指引你在哪里需要进行更改,但为了帮助你完成这个过程,有三个小的更改需要进行(注意,重写之前编写的相同代码所报告的行号)

  1. 64: fn poll(&mut self, waker: &Waker)
  2. 82: match f1.poll(waker)
  3. 102: match f2.poll(waker)

现在,通过编写 cargo run 来检查一切是否按预期工作

Improving our base example

我们想看看如何改进我们的状态机,使它允许我们跨等待点保存变量。为此,我们需要将它们存储在某处,并在状态机中进入每个状态时恢复所需的变量

  1. coroutine fn async_main() {
  2. println!("Program starting");
  3. let txt = Http::get("/600/HelloAsyncAwait").wait;
  4. println!("{txt}");
  5. let txt = Http::get("/400/HelloAsyncAwait").wait;
  6. println!("{txt}");
  7. }
  1. coroutine fn async_main() {
  2. let mut counter = 0;
  3. println!("Program starting");
  4. let txt = http::Http::get("/600/HelloAsyncAwait").wait;
  5. println!("{txt}");
  6. counter += 1;
  7. let txt = http::Http::get("/400/HelloAsyncAwait").wait;
  8. println!("{txt}");
  9. counter += 1;
  10. println!("Received {} responses.", counter);
  11. }

在这个版本中,我们只是在 async_main 函数的顶部创建了一个计数器变量,并为我们从服务器接收到的每个响应增加计数器。最后,打印出收到的响应数。我们实现它的方式是向 Coroutine0 结构体添加一个名为 stack 的新字段

  1. struct Coroutine0 {
  2. stack: Stack0,
  3. state: State0,
  4. }

stack 字段保存了一个我们也需要定义的 Stack0 结构体

  1. struct Coroutine0 {
  2. stack: Stack0,
  3. state: State0,
  4. }
  5. #[derive(Default)]
  6. struct Stack0 {
  7. counter: Option<usize>,
  8. }

这个结构体只保存一个字段,因为我们只有一个变量。该字段将是 Option 类型。我们还派生了这个结构体的 Default trait,以便我们可以轻松初始化它

async/await 在 Rust 中创建的 future 以一种稍微高效的方式存储这些数据。在我们的例子中,我们将每个变量存储在一个单独的结构体中,因为我认为这更容易理解,但这也意味着我们需要存储的变量越多,协程需要的空间就越大。它将随着状态变化之间需要 存储/恢复 的不同变量的数量线性增长。这可能是很多数据。例如,如果我们有100个状态变化,每个状态都需要一个不同的 i64 大小的变量来存储到下一个状态,这将需要一个结构体占用 100 * 8b = 800 字节的内存

Rust 通过将协程实现为枚举来优化这一点,其中每个状态只保存它需要在下一个状态中恢复的数据。这样,协程的大小不依赖于变量的总数,它只取决于需要保存/恢复的最大状态的大小。在前面的例子中,大小将减少到8字节,因为任何单个状态变化所需的最大空间足以容纳一个 i64 大小的变量。相同的空间会被反复使用。这种设计允许这种优化的事实是重要的,这是无堆栈协程比堆栈协程在内存效率方面的优势

  1. struct Coroutine0 {
  2. stack: Stack0,
  3. state: State0,
  4. }
  5. #[derive(Default)]
  6. struct Stack0 {
  7. counter: Option<usize>,
  8. }
  9. impl Coroutine0 {
  10. fn new() -> Self {
  11. Self {
  12. state: State0::Start,
  13. stack: Stack0::default(),
  14. }
  15. }
  16. }

stack的默认值与我们无关,因为无论如何我们都会重写它。接下来的几个步骤是我们最感兴趣的。在 Coroutine0 的 Future implementation 中,我们将假设 corofy 添加了以下代码来为我们初始化、存储和恢复堆栈变量。让我们看看第一次调用 poll 时发生了什么

  1. State0::Start => {
  2. // initialize stack (hoist variables)
  3. self.stack.counter = Some(0);
  4. // ---- Code you actually wrote ----
  5. println!("Program starting");
  6. // ---------------------------------
  7. let fut1 = Box::new(http::Http::get("/600/HelloAsyncAwait"));
  8. self.state = State0::Wait1(fut1);
  9. // save stack
  10. }
  • 当我们处于开始状态(Start state)时,我们做的第一件事是在我们初始化堆栈的顶部添加一个段。我们要做的一件事是将相关代码段(在本例中是在第一个等待点之前)的所有变量声明提升到函数的顶部
  • 在我们的例子中,我们还将变量初始化为其初始值,在本例中为 0
  • 我们还添加了一条注释,指出应该保存堆栈,但由于在第一个等待点之前发生的只是 counter 的初始化,因此这里没有什么可存储的

让我们来看看在第一个等待点之后发生了什么

  1. State0::Wait1(ref mut f1) => {
  2. match f1.poll(waker) {
  3. PollState::Ready(txt) => {
  4. // Restore stack
  5. let mut counter = self.stack.counter.take().unwrap();
  6. // ---- Code you actually wrote ----
  7. println!("{txt}");
  8. counter += 1;
  9. // ---------------------------------
  10. let fut2 = Box::new(http::Http::get("/400/HelloAsyncAwait"));
  11. self.state = State0::Wait2(fut2);
  12. // save stack
  13. self.stack.counter = Some(counter);
  14. },
  15. PollState::NotReady => break PollState::NotReady,
  16. }
  17. }

我们要做的第一件事是通过获取计数器的所有权来恢复栈(在本例中,take() 将当前存储在 self.stack.counter 中的值替换为 None),并将其写入代码段中使用的同名变量(counter)。在这种情况下,获得所有权并稍后将值放回不是问题,它模仿了我们在 协程/等待 示例中编写的代码。接下来要做的改动是将第一个等待点之后的所有代码粘贴进去。在这个例子中,唯一的变化是 counter 变量加了1。最后,将栈状态保存回来,以便在等待点之间保持更新后的状态。在第5章中,我们看到了如何在 fibers存储/恢复 寄存器状态。因为第5章展示了一个stackful 协程实现的例子,所以我们根本不需要关心栈状态,因为所有需要的状态都存储在我们创建的栈中。由于我们的协程是无堆栈的我们不存储每个协程的整个调用栈,但我们需要 存储/恢复 将跨等待点使用的堆栈部分无堆栈协程仍然需要从栈中保存一些信息,就像我们在这里所做的那样。当我们进入State0::Wait2** **状态时,我们以同样的方式开始

  1. State0::Wait2(ref mut f2) => {
  2. match f2.poll(waker) {
  3. PollState::Ready(txt) => {
  4. // Restore stack
  5. let mut counter = self.stack.counter.take().unwrap();
  6. // ---- Code you actually wrote ----
  7. println!("{txt}");
  8. counter += 1;
  9. println!(«Received {} responses.», counter);
  10. // ---------------------------------
  11. self.state = State0::Resolved;
  12. // Save stack (all variables set to None already)
  13. break PollState::Ready(String::new());
  14. },
  15. PollState::NotReady => break PollState::NotReady,
  16. }
  17. }

由于在我们的程序中没有更多的等待点,其余的代码进入这个段,并且由于我们在这里已经完成了 counter,我们可以简单地通过让它超出作用域来删除它。如果我们的变量持有任何资源,它们也会在这里被释放。这样,我们就赋予了协程跨等待点保存变量的能力。让我们试着通过 cargo run 来运行它。现在,让我们看一个需要跨等待点存储**引用的示例,因为这是使我们的 coroutine/wait** 函数表现得像“正常”函数的一个重要方面

Improving our example 2 – references

让我们为这个示例的下一个版本做好一切准备

  • Create a new folder called b-coroutines-references and copy everything from a-coroutines-variables over to it
  • 通过更改 Cargo 中的 package 部分中的 name 属性,您可以更改项目的名称,使其与文件夹相对应。但这个例子不需要这样做才能正常运行。You can find this example in this book’s GitHub repository in the ch10/b-coroutinesreferences folder

这一次,我们将通过使用下面的 coroutine/wait 示例程序学习如何在协程中存储对变量的引用

  1. use std::fmt::Write;
  2. coroutine fn async_main() {
  3. let mut buffer = String::from("\nBUFFER:\n----\n");
  4. let writer = &mut buffer;
  5. println!("Program starting");
  6. let txt = http::Http::get("/600/HelloAsyncAwait").wait;
  7. writeln!(writer, "{txt}").unwrap();
  8. let txt = http::Http::get("/400/HelloAsyncAwait").wait;
  9. writeln!(writer, "{txt}").unwrap();
  10. println!("{}", buffer);
  11. }

因此,在这个例子中,我们创建了一个 String 类型的 buffer 变量,我们用一些文本初始化它,我们获取一个 &mut 引用,并将它存储在一个 writer 变量中。每次我们收到响应时,我们通过 writer 中保存的 &mut 引用将响应写入缓冲区,然后在程序结束时将缓冲区打印到终端。让我们看看我们需要做什么来让它工作。我们要做的第一件事是引入 fmt::Write 特征,以便我们可以使用 writeln! 宏写入缓冲区

  1. use std::fmt::Write;
  2. #[derive(Default)]
  3. struct Stack0 {
  4. buffer: Option<String>,
  5. writer: Option<*mut String>,
  6. }

这里需要注意的重要一点是,writer 不能是 Option<&mut String>,因为我们知道它将引用同一个结构体中的 buffer 字段。一个字段在&self 上接受引用的结构体称为自引用结构体,在 Rust 中无法表示这一点,因为自引用的生命周期无法表达。解决方案是将 &mut 自引用转换为指针,并确保我们自己正确地管理生命周期。The only other thing we need to change is the Future::poll implementation

  1. State0::Start => {
  2. // initialize stack (hoist variables)
  3. self.stack.buffer = Some(String::from("\nBUFFER:\n----\n"));
  4. self.stack.writer = Some(self.stack.buffer.as_mut().unwrap());
  5. // ---- Code you actually wrote ----
  6. println!("Program starting");
  7. // ---------------------------------
  8. let fut1 = Box::new(http::Http::get("/600/HelloAsyncAwait"));
  9. self.state = State0::Wait1(fut1);
  10. // save stack
  11. }

好吧,这看起来有点奇怪。我们修改的第一行代码非常简单。我们将 buffer 变量初始化为一个新的字符串类型,就像我们在协程/等待程序的顶部所做的那样。然而,下一行代码看起来有点危险。我们将缓冲区的 &mut 引用转换为 *mut 指针

是的,我知道我们可以选择另一种方法,因为我们可以在需要的地方获取 buffer 的引用,而不是将其存储在它的变量中,但这只是因为我们的示例非常简单。假设我们使用一个库,该库需要借用 async 函数本地的数据,并且我们必须以某种方式手动管理生命周期,就像我们在这里做的那样,但在一个更复杂的场景中

self.stack.buffer.as_mut().unwrap() 行返回一个指向 buffer 字段的 &mut 引用。由于 self.stack.writerOption<*mut String> 类型,该引用将被强制转换为一个指针(这意味着 Rust 通过从上下文推断它来隐式地进行转换)。我们在这里使用 *mut String,因为我们故意不想要字符串切片(&str),这通常是我们在 Rust 中使用字符串类型引用时得到的(和想要的)

  1. // Let’s take a look at what happens after the first wait point
  2. State0::Wait1(ref mut f1) => {
  3. match f1.poll(waker) {
  4. PollState::Ready(txt) => {
  5. // Restore stack
  6. let writer = unsafe { &mut *self.stack.writer.take().unwrap() };
  7. // ---- Code you actually wrote ----
  8. writeln!(writer, «{txt}»).unwrap();
  9. // ---------------------------------
  10. let fut2 = Box::new(http::Http::get("/400/HelloAsyncAwait"));
  11. self.state = State0::Wait2(fut2);
  12. // save stack
  13. self.stack.writer = Some(writer);
  14. },
  15. PollState::NotReady => break PollState::NotReady,
  16. }
  17. }

我们所做的第一个更改是关于如何恢复我们的堆栈。我们需要恢复我们的 writer 变量,以便它保存一个指向 buffer 的 &mut 字符串类型。要做到这一点,我们必须编写一些不安全的代码来解引我们的指针,并让我们使用 &mut 引用我们的缓冲区。指向指针的引用强制转换是安全的,不安全的部分是解指针引用。接下来,我们添加编写响应的代码行。我们可以保持与我们在 coroutine/wait 函数中编写它的方式相同。最后,我们将栈状态保存回来,因为我们需要两个变量在等待点期间存在

由于可以简单地复制 writer 字段中的指针,因此在使用它时不必拥有它的所有权,但为了保持一致性,我们将拥有它,就像第一个例子中所做的那样。这也是有意义的,因为如果没有必要为下一个 await 点存储指针,我们可以简单地通过不存储它来让它离开作用域

最后一步是到达 Wait2, future 返回 PollState::Ready

  1. State0::Wait2(ref mut f2) => {
  2. match f2.poll(waker) {
  3. PollState::Ready(txt) => {
  4. // Restore stack
  5. let buffer = self.stack.buffer.as_ref().take().unwrap();
  6. let writer = unsafe { &mut *self.stack.writer.take().unwrap() };
  7. // ---- Code you actually wrote ----
  8. writeln!(writer, «{txt}»).unwrap();
  9. println!("{}", buffer);
  10. // ---------------------------------
  11. self.state = State0::Resolved;
  12. // Save stack / free resources
  13. let _ = self.stack.buffer.take();
  14. break PollState::Ready(String::new());
  15. },
  16. PollState::NotReady => break PollState::NotReady,
  17. }
  18. }

在这个段中,我们恢复两个变量,因为我们通过 writer 变量写入最后的响应,然后将存储在缓冲区中的所有内容打印到终端。我想指出的是println!(“{}”, buffer); line在原始的 协程/等待 示例中接受一个引用,即使它看起来可能像我们传递了一个自有的字符串。因此,我们将缓冲区恢复为 &String 类型,而不是自己的版本是有意义的。转移所有权也会使 writer 变量中的指针无效。我们要做的最后一件事是丢弃不再需要的数据。我们的 self.stack.writer 字段已经被设置为 None,因为我们在一开始恢复栈时获得了它的所有权,但我们还需要获得 self.stack.buffer 保存的字符串类型的所有权,以便它也在这个作用域的末尾被删除。如果我们不这样做,我们将保留分配给字符串的内存,直到整个协程被删除(这可能要晚得多)。现在,我们已经完成了所有的更改。如果我们之前所做的重写在 corofy 中实现,我们的 协程/等待 实现理论上可以支持更复杂的用例。让我们通过 cargo run 来看看运行程序时会发生什么

自引用结构体 和 Pinning - 图1

Puh, great. All that dangerous unsafe turned out to work just fine, didn’t it? Good job. Let’s make one small improvement before we finish

Improving our example 3 – this is… not… good…

假设你没有阅读本节的标题,并喜欢我们前面的示例编译并显示正确的结果。我认为我们的协程实现非常好,现在我们可以看看一些优化。在我们的执行器中有一个优化是我想立即执行的。在我们开始之前,让我们把一切都准备好

  • Create a new folder called c-coroutines-problem and copy everything from b-coroutines-references over to it
  • 通过更改 Cargo 中的 package 部分中的 name 属性,您可以更改项目的名称,使其与文件夹相对应。但这个例子不需要这样做才能正常运行。This example is located in this book’s GitHub repository in the ch09/c-coroutinesproblem folder

回到优化。您看,对于我们的运行时在现实生活中将要处理的工作负载的新见解表明,大多数 future 将在第一次轮询时返回。因此,理论上,我们只需轮询一次我们在 block_on 中收到的 future,大多数情况下它都会立即解决。让我们导航到 src/runtime/executor.rs,看看我们如何通过添加几行代码来利用这一点。切换到函数 Executor::block_on,你会发现我们要做的第一件事就是在轮询之前生成 future 对象。生成 future 意味着我们在堆中为其分配空间,并将指向其位置的指针存储在 HashMap 变量中。因为 future 很可能在第一次轮询时就已经准备好了,所以这是可以避免的不必要的工作。让我们在 block_on 函数的开头添加这个小优化来利用这一点

  1. pub fn block_on<F>(&mut self, future: F)
  2. where
  3. F: Future<Output = String> + 'static,
  4. {
  5. // ===== OPTIMIZATION, ASSUME READY
  6. let waker = self.get_waker(usize::MAX);
  7. let mut future = future;
  8. match future.poll(&waker) {
  9. PollState::NotReady => (),
  10. PollState::Ready(_) => return,
  11. }
  12. // ===== END
  13. spawn(future);
  14. loop {
  15. ...
  16. }
  17. }

在,我们只需立即轮询 future,如果 future 在第一次轮询时解决,则返回,因为我们都完成了。这样,我们只有在需要等待的时候才会生成future。是的,这里假设我们的 id 永远不会达到 usize::MAX,但让我们假设这只是一个概念验证。无论如何,如果 future 生成(spawn)并再次轮询,我们的唤醒器将被丢弃并替换为一个新的唤醒器,因此这应该不是问题。让我们试着运行这个程序,看看会得到什么

自引用结构体 和 Pinning - 图2

That doesn’t sound good! Okay, that’s probably a kernel bug in Linux, so let’s try it on Windows instead

自引用结构体 和 Pinning - 图3

让我们仔细看看当我们进行小的优化时,异步系统到底发生了什么

Discovering self-referential structs

我们创建了一个自引用结构体,对其进行初始化,使其接收一个指向自身的指针,然后移动它。让我们仔细看看

  1. 首先,我们接收了一个 future 对象作为 block_on 的参数。这不是问题,因为 future 还不是自引用的,所以我们可以把它移动到任何我们想要的地方,没有问题(这也是为什么使用适当的 async/await 在轮询之前移动 future 是完全可以的)
  2. 然后,我们对 future 进行了一次轮询。我们所做的优化做了一个重要的改变。当我们第一次轮询 future 时,它位于栈中(在我们的 block_on 函数的栈帧内)
  3. 当我们第一次轮询 future 时,我们将变量初始化为初始状态。我们的 writer 变量接受一个指向 buffer 变量的指针(存储为协程的一部分),并在此时使其成为自引用
  4. The first time we polled the future, it returned NotReady
  5. Since it returned NotReady, we spawned the future, which moves it into the tasks collection with the HashMap>> type in our Executor. The future is now placed in Box, which moves it to the heap
  6. 下次轮询 future 时,我们通过解引用 writer 变量的指针来恢复栈。然而,有一个大问题:指针现在指向栈上的旧位置,即第一次轮询时 future 所在的位置

现在你已经亲眼看到了自引用结构体的问题,如何将其应用于 future,以及为什么我们需要一些东西来防止这种情况发生。自引用结构体是一种接受 **self 的引用**并将其存储在字段中的结构体。现在,这里的“引用”一词有点不精确,因为在 Rust 中无法获取 self 的引用并将该引用存储在 self 中。要在安全的 Rust 中做到这一点,您必须将引用转换为指针(请记住,引用只是编程语言中具有特殊含义的指针)。当该值被移动到内存中的另一个位置时,指针并没有更新,而是指向“旧”位置。如果我们看一下从栈上的一个位置移动到另一个位置的过程,它看起来像这样

自引用结构体 和 Pinning - 图4

自引用结构体 和 Pinning - 图5

现在我们已经很好地理解了问题是什么,让我们仔细看看 Rust 如何通过使用其类型系统来解决这个问题,以防止我们移动依赖于内存中稳定位置才能正确工作的结构体

Pinning in Rust

下图展示了一个稍微复杂一些的自引用结构,这样我们就有了一些直观的东西来帮助理解

自引用结构体 和 Pinning - 图6

At a very high level, pinning makes it possible to rely on data that has a stable memory address by disallowing any operation that might move it

自引用结构体 和 Pinning - 图7

pinning 的概念非常简单。复杂的部分是如何在语言中实现以及如何使用它

Pinning in theory

钉扎(Pinning) 是 Rust 标准库的一部分,由两部分组成:类型(type) Pin标记特征(marker trait) Unpin。钉扎只是一种语言结构。没有特殊的位置或内存可以让你把值移动到那里,让它们固定。没有系统调用要求操作系统确保一个值在内存中的位置不变。这只是类型系统的一部分,目的是阻止我们移动值。Pin 并没有消除对 unsafe 的需求,它只是为 unsafe 的用户提供了一个保证,即该值在内存中具有稳定的位置,只要钉扎该值的用户只使用了安全的 Rust。这允许我们编写安全的自引用类型。它确保所有可能导致问题的操作都必须使用不安全的操作。回到我们的协程示例,如果我们要移动结构体,我们必须编写不安全的 Rust。这就是 Rust 如何维护它的安全保障。如果你知道你创建的 future 从来不使用自我引用,你可以选择使用 unsafe 来移动它,但是如果你做错了,责任就落在你身上。在我们深入研究 钉扎(Pinning) 之前,我们需要定义几个我们需要继续研究的术语

  • Pin is the type it’s all about. You’ll find this as a part of Rust’s standard library under the std::pin module. Pin wrap types that implement the Deref trait, which in practical terms means that it wraps references and smart pointers
  • Unpin is a marker trait. If a type implements Unpin, pinning will have no effect on that type. The type will still be wrapped in Pin but you can simply take it out again. The impressive thing is that almost everything implements Unpin by default, and if you manually want to mark a type as !Unpin, you have to add a marker trait called PhantomPinned to your type. Having a type, T, implement !Unpin is the only way for something such as Pin<&mut T> to have any effect
  • Pinning a type that’s !Unpin will guarantee that the value remains at the same location in memory until it gets dropped, so long as you stay in safe Rust
  • Pin projections are helper methods on a type that’s pinned. The syntax often gets a little weird since they’re only valid on pinned instances of self. For example, they often look like fn foo(self: Pin<&mut self>)
  • Structural pinning is connected to pin projections in the sense that, if you have Pin<&mut T> where T has one field, a, that can be moved freely and one that can’t be moved, b, you can do the following
    • Write a pin projection for a with the fn a(self: Pin<&mut self>) -> &A signature. In this case, we say that pinning is not structural
    • Write a projection for b that looks like fn b(self: Pin<&mut self>) -> Pin<&mut B>, in which case we say that pinning is structural for b since it’s pinned when the struct, T, is pinned

在了解了最重要的定义之后,让我们来看看固定(pin)值的两种方法

Pinning to the heap

自引用结构体 和 Pinning - 图8

  1. use std::{marker::PhantomPinned, pin::Pin};
  2. #[derive(Default)]
  3. struct MaybeSelfRef {
  4. a: usize,
  5. b: Option<*const usize>,
  6. _pin: PhantomPinned,
  7. }

因此,我们希望能够使用 MaybeSelfRef::default() 创建一个实例,我们可以根据需要移动它,然后在某个时候将它初始化为一个它引用自己的状态,移动它会引起问题。这就像我们在前面的例子中看到的那样,future 在轮询之前是不会自我引用的。让我们为 MaybeSelfRef 编写 impl 代码块并看一看代码

  1. use std::{marker::PhantomPinned, pin::Pin};
  2. #[derive(Default)]
  3. struct MaybeSelfRef {
  4. a: usize,
  5. b: Option<*const usize>,
  6. _pin: PhantomPinned,
  7. }
  8. impl MaybeSelfRef {
  9. fn init(self: Pin<&mut Self>) {
  10. unsafe {
  11. let Self { a, b, .. } = self.get_unchecked_mut();
  12. *b = Some(a);
  13. }
  14. }
  15. fn b(self: Pin<&mut Self>) -> Option<&mut usize> {
  16. unsafe { self.get_unchecked_mut().b.map(|b| &mut *b) }
  17. }
  18. }

如你所见,MaybeStelfRef 只有在我们调用 init 之后才会是自引用。We also define one more method that casts the pointer stored in b to Option<&mut usize>, which is a mutable reference to a。需要注意的是,我们的两个函数都需要 unsafe。如果没有 Pin,唯一需要不安全的方法将是 b,因为我们在那里解引用指针。获取对固定(pined)值的可变引用总是不安全的,因为没有什么可以阻止我们在那一点移动固定(pined)值。Pinning to the heap is usually done by pinning a Box. There is even a convenient method on Box that allows us to get Pin>. Let’s look at a short example

  1. fn main() {
  2. let mut x = Box::pin(MaybeSelfRef::default());
  3. x.as_mut().init();
  4. println!("{}", x.as_ref().a);
  5. *x.as_mut().b().unwrap() = 2;
  6. println!("{}", x.as_ref().a);
  7. }

Here, we pin MaybeSelfRef to the heap and initialize it. We print out the value of a and then mutate the data through the self-reference in b, and set its value to 2. If we look at the output, we’ll see that everything looks as expected

自引用结构体 和 Pinning - 图9

Heap pinning being safe is not so surprising since, in contrast to(与…相比之下) the stack, a heap allocation will be stable throughout the program, regardless of where we create it

Pinning to the stack

Pinning to the stack 可能有些困难。我们了解了栈的工作原理,知道栈会随着值的弹出和压入而增大和缩小。So, if we’re going to pin to the stack, we have to pin it somewhere “high” on the stack. This means that if we pin a value to the stack inside a function call, we can’t return from that function(在函数调用里 pin 了一个值,该值存在于该函数栈帧中,所以不能直接返回该值了,因为被 pin 住了,所以是矛盾的), and expect the value to still be pinned there. That would be impossible。Pinning to the stack is hard since we pin by taking &mut T, and we have to guarantee that we won’t move T until it’s dropped. If we’re not careful, this is easy to get wrong. Rust can’t help us here, so it’s up to us to uphold that guarantee. This is why stack pinning is unsafe。Let’s look at the same example using stack pinning

  1. fn stack_pinning_manual() {
  2. let mut x = MaybeSelfRef::default();
  3. let mut x = unsafe { Pin::new_unchecked(&mut x) };
  4. x.as_mut().init();
  5. println!("{}", x.as_ref().a);
  6. *x.as_mut().b().unwrap() = 2;
  7. println!("{}", x.as_ref().a);
  8. }

The noticeable difference here is that it’s unsafe to pin to the stack, so now, we need unsafe both as users of MaybeSelfRef and as implementors. If we run the example with cargo run, the output will be the same as in our first example

自引用结构体 和 Pinning - 图10

The reason stack pinning requires unsafe is that it’s rather easy to accidentally break the guarantees that Pin is supposed to provide. Let’s take a look at this example

  1. use std::mem::swap;
  2. fn stack_pinning_manual_problem() {
  3. let mut x = MaybeSelfRef::default();
  4. let mut y = MaybeSelfRef::default();
  5. {
  6. let mut x = unsafe { Pin::new_unchecked(&mut x) };
  7. x.as_mut().init();
  8. *x.as_mut().b().unwrap() = 2;
  9. }
  10. swap(&mut x, &mut y);
  11. println!("
  12. x: {{
  13. +----->a: {:p},
  14. | b: {:?},
  15. | }}
  16. |
  17. | y: {{
  18. | a: {:p},
  19. +-----|b: {:?},
  20. }}", &x.a, x.b, &y.a, y.b);
  21. }

In this example, we create two instances of MaybeSelfRef called x and y. Then, we create a scope where we pin x and set the value of x.a to 2 by dereferencing the self-reference in b, as we did previously. Now, when we exit the scope, x isn’t pinned anymore, which means we can take a mutable reference to it without needing unsafe. Since this is safe Rust and we should be able to do what we want, we swap x and y。输出打印出两个结构体的a字段的指针地址,以及存储在b中的指针值

自引用结构体 和 Pinning - 图11

自引用结构体 和 Pinning - 图12

现在我们已经看到了栈钉扎的所有陷阱,我明确的建议是避免它,除非你需要使用它。如果你必须使用它,那就使用 pin! 宏,以避免我们在这里描述的问题

自引用结构体 和 Pinning - 图13

Pin projections and structural pinning

Before we leave the topic of pinning, we’ll quickly explain what pin projections and structural pinning are. Both sound complex, but they are very simple in practice. The following diagram shows how these terms are connected

自引用结构体 和 Pinning - 图14

Structural pinning means that if a struct is pinned, so is the field. We expose this through pin projections, as we’ll see in the following code example. If we continue with our example and create a struct called Foo that holds both MaybeSelfRef (field a) and a String type (field b), we could write two projections that return a pinned version of a and a regular mutable reference to b

  1. use std::{marker::PhantomPinned, pin::Pin};
  2. #[derive(Default)]
  3. struct MaybeSelfRef {
  4. a: usize,
  5. b: Option<*const usize>,
  6. _pin: PhantomPinned,
  7. }
  8. #[derive(Default)]
  9. struct Foo {
  10. a: MaybeSelfRef,
  11. b: String,
  12. }
  13. impl Foo {
  14. fn a(self: Pin<&mut Self>) -> Pin<&mut MaybeSelfRef> {
  15. unsafe {
  16. self.map_unchecked_mut(|s| &mut s.a)
  17. }
  18. }
  19. fn b(self: Pin<&mut Self>) -> &mut String {
  20. unsafe {
  21. &mut self.get_unchecked_mut().b
  22. }
  23. }
  24. }

Note that these methods will only be callable when Foo is pinned. You won’t be able to call either of these methods on a regular instance of Foo. Pin projections do have a few subtleties that you should be aware of, but they’re explained in quite some detail in the official documentation (https://doc.rust-lang.org/stable/std/pin/index.html), so I’ll refer you there for more information about the precautions you must take when writing projections

自引用结构体 和 Pinning - 图15

至此,我们几乎涵盖了 async Rust 中所有的高级主题。然而,在进入最后一章之前,让我们看看钉扎如何防止我们犯上一遍协程例子中犯的大错误

Improving our example 4 – using pinning

  • Copy the entire c-coroutines-problem folder and name the new copy e-coroutines-pin
  • Open Cargo.toml and rename the name of the package e-coroutines-pin

自引用结构体 和 Pinning - 图16

  1. use std::pin::Pin;
  2. // The only other change we need to make is in the definition of poll in our Future trait
  3. fn poll(self: Pin<&mut Self>, waker: &Waker) -> PollState<Self::Output>;

However, the implications of this change are noticeable pretty much everywhere poll is called, so we need to fix that as well

  1. use crate::{future::PollState, runtime::{self, reactor, Waker}, Future};
  2. use mio::Interest;
  3. use std::{io::{ErrorKind, Read, Write}, pin::Pin};
  4. // The only other place we need to make some changes is in the Future implementation for HttpGetFuture
  5. fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> PollState<Self::Output>
  1. let id = self.id;
  2. if self.stream.is_none() {
  3. println!("FIRST POLL - START OPERATION");
  4. self.write_request();
  5. let stream = (&mut self).stream.as_mut().unwrap();
  6. runtime::reactor().register(stream, Interest::READABLE, id);
  7. runtime::reactor().set_waker(waker, self.id);
  8. }
  9. let s = String::from_utf8_lossy(&self.buffer).to_string();
  10. runtime::reactor().deregister(self.stream.as_mut().unwrap(), id);
  11. break PollState::Ready(s);

自引用结构体 和 Pinning - 图17

  1. mod future;
  2. mod http;
  3. mod runtime;
  4. use future::{Future, PollState};
  5. use runtime::Waker;
  6. use std::{fmt::Write, marker::PhantomPinned, pin::Pin};
  7. Wait1(Pin<Box<dyn Future<Output = String>>>),
  8. Wait2(Pin<Box<dyn Future<Output = String>>>),
  9. struct Coroutine0 {
  10. stack: Stack0,
  11. state: State0,
  12. _pin: PhantomPinned,
  13. }
  14. impl Coroutine0 {
  15. fn new() -> Self {
  16. Self {
  17. state: State0::Start,
  18. stack: Stack0::default(),
  19. _pin: PhantomPinned,
  20. }
  21. }
  22. }
  23. fn poll(self: Pin<&mut Self>, waker: &Waker) -> PollState<Self::Output>
  24. let this = unsafe { self.get_unchecked_mut() };
  25. loop {
  26. match this.state {
  27. State0::Start => {
  28. // initialize stack (hoist declarations - no stack yet)
  29. this.stack.buffer = Some(String::from("\nBUFFER:\n----\n"));
  30. this.stack.writer = Some(this.stack.buffer.as_mut().unwrap());
  31. // ---- Code you actually wrote ----
  32. println!("Program starting");
  33. ...
  34. }
  35. }
  36. ...
  37. }
  38. let fut1 = Box::pin(http::Http::get("/600/HelloAsyncAwait"));
  39. let fut2 = Box::pin(http::Http::get("/400/HelloAsyncAwait"));
  40. match f1.as_mut().poll(waker)
  41. match f2.as_mut().poll(waker)
  1. ...
  2. thread::{self, Thread}, pin::Pin,
  3. };
  4. type Task = Pin<Box<dyn Future<Output = String>>>;
  5. e.tasks.borrow_mut().insert(id, Box::pin(future));
  6. match future.as_mut().poll(&waker) {