导读

Rust 1.39中异步API进入稳定阶段, Rust又增一利器。但对于笔者来说,Rust异步编程还是迷雾重重,在学习过程中遇到了很多问题,其中最主要的是:

  • Rust的协程设计
  • Rust的底层异步机制是如何设计的?
  • 目前的异步功能库有tokio, async_std, smol,它们之间有什么相似和差异?
  • Rust的异步API有哪些?
  • 目前还有什么不足之处,roadmap是什么?
  • Rust中使用异步能带来哪些好处?

笔者在知乎上发现一个讲解Rust异步机制的专栏,通过阅读该系列的文章,构建Rust异步的世界观。

基本概念 - 图1

Rust的协程

在讨论异步机制的要素之前,我们先讨论并发问题。
多线程是处理并发的重要手段,它最基本的功能是提供了多个独立的逻辑流,并且在这些逻辑流之间不断切换从而在同一段时间内可以处理若干个任务。

但是切换是有开销的,线程开销包括:

  • 用户态与内核态的两次切换,内核态进行相关处理和调度
  • 寄存器堆栈的保存和恢复
  • TLB,cache刷新带来的cache miss

在Linux的实现中,线程和进程并同用task_struct去描述, 在os层面不区分线程和进程,因此它们的切换开销相近。

协程模型就是把逻辑流的调度机制从内核态搬运到用户态,因此同一线程中的协程切换不需要涉及内核\用户态切换,因此协程的调度开销要比线程切换低。另一方面,给每一个协程提供更细粒度的空间分配,可以提高内存的利用率。

上面提到的调度模型在计算机各个领域都很常见,简单来说就是抽象为Task,-Scheduler, Resource. Task代表某种将被执行的任务,Task需要获取\绑定某个Resource才能被执行。根据Task的需求,Scheduler为Task匹配某个Resource。

因此,协程(Task)需要被调度器调度到某个内核线程(Resource)上才能执行,调度器在用户态工作,没有内核态用户态的切换开销。

协程有两种实现:无栈协程(stackless coroutine)和有栈协程(green \ stackful coroutine)。Rust是无栈协程,Golang的goroutine是有栈协程。有栈协程的结构与内核线程类似,拥有自己的栈结构存储局部变量。

有栈协程写起来类似原生线程,但有一些缺点:

  • 栈扩容,有额外开销。
  • CPU上下文要保存到栈里面,跨平台比较复杂。

那么Rust中无栈协程又是如何实现状态保存的呢?使用状态机:

  1. async fn miao(){
  2. let a = 10;
  3. func().await;
  4. let (b, c) = (10usize, 0usize);
  5. func().await;
  6. }
  7. // compiler为miao生成的匿名状态机
  8. enum CoroutineState{
  9. State0,
  10. State1(i32), // 保存变量a line 1-2
  11. State2(usize, uszie); // 保存变量b, c line 3-5
  12. Terminal,
  13. }

在上面的代码中, 当miao()被调用时, 处于初始状态State0, 运行到第2行时处于State1, 3-5行间处于State2。简单地说,每一次 .await的调用完成,会使协程转变到新状态

在Future 0.1版本中使用基于组合回调函数的方式实现无栈协程,这一设计后来被丢弃了,主要原因是组合子之间不能传递引用,只能返回值,导致每个回调(闭包)都要存储必须的状态,带来额外的内存分配和复制开销,不符合Rust的零成本抽象哲学。

std中的异步

Rust的std库中只定义了以下关键内容:

  • 定义了 Future trait
  • 通过 async\await 语法糖来方便创建\启动异步任务,也就是根据异步任务实现状态机。
  • 定义了 Waker 类型

只有接口定义还远不能使用。

组成

Task

执行任务的代码,承载task的叫做workload。workload多种多样,从小到大有:协程,线程,进程,容器,虚拟机,物理机,分布式集群等等等。

Future

Future是对某个任务执行结果的抽象, 有两种状态 Poll::PendingPoll::Ready<T> ,分别表示任务未完成和完成状态。

  1. pub trait Future {
  2. type Output;
  3. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
  4. }

从上面的定义可以看出, Future只有一个poll方法,该方法会不断检查Future的状态。每次调用poll会返回Pending或者Ready。 Context<_> 中保存了对应的Waker和有关信息。

关于Future的特点:

  • Future是惰性的,它必须要调用 .await才会执行。
  • 只能在 async block , async fn中使用 .await
  • Rust编译器会自动为async block, async fn实现Future。
  • Future API被统一到futures这个库中。

Executor

Rust中负责调度和执行Future的机制称为Executor。
常见的executor:

  • block_on: 阻塞当前线程直到coroutine返回。一般处于最外层。
  • spawn: 类似于fork, 开启一个并发执行的task。
  • select:类似golang的select, 阻塞当前coroutine直到多个coroutine中的某一个返回。
  • join_all: 等待所有coroutine返回。

Reactor & Waker

Future的poll是用来询问和更新Future的状态,那么谁来调用这个方法? 如果用户手动调用,那就变成轮询了。如果是Executor不断调用它,那也成了轮询了。

实际上在多数的实现中,会做如下处理:

  1. impl Future for T{
  2. fn poll(..){
  3. if !pred(){
  4. // 在reactor中注册waker
  5. Pending
  6. }else{
  7. Ready(..)
  8. }
  9. }
  10. }

