需求
- 在socket上监听TCP连接
- 解析少量的HTTP请求
- 创建一个合适的HTTP响应
- 使用线程池改进服务器的吞吐量
- 优雅的停机和清理
main.rs
```rust use std::fs; use std::io::prelude::*; // read是stream上的一个trait 正常情况下不能直接使用,但是把这个引入就可以使用了。。 use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; use webserver_macro::ThreadPool;
fn main() { //监听制定端口 let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let pool = ThreadPool::new(4); //在线程池中对最大线程数进行约束 for stream in listener.incoming() { let stream = stream.unwrap(); // thread::spawn(|| { // //为每一个连接创建一个线程 但是没有对最大线程数进行限制 // handle_connect(stream) //处理连接 // }); pool.execute(|| handle_connect(stream)); //再通过线程池中的方法对函数进行分配线程去执行 } } //因为这个stream的状态可能会改变 所以标记为mut fn handle_connect(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); //请求 //Method Request-URI HTTP-Version CRLF //headers CRLF //message-body
//响应//HTTP-Version Status-Code Reason-Phrase CRLF//headers CRLF//message-bodylet get = b"GET / HTTP/1.1\r\n"; //将后面字符串的内容转换为字节流// if buffer.starts_with(get) {// let content = fs::read_to_string("hello.html").unwrap();// let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", content);// stream.write(response.as_bytes()).unwrap();// stream.flush().unwrap(); //flush会阻止程序的运行,直到所有字节都写入连接中;// } else {// let content = fs::read_to_string("404.html").unwrap();// let response = format!("HTTP/1.1 400 NOT FOUND\r\n\r\n{}", content);// stream.write(response.as_bytes()).unwrap();// stream.flush().unwrap();// }//重构一下let sleep = b"GET /sleep HTTP/1.1\r\n";let (status_line, filename) = if buffer.starts_with(get) {("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else if buffer.starts_with(sleep) {//这里我们虚构一个响应时间为5秒的请求thread::sleep(Duration::from_secs(5));("HTTP/1.1 200 OK\r\n\r\n", "hello.html")} else {("HTTP/1.1 400 NOT FOUND\r\n\r\n", "404.html")};let content = fs::read_to_string(filename).unwrap();let response = format!("{}{}", status_line, content);stream.write(response.as_bytes()).unwrap();stream.flush().unwrap();//flush会阻止程序的运行,直到所有字节都写入连接中;
}
<a name="h6lTs"></a>
## lib.rs
```rust
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
enum Message {
NewJob(Job),
Terminate
}
pub struct ThreadPool {
//JoinHandle是thread_spawn的返回值 JoinHandle<T>
//这个T是指传进去闭包的返回值,我们这里传进去的是 handle_connect这个函数
//没有返回值,所以我们这里就用()来代替这个值。
workers: Vec<Worker>, //如何让worker从线程池里面接受任务然后执行 channel
sender: mpsc::Sender<Message>,
}
//struct Job;//通道中传输的任务的类型
type Job = Box<dyn FnOnce() + Send + 'static>;
//因为要执行的任务是在闭包中 所以Job的类型就是这个闭包而因为job是在执行的时候才确定的
//所以要用Box<dyn >
impl ThreadPool {
/// Create a new ThreadPool
///
/// The size is the number of threads in the pool
///
/// #Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(max_thread_num: usize) -> ThreadPool {
assert!(max_thread_num > 0);
let (sender,receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
//发送者应该是线程池,而接受者应该是每一个线程 所以要改进worker的new函数
let mut workers = Vec::with_capacity(max_thread_num);
for id in 0..max_thread_num {
//creat thread in vec
//我们看thread_spawn这个函数 它把闭包接收后就会立即执行
//但着不是我们想要的 我们需要的是创建一个线程后就进入等待
//等代码传给他们的时候 他们在执行 所以我们创建一种新的数据结构 叫worker 来代替threads
workers.push(Worker::new(id,Arc::clone(&receiver)));
//但是由于通道的限制,可以是多个发送者 但只能是一个接受者
//我们希望所有线程都共享一个receiver 从而能在线程间分发任务
//此外从通道队列中取出任务也意味着接受者是可变的 所以我们要用Arc 和 Mutex
}
ThreadPool { workers ,sender}
}
pub fn execute<F>(&self, f: F)
where
// 闭包的类型限制这样写 where用来修饰trait
// 这个是根据thread::spawn 的类型约束写的 因为它是代替它的函数
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);//新建 Box把闭包包起来
self.sender.send(Message::NewJob(job)).unwrap();//把job发送给线程
}
}
impl Drop for ThreadPool {
fn drop(&mut self){
println!("Sending terminate message to all worker");
//先给线程发送终止消息
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers");
//再将每一个worker关闭
//不能将两次循环合并 因为发送消息和接收到的线程不一定是同一个
for worker in &mut self.workers {
//因为join方法需要获得worker的所有权但是这里我们只是引用 所以需要对其进行重构
//我们可以使用Option中的take方法来取得所有权
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
//但是如果直接调用join方法是不会直接停止 而是继续等待任务
//所以我们要结束的时候 要给线程发送一个结束的信号 把接收任务的循环给终止掉
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
//所以这里 receiver的类型就需要用Arc和Mutex包起来
fn new(id: usize,receiver:Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
//在闭包中就可以直接执行receiver接收到的代码了
let message = receiver.lock().unwrap().recv().unwrap();
//再通过match表达式进行任务的区分
match message {
Message::NewJob(job) => {
println!("Worker {} got the job",id);
job();
},
Message::Terminate =>{
println!("Worker {} told end",id);
break;
}
}
});
Worker { id, thread:Some(thread) }
}
}
