参考博客:https://hegdenu.net/posts/understanding-async-await-3/

现在我们回到那个 await 的话题。将研究为什么有些事情不应该跨 await point 进行。通常,新编写异步代码的人要做的第一件事就是共享状态。通常,共享状态由互斥量保护(或者互斥量的变体,比如读写锁)。所以今天将重点关注互斥锁

why shouldn’t I hold a mutex guard across an await point

假设我们想在异步代码中共享状态。这通常不是一个好主意(实际上,这通常是不必要的)。但有时这是必要的。无论如何,它都值得一看。因为我们在这里感兴趣的是理解 async/await。我们会经常讨论互斥量。所以可能有必要快速复习一下基础知识。以确保我们意见一致。Mutex 是“互斥”的缩写。它是一个并发编程原语。它确保在给定时间,程序只有一部分在做特定的事情。通常这是访问跨线程共享的对象(如果你没有多线程,那么你不需要这种保护)。传统的互斥量有两个方法:Lock 和 Unlock。代码锁定互斥量,做一些事情,然后解锁互斥量。如果程序的其他部分已经持有互斥量,那么代码会阻塞lock方法。我们可以在Rust中通过创建自己的类型来想象这种流程

  1. // These are NOT real Rust types, especially `MyMutex`
  2. fn exclusive_access(mutex: &MyMutex, protected: &MyObject) {
  3. // Blocks until a lock can be obtained
  4. mutex.lock();
  5. // This "something" is what is protected by the mutex.
  6. protected.do_something();
  7. // Once unlocked, other threads can lock the mutex. Don't forget this bit!
  8. mutex.unlock();
  9. }

这里的问题是 MyObject 只受约定的保护。我们必须相信,在任何访问 MyObject 的地方,都锁定了相同的互斥量(如果你锁定了一个不同的互斥量,它真的没有任何帮助)。当你完成时,你可能忘记解锁互斥量。在这个玩具示例中,这似乎不太可能。但是假设我们使用 ? 操作符,如果 do_something() 返回错误,则提前返回。现在我们再也不能锁定互斥量了


相反,Rust 将互斥量和它保护的对象粘在一起。这就是 std::sync::Mutex。受保护对象实际上在互斥量内部。当你锁定互斥量时,你会得到一个 MutexGuard。您可以通过解除引用来通过 guard 访问受保护的对象。当 guard 超出范围时,互斥量会自动解锁。这种行为称为 RAII,这代表“资源获取即初始化”

  1. // This is now the std library mutex
  2. fn exclusive_access(mutex: &std::sync::Mutex<MyObject>) {
  3. // Blocks until a lock can be obtained (same as before)
  4. let guard = mutex
  5. .lock()
  6. .expect("the mutex is poisoned, program cannot continue");
  7. // The guard gets automatically dereferenced so we can call
  8. // `MyObject`'s methods on it directly.
  9. guard.do_something();
  10. // That's it, once the guard goes out of scope, the lock is released.
  11. }

mutex sequence diagram

让我们试着想象两个线程访问我们受保护的对象。我简化了一些东西。例如,我们不能在创建两个线程时传递互斥量。我们需要把它封装在一个智能指针中。但我们稍后会看到如何做到这一点。这对这个例子来说并不重要

异步理解之路三 - 图1

在这里,我们可以看到两个线程如何锁定互斥量。只有线程1成功。线程2停好了(停止执行)。一旦线程1放弃互斥锁保护,互斥锁就解锁了。然后线程2可以获得锁并完成它的工作。现在,让我们回到使用异步上下文的 MutexGuard

hold-mutex-guard async function

So let’s imagine that we’ve got a use case where we want a mutex held across an await point.This could be
  1. Read shared counter
  2. Access async shared resource (a database handle perhaps?)
  3. Write new value to shared counter
现在,让我们把异步共享的资源替换成运行时使用的资源。因为我们并不关心它是什么(这将使后面的事情更简单)。这是我们的async函数
  1. use std::sync::{Arc, Mutex};
  2. async fn hold_mutex_guard(data: Arc<Mutex<u64>>) -> Result<(), DataAccessError> {
  3. let mut guard = data.lock().map_err(|_| DataAccessError {})?;
  4. println!("existing value: {}", *guard);
  5. tokio::task::yield_now().await;
  6. *guard = *guard + 1;
  7. println!("new value: {}", *guard);
  8. Ok(())
  9. }
  1. Our future takes some data. Actually it’s an <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Arc<Mutex<u64>></font>. Our value is a <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">u64</font>. We want to access and modify our value from multiple tasks though. So it’s wrapped in a <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Mutex</font>
  2. Finally, we need to access our mutex from multiple tasks. So it’s wrapped in a std::sync::Arc. An <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Arc</font> is actually an acronym: ARC(Atomically Reference Counted). It’s a shared pointer,It can be cloned and passed around between tasks
  3. We lock the mutex(now only we have access to it). We print out the value of our shared data. Now we “access our async resource”(actually we just yield back to the runtime). Then update the value of the shared data. And print that out

