参考:https://kaisery.github.io/trpl-zh-cn/ch20-03-graceful-shutdown-and-cleanup.html
项目成果:
项目 2:构建多线程 web server - 图1
如下是我们将怎样构建此 web server 的计划:

  1. 学习一些 TCP 与 HTTP 知识
  2. 在套接字(socket)上监听 TCP 请求
  3. 解析少量的 HTTP 请求
  4. 创建一个合适的 HTTP 响应
  5. 通过线程池改善 server 的吞吐量

这里使用的方法并不是使用 Rust 构建 web server 最好的方法。crates.io 上有很多可用于生产环境的 crate,它们提供了比我们所要编写的更为完整的 web server 和线程池实现。
然而,本章的目的在于学习,而不是走捷径。因为 Rust 是一个系统编程语言,我们能够选择处理什么层次的抽象,并能够选择比其他语言可能或可用的层次更低的层次。因此我们将自己编写一个基础的 HTTP server 和线程池,以便学习将来可能用到的 crate 背后的通用理念和技术。

  1. cargo new web-server
  2. cd web_server

单线程 server

一个小而简单的 server,它对一个请求返回页面内容而对所有其他请求返回 404 响应:

  1. use std::fs;
  2. use std::io::prelude::*;
  3. use std::net::TcpListener;
  4. use std::net::TcpStream;
  5. fn main() {
  6. // 绑定端口,如果端口被占用、或者权限不够,则直接 panic
  7. let listener = TcpListener::bind("0.0.0.0:7878").unwrap();
  8. // let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  9. for stream in listener.incoming() {
  10. let stream = stream.unwrap();
  11. println!("成功连接!");
  12. handle_connection(stream);
  13. }
  14. }
  15. fn handle_connection(mut stream: TcpStream) {
  16. let mut buffer = [0; 1024];
  17. stream.read(&mut buffer).unwrap();
  18. println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
  19. // 响应成功,但浏览器访问时返回 空白页面
  20. // let response = "HTTP/1.1 200 OK\r\n\r\n";
  21. let get = b"GET / HTTP/1.1\r\n";
  22. let (status_line, filename) = if buffer.starts_with(get) {
  23. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  24. } else {
  25. ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  26. };
  27. let contents = fs::read_to_string(filename).unwrap();
  28. let response = format!("{}{}", status_line, contents);
  29. stream.write(response.as_bytes()).unwrap();
  30. stream.flush().unwrap();
  31. }

目前 server 运行于单线程中,它一次只能处理一个请求。意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。
让我们模拟一些慢请求来看看这如何会成为一个问题,并进行修复以便 server 可以一次处理多个请求。

  1. use std::fs;
  2. use std::io::prelude::*;
  3. use std::net::TcpListener;
  4. use std::net::TcpStream;
  5. use std::thread;
  6. use std::time::Duration;
  7. fn main() {
  8. // 绑定端口,如果端口被占用、或者权限不够,则直接 panic
  9. let listener = TcpListener::bind("0.0.0.0:7878").unwrap();
  10. // let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  11. for stream in listener.incoming() {
  12. let stream = stream.unwrap();
  13. println!("成功连接!");
  14. handle_connection(stream);
  15. }
  16. }
  17. fn handle_connection(mut stream: TcpStream) {
  18. let mut buffer = [0; 1024];
  19. stream.read(&mut buffer).unwrap();
  20. println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
  21. // 响应成功,但浏览器访问时返回 空白页面
  22. // let response = "HTTP/1.1 200 OK\r\n\r\n";
  23. let get = b"GET / HTTP/1.1\r\n";
  24. let sleep = b"GET /sleep HTTP/1.1\r\n";
  25. let (status_line, filename) = if buffer.starts_with(get) {
  26. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  27. } else if buffer.starts_with(sleep) {
  28. thread::sleep(Duration::from_secs(3));
  29. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  30. } else {
  31. ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  32. };
  33. let contents = fs::read_to_string(filename).unwrap();
  34. let response = format!("{}{}", status_line, contents);
  35. stream.write(response.as_bytes()).unwrap();
  36. stream.flush().unwrap();
  37. }

