一个玩具任务执行器

来造一个任务执行器吧! 我们的目标是使任意数量的任务能够协调运行在单个OS线程上 成为可能. 为了保持示例简单, 我们挑选了最原始的数据结构来实现. 以下是我们需要 导入的数据结构:

  1. use std::mem;
  2. use std::collections::{HashMap, HashSet};
  3. use std::sync::{Arc, Mutex, MutexGuard};
  4. use std::thread::{self, Thread};

首先, 我们定义一个保持执行器状态的结构. 该执行器拥有任务本身, 并分配 给每个任务 一个usizeID, 使得我们能够从外部指向这些任务. 特殊的, 执行器保持一个ready 集合, 存储应该要被唤醒的任务id(因为这些任务在等待的事件已经发生了):

  1. struct ExecState {
  2. // The next available task ID.
  3. next_id: usize,
  4. // The complete list of tasks, keyed by ID.
  5. tasks: HashMap<usize, TaskEntry>,
  6. // The set of IDs for ready-to-run tasks.
  7. ready: HashSet<usize>,
  8. // The actual OS thread running the executor.
  9. thread: Thread,
  10. }

执行器本身只是将这个状态用ArcMutex包装起来, 允许它能够被其他线程使用 (即使所有任务都可以局部线程中运行):

  1. #[derive(Clone)]
  2. pub struct ToyExec {
  3. state: Arc<Mutex<ExecState>>,
  4. }

现在, ExecStatetasks字段提供了TaskEntry实例, 这些实例包装了一个具体任务, 和对应的WakeHandle:

  1. struct TaskEntry {
  2. task: Box<ToyTask + Send>,
  3. wake: Arc<WakeHandle>,
  4. }
  5. struct ToyWake {
  6. // A link back to the executor that owns the task we want to wake up.
  7. exec: ToyExec,
  8. // The ID for the task we want to wake up.
  9. id: usize,
  10. }

最后, 我们需要一些创建执行器并修改执行器状态的模板:

  1. impl ToyExec {
  2. pub fn new() -> Self {
  3. ToyExec {
  4. state: Arc::new(Mutex::new(ExecState {
  5. next_id: 0,
  6. tasks: HashMap::new(),
  7. ready: HashSet::new(),
  8. thread: thread::current(),
  9. })),
  10. }
  11. }
  12. // a convenience method for getting our hands on the executor state
  13. fn state_mut(&self) -> MutexGuard<ExecState> {
  14. self.state.lock().unwrap()
  15. }
  16. }

有了这些模板代码, 我们可以开始关注于执行器的内部工作原理. 我们最好从核心执行器 循环开始. 简便起见, 这个循环从来不退出; 它的职责仅仅是继续跑完所有分出线程的 新任务:

  1. impl ToyExec {
  2. pub fn run(&self) {
  3. loop {
  4. // each time around, we grab the *entire* set of ready-to-run task IDs:
  5. let mut ready = mem::replace(&mut self.state_mut().ready, HashSet::new());
  6. // now try to `complete` each ready task:
  7. for id in ready.drain() {
  8. // note that we take *full ownership* of the task; if it completes,
  9. // it will be dropped.
  10. let entry = self.state_mut().tasks.remove(&id);
  11. if let Some(mut entry) = entry {
  12. if let Async::WillWake = entry.task.complete(entry.wake.clone()) {
  13. // the task hasn't completed, so put it back in the table.
  14. self.state_mut().tasks.insert(id, entry);
  15. }
  16. }
  17. }
  18. // we'd processed all work we acquired on entry; block until more work
  19. // is available
  20. thread::park();
  21. }
  22. }
  23. }

这里精妙的地方是, 在每一轮循环, 我们在开始的时候tick所有准备好的东西, 然后 “park”线程. std库的park/unpark让处理阻塞和唤醒OS线程变得很容易. 在 这个例子里, 我们想要的是执行器底层的OS线程阻塞, 直到一些额外的任务已经就绪. 这样 的话, 即使我们无法唤醒线程, 也不会有任何风险: 如果另外的线程在我们初次读取 ready集与调用park方法之间调用了unpark方法, park方法就会马上返回.

另一方面, 一个任务会像这样被唤醒:

  1. impl ExecState {
  2. fn wake_task(&mut self, id: usize) {
  3. self.ready.insert(id);
  4. // *after* inserting in the ready set, ensure the executor OS
  5. // thread is woken up if it's not already running.
  6. self.thread.unpark();
  7. }
  8. }
  9. impl Wake for ToyWake {
  10. fn wake(&self) {
  11. self.exec.state_mut().wake_task(self.id);
  12. }
  13. }

剩下的部分就很直接了. spawn方法负责将任务包装成TaskEntry并执行它:

  1. impl ToyExec {
  2. pub fn spawn<T>(&self, task: T)
  3. where T: Task + Send + 'static
  4. {
  5. let mut state = self.state_mut();
  6. let id = state.next_id;
  7. state.next_id += 1;
  8. let wake = ToyWake { id, exec: self.clone() };
  9. let entry = TaskEntry { wake: Arc::new(wake), task: Box::new(task) };
  10. state.tasks.insert(id, entry);
  11. // A newly-added task is considered immediately ready to run
  12. state.wake_task(id);
  13. }
  14. }

最后, 一个任务也有可能没有被完成, 但所有唤醒它的句柄也被丢弃了, 而它也没有准备好 运行. 对于这种情况, 我们希望把该任务也丢弃, 因为它其实是不可达的(unreachable):

  1. impl Drop for ToyWake {
  2. fn drop(&mut self) {
  3. let mut state = self.exec.state_mut();
  4. if !state.ready.contains(&self.id) {
  5. state.tasks.remove(&self.id);
  6. }
  7. }
  8. }

好了, 我们造了个任务调度器了! 现在, 让我们造个事件源, 让任务模型等待去处理.