Future和异步

Futures

  1. trait Future {
  2. type Output;
  3. // For now, read `Pin<&mut Self>` as `&mut Self`.
  4. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
  5. }
  6. enum Poll<T> {
  7. Ready(T),
  8. Pending,
  9. }

基本就是js的Promise。如果完成了在poll的时候返回Ready,否则Pending。如果一个Future在返回Ready之后就认为不会再被调用了,如果还被poll有的可能返回Pending有的可能panic,但永远不会造成内存或线程安全问题。fuse适配器方法可以让Future永远返回pending。

async function

async fn在被调用时会马上返回一个Future。声明async fn时不用把返回类型定义成Future,Rust会自动包装。返回的Future里包装了该函数运行需要的参数和本地变量。Rust会根据函数的body和参数生成具体的Future的类型,是个匿名类型,你需要知道是它实现了Future,就像closure。

一个async function在被poll的时候才会开始执行。poll会按函数体里的子async function的顺序来检查进度,其中前一个的await如果没有返回Ready不会去检查下一个。
能中断然后继续执行是async函数特有的。一般函数返回之后其stack的数据就全部丢失了,所以想await这样能够返回之后继续执行的能力也只有在async函数中可以。
现在Rust还不支持在trait里定义async函数,只有自由函数可以。async-trait这个crate支持用宏来定义异步trait方法。

在一个同步的函数等待异步函数可以用async_std::task::block_on。在异步函数里不能用,因为会阻塞整个线程。block_on不是无限循环的poll去检查异步函数的结果,而是在poll的时候会传入一个Context,这个context在子Future有结果的时候会被唤起。

async 代码块

async放在一个代码块前,可以将这个代码块变成异步的,里面可以使用异步函数和await,代码块会返回future,也是在poll时才会真的开始运行。

  1. let res = async {
  2. let r = af().await;
  3. if r > 0 {
  4. return r
  5. }
  6. return false
  7. };

async代码块表现的像一个函数,如果里面的?操作符返回错误会返回给代码块的值(上面是给res),而不是让其所在的函数返回,代码块里如果有return,也是代码块的值(上面是返回给res)也不是让函数返回。而且向上面这样如果两种情况返回的类型不一样是会报错的。因为代码块没有指定返回类型,所以?无法推断出要转换的错误类型,可以在结尾的Ok里写明类型Ok::<(), std::io::Error>(())

async代码块同样可以捕捉上文的变量,想closure一样,可以用move:async move {}。一般在同步代码里调用异步函数时,如果传递引用,需要引用有static lifetime,因为编译器不知道你会把这个Future传递给谁来await,也许到时候传递的引用所指的变量已经不在了。所以可以放到一个async move代码块里,这样编译器会把变量转移到这个代码块里。

async块还可以放到同步函数里,这样可以让这个函数在不用poll的情况下能先做一些事,然后返回一个Future让调用者来控制如何并行。

spawn

block_on会阻塞线程等异步结果,所以不能用于并行。想要并行需要用spawn或spawn_local(需要用unstable的async-std)。spawn有可能会用其他线程,spawn_local只在本线程。spawn_local的任务还是只有在poll,也就是block_on时才会启动。
不同的异步程序之间的切换只在await的时候发生,所以要想并行,必须有await。

spawn在有空闲线程就会启动,而且在一个Future在被多次poll的过程中,每次去poll的可能是不同的线程,这样这个Future的不同部分可能在不同的线程上执行。
因为spawn把Future放到不同的线程执行,所以这个Future需要是Send的,只有Future里捕捉到的所有变量,包括异步函数自己的本地变量都是Send这个Future才是Send。
除了让本地变量都是Send意外,还可以控制变量的生存范围,只要不覆盖await就可以。

其他语言很多都是在调用异步函数的时候就开始执行,这需要在语言本身就实现事件轮训。Rust这样让程序员自己控制异步函数的启动和切换就不需要语言自己的overhead了。

yield_now和spawn_blocking

如果一个async函数中有比较长的非async的计算过程,这个函数会占用很多这个线程的时间,导致其他异步函数没机会运行。
可以加yield_now在这个长计算中间可以给其他函数跑的机会async_std::task::yield_now().await;yield_now在第一次被poll的时候返回Pending,这时候调用者可以去poll其他异步函数,回来再poll就会返回Ready继续进行。
对于无法拆分的计算,比如调用C/C++,可以用async_std::task::spawn_blocking,把计算放到一个closure中传递给另外的线程。

原理

poll

当一个Future第一次被poll的时候如果还没有完成,会传给Poll一个waker,当Future完成的时候用这个waker去唤醒Poll

  1. use std::task::Waker;
  2. struct MyPrimitiveFuture {
  3. ...
  4. waker: Option<Waker>,
  5. }
  6. impl Future for MyPrimitiveFuture {
  7. type Output = ...;
  8. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<...> {
  9. ...
  10. if ... future is ready ... {
  11. return Poll::Ready(final_value);
  12. }
  13. // Save the waker for later.
  14. self.waker = Some(cx.waker().clone());
  15. Poll::Pending
  16. }
  17. }
  18. // If we have a waker, invoke it, and clear `self.waker`.
  19. if let Some(waker) = self.waker.take() {
  20. waker.wake();
  21. }

一般来说一个异步函数不会自己处理waker,而是传递给函数体里面的await。

pin

future因为会在执行中间被打断和继续,每个能被打断的地方都类似一个作用域的起止点,变量的生命周期也受到影响,在断点之后的引用有可能会变成空指针。所以future有的生命周期两个阶段。第一个阶段是在函数体开始执行之前,所有的变量引用还没有生成,这时候future是可以被转移或drop的。第二个阶段是被poll之后,引用已经生成的话,如果这时候future被转移或drop了,这些指针就变成悬垂指针了。所以这时候future就不能随便移动了,需要被pin。

pin的定义

  1. pub struct Pin<P> {
  2. pointer: P,
  3. }

pointer是一个私有字段,所以只能通过一些特定的方法获取,这样保证了P不被随意移动。
pin的创建可以通过future-lite的pin!宏。也可以通过标准库里的的Pin的Pin::from(boxed)。这些方法都是获取原指针的所有权,然后返回一个Pin指针,确保你没有权利再移动它。

block_on和await这些操作都是获取了future的所有权然后生成一个Pin然后在poll它。

Unpin Trait

除了异步函数和异步代码块意外,其他的类型其实用不着使用Pin的,所以大多数Rust的类型都实现了Unpin这个marker trait。这类的类型即使使用了Pin也可以随便移动,可以使用into_inner方法重新获取里面的值。