Future和异步
Futures
trait Future {
type Output;
// For now, read `Pin<&mut Self>` as `&mut Self`.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
基本就是js的Promise。如果完成了在poll的时候返回Ready,否则Pending。如果一个Future在返回Ready之后就认为不会再被调用了,如果还被poll有的可能返回Pending有的可能panic,但永远不会造成内存或线程安全问题。fuse适配器方法可以让Future永远返回pending。
async function
async fn在被调用时会马上返回一个Future。声明async fn时不用把返回类型定义成Future,Rust会自动包装。返回的Future里包装了该函数运行需要的参数和本地变量。Rust会根据函数的body和参数生成具体的Future的类型,是个匿名类型,你需要知道是它实现了Future
一个async function在被poll的时候才会开始执行。poll会按函数体里的子async function的顺序来检查进度,其中前一个的await如果没有返回Ready不会去检查下一个。
能中断然后继续执行是async函数特有的。一般函数返回之后其stack的数据就全部丢失了,所以想await这样能够返回之后继续执行的能力也只有在async函数中可以。
现在Rust还不支持在trait里定义async函数,只有自由函数可以。async-trait这个crate支持用宏来定义异步trait方法。
在一个同步的函数等待异步函数可以用async_std::task::block_on。在异步函数里不能用,因为会阻塞整个线程。block_on不是无限循环的poll去检查异步函数的结果,而是在poll的时候会传入一个Context,这个context在子Future有结果的时候会被唤起。
async 代码块
async放在一个代码块前,可以将这个代码块变成异步的,里面可以使用异步函数和await,代码块会返回future,也是在poll时才会真的开始运行。
let res = async {
let r = af().await;
if r > 0 {
return r
}
return false
};
async代码块表现的像一个函数,如果里面的?
操作符返回错误会返回给代码块的值(上面是给res),而不是让其所在的函数返回,代码块里如果有return,也是代码块的值(上面是返回给res)也不是让函数返回。而且向上面这样如果两种情况返回的类型不一样是会报错的。因为代码块没有指定返回类型,所以?无法推断出要转换的错误类型,可以在结尾的Ok里写明类型Ok::<(), std::io::Error>(())
async代码块同样可以捕捉上文的变量,想closure一样,可以用move:async move {}
。一般在同步代码里调用异步函数时,如果传递引用,需要引用有static lifetime,因为编译器不知道你会把这个Future传递给谁来await,也许到时候传递的引用所指的变量已经不在了。所以可以放到一个async move代码块里,这样编译器会把变量转移到这个代码块里。
async块还可以放到同步函数里,这样可以让这个函数在不用poll的情况下能先做一些事,然后返回一个Future让调用者来控制如何并行。
spawn
block_on会阻塞线程等异步结果,所以不能用于并行。想要并行需要用spawn或spawn_local(需要用unstable的async-std)。spawn有可能会用其他线程,spawn_local只在本线程。spawn_local的任务还是只有在poll,也就是block_on时才会启动。
不同的异步程序之间的切换只在await的时候发生,所以要想并行,必须有await。
spawn在有空闲线程就会启动,而且在一个Future在被多次poll的过程中,每次去poll的可能是不同的线程,这样这个Future的不同部分可能在不同的线程上执行。
因为spawn把Future放到不同的线程执行,所以这个Future需要是Send的,只有Future里捕捉到的所有变量,包括异步函数自己的本地变量都是Send这个Future才是Send。
除了让本地变量都是Send意外,还可以控制变量的生存范围,只要不覆盖await就可以。
其他语言很多都是在调用异步函数的时候就开始执行,这需要在语言本身就实现事件轮训。Rust这样让程序员自己控制异步函数的启动和切换就不需要语言自己的overhead了。
yield_now和spawn_blocking
如果一个async函数中有比较长的非async的计算过程,这个函数会占用很多这个线程的时间,导致其他异步函数没机会运行。
可以加yield_now在这个长计算中间可以给其他函数跑的机会async_std::task::yield_now().await;
yield_now在第一次被poll的时候返回Pending,这时候调用者可以去poll其他异步函数,回来再poll就会返回Ready继续进行。
对于无法拆分的计算,比如调用C/C++,可以用async_std::task::spawn_blocking,把计算放到一个closure中传递给另外的线程。
原理
poll
当一个Future第一次被poll的时候如果还没有完成,会传给Poll一个waker,当Future完成的时候用这个waker去唤醒Poll
use std::task::Waker;
struct MyPrimitiveFuture {
...
waker: Option<Waker>,
}
impl Future for MyPrimitiveFuture {
type Output = ...;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<...> {
...
if ... future is ready ... {
return Poll::Ready(final_value);
}
// Save the waker for later.
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
// If we have a waker, invoke it, and clear `self.waker`.
if let Some(waker) = self.waker.take() {
waker.wake();
}
一般来说一个异步函数不会自己处理waker,而是传递给函数体里面的await。
pin
future因为会在执行中间被打断和继续,每个能被打断的地方都类似一个作用域的起止点,变量的生命周期也受到影响,在断点之后的引用有可能会变成空指针。所以future有的生命周期两个阶段。第一个阶段是在函数体开始执行之前,所有的变量引用还没有生成,这时候future是可以被转移或drop的。第二个阶段是被poll之后,引用已经生成的话,如果这时候future被转移或drop了,这些指针就变成悬垂指针了。所以这时候future就不能随便移动了,需要被pin。
pin的定义
pub struct Pin<P> {
pointer: P,
}
pointer是一个私有字段,所以只能通过一些特定的方法获取,这样保证了P不被随意移动。
pin的创建可以通过future-lite的pin!宏。也可以通过标准库里的的Pin的Pin::from(boxed)。这些方法都是获取原指针的所有权,然后返回一个Pin指针,确保你没有权利再移动它。
block_on和await这些操作都是获取了future的所有权然后生成一个Pin然后在poll它。
Unpin Trait
除了异步函数和异步代码块意外,其他的类型其实用不着使用Pin的,所以大多数Rust的类型都实现了Unpin这个marker trait。这类的类型即使使用了Pin也可以随便移动,可以使用into_inner方法重新获取里面的值。