任务

任务是应用程序的“逻辑单元”。 它们类似于Go的goroutine和Erlang的进程,但是异步。 换句话说,任务是异步绿色线程。

鉴于任务运行异步逻辑位,它们由Future特征表示。 任务完成处理后,任务的future实现将以()值完成。

任务被传递给执行程序,执行程序处理任务的调度。 执行程序通常在一组或一组线程中调度许多任务。 任务不得执行计算繁重的逻辑,否则将阻止其他任务执行 。 因此,不要尝试将斐波那契序列计算为任务。

任务通过直接实施Future特征或通过使用future和tokio crate中可用的各种组合函数构建future来实现。

下面是一个使用HTTP get从URI获取值并缓存结果的示例。

  1. 检查缓存以查看是否存在URI条目。
  2. 如果没有条目,请执行HTTP get。
  3. 将响应存储在缓存中。
  4. 返回response。

整个事件序列也包含超时,以防止无限制的执行时间。

  1. // The functions here all return `Box<Future<...>>`. This is one
  2. // of a number of ways to return futures. For more details on
  3. // returning futures, see the "Returning futures" section in
  4. // "Going deeper: Futures".
  5. /// Get a URI from some remote cache.
  6. fn cache_get(uri: &str)
  7. -> Box<Future<Item = Option<String>, Error = Error>>
  8. { ... }
  9. fn cache_put(uri: &str, val: String)
  10. -> Box<Future<Item = (), Error = Error>>
  11. { ... }
  12. /// Do a full HTTP get to a remote URL
  13. fn http_get(uri: &str)
  14. -> Box<Future<Item = String, Error = Error>>
  15. { ... }
  16. fn fetch_and_cache(url: &str)
  17. -> Box<Future<Item = String, Error = Error>>
  18. {
  19. // The URL has to be converted to a string so that it can be
  20. // moved into the closure. Given futures are asynchronous,
  21. // the stack is not around anymore by the time the closure is called.
  22. let url = url.to_string();
  23. let response = http_get(&url)
  24. .and_then(move |response| {
  25. cache_put(&url, response.clone())
  26. .map(|_| response)
  27. });
  28. Box::new(response)
  29. }
  30. let url = "https://example.com";
  31. let response = cache_get(url)
  32. .and_then(|resp| {
  33. // `Either` is a utility provided by the `futures` crate
  34. // that enables returning different futures from a single
  35. // closure without boxing.
  36. match resp {
  37. Some(resp) => Either::A(future::ok(resp)),
  38. None => {
  39. Either::B(fetch_and_cache(url))
  40. }
  41. }
  42. });
  43. // Only let the task run for up to 20 seconds.
  44. //
  45. // This uses a fictional timer API. Use the `tokio-timer` crate for
  46. // all your actual timer needs.
  47. let task = Timeout::new(response, Duration::from_secs(20));
  48. my_executor.spawn(task);

由于这些步骤对于完成任务都是必需的,因此将它们全部分组到同一任务中是有意义的。

但是,如果不是在缓存未命中时更新缓存,而是希望在一个时间间隔内更新缓存值,那么将其拆分为多个任务是有意义的,因为这些步骤不再直接相关。

  1. let url = "https://example.com";
  2. // An Interval is a stream that yields `()` on a fixed interval.
  3. let update_cache = Interval::new(Duration::from_secs(60))
  4. // On each tick of the interval, update the cache. This is done
  5. // by using the same function from the previous snippet.
  6. .for_each(|_| {
  7. fetch_and_cache(url)
  8. .map(|resp| println!("updated cache with {}", resp))
  9. });
  10. // Spawn the cache update task so that it runs in the background
  11. my_executor.spawn(update_cache);
  12. // Now, only get from the cache.
  13. // (NB: see next section about ensuring the cache is up to date.)
  14. let response = cache_get(url);
  15. let task = Timeout::new(response, Duration::from_secs(20));
  16. my_executor.spawn(task);

