Futures

在指南早期暗示的future是用于管理异步逻辑的构建块。 它们是Tokio使用的底层异步抽象。

future的实施由future crate提供。 但是,为方便起见,Tokio重新导出了许多类型。

Futures是什么

future是表示异步计算完成的值。通常,由于系统中某处发生的事件使future完成。虽然我们从基本I / O的角度看待事物,但您可以使用future来表示各种事件,例如:

  • 在线程池中执行的数据库查询。当数据库查询完成时,future 完成,其值是查询的结果。

  • 对服务器的RPC 调用。当服务器回复时,future 完成,其值是服务器的响应。

  • 超时事件。当时间到了,future就完成了,它的值是()。

  • 在线程池上运行的长时间运行的CPU密集型任务。任务完成后,future 完成,其值为任务的返回值。

  • 从套接字读取字节。当字节准备就绪时,future就完成了 - 根据缓冲策略,字节可能直接返回,或作为副作用写入某个现有缓冲区。

future抽象的整个要点是允许异步函数,即不能立即返回值的函数,将会返回一些东西。

例如,异步HTTP客户端可以提供如下所示的get函数:

  1. pub fn get(&self, uri: &str) -> ResponseFuture { ... }

然后,库的用户将使用该函数:

  1. let response_future = client.get("https://www.example.com");

现在,response_future不是实际响应。这是个一旦收到响应,就会完成的future。 但是,调用者要具有一个具体的东西(future)使他们可以开始使用它。 例如,它们可以链式计算在接收到响应时执行,或者它们可能将future传递给函数。

  1. let response_is_ok = response_future
  2. .map(|response| {
  3. response.status().is_ok()
  4. });
  5. track_response_success(response_is_ok);

所有与future一起采取的行动都不会立即执行任何工作。他们不能,因为他们没有实际的HTTP响应。相反,他们定义了响应future完成时要完成的工作。

future crate和Tokio都有一系列组合功能,可以用来处理future

实现future

使用Tokio时,实现Future是很常见的,因此适应它是很重要的。

如前一节所述,Rustfuture是基于轮询的。这是Rustfuture库的一个独特方面。其他编程语言的大多数future库使用基于推送的模型,其中回调被提供给future,并且计算立即调用计算结果回调。

使用基于轮询的模型提供了许多优点,包括作为零成本抽象,即,与手动编写异步代码相比,使用Rustfuture没有额外的开销。

future的特点如下:

  1. trait Future {
  2. /// The type of the value returned when the future completes.
  3. type Item;
  4. /// The type representing errors that occured while processing the
  5. /// computation.
  6. type Error;
  7. fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>;
  8. }

您可能会注意到这与用于实现异步任务的trait完全相同。 这是因为一旦计算完成,异步任务就是“正好”的future,其值为()。

通常,当您实现Future时,您将定义一个由子(或内部)future组成的计算。 在这种情况下,future的实现会尝试调用内部future,如果内部future未准备好,则返回NotReady。

以下示例是由另一个返回usize并将使该值加倍的future组成的future

  1. pub struct Doubler<T> {
  2. inner: T,
  3. }
  4. pub fn double<T>(inner: T) -> Doubler<T> {
  5. Doubler { inner }
  6. }
  7. impl<T> Future for Doubler<T>
  8. where T: Future<Item = usize>
  9. {
  10. type Item = usize;
  11. type Error = T::Error;
  12. fn poll(&mut self) -> Result<Async<usize>, T::Error> {
  13. match self.inner.poll()? {
  14. Async::Ready(v) => Ok(Async::Ready(v * 2)),
  15. Async::NotReady => Ok(Async::NotReady),
  16. }
  17. }
  18. }

当Doublerfuture被轮询时,它会调查其内在的future。 如果内部future尚未准备好,Doubler future将返回NotReady。 如果内心的future已经准备就绪,那么Doubler的future会使返回值加倍并返回Ready。

因为上面的匹配模式很常见,所以future crate提供了一个宏:try_ready!。 它类似于try!,但它也返回NotReady。 上面的poll函数可以使用try_ready重写! 如下:

  1. fn poll(&mut self) -> Result<Async<usize>, T::Error> {
  2. let v = try_ready!(self.inner.poll());
  3. Ok(Async::Ready(v * 2))
  4. }

返回NotReady

当一个任务返回NotReady时,一旦它转换到就绪状态,执行者就会被通知。这使执行者能够有效地调度任务。

当函数返回Async :: NotReady时,在状态转换为“就绪”时通知执行程序至关重要。否则,任务将无限挂起,永远不会再次运行。

对于大多数future的实现,这是可传递的。当future实施是子future的组合时,当至少一个内部future返回NotReady时,外部future仅返回NotReady。因此,一旦内部future转变为就绪状态,外部future将转变为就绪状态。在这种情况下,NotReady合约已经满足,因为内部future将在准备就绪时通知执行者。

最内层的future,有时也被称为“资源”,是负责通知执行人的人。这是通过对task :: current()返回的任务调用notify来完成的。

在执行者调用任务轮询之前,它将任务上下文设置为线程局部变量。然后,最内部的future从线程本地访问上下文,以便一旦其准备状态改变就能够通知任务。

我们将在后面的部分中更深入地探索实施资源和任务系统。除非你从内部的future获得NotReady,否则这里的关键是不要返回NotReady

一个更复杂的future

让我们看一下稍微复杂的future实现。 在这种情况下,我们将实现一个取得主机名,进行DNS解析,然后建立与远程主机的连接的future。 我们假设存在一个如下所示的resolve函数:

  1. pub fn resolve(host: &str) -> ResolveFuture;

其中ResolveFuture是一个返回SocketAddrfuture

实现future的步骤是:

  1. 调用resolve以获取ResolveFuture实例。
  2. 调用ResolveFuture :: poll直到它返回一个SocketAddr
  3. SocketAddr传递给TcpStream :: connect
  4. 调用ConnectFuture :: poll直到它返回TcpStream
  5. 使用TcpStream完成外部future

我们将使用枚举来跟踪future的状态.

  1. enum State {
  2. // Currently resolving the host name
  3. Resolving(ResolveFuture),
  4. // Establishing a TCP connection to the remote host
  5. Connecting(ConnectFuture),
  6. }

ResolveAndConnectfuture定义为:

  1. pub struct ResolveAndConnect {
  2. state: State,
  3. }
  1. pub fn resolve_and_connect(host: &str) -> ResolveAndConnect {
  2. let state = State::Resolving(resolve(host));
  3. ResolveAndConnect { state }
  4. }
  5. impl Future for ResolveAndConnect {
  6. type Item = TcpStream;
  7. type Error = io::Error;
  8. fn poll(&mut self) -> Result<Async<TcpStream>, io::Error> {
  9. use self::State::*;
  10. loop {
  11. let addr = match self.state {
  12. Resolving(ref mut fut) => {
  13. try_ready!(fut.poll())
  14. }
  15. Connecting(ref mut fut) => {
  16. return fut.poll();
  17. }
  18. };
  19. let connecting = TcpStream::connect(&addr);
  20. self.state = Connecting(connecting);
  21. }
  22. }
  23. }

这解释了Future如何实现状态机。 这个future可以是两种状态中的任何一种:

  1. Resolving
  2. Connecting

每次调用poll时,我们都会尝试将状态机推进到下一个状态。

现在,我们刚刚实现的future基本上是AndThen,所以我们可能只是使用该组合器而不是重新实现它。

  1. resolve(my_host)
  2. .and_then(|addr| TcpStream::connect(&addr))