异步理解之路三 - 图2

running the hold-mutex-guard async function

异步理解之路三 - 图3

We create our data(initial value of 0). And then we await our future. Remember: this is a naughty async function. It’s holding a mutex guard across an await point! So let’s see what happens when we await it

异步理解之路三 - 图4

It works (that is just a little disappointing). Clearly, we’re going to have to try harder to do bad things. 我们不能连续两次 await async函数。因为这样它就会连续运行(一次又一次)。然而,有一种方法可以在异步运行时并发运行多个 future。它被称为spawn,类似于创建新线程的函数。以Tokio为例,它是Tokio::spawn (严格来说,应该是tokio::task::spawn,但它的别名是tokio::spawn,这是它通常的使用方式)。使用tokio::spawn的一个巨大区别是你不需要 await。future将被设置为在新任务中立即执行。事实上,这就是我们如何完成多个任务的原因。但是,新任务可能不会立即轮询。这取决于async运行时的worker的占用情况。让我们创建一个简单的例子。为了简洁,我们将使用async/await语法

异步理解之路三 - 图5

spawn onto current-thread:An async runtime may only have one worker. For example the current-thread scheduler in Tokio. Then we could spawn a task from within another task. But it wouldn’t get polled until the current task yields to the scheduler(or maybe later if other tasks are waiting). This is how it would look as a sequence diagram

异步理解之路三 - 图6

spawn onto multi-thread:Instead, a runtime may have multiple workers(which means multiple threads). Like the multi-thread scheduler in Tokio. Then there can be as many tasks being polled in parallel as there are workers. Let’s take a runtime with 2 workers and see how that would look as a sequence diagram

异步理解之路三 - 图7

这张图包含了一些关于 Tokio 如何工作的谎言。任务实际上是在运行新任务的同一个worker上生成的。如果另一个工作进程处于空闲状态,它可能会从第一个工作进程的队列中窃取任务(但所有这些都超出了我们的范围,所以我们继续)

wait for me to finish:Spawn returns a join handle: tokio::task::JoinHandle. The join handle can be used to wait for the completion of the task(the join handle implements <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Future</font> so it can be <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">.await</font>ed just like any future!). It can also be used to abort the spawned task

spawning multiple async functions:Let’s spawn a couple of instances of our async function!

异步理解之路三 - 图8

  1. error: future cannot be sent between threads safely
  2. --> resources/understanding-async-await/src/bin/mutex_guard_async.rs:5:18
  3. |
  4. 5 | tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
  5. | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `hold_mutex_guard` is not `Send`
  6. |
  7. = help: within `impl Future<Output = Result<(), DataAccessError>>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, u64>`
  8. note: future is not `Send` as this value is used across an await
  9. --> resources/understanding-async-await/src/bin/mutex_guard_async.rs:15:29
  10. |
  11. 12 | let mut guard = data.lock().map_err(|_| DataAccessError {})?;
  12. | --------- has type `std::sync::MutexGuard<'_, u64>` which is not `Send`
  13. ...
  14. 15 | tokio::task::yield_now().await;
  15. | ^^^^^^ await occurs here, with `mut guard` maybe used later
  16. ...
  17. 21 | }
  18. | - `mut guard` is later dropped here
  19. note: required by a bound in `tokio::spawn`
  20. --> /Users/stainsby/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.27.0/src/task/spawn.rs:163:21
  21. |
  22. 163 | T: Future + Send + 'static,
  23. | ^^^^ required by this bound in `spawn`
  • error: future cannot be sent between threads safely:派生任务可以在任何worker(线程)上启动。即使是当前线程运行时,也可能有另一个线程产生的任务(这意味着任务是从运行时外部派生的,这很好)。因此,future需要能够在线程之间发送是有道理的。但为什么不能呢?
  • note: future is not Send as this value is used across an await:然后它向我们指出mut guard。告诉我们它没有发送。然后指向 .await,我们将其作为违规的await点(rust错误太惊人了!)。最后,错误继续告诉我们,这都是spawn的错!
  • note: required by a bound in tokio::spawn:这个 send 东西并不神奇。tokio的作者在tokio::spawn中明确指定了它。让我们来看看它的代码

异步理解之路三 - 图9

  • Rust的类型系统阻碍了我们的计划。我们用互斥保护和等待点做坏事的计划。但我们可能还是会惹些坏事。We don’t need to run our not <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Send</font> async function twice concurrently. We just need to try to lock that mutex from somewhere else. So let’s create another async function that we can spawn. It’s the same as the previous one, but without the yield (and therefore, without an await point,so it’s not really async)

