an async function is actually a function return a Future
fn foo2() -> impl Future<Output=()> {}async foo() {}
foo and foo2 are identical
loop {select! {stream <- network.await => {}line <- terminal.await => {}foo <- foo.await => {}}}
The executor keep running until all future are resolved
fn main() {// executorlet runtime = tokio::runtime::Runtime::new();runtime.block_on(async {...});}
tokio::task::spawn_blocking// tells the executor that it is going to be blocked, make sure other task is running// 当在async函数中使用none async function会导致async函数被block,这个时候可以通过 spawn_blocking 来告诉executor, 当前那执行的函数是blocking的
fuse future
fuse is a way to say that it’s safe to pull a future it’s safe to await a future, even if that future has already completed in the past
join
let files: Vec<_> = (0..3).map(|i| tokio::fs::read_to_string(format!("file{}", i))).collect();let (f1, f2, f3) = join!(files[0], files[1], files[2]);
tokio::spawn
一个async block只有一个executor会处理它,也就是说,即使在多核环境中,也无法充分利用,可以在async函数中,使用tokio::spawn, 这样新的线程可以来处理这个新的future
tokio runtime
可以存在多个runtime, tokio依赖thread local来获取runtime,如果没有threadlocal,比如在一些iot环境,在调用tokio::spawn()时,就需要将runtime传递到下层
what a future actually is
async fn foo() {// stage 1{let mut x = [0; 1024];let fut = tokio::fs::read_into("file.data", &mut x[..]);}// fut.await// stage 2{// because fut and x are cross await point, it is saved in the statemachine,// so it is actually self.fut and self.xlet n = fut.output();println!("{:?}", x[..n]);}}fn foo() -> impl Future<Output=()> /* Statemachine */
the question is where x store?
the compile generate some kind of enum
enum StateMachine {Stage1 {x: [u8; 1024],fut: tokio::fs::Future},Stage2 {}}any local variable that needs to be kept across await points are save in the internal state of statemachine
因为future将localvariable 保存在statemachine上,所以,future有可能变得很大,当把future作为参数传递时,会有大量的memcpy,所以最好是将future 放到堆上,Box::pin(future)
what is async_trait
trait Service {async fn call(&mut self, req: Request) -> Response;}
currently, async fn is not allowed in trait, because, compiler don’t know the size of the return type, as the return time of async function is Future, what async_trait does is transform the return type to Heap allocated, so that the size is reconginzed.
这个trait实际上转换成Future就是
trait Service {fn call(&mut self, req: Request) -> impl Future<Output=Response>;}
如果函数foo调用 Service call的函数
fn foo(x: &mut dyn Service) {let fut = x.call(Request); // 这里fut的size是多少?}
我们指到
假设有个struct X 要实现这个trait
struct X;impl Service for X {async fn call(&mut self, req: Request) -> Response {let z = [0;1024];tokio::time::sleep(100).await;drop(z);Response}}
我们指到,async函数返回的是一个Future结构体,async call的返回值并不是Response,而是Future
#[async_trait] 做的就是将trait中的async函数重重写为 PinBox::pin(async move {...})
trait Service {fn call(req: Request) -> Pin<Box<dyn Future<Output=Response>>>;}
因为Box::pin 将Future放在了堆上,它的大小就是在栈上的指针大小,这个大小即是固定的。
async_trait 让我们很容易的实现 async trait,但trade off是,每次都会将Future分配在堆上,这是expensive的。如果你的trait函数调用频率很高,那这将对性能有很大的影响。
trait Service {type CallFuture: Future<Output = Response>;fn call(&mut self, req: Request) -> Self::CallFuture;}struct Y;impl Service for Y {type CallFuture = Pin<Box<dyn Future<Output=Response>>>;fn call(&mut self, req: Request) -> Self::CallFuture {async { Response }}}
std mutex or tokio mutex
async fn main() {let x = Arc::new(Mutext::new(0));let x1 = Arc::clone(&x);tokio::spawn(async move {loop {let x = x1.lock();tokio::fs::read_to_string("file").await;*x += 1;}});let x2 = Arc::clone(&x);tokio::spawn(async move {loop {*x2.lock() -= 1;}});}
如果这里的runtime只一个1个线程,这里会导致死锁,当第一个spawn执行到await时,会yield出来,这时候第二个spawn开始执行,但是x1还未unlock,这时候x2被lock,但是由于只有1个线程,所以x1永远无法unlock, 这里要使用async mutex,因为当async mutex lock时,它会yield, 但是async mutex性能不好
所以当lock不包含 await point时,使用std mutex更好,但是,一旦lock包含了await point,就必须使用async mutex,或者当lock section比较大时,也是建议用async mutex
thread
tokio executor 会尽量让await唤醒后在同一个线程上执行,但不保证会调度到其他线程