使用 cargo run 启动 server,并接着打开两个浏览器窗口:一个请求 http://127.0.0.1:7878/ 而另一个请求 http://127.0.0.1:7878/sleep 。如果像之前一样多次请求 / ,会发现响应的比较快速。不过如果请求 /sleep 之后在请求 / ,就会看到 / 会等待直到 sleep 休眠完五秒之后才出现。

使用线程池改善吞吐量

线程池thread pool )是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配任务,这个线程会离开并处理任务。其余的线程则可用于处理在第一个线程处理任务的同时处理其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,增加 server 的吞吐量。
当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向对队列索取另一个请求。通过这种设计,则可以并发处理 N 个请求,其中 N 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。
这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和单线程异步 I/O 模型。
另外可以使用编译器驱动开发(compiler-driven development):我们将编写调用所期望的函数的代码,接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。就像测试驱动开发那样。

  1. // src/main.rs
  2. use std::fs;
  3. use std::io::prelude::*;
  4. use std::net::{TcpListener, TcpStream};
  5. use std::thread;
  6. use std::time::Duration;
  7. use web_server::ThreadPool;
  8. fn main() {
  9. let listener = TcpListener::bind("0.0.0.0:7878").unwrap();
  10. let pool = ThreadPool::new(4);
  11. for stream in listener.incoming() {
  12. let stream = stream.unwrap();
  13. pool.execute(|| {
  14. handle_connection(stream);
  15. });
  16. }
  17. }
  18. fn handle_connection(mut stream: TcpStream) {
  19. let mut buffer = [0; 1024];
  20. stream.read(&mut buffer).unwrap();
  21. println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
  22. let get = b"GET / HTTP/1.1\r\n";
  23. let sleep = b"GET /sleep HTTP/1.1\r\n";
  24. let (status_line, filename) = if buffer.starts_with(get) {
  25. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  26. } else if buffer.starts_with(sleep) {
  27. thread::sleep(Duration::from_secs(3));
  28. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  29. } else {
  30. ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  31. };
  32. let contents = fs::read_to_string(filename).unwrap();
  33. let response = format!("{}{}", status_line, contents);
  34. stream.write(response.as_bytes()).unwrap();
  35. stream.flush().unwrap();
  36. }
  1. // src/lib.rs
  2. // #![allow(unused)]
  3. use std::sync::mpsc;
  4. use std::sync::{Arc, Mutex};
  5. use std::thread;
  6. pub struct ThreadPool {
  7. workers: Vec<Worker>,
  8. sender: mpsc::Sender<Job>,
  9. }
  10. type Job = Box<dyn FnOnce() + Send + 'static>;
  11. impl ThreadPool {
  12. /// 创建线程池。
  13. ///
  14. /// 线程池中线程的数量。
  15. ///
  16. /// # Panics
  17. ///
  18. /// `new` 函数在 size 为 0 时会 panic。
  19. pub fn new(size: usize) -> Self {
  20. assert!(size > 0);
  21. let (sender, receiver) = mpsc::channel();
  22. let receiver = Arc::new(Mutex::new(receiver));
  23. let mut workers = Vec::with_capacity(size);
  24. for id in 0..size {
  25. workers.push(Worker::new(id, Arc::clone(&receiver)));
  26. }
  27. Self { workers, sender }
  28. }
  29. /// 执行线程,需要传入闭包。
  30. ///
  31. /// 使用 Sender 对线程进行通信。
  32. pub fn execute<F>(&self, f: F)
  33. where F: FnOnce() + Send + 'static {
  34. let job = Box::new(f);
  35. self.sender.send(job).unwrap();
  36. }
  37. }
  38. struct Worker {
  39. id: usize,
  40. thread: thread::JoinHandle<()>,
  41. }
  42. impl Worker {
  43. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Self {
  44. let thread = thread::spawn(move || loop {
  45. let job = receiver.lock().unwrap().recv().unwrap();
  46. println!("Worker {} got a job; executing.", id);
  47. job(); // 闭包最终被调用
  48. });
  49. Self { id, thread }
  50. }
  51. }

