需求

  • 在socket上监听TCP连接
  • 解析少量的HTTP请求
  • 创建一个合适的HTTP响应
  • 使用线程池改进服务器的吞吐量
  • 优雅的停机和清理

    main.rs

    ```rust use std::fs; use std::io::prelude::*; // read是stream上的一个trait 正常情况下不能直接使用,但是把这个引入就可以使用了。。 use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; use webserver_macro::ThreadPool;

fn main() { //监听制定端口 let listener = TcpListener::bind(“127.0.0.1:7878”).unwrap(); let pool = ThreadPool::new(4); //在线程池中对最大线程数进行约束 for stream in listener.incoming() { let stream = stream.unwrap(); // thread::spawn(|| { // //为每一个连接创建一个线程 但是没有对最大线程数进行限制 // handle_connect(stream) //处理连接 // }); pool.execute(|| handle_connect(stream)); //再通过线程池中的方法对函数进行分配线程去执行 } } //因为这个stream的状态可能会改变 所以标记为mut fn handle_connect(mut stream: TcpStream) { let mut buffer = [0; 512]; stream.read(&mut buffer).unwrap(); //请求 //Method Request-URI HTTP-Version CRLF //headers CRLF //message-body

  1. //响应
  2. //HTTP-Version Status-Code Reason-Phrase CRLF
  3. //headers CRLF
  4. //message-body
  5. let get = b"GET / HTTP/1.1\r\n"; //将后面字符串的内容转换为字节流
  6. // if buffer.starts_with(get) {
  7. // let content = fs::read_to_string("hello.html").unwrap();
  8. // let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", content);
  9. // stream.write(response.as_bytes()).unwrap();
  10. // stream.flush().unwrap(); //flush会阻止程序的运行,直到所有字节都写入连接中;
  11. // } else {
  12. // let content = fs::read_to_string("404.html").unwrap();
  13. // let response = format!("HTTP/1.1 400 NOT FOUND\r\n\r\n{}", content);
  14. // stream.write(response.as_bytes()).unwrap();
  15. // stream.flush().unwrap();
  16. // }
  17. //重构一下
  18. let sleep = b"GET /sleep HTTP/1.1\r\n";
  19. let (status_line, filename) = if buffer.starts_with(get) {
  20. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  21. } else if buffer.starts_with(sleep) {//这里我们虚构一个响应时间为5秒的请求
  22. thread::sleep(Duration::from_secs(5));
  23. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  24. } else {
  25. ("HTTP/1.1 400 NOT FOUND\r\n\r\n", "404.html")
  26. };
  27. let content = fs::read_to_string(filename).unwrap();
  28. let response = format!("{}{}", status_line, content);
  29. stream.write(response.as_bytes()).unwrap();
  30. stream.flush().unwrap();//flush会阻止程序的运行,直到所有字节都写入连接中;

}

<a name="h6lTs"></a>
## lib.rs
```rust
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

enum Message {
    NewJob(Job),
    Terminate
}

pub struct ThreadPool {
    //JoinHandle是thread_spawn的返回值 JoinHandle<T>
    //这个T是指传进去闭包的返回值,我们这里传进去的是 handle_connect这个函数
    //没有返回值,所以我们这里就用()来代替这个值。
    workers: Vec<Worker>, //如何让worker从线程池里面接受任务然后执行 channel
    sender: mpsc::Sender<Message>,
}

//struct Job;//通道中传输的任务的类型
type Job = Box<dyn FnOnce() + Send + 'static>;
//因为要执行的任务是在闭包中 所以Job的类型就是这个闭包而因为job是在执行的时候才确定的
//所以要用Box<dyn >
impl ThreadPool {
    /// Create a new ThreadPool
    ///
    /// The size is the number of threads in the pool
    ///
    /// #Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(max_thread_num: usize) -> ThreadPool {
        assert!(max_thread_num > 0);
        let (sender,receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        //发送者应该是线程池,而接受者应该是每一个线程 所以要改进worker的new函数
        let mut workers = Vec::with_capacity(max_thread_num);
        for id in 0..max_thread_num {
            //creat thread in vec
            //我们看thread_spawn这个函数 它把闭包接收后就会立即执行
            //但着不是我们想要的 我们需要的是创建一个线程后就进入等待
            //等代码传给他们的时候 他们在执行 所以我们创建一种新的数据结构 叫worker 来代替threads
            workers.push(Worker::new(id,Arc::clone(&receiver)));
            //但是由于通道的限制,可以是多个发送者 但只能是一个接受者
            //我们希望所有线程都共享一个receiver 从而能在线程间分发任务
            //此外从通道队列中取出任务也意味着接受者是可变的 所以我们要用Arc 和 Mutex
        }
        ThreadPool { workers ,sender}
    }

    pub fn execute<F>(&self, f: F)
    where
        // 闭包的类型限制这样写 where用来修饰trait
        // 这个是根据thread::spawn 的类型约束写的 因为它是代替它的函数
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);//新建 Box把闭包包起来
        self.sender.send(Message::NewJob(job)).unwrap();//把job发送给线程
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self){
        println!("Sending terminate message to all worker");
        //先给线程发送终止消息
        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }
        println!("Shutting down all workers");
        //再将每一个worker关闭
        //不能将两次循环合并 因为发送消息和接收到的线程不一定是同一个
        for worker in &mut self.workers {
            //因为join方法需要获得worker的所有权但是这里我们只是引用 所以需要对其进行重构
            //我们可以使用Option中的take方法来取得所有权
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
                //但是如果直接调用join方法是不会直接停止 而是继续等待任务
                //所以我们要结束的时候 要给线程发送一个结束的信号 把接收任务的循环给终止掉
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    //所以这里 receiver的类型就需要用Arc和Mutex包起来

    fn new(id: usize,receiver:Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            //在闭包中就可以直接执行receiver接收到的代码了
            let message = receiver.lock().unwrap().recv().unwrap();
            //再通过match表达式进行任务的区分
            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got the job",id);
                    job();
                },
                Message::Terminate =>{
                    println!("Worker {} told end",id);
                    break;
                }
            }
        });
        Worker { id, thread:Some(thread) }
    }
}