an async function is actually a function return a Future

  1. fn foo2() -> impl Future<Output=()> {
  2. }
  3. async foo() {
  4. }

foo and foo2 are identical

  1. loop {
  2. select! {
  3. stream <- network.await => {
  4. }
  5. line <- terminal.await => {
  6. }
  7. foo <- foo.await => {
  8. }
  9. }
  10. }

The executor keep running until all future are resolved

  1. fn main() {
  2. // executor
  3. let runtime = tokio::runtime::Runtime::new();
  4. runtime.block_on(async {
  5. ...
  6. });
  7. }
  1. tokio::task::spawn_blocking
  2. // tells the executor that it is going to be blocked, make sure other task is running
  3. // 当在async函数中使用none async function会导致async函数被block,这个时候可以通过 spawn_blocking 来告诉executor, 当前那执行的函数是blocking的

fuse future

fuse is a way to say that it’s safe to pull a future it’s safe to await a future, even if that future has already completed in the past

join

  1. let files: Vec<_> = (0..3).map(|i| tokio::fs::read_to_string(format!("file{}", i))).collect();
  2. let (f1, f2, f3) = join!(files[0], files[1], files[2]);

tokio::spawn

一个async block只有一个executor会处理它,也就是说,即使在多核环境中,也无法充分利用,可以在async函数中,使用tokio::spawn, 这样新的线程可以来处理这个新的future

tokio runtime

可以存在多个runtime, tokio依赖thread local来获取runtime,如果没有threadlocal,比如在一些iot环境,在调用tokio::spawn()时,就需要将runtime传递到下层

what a future actually is

  1. async fn foo() {
  2. // stage 1
  3. {
  4. let mut x = [0; 1024];
  5. let fut = tokio::fs::read_into("file.data", &mut x[..]);
  6. }
  7. // fut.await
  8. // stage 2
  9. {
  10. // because fut and x are cross await point, it is saved in the statemachine,
  11. // so it is actually self.fut and self.x
  12. let n = fut.output();
  13. println!("{:?}", x[..n]);
  14. }
  15. }
  16. fn foo() -> impl Future<Output=()> /* Statemachine */

the question is where x store?

the compile generate some kind of enum

  1. enum StateMachine {
  2. Stage1 {
  3. x: [u8; 1024],
  4. fut: tokio::fs::Future
  5. },
  6. Stage2 {}
  7. }
  8. any local variable that needs to be kept across await points are save in the internal state of statemachine

因为future将localvariable 保存在statemachine上,所以,future有可能变得很大,当把future作为参数传递时,会有大量的memcpy,所以最好是将future 放到堆上,Box::pin(future)

what is async_trait

  1. trait Service {
  2. async fn call(&mut self, req: Request) -> Response;
  3. }

currently, async fn is not allowed in trait, because, compiler don’t know the size of the return type, as the return time of async function is Future, what async_trait does is transform the return type to Heap allocated, so that the size is reconginzed.

这个trait实际上转换成Future就是

  1. trait Service {
  2. fn call(&mut self, req: Request) -> impl Future<Output=Response>;
  3. }

如果函数foo调用 Service call的函数

  1. fn foo(x: &mut dyn Service) {
  2. let fut = x.call(Request); // 这里fut的size是多少?
  3. }

我们指到
假设有个struct X 要实现这个trait

  1. struct X;
  2. impl Service for X {
  3. async fn call(&mut self, req: Request) -> Response {
  4. let z = [0;1024];
  5. tokio::time::sleep(100).await;
  6. drop(z);
  7. Response
  8. }
  9. }

我们指到,async函数返回的是一个Future结构体,async call的返回值并不是Response,而是Future, 这里z used cross await point,意味着在栈上的变量z将会挂载Future的字段上作为返回值返回出去,所以 call的返回值大小取决于在call函数中的stack variable (z 跨越了await point), 所以fut的大小compiler是不知道的。
#[async_trait] 做的就是将trait中的async函数重重写为 Pin>>, 同时将实现重写为 Box::pin(async move {...})

  1. trait Service {
  2. fn call(req: Request) -> Pin<Box<dyn Future<Output=Response>>>;
  3. }

因为Box::pin 将Future放在了堆上,它的大小就是在栈上的指针大小,这个大小即是固定的。
async_trait 让我们很容易的实现 async trait,但trade off是,每次都会将Future分配在堆上,这是expensive的。如果你的trait函数调用频率很高,那这将对性能有很大的影响。

  1. trait Service {
  2. type CallFuture: Future<Output = Response>;
  3. fn call(&mut self, req: Request) -> Self::CallFuture;
  4. }
  5. struct Y;
  6. impl Service for Y {
  7. type CallFuture = Pin<Box<dyn Future<Output=Response>>>;
  8. fn call(&mut self, req: Request) -> Self::CallFuture {
  9. async { Response }
  10. }
  11. }

std mutex or tokio mutex

  1. async fn main() {
  2. let x = Arc::new(Mutext::new(0));
  3. let x1 = Arc::clone(&x);
  4. tokio::spawn(async move {
  5. loop {
  6. let x = x1.lock();
  7. tokio::fs::read_to_string("file").await;
  8. *x += 1;
  9. }
  10. });
  11. let x2 = Arc::clone(&x);
  12. tokio::spawn(async move {
  13. loop {
  14. *x2.lock() -= 1;
  15. }
  16. });
  17. }

如果这里的runtime只一个1个线程,这里会导致死锁,当第一个spawn执行到await时,会yield出来,这时候第二个spawn开始执行,但是x1还未unlock,这时候x2被lock,但是由于只有1个线程,所以x1永远无法unlock, 这里要使用async mutex,因为当async mutex lock时,它会yield, 但是async mutex性能不好

所以当lock不包含 await point时,使用std mutex更好,但是,一旦lock包含了await point,就必须使用async mutex,或者当lock section比较大时,也是建议用async mutex

thread

tokio executor 会尽量让await唤醒后在同一个线程上执行,但不保证会调度到其他线程