an async function is actually a function return a Future
fn foo2() -> impl Future<Output=()> {
}
async foo() {
}
foo and foo2 are identical
loop {
select! {
stream <- network.await => {
}
line <- terminal.await => {
}
foo <- foo.await => {
}
}
}
The executor keep running until all future are resolved
fn main() {
// executor
let runtime = tokio::runtime::Runtime::new();
runtime.block_on(async {
...
});
}
tokio::task::spawn_blocking
// tells the executor that it is going to be blocked, make sure other task is running
// 当在async函数中使用none async function会导致async函数被block,这个时候可以通过 spawn_blocking 来告诉executor, 当前那执行的函数是blocking的
fuse future
fuse is a way to say that it’s safe to pull a future it’s safe to await a future, even if that future has already completed in the past
join
let files: Vec<_> = (0..3).map(|i| tokio::fs::read_to_string(format!("file{}", i))).collect();
let (f1, f2, f3) = join!(files[0], files[1], files[2]);
tokio::spawn
一个async block只有一个executor会处理它,也就是说,即使在多核环境中,也无法充分利用,可以在async函数中,使用tokio::spawn, 这样新的线程可以来处理这个新的future
tokio runtime
可以存在多个runtime, tokio依赖thread local来获取runtime,如果没有threadlocal,比如在一些iot环境,在调用tokio::spawn()时,就需要将runtime传递到下层
what a future actually is
async fn foo() {
// stage 1
{
let mut x = [0; 1024];
let fut = tokio::fs::read_into("file.data", &mut x[..]);
}
// fut.await
// stage 2
{
// because fut and x are cross await point, it is saved in the statemachine,
// so it is actually self.fut and self.x
let n = fut.output();
println!("{:?}", x[..n]);
}
}
fn foo() -> impl Future<Output=()> /* Statemachine */
the question is where x store?
the compile generate some kind of enum
enum StateMachine {
Stage1 {
x: [u8; 1024],
fut: tokio::fs::Future
},
Stage2 {}
}
any local variable that needs to be kept across await points are save in the internal state of statemachine
因为future将localvariable 保存在statemachine上,所以,future有可能变得很大,当把future作为参数传递时,会有大量的memcpy,所以最好是将future 放到堆上,Box::pin(future)
what is async_trait
trait Service {
async fn call(&mut self, req: Request) -> Response;
}
currently, async fn is not allowed in trait, because, compiler don’t know the size of the return type, as the return time of async function is Future, what async_trait does is transform the return type to Heap allocated, so that the size is reconginzed.
这个trait实际上转换成Future就是
trait Service {
fn call(&mut self, req: Request) -> impl Future<Output=Response>;
}
如果函数foo调用 Service call的函数
fn foo(x: &mut dyn Service) {
let fut = x.call(Request); // 这里fut的size是多少?
}
我们指到
假设有个struct X 要实现这个trait
struct X;
impl Service for X {
async fn call(&mut self, req: Request) -> Response {
let z = [0;1024];
tokio::time::sleep(100).await;
drop(z);
Response
}
}
我们指到,async函数返回的是一个Future结构体,async call的返回值并不是Response,而是Future
#[async_trait] 做的就是将trait中的async函数重重写为 PinBox::pin(async move {...})
trait Service {
fn call(req: Request) -> Pin<Box<dyn Future<Output=Response>>>;
}
因为Box::pin 将Future放在了堆上,它的大小就是在栈上的指针大小,这个大小即是固定的。
async_trait 让我们很容易的实现 async trait,但trade off是,每次都会将Future分配在堆上,这是expensive的。如果你的trait函数调用频率很高,那这将对性能有很大的影响。
trait Service {
type CallFuture: Future<Output = Response>;
fn call(&mut self, req: Request) -> Self::CallFuture;
}
struct Y;
impl Service for Y {
type CallFuture = Pin<Box<dyn Future<Output=Response>>>;
fn call(&mut self, req: Request) -> Self::CallFuture {
async { Response }
}
}
std mutex or tokio mutex
async fn main() {
let x = Arc::new(Mutext::new(0));
let x1 = Arc::clone(&x);
tokio::spawn(async move {
loop {
let x = x1.lock();
tokio::fs::read_to_string("file").await;
*x += 1;
}
});
let x2 = Arc::clone(&x);
tokio::spawn(async move {
loop {
*x2.lock() -= 1;
}
});
}
如果这里的runtime只一个1个线程,这里会导致死锁,当第一个spawn执行到await时,会yield出来,这时候第二个spawn开始执行,但是x1还未unlock,这时候x2被lock,但是由于只有1个线程,所以x1永远无法unlock, 这里要使用async mutex,因为当async mutex lock时,它会yield, 但是async mutex性能不好
所以当lock不包含 await point时,使用std mutex更好,但是,一旦lock包含了await point,就必须使用async mutex,或者当lock section比较大时,也是建议用async mutex
thread
tokio executor 会尽量让await唤醒后在同一个线程上执行,但不保证会调度到其他线程