消息传递

就像Go和Erlang一样,任务可以使用消息传递进行通信。 实际上,使用消息传递来协调多个任务是很常见的。 这允许独立任务仍然相互作用。

future包提供了一个同步模块,其中包含一些适合跨任务传递消息的通道类型。

  • oneshot是一个用于发送一个值的通道。
  • mpsc是用于发送许多(零个或多个)值的通道。

前面的例子并不完全正确。 鉴于任务同时执行,无法保证缓存更新任务在其他任务尝试从缓存中读取时将第一个值写入缓存。

这是使用消息传递的完美情况。 高速缓存更新任务可以发送消息,通知其他任务它已使用初始值启动了高速缓存。

  1. let url = "https://example.com";
  2. let (primed_tx, primed_rx) = oneshot::channel();
  3. let update_cache = fetch_and_cache(url)
  4. // Now, notify the other task that the cache is primed
  5. .then(|_| primed_tx.send(()))
  6. // Then we can start refreshing the cache on an interval
  7. .then(|_| {
  8. Interval::new(Duration::from_secs(60))
  9. .for_each(|_| {
  10. fetch_and_cache(url)
  11. .map(|resp| println!("updated cache with {}", resp))
  12. })
  13. });
  14. // Spawn the cache update task so that it runs in the background
  15. my_executor.spawn(update_cache);
  16. // First, wait for the cache to primed
  17. let response = primed_rx
  18. .then(|_| cache_get(url));
  19. let task = Timeout::new(response, Duration::from_secs(20));
  20. my_executor.spawn(task);

任务通知

使用Tokio构建的应用程序被构造为一组并发运行的任务。 这是服务器的基本结构:

  1. let server = listener.incoming().for_each(|socket| {
  2. // Spawn a task to process the connection
  3. tokio::spawn(process(socket));
  4. Ok(())
  5. })
  6. .map_err(|_| ()); // Just drop the error
  7. tokio::run(server);

在这种情况下,我们为每个入站服务器套接字生成一个任务。 但是,也可以实现处理同一套接字上所有入站连接的服务器future:

  1. pub struct Server {
  2. listener: TcpListener,
  3. connections: Vec<Box<Future<Item = (), Error = io::Error> + Send>>,
  4. }
  5. impl Future for Server {
  6. type Item = ();
  7. type Error = io::Error;
  8. fn poll(&mut self) -> Result<Async<()>, io::Error> {
  9. // First, accept all new connections
  10. loop {
  11. match self.listener.poll_accept()? {
  12. Async::Ready((socket, _)) => {
  13. let connection = process(socket);
  14. self.connections.push(connection);
  15. }
  16. Async::NotReady => break,
  17. }
  18. }
  19. // Now, poll all connection futures.
  20. let len = self.connections.len();
  21. for i in (0..len).rev() {
  22. match self.connections[i].poll()? {
  23. Async::Ready(_) => {
  24. self.connections.remove(i);
  25. }
  26. Async::NotReady => {}
  27. }
  28. }
  29. // `NotReady` is returned here because the future never actually
  30. // completes. The server runs until it is dropped.
  31. Ok(Async::NotReady)
  32. }
  33. }

这两种策略在功能上是等效的,但具有明显不同的运行时特性。

通知发生在任务级别。 该任务不知道哪个子future触发了通知。 因此,无论何时轮询任务,都必须尝试轮询所有子future

task

在此任务中,有三个子future可以进行轮询。 如果其中一个子future所包含的资源转为“就绪”,则任务本身会收到通知,并会尝试轮询其所有三个子future。 其中一个将推进,这反过来推进任务的内部状态。

关键是尽量减少任务,尽可能少地完成每项任务。 这就是为什么服务器为每个连接生成新任务而不是在与侦听器相同的任务中处理连接的原因。

好吧,实际上有一种方法可以让任务知道哪个子future使用FuturesUnordered触发了通知,但通常正确的做法是生成一个新任务。