异步历史

回调函数 -> promise -> async/await

async/await

async 是一个修饰符,它可以应用在函数上,在调用时立即返回一个 Future 对象。这个 Future 对象最终将给出这个函数的实际返回结果。
async 将一个代码块转化为实现了 future 特征的状态机。

await 用在执行一个 async 的函数。

  1. let user = db.get_user("withoutboats").await;
  2. impl Database {
  3. async fn get_user(&mut self, user: &str) -> User {
  4. let sql = format!("select FROM users WHERE username = {}", user);
  5. let db_response = self.query(&sql).await;
  6. User::from(db_response)
  7. }
  8. }

设计原理

零成本抽象

零成本抽象是 Rust 比较独特的一项准则。

零成本抽象意味着你不使用的东西,你不用为它付出任何代价,进一步讲,你使用的东西,你无法写出比这更好的代码。

绿色线程方案

基于语言运行时,该模型的典型应用是 golang ,它的绿色线程被称为 goroutine。创建成本很低,可以同时运行成千上万个。

优点:避免线程的上下文切换,开销低。

缺点:集成于语言中,不得不使用该绿色线程,引入调度 goroutine 的运行时成本。

future 方案

基于库的解决方案,为异步 I/O 提供良好的抽象,它不是语言的一部分,也不是每个程序附带的运行时的一部分,只是可选的并按需使用的库。

Future 表示一个尚未得出的值,可以在它被解决(resolved)以得出那个值之前对它进行各种操作。
Future 可以避免空的轮询。

基于轮询的 future

future 结构
  1. pub enum Poll<T> {
  2. Ready(T),
  3. Pending,
  4. }
  5. pub trait Future {
  6. type Output;
  7. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
  8. }

Runtime

Runtime 由两部分组成, ExecutorReactor

Executor 为执行器,没有「任何阻塞的等待」,循环执行一系列就绪的 Future ,当 Future 返回 Pending 时,会将 Future 转移到 Reactor 上等待进一步唤醒。

Reactor 为反应器,轮询并唤醒挂载的事件,执行对应的 wake 方法,同时将 Future 放到 Executor 的队列中等待执行。

一句话概括 RuntimeFuture不能马上返回值的时候,会被交给 ReactorFuture 的值准备就绪后,调用 wake 传递给 Executor 执行,反复执行,直至整个 Future 返回Ready

2. 异步浅谈 - 图1

2. 异步浅谈 - 图2

rust 异步使用

  1. async fn foo() -> {
  2. ...
  3. }

将函数的原型返回一个 Futrue,大致相当于

  1. fn foo() -> impl Future<Output = ()> {
  2. async { ... }
  3. }

async/await 通过一个状态机来控制代码的流程,配合executor完成协程的切换。

示例

  1. use async_std::task;
  2. use futures::executor::block_on;
  3. use std::{fmt::Debug, time};
  4. async fn hello_world() {
  5. println!("hello, world!");
  6. }
  7. #[derive(Debug)]
  8. struct Song;
  9. async fn learn_song() -> Song {
  10. println!("学习唱歌");
  11. // 模拟IO阻塞等
  12. // task::sleep(time::Duration::from_secs(1)).await; // 不能使用 thread::sleep
  13. Song
  14. }
  15. async fn sing_song(song: Song) {
  16. task::sleep(time::Duration::from_secs(5)).await;
  17. println!("唱歌中... {:?}", song);
  18. }
  19. async fn dance() {
  20. println!("跳舞中...")
  21. }
  22. // 并行任务1:
  23. async fn learn_and_sing() {
  24. // 在唱歌之前等待学歌完成
  25. // 这里我们使用 `.await` 而不是 `block_on` 来防止阻塞线程,这样就可以同时执行 `dance` 了。
  26. let song = learn_song().await;
  27. sing_song(song).await;
  28. }
  29. async fn async_main() {
  30. let f1 = learn_and_sing();
  31. let f2 = dance();
  32. // `join!` 类似于 `.await` ,但是可以等待多个 future 并发完成
  33. // 如果学歌的时候有了短暂的阻塞,跳舞将会接管当前的线程,如果跳舞变成了阻塞
  34. // 学歌将会返回来接管线程。如果两个futures都是阻塞的,
  35. // 这个‘async_main'函数就会变成阻塞状态,并生成一个执行器
  36. futures::join!(f1, f2); // f1, f2 并行完成
  37. }
  38. fn main() {
  39. let future = hello_world(); // Nothing is printed
  40. println!("调用了异步函数 hello_world,此句应该先打印");
  41. block_on(future);
  42. // 测试二
  43. println!("===顺序执行===");
  44. let song = block_on(learn_song());
  45. block_on(sing_song(song));
  46. block_on(dance());
  47. println!("===并行执行===");
  48. block_on(async_main());
  49. }

参考

  1. 零成本异步 I/O https://mp.weixin.qq.com/s/5KBPwcv8jCrJoYprOyLUDQ
  2. https://mp.weixin.qq.com/s/ZMeZaNwT-xJwav0zZpolyg
  3. https://www.rectcircle.cn/posts/rust%E5%BC%82%E6%AD%A5%E7%BC%96%E7%A8%8B/
  4. https://rustcc.cn/article?id=e6d50145-4bc2-4f1e-84da-c39c8217640b