Executor把唤醒Future的责任抛给reactor。Waker关联了某个future,是reactor用于通知executor的中间层。

Waker在 std::task::Waker 中定义,它的设计与众不同,具体可见rfc-2592, 总结如下:

  • Rust并没有提供trait来抽象Waker,而是提供了一个具体类型 std::task::Waker
  • Waker与具体executor相关联,因此为了支持定制行为,Waker封装了一个 RawWaker 对象,RawWaker是个胖指针对象,通过定制其内部的 wakable obejctvtable 来提供定制行为)。
  • Waker必须可以复制。最直观的实现是 Arc<T> , Executor和Reactor各持有一个arc。

如下:

  1. pub fn wake(&self) {
  2. // The actual wakeup call is delegated through a virtual function call
  3. // to the implementation which is defined by the executor.
  4. unsafe { (self.waker.vtable.wake)(self.waker.data) }
  5. }

《books-futures-explained》给出了一个future例子及其逻辑运行过程:https://cfsamson.github.io/books-futures-explained/2_a_mental_model_for_futures.html

一些executor底层使用线程池,要求task本身是Send+Sync的。另一些,如Actix的executor则使用单线程。
Task在基于多线程executor上执行,要注意线程互斥锁可能导致的特殊死锁:task A获得了std::sync::Mutex然后通过.await出让执行权,其他task都在等待A释放锁。需要使用tokio::sync::Mutex这种基于future的互斥锁。如下图:
image.png

Pin

前面提到, poll方法的第一个参数不是&mut self 而是 self: Pin<&mut Self>, 那么为什么要设计Pin这样一个结构,它解决了什么问题?这要结合异步机制设计、Rust自身的语言特色来理解。

采用基于状态机方法的协程模型中, Rust编译器会为Task自动生成匿名结构用于存储状态。如果是类似如下的代码:

  1. async fn example(){
  2. let mut b = 20;
  3. let a = &mut b;
  4. func1().await;
  5. a += 1;
  6. }

上述代码中有一个跨越 .await 的引用, 因此要把a, b 都存起来。生成的代码如下:

  1. enum ExampleState<'a>{
  2. state1(i32, &'a mut i32),
  3. state2(i32, &'a mut i32),
  4. }

其中b是被结构体 ExampleState所拥有,a指向b,也就是ExampleState自身的成员a指向了另一个成员b,产生了自引用。如果此时该结构体被移动(move)了, 那么引用仍然指向先前的内存地址,会导致悬垂指针。

一个解决思路是:把被引用的对象通过类似Box的指针分配在堆内存上。这样move的是智能指针本身,而不是被引用的对象。

  1. //使用了堆分配,看起来不是zero-cost,
  2. //但是由于Future的惰性,只有在执行Future之时才会进行内存分配,因此这个开销也不大。
  3. enum ExampleState<'a>{
  4. state1(Box<i32>),
  5. state2(Box<i32>),
  6. }

自引用担心出现move,我们就让它不能Move,这就是Pin这个智能指针表达的语义。

与Pin相关的trait是 Unpin , 默认情况下类型T是 Unpin , 意味着这个类型T即使在被固定了也可以安全地move。对于 T: Unpin , Pin<'a, T> 完全等价于 &'a mut TUnpin 还有很多细节,可参考这里

例子:200行实现的单线程异步运行时

《books-futures-explained》一书带有一个200行的单线程运行时实现,带有详细注释的代码在:https://github.com/cfsamson/examples-futures/blob/master/src/main.rs

代码的几个关键点:

  • 单线程executor block_on 调用 park() 使当前线程休眠\唤醒。 block_on 内部为每个 Future 对象创建一个 MyWaker
  • 每个 MyWaker 都记录了当前线程(也就是 block_on 所在线程),通过 vtable 调用 wake() \ wake_ref() 方法时会用 thread::unpark() 将当前线程唤醒。
  • Task 主要记录了必须的数据、id(用于唯一标记)和Reactor的一个Arc引用(不是必须,为了简化实现)。
  • Taskpoll 方法中会修改 Reactor 中对应的 状态改为 TaskState::Finished 并且返回ready;或者通过注册自己,这会向Reactor的channel发送一个事件。
  • Task被注册到Reactor中,会设置一个计时器,模拟一个长时间等待的IO任务。定时器超时会把对应Task状态更新为 TaskState::Ready

以上是基本的流程,但是还有一个细节:传入 block_on 的future是一个async block,而不是 Task

  1. let mainfut = async {
  2. fut1.await; // type=Task
  3. fut2.await;
  4. };
  5. block_on(mainfut);

block_on 驱动的是 mainfut 而不是其内部的两个Task, 那么从 mainfut 这个Non-leaf future到 fut1, fut2 这两个Leaf future, 发生了什么?

换句话说, async\await 做了什么?

async\await

async语法糖自动把函数、代码块转换为实现了Future的状态机(本质是匿名状态机)。
await则

异步生态

目前的异步运行时有:tokio, async-std, smol, fuchsia-async。
大多数异步代码都是运行时无关的,但是涉及异步io、计时器,很可能要跟运行时绑定。tokio使用mio来实现reactor,定义了自己的AsyncWrite\AsyncRead trait,这根smol和async-std冲突。后者的使用了async-executor和futures定义的AsyncWrite\AsyncRead。

有一些用于兼容不同runtime的库,如async_compat.

参考文献