Executors and System IO

The Future Trait的上一章节中,我们讨论了这个 Future 在套接字上,执行异步读取的示例:

  1. {{#include ../../examples/02_02_future_trait/src/lib.rs:socket_read}}

这个 Future 将读取套接字上的可用数据,如果没有可用数据,它将交还给 executor,要求在套接字再次变得可读时,唤醒这个任务。但是,根据此示例尚不清楚,这个Socket类型是怎么实现的,尤其是set_readable_callback函数是如何工作的。我们如何安排lw.wake(),在一旦套接字变得可读时,就被调用?一种选择是,让一个线程不断检查socket是否可读,在适当的时候调用wake()。但是,这将是非常低效的,需要为每个阻塞的 IO Future 使用一个单独的线程。这将大大降低我们异步代码的效率。

实际上,此问题是通过与 IO-感知系统阻塞原语交互来解决。例如,epoll在 Linux 上,kqueue在 FreeBSD 和 Mac OS 上,在 Windows 上为 IOCP,以及 Fuchsia 的port(所有这些都通过跨平台的 Rust 箱子mio揭露)。这些原语都允许一个线程,在多个异步 IO 事件上阻塞,并在事件的其中一个完成后返回。实际上,这些 API 通常如下所示:

  1. struct IoBlocker {
  2. ...
  3. }
  4. struct Event {
  5. // An ID uniquely identifying the event that occurred and was listened for.
  6. id: usize,
  7. // A set of signals to wait for, or which occurred.
  8. signals: Signals,
  9. }
  10. impl IoBlocker {
  11. /// Create a new collection of asynchronous IO events to block on.
  12. fn new() -> Self { ... }
  13. /// Express an interest in a particular IO event.
  14. fn add_io_event_interest(
  15. &self,
  16. /// The object on which the event will occur
  17. io_object: &IoObject,
  18. /// A set of signals that may appear on the `io_object` for
  19. /// which an event should be triggered, paired with
  20. /// an ID to give to events that result from this interest.
  21. event: Event,
  22. ) { ... }
  23. /// Block until one of the events occurs.
  24. fn block(&self) -> Event { ... }
  25. }
  26. let mut io_blocker = IoBlocker::new();
  27. io_blocker.add_io_event_interest(
  28. &socket_1,
  29. Event { id: 1, signals: READABLE },
  30. );
  31. io_blocker.add_io_event_interest(
  32. &socket_2,
  33. Event { id: 2, signals: READABLE | WRITABLE },
  34. );
  35. let event = io_blocker.block();
  36. // prints e.g. "Socket 1 is now READABLE" if socket one became readable.
  37. println!("Socket {:?} is now {:?}", event.id, event.signals);

Future executor 可以使用这些原语来提供异步 IO 对象(例如套接字),这些对象可以配置,在发生特定 IO 事件时,运行的回调。在我们上面例子的SocketRead情况下Socket::set_readable_callback函数可能类似于以下伪代码:

  1. impl Socket {
  2. fn set_readable_callback(&self, waker: Waker) {
  3. // `local_executor` is a reference to the local executor.
  4. // this could be provided at creation of the socket, but in practice
  5. // many executor implementations pass it down through thread local
  6. // storage for convenience.
  7. let local_executor = self.local_executor;
  8. // Unique ID for this IO object.
  9. let id = self.id;
  10. // Store the local waker in the executor's map so that it can be called
  11. // once the IO event arrives.
  12. local_executor.event_map.insert(id, waker);
  13. local_executor.add_io_event_interest(
  14. &self.socket_file_descriptor,
  15. Event { id, signals: READABLE },
  16. );
  17. }
  18. }

现在,我们只有一个 executor 线程,该线程可以接收任何 IO 事件,并将 IO 事件分配给相应的Waker,这将唤醒相应的任务,从而使 executor 在返回以检查更多 IO 事件之前,可以驱使更多任务驶向完成,(且该循环会继续…)。