异步理解之路三 - 图10

  • We’re not holding the guard across an await point. So this async function is <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Send</font>! We need to make one more change. To ensure that this breaks. We’re going to use a current-thread runtime. This means that tasks won’t run in parallel

异步理解之路三 - 图11

Here we spawn our <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Send</font> async function <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">yieldless_mutex_access()</font>. And then immediately await our bad function <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">hold_mutex_guard()</font>. Let’s check the output. And then it just hangs there. We’ve created a deadlock! Now, it’s time to understand why. So we’ll go back to our old tricks. And write a custom <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Future</font> for our async function.

异步理解之路三 - 图12

hold-mutex-guard future

We’re going to manually implement a future that does the same. I almost didn’t manage this one. Now, on to that future. Futures are generally implemented as state machines. We’ll need an initial state. And we like to have an explicit completed state. And in the middle, a state after having yielded once

异步理解之路三 - 图13

Our initial state needs the parameters that the async function receives. The yielded state is going to have our guard stored in it. We also need the Arc containing our data. This matches what our async function would have had generated. The <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font> requires a lifetime generic parameter. That means that our future will also need a lifetime generic parameter

异步理解之路三 - 图14

take a look at the state machine for <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">HoldMutexGuard</font> 异步理解之路三 - 图15 1. The future starts in the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Init</font> state:When polled the first time, it returns <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Poll::Pending</font> 2. And moves to the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Yielded</font> state. When polled the second time, it returns <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Poll::Ready</font> 3. And moves to the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Done</font> state. The implementation is a little more complex though

implementing the hold-mutex-guard future:

异步理解之路三 - 图16

It’s not as bad as it looks! Our <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Output</font> associated type is the same as our function return parameter. That’s easy. So let’s look at the implementation for <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">poll()</font>. What is this beast?

异步理解之路三 - 图17

The borrow checker has lots of fun with anything to do with <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Pin</font>. We need to modify <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">self</font>. But it’s pinned. And we need to reference parts of it as well. So we dereference our pinned self. Then take a mutable reference. The first time we get polled, we’ll be in the state <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Init</font>. So we’ll do everything up to the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">yield_now</font> call in our async function

holding onto that guard:

  1. Once we have our <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font>, we print the value. We’re now going to yield back to the runtime. So just like in our <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">YieldNow</font> future, we need to wake our waker first. Otherwise our future will never be polled again. Then we set the next state: <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Yielded</font>. And return <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Poll::Pending</font>
  2. The next time our future gets polled, we are already in state <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Yielded</font>. We will print the value from the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font>. Then move to state <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Done</font> and return <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Poll::Ready</font>. At that point, the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font> will get dropped. That’s the end of the implementation
  3. The important bit here is that in the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Yielded</font> state, we hold on to the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font> and return. This is what our async function is doing too. But we don’t see it so clearly. We just see <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">.await</font>. But every time your async function contains an await point, that is the future returning. And before returning, it has to store all the in-scope local variables in itself

hanging around again:Let’s reproduce that hanging program again with our future. We’re going to spawn the same async function to help provoke the hang as we did before. And we’ll unwrap async main() straight away. This leaves us with an unwrapped version of the same code we used before

异步理解之路三 - 图18

We’re creating a current-thread runtime. Same as before. Let’s have a look at the sequence diagram!

异步理解之路三 - 图19

  1. The important point is the two futures. <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">yieldless_mutex_access()</font> gets spawned first. Then <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">HoldMutexGuard</font> gets awaited. As we saw when we introduced spawn, the new task has to wait. The runtime is single threaded. So the new task created with <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">yieldless_mutex_access()</font> must wait until the current task yields to the runtime. This means that the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">HoldMutexGuard</font> future is run first. It locks the mutex and receives a <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font>. It wakes it’s waker
  2. Then changes state to <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Yielded</font>, storing the <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">MutexGuard</font> in itself. And then returns <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">Poll::Pending</font>. Yielding to the runtime. Now the runtime can poll the next task. The one spawned with <font style="color:rgb(51, 51, 51);background-color:rgb(238, 238, 238);">yieldless_mutex_access()</font>. This task locks the mutex. Well, it tries. But the mutex is already locked, so it blocks until it gets unlocked. Since the runtime only has one thread, this blocks the entire runtime. And causes a deadlock. We saw this before with our async function. And now we understand why!

now what?

那么,如果我们想控制对某些共享异步资源的访问,应该怎么做呢?显而易见的答案是在tokio中使用异步互斥量。它叫做tokio::sync::Mutex。在等待点之间保持这个互斥量的守卫是安全的。这是因为它的lock()方法是异步的。所以它不会在等待锁的时候阻塞线程。因此,其他一些保持锁的任务可以取得进展( release the lock )。然而,通常最好根本不使用互斥量。相反,将共享资源的全部所有权交给单个任务。并通过消息传递与该任务进行通信