运行时模型

现在我们将介绍Tokio /future运行时模型。 Tokio构建在future箱顶部并使用其运行时模型。 这允许它也使用future箱与其他图书馆互操作。

注意:此运行时模型与其他语言中的异步库非常不同。 虽然在较高的层面上,API看起来很相似,但代码执行方式却有所不同。

同步模型

首先,让我们简要谈谈同步(或阻塞)模型。 这是Rust标准库使用的模型。

  1. // let socket = ...;
  2. let mut buf = [0; 1024];
  3. let n = socket.read(&mut buf).unwrap();
  4. // Do something with &buf[..n];

调用socket.read时,无论套接字在其缓冲区中是否有待处理数据, 如果有待处理的数据,则读取的调用将立即返回,并且buf将填充该数据。 如果没有未决数据,则read函数将阻止当前线程,直到收到数据。 此时,buf将填充此新接收的数据,并且将返回read函数

为了同时在许多不同的套接字上并发执行读取,每个套接字需要一个线程。 每个套接字使用一个线程不能很好地扩展到大量的套接字。 这被称为c10k问题。

非阻塞套接字

在执行像read这样的操作时避免阻塞线程的方法是不阻塞线程! 当套接字在其接收缓冲区中没有未决数据时,read函数立即返回,表明套接字“未准备好”以执行读取操作。

使用Tokio TcpStream时,如果没有要读取的待处理数据,则对read的调用将返回类型ErrorKind :: WouldBlock的错误。 此时,调用者负责稍后再次调用read。 诀窍是知道“晚些时候”的时间。

考虑非阻塞读取的另一种方法是“轮询”套接字以读取数据。

轮询模型

轮询套接字数据的策略可以推广到任何操作。 例如,在轮询模型中获取“小部件”的函数看起来像这样:

  1. fn poll_widget() -> Async<Widget> { ... }

此函数返回Async <Widget>,其中Async是Ready(Widget)或NotReady的枚举。 Async枚举由future箱提供,是轮询模型的构建块之一。

现在,让我们定义一个没有使用此poll_widget函数的组合器的异步任务。 该任务将执行以下操作:

  1. 获取小部件。
  2. 将小部件打印到STDOUT。
  3. 终止任务。

为了定义任务,我们实现了Future trait。

  1. ///轮询单个小部件并将其写入STDOUT的任务。
  2. pub struct MyTask;
  3. impl Future for MyTask {
  4. type Item = ();
  5. type Error = ();
  6. fn poll(&mut self) -> Result<Async<()>, ()> {
  7. match poll_widget() {
  8. Async::Ready(widget) => {
  9. println!("widget={:?}", widget);
  10. Ok(Async::Ready(()))
  11. }
  12. Async::NotReady => {
  13. return Ok(Async::NotReady);
  14. }
  15. }
  16. }
  17. }

重要提示: 返回Async :: NotReady具有特殊含义。 有关详细信息,请参阅下一节。

需要注意的关键是,当调用MyTask :: poll时,它会立即尝试获取小部件。 如果对poll_widget的调用返回NotReady,则该任务无法继续进行。 然后任务返回NotReady,表明它尚未准备好完成处理。

任务实现不会阻止。 相反,“将来的某个时间”,执行者将再次调用MyTask :: poll。 将再次调用poll_widget。 如果poll_widget已准备好返回窗口小部件,则该任务又可以打印窗口小部件。 然后,可以通过返回Ready来完成任务。

执行者(Executors)

为了使任务取得进展,必须调用MyTask :: poll。 这就是执行者的工作。

执行程序负责反复调用任务轮询,直到返回Ready。 有很多不同的方法可以做到这一点。 例如,CurrentThread执行者将阻止当前线程并遍历所有生成的任务,并对它们调用poll。 ThreadPool在线程池中调度任务。 这也是运行时使用的默认执行者。

必须在执行者上生成所有任务,否则不会执行任何工作。

在最简单的情况下,执行者可能看起来像这样:

  1. pub struct SpinExecutor {
  2. tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
  3. }
  4. impl SpinExecutor {
  5. pub fn spawn<T>(&mut self, task: T)
  6. where T: Future<Item = (), Error = ()> + 'static
  7. {
  8. self.tasks.push_back(Box::new(task));
  9. }
  10. pub fn run(&mut self) {
  11. while let Some(mut task) = self.tasks.pop_front() {
  12. match task.poll().unwrap() {
  13. Async::Ready(_) => {}
  14. Async::NotReady => {
  15. self.tasks.push_back(task);
  16. }
  17. }
  18. }
  19. }
  20. }

当然,这不会非常有效。 执行程序在一个繁忙的循环中运转并尝试轮询所有任务,即使任务将再次返回NotReady。

理想情况下,执行者可以通过某种方式知道任务的“准备就绪”状态何时被改变,即当轮询调用返回Ready时。 执行者看起来像这样:

  1. pub fn run(&mut self) {
  2. loop {
  3. while let Some(mut task) = self.ready_tasks.pop_front() {
  4. match task.poll().unwrap() {
  5. Async::Ready(_) => {}
  6. Async::NotReady => {
  7. self.not_ready_tasks.push_back(task);
  8. }
  9. }
  10. }
  11. if self.not_ready_tasks.is_empty() {
  12. return;
  13. }
  14. // Put the thread to sleep until there is work to do
  15. self.sleep_until_tasks_are_ready();
  16. }
  17. }

当任务从“未准备好”变为“准备好”时能够得到通知是future任务模型的核心。 我们将很快进一步深入研究。