优雅清理

  1. // web-server/src/main.rs
  2. use std::fs;
  3. use std::io::prelude::*;
  4. use std::net::{TcpListener, TcpStream};
  5. use std::thread;
  6. use std::time::Duration;
  7. use web_server::ThreadPool;
  8. fn main() {
  9. let listener = TcpListener::bind("0.0.0.0:7878").unwrap();
  10. let pool = ThreadPool::new(4);
  11. for stream in listener.incoming().take(2) {
  12. let stream = stream.unwrap();
  13. pool.execute(|| {
  14. handle_connection(stream);
  15. });
  16. }
  17. println!("Shutting down.");
  18. }
  19. fn handle_connection(mut stream: TcpStream) {
  20. let mut buffer = [0; 1024];
  21. stream.read(&mut buffer).unwrap();
  22. println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
  23. let get = b"GET / HTTP/1.1\r\n";
  24. let sleep = b"GET /sleep HTTP/1.1\r\n";
  25. let (status_line, filename) = if buffer.starts_with(get) {
  26. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  27. } else if buffer.starts_with(sleep) {
  28. thread::sleep(Duration::from_secs(3));
  29. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  30. } else {
  31. ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  32. };
  33. let contents = fs::read_to_string(filename).unwrap();
  34. let response = format!("{}{}", status_line, contents);
  35. stream.write(response.as_bytes()).unwrap();
  36. stream.flush().unwrap();
  37. }
  1. // web-server/src/lib.rs
  2. use std::sync::mpsc;
  3. use std::sync::{Arc, Mutex};
  4. use std::thread;
  5. type Job = Box<dyn FnOnce() + Send + 'static>;
  6. enum Message {
  7. NewJob(Job),
  8. Terminate,
  9. }
  10. pub struct ThreadPool {
  11. workers: Vec<Worker>,
  12. sender: mpsc::Sender<Message>,
  13. }
  14. impl ThreadPool {
  15. /// 创建线程池。
  16. ///
  17. /// 线程池中线程的数量。
  18. ///
  19. /// # Panics
  20. ///
  21. /// `new` 函数在 size 为 0 时会 panic。
  22. pub fn new(size: usize) -> Self {
  23. assert!(size > 0);
  24. let (sender, receiver) = mpsc::channel();
  25. let receiver = Arc::new(Mutex::new(receiver));
  26. let mut workers = Vec::with_capacity(size);
  27. for id in 0..size {
  28. workers.push(Worker::new(id, Arc::clone(&receiver)));
  29. }
  30. Self { workers, sender }
  31. }
  32. /// 执行线程,需要传入闭包。
  33. ///
  34. /// 使用 Sender 对线程进行通信。
  35. pub fn execute<F>(&self, f: F)
  36. where F: FnOnce() + Send + 'static {
  37. let job = Box::new(f);
  38. self.sender.send(Message::NewJob(job)).unwrap();
  39. }
  40. }
  41. impl Drop for ThreadPool {
  42. fn drop(&mut self) {
  43. println!("Sending terminate message to all workers.");
  44. for _ in &mut self.workers {
  45. self.sender.send(Message::Terminate).unwrap();
  46. }
  47. for worker in &mut self.workers {
  48. println!("Shutting down worker {}", worker.id);
  49. if let Some(thread) = worker.thread.take() {
  50. // 将 thread 移动出拥有其所有权的 Worker 实例以便 join 可以消耗掉这个线程
  51. thread.join().unwrap();
  52. }
  53. }
  54. }
  55. }
  56. struct Worker {
  57. id: usize,
  58. thread: Option<thread::JoinHandle<()>>,
  59. }
  60. impl Worker {
  61. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Self {
  62. let thread = thread::spawn(move || loop {
  63. let message = receiver.lock().unwrap().recv().unwrap();
  64. match message {
  65. Message::NewJob(job) => {
  66. println!("Worker {} got a job; executing.", id);
  67. job(); // 闭包最终被调用
  68. }
  69. Message::Terminate => {
  70. println!("Worker {} was told to terminate.", id);
  71. break;
  72. }
  73. }
  74. });
  75. Self { id, thread: Some(thread) }
  76. }
  77. }

todo:web-server# 这里还有很多可以做的事!如下是一些点子:

  • ThreadPool 和其公有方法增加更多文档
  • 为库的功能增加测试
  • unwrap 调用改为更健壮的错误处理
  • 使用 ThreadPool 进行其他不同于处理网络请求的任务
  • crates.io 上寻找一个线程池 crate 并使用它实现一个类似的 web server,将其 API 和鲁棒性与我们的实现做对比