一个玩具任务执行器
来造一个任务执行器吧! 我们的目标是使任意数量的任务能够协调运行在单个OS线程上 成为可能. 为了保持示例简单, 我们挑选了最原始的数据结构来实现. 以下是我们需要 导入的数据结构:
use std::mem;use std::collections::{HashMap, HashSet};use std::sync::{Arc, Mutex, MutexGuard};use std::thread::{self, Thread};
首先, 我们定义一个保持执行器状态的结构. 该执行器拥有任务本身, 并分配 给每个任务
一个usizeID, 使得我们能够从外部指向这些任务. 特殊的, 执行器保持一个ready
集合, 存储应该要被唤醒的任务id(因为这些任务在等待的事件已经发生了):
struct ExecState {// The next available task ID.next_id: usize,// The complete list of tasks, keyed by ID.tasks: HashMap<usize, TaskEntry>,// The set of IDs for ready-to-run tasks.ready: HashSet<usize>,// The actual OS thread running the executor.thread: Thread,}
执行器本身只是将这个状态用Arc和Mutex包装起来, 允许它能够被其他线程使用
(即使所有任务都可以局部线程中运行):
#[derive(Clone)]pub struct ToyExec {state: Arc<Mutex<ExecState>>,}
现在, ExecState的tasks字段提供了TaskEntry实例, 这些实例包装了一个具体任务,
和对应的WakeHandle:
struct TaskEntry {task: Box<ToyTask + Send>,wake: Arc<WakeHandle>,}struct ToyWake {// A link back to the executor that owns the task we want to wake up.exec: ToyExec,// The ID for the task we want to wake up.id: usize,}
最后, 我们需要一些创建执行器并修改执行器状态的模板:
impl ToyExec {pub fn new() -> Self {ToyExec {state: Arc::new(Mutex::new(ExecState {next_id: 0,tasks: HashMap::new(),ready: HashSet::new(),thread: thread::current(),})),}}// a convenience method for getting our hands on the executor statefn state_mut(&self) -> MutexGuard<ExecState> {self.state.lock().unwrap()}}
有了这些模板代码, 我们可以开始关注于执行器的内部工作原理. 我们最好从核心执行器 循环开始. 简便起见, 这个循环从来不退出; 它的职责仅仅是继续跑完所有分出线程的 新任务:
impl ToyExec {pub fn run(&self) {loop {// each time around, we grab the *entire* set of ready-to-run task IDs:let mut ready = mem::replace(&mut self.state_mut().ready, HashSet::new());// now try to `complete` each ready task:for id in ready.drain() {// note that we take *full ownership* of the task; if it completes,// it will be dropped.let entry = self.state_mut().tasks.remove(&id);if let Some(mut entry) = entry {if let Async::WillWake = entry.task.complete(entry.wake.clone()) {// the task hasn't completed, so put it back in the table.self.state_mut().tasks.insert(id, entry);}}}// we'd processed all work we acquired on entry; block until more work// is availablethread::park();}}}
这里精妙的地方是, 在每一轮循环, 我们在开始的时候tick所有准备好的东西, 然后
“park”线程. std库的park/unpark让处理阻塞和唤醒OS线程变得很容易. 在
这个例子里, 我们想要的是执行器底层的OS线程阻塞, 直到一些额外的任务已经就绪. 这样
的话, 即使我们无法唤醒线程, 也不会有任何风险: 如果另外的线程在我们初次读取
ready集与调用park方法之间调用了unpark方法, park方法就会马上返回.
另一方面, 一个任务会像这样被唤醒:
impl ExecState {fn wake_task(&mut self, id: usize) {self.ready.insert(id);// *after* inserting in the ready set, ensure the executor OS// thread is woken up if it's not already running.self.thread.unpark();}}impl Wake for ToyWake {fn wake(&self) {self.exec.state_mut().wake_task(self.id);}}
剩下的部分就很直接了. spawn方法负责将任务包装成TaskEntry并执行它:
impl ToyExec {pub fn spawn<T>(&self, task: T)where T: Task + Send + 'static{let mut state = self.state_mut();let id = state.next_id;state.next_id += 1;let wake = ToyWake { id, exec: self.clone() };let entry = TaskEntry { wake: Arc::new(wake), task: Box::new(task) };state.tasks.insert(id, entry);// A newly-added task is considered immediately ready to runstate.wake_task(id);}}
最后, 一个任务也有可能没有被完成, 但所有唤醒它的句柄也被丢弃了, 而它也没有准备好 运行. 对于这种情况, 我们希望把该任务也丢弃, 因为它其实是不可达的(unreachable):
impl Drop for ToyWake {fn drop(&mut self) {let mut state = self.exec.state_mut();if !state.ready.contains(&self.id) {state.tasks.remove(&self.id);}}}
好了, 我们造了个任务调度器了! 现在, 让我们造个事件源, 让任务模型等待去处理.
