1 单线程Web服务器
1.1 启动监听、接受连接
use std::net::TcpListener;pub fn step1(){ let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming(){ let stream = stream.unwrap(); println!("接受一个连接"); }}
1.2 读取HTTP请求
use std::io::prelude::*;use std::net::TcpListener;use std::net::TcpStream;pub fn step1(){ let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming(){ handle_connection(stream.unwrap()); }}fn handle_connection(mut stream: TcpStream){ let mut buf = [0;512]; let size = stream.read(&mut buf).unwrap(); println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size]));}
1.3 发送回应
fn handle_connection(mut stream: TcpStream){ let mut buf = [0;512]; let size = stream.read(&mut buf).unwrap(); println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size])); let ack = "HTTP/1.1 200 OK\r\n"; stream.write(ack.as_bytes()).unwrap(); stream.flush().unwrap();}
1.4 从文件中读取HTML
fn handle_connection(mut stream : TcpStream){ let mut buf = [0;512]; let size = stream.read(&mut buf).unwrap(); println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size])); let mut file = File::open("./src/index.html").unwrap(); let mut cont = String::new(); file.read_to_string(&mut cont).unwrap(); let mut file = File::open("./src/index.html").unwrap(); let mut cont = String::new(); file.read_to_string(&mut cont).unwrap(); let ack = format!("HTTP/1.1 200 OK\r\n{}",cont); stream.write(ack.as_bytes()).unwrap(); stream.flush().unwrap();}
1.5 区分请求路径
fn handle_connection(mut stream : TcpStream){ let mut buf = [0;512]; let size = stream.read(&mut buf).unwrap(); println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size])); let root = b"GET / HTTP/1.1"; if buf.starts_with(root) { let mut file = File::open("./src/index.html").unwrap(); let mut cont = String::new(); file.read_to_string(&mut cont).unwrap(); let ack = format!("HTTP/1.1 200 OK\r\n{}", cont); stream.write(ack.as_bytes()).unwrap(); stream.flush().unwrap(); }else{ let mut file = File::open("./src/404.html").unwrap(); let mut cont = String::new(); file.read_to_string(&mut cont).unwrap(); let ack = format!("HTTP/1.1 400 NOT FOUND\r\n{}", cont); stream.write(ack.as_bytes()).unwrap(); stream.flush().unwrap(); }}
1.6 提取函数
fn handle_connection(mut stream : TcpStream){ let mut buf = [0;512]; let size = stream.read(&mut buf).unwrap(); println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size])); let root = b"GET / HTTP/1.1"; if buf.starts_with(root) { return_page(&mut stream,"HTTP/1.1 200 OK","./src/index.html"); }else{ return_page(&mut stream,"HTTP/1.1 400 NOT FOUND","./src/404.html"); }}fn return_page(stream : &mut TcpStream,status:&str,file:&str){ let mut file = File::open(file).unwrap(); let mut cont = String::new(); file.read_to_string(&mut cont).unwrap(); let ack = format!("{}\r\n{}",status, cont); stream.write(ack.as_bytes()).unwrap(); stream.flush().unwrap();}
2 扩展成多线程
2.1 线程池
use std::thread;pub struct ThreadPool{ size : usize, workers : Vec<Worker>,}struct Worker{ id: usize, handle: thread::JoinHandle<()>,}impl Worker{ pub fn new(id: usize) -> Worker{ let thread = thread::spawn(|| {});//什么也不做的空线程 Worker{ id:id, handle:thread, } }}impl ThreadPool{ pub fn new(max : usize) -> ThreadPool{ assert!(max > 0); // 创建多个工作者 let mut list = Vec::with_capacity(max); for id in 0..max{ list.push(Worker::new(id)); } ThreadPool{ size: max, workers: list, } } // 后续需要将f传递到Worker中 pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static { }}
std::thread::spawn()的签名如下:
pub fn spawn<F,T>(f: F) -> JoinHandle<T> where F:FnOnce() -> T + Send + 'static{},T: Send + 'static
- 分析如下
- 不关心返回值的类型
T - 参数类型
F的特性限定FnOnce()在execute()中也是需要的:因为参数需要最终传递到execute()中 - 参数类型
F的特性限定Send是必须的:Send允许将参数从一个线程传递到另一个线程(新创建的线程) - 参数类型
F的生命周期限定'static是必须的:不知道线程会运行多长时间
2.2 使用通道传递工作函数
use std::sync;use std::sync::mpsc;use std::thread;// 要传递的工作函数由特性限定表示;但是不能直接传递大小不确定的特性类型,要用Box封装type Job = Box<FnOnce() + Send + 'static>;pub struct ThreadPool { size: usize, workers: Vec<Worker>, sender: mpsc::Sender<Job>,}struct Worker { id: usize, handle: thread::JoinHandle<()>,}impl Worker { pub fn new(id: usize, receiver: sync::Arc<sync::Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop {// 需要将receiver的所有权移动到闭包中,所以使用move let job = receiver.lock().unwrap().recv().unwrap(); // 这里无法通过编译: 被封装的 FnOnce() 特性在作为函数进行调用时,需要获取资源所有权 // 但是Box不允许移出资源的所有权:资源是属于Box的,不能移出 // 解决方案见下一节 (*job)(); }); Worker { id: id, handle: thread, } }}impl ThreadPool { pub fn new(max: usize) -> ThreadPool { assert!(max > 0); let (tx, rx) = mpsc::channel(); let receiver = sync::Arc::new(sync::Mutex::new(rx)); let mut list = Vec::with_capacity(max); for id in 0..max { // 不能直接使用receiver,因为所有权被转移走,后续循环中就不能再使用了 // 使用Arc和Mutex进行封装 // Arc适用于资源需要被多线程、多处只读使用、无法确定哪部分最后结束使用(然后释放资源)的情况 // Mutex保证资源的互斥使用 list.push(Worker::new(id, receiver.clone())); } ThreadPool { size: max, workers: list, sender: tx, } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { // 用Box封装工作函数,然后发送到通道中 let job = Box::new(f); self.sender.send(job); }}
2.3 使用Box<Self>
use std::sync;use std::sync::mpsc;use std::thread;pub trait FnBox { fn call_box(self: Box<Self>);}// 总括实现: 用Box<Self>可以取得Box中的值的所有权impl<F: FnOnce()> FnBox for F { fn call_box(self: Box<Self>) { (*self)(); }}type Job = Box<FnBox + Send + 'static>;pub struct ThreadPool { sender: mpsc::Sender<Job>,}struct Worker;impl Worker { fn run(id: usize, receiver: sync::Arc<sync::Mutex<mpsc::Receiver<Job>>>) { thread::spawn(move || loop { // lock()的返回值为LockResult<MutexGuard<T>>类型,其中的MutexGuard<T>失效时,锁被自动释放 // let 语句中,对MutexGuard<T>调用recv().unwrap()方法后,将结果赋值给job后,临时的MutexGuard<T>就失效了,锁被释放 let job = receiver.lock().unwrap().recv().unwrap(); println!("线程{}执行任务", id); job.call_box();//通过Box<Self>类型获取Box中的值的所有权,然后将值作为函数进行调用 } /* // while 表达式中的值在整个while块中一直有效,MutexGuard<T>一直有效,锁一直被持有:在调用job.call_box()期间锁也被持有 // 无法实现多个请求的并行执行 while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {} got a job; executing.", id); job.call_box(); } */ ); }}impl ThreadPool { pub fn new(max: usize) -> ThreadPool { assert!(max > 0); let (tx, rx) = mpsc::channel(); let receiver = sync::Arc::new(sync::Mutex::new(rx)); for id in 0..max { Worker::run(id, receiver.clone()); } ThreadPool { sender: tx } } pub fn execute<F>(&self, f: F) where F: FnBox + Send + 'static // 这里FnOnce() 改成 FnBox { let job = Box::new(f); self.sender.send(job).unwrap(); }}
- 将第一节的单线程服务器改成多线程时,仅需要修改1.1节的step1()函数
pub fn step1(){ let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); // 创建线程池 let pool = super::section2::ThreadPool::new(4); for stream in listener.incoming(){ // 使用线程池执行函数 pool.execute(||handle_connection(stream.unwrap())); }}
3 优雅停机与清理
use std::sync;use std::sync::mpsc;use std::thread;pub trait FnBox { fn call_box(self: Box<Self>);}impl<F: FnOnce()> FnBox for F { // 使用Box<Self>可以从Box<T>中取出T的值,获得所有权 fn call_box(self: Box<Self>) { (*self)(); }}type Job = Box<FnBox + Send + 'static>;// 使用枚举类型粘结异构类型enum Msg { NewJob(Job), Exit,}pub struct ThreadPool { sender: mpsc::Sender<Msg>, list: Vec<Worker>,}struct Worker { thread: Option<thread::JoinHandle<()>>,}impl Worker { fn run(id: usize, receiver: sync::Arc<sync::Mutex<mpsc::Receiver<Msg>>>) -> Worker { let t = thread::spawn(move || loop { // 使用if let if let Msg::NewJob(job) = receiver.lock().unwrap().recv().unwrap() { println!("线程{}执行任务", id); job.call_box(); } else { println!("线程{}收到退出请求", id); break; } // 使用match /* let msg = receiver.lock().unwrap().recv().unwrap(); match msg { Msg::NewJob(job) => { println!("线程{}执行任务", id); job.call_box(); } Msg::Exit => { println!("线程{}收到退出请求", id); break; } } */ }); Worker { thread: Some(t) } }}impl ThreadPool { pub fn new(max: usize) -> ThreadPool { assert!(max > 0); let (tx, rx) = mpsc::channel(); // 使用Arc和Mutex封装类型,使得类型可以安全地在多线程中使用 let receiver = sync::Arc::new(sync::Mutex::new(rx)); let mut list = Vec::with_capacity(max); for id in 0..max { list.push(Worker::run(id, receiver.clone())); } ThreadPool { sender: tx, list: list, } } pub fn execute<F>(&self, f: F) where F: FnBox + Send + 'static, { let job = Box::new(f); self.sender.send(Msg::NewJob(job)).unwrap(); }}impl Drop for ThreadPool { fn drop(&mut self) { // 发送多个退出请求,让每个线程收到一个 for _ in &self.list { self.sender.send(Msg::Exit).unwrap(); } // 等待每个线程退出 for item in &mut self.list { if let Some(t) = item.thread.take() { match t.join() { Ok(_) => 0, Err(_) => 1, }; } } }}