基于单线程模拟慢请求

  1. use std::{
  2. fs,
  3. io::{prelude::*, BufReader},
  4. net::{TcpListener, TcpStream},
  5. thread,
  6. time::Duration,
  7. };
  8. fn main() {
  9. // 监听地址: 127.0.0.1:7878
  10. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  11. // 建立连接
  12. for stream in listener.incoming() {
  13. let stream = stream.unwrap();
  14. handle_connection(stream);
  15. }
  16. }
  17. fn handle_connection(mut stream: TcpStream) {
  18. let buf_reader = BufReader::new(&mut stream);
  19. let request_line = buf_reader.lines().next().unwrap().unwrap();
  20. // let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
  21. // ("HTTP/1.1 200 OK", "hello.html")
  22. // } else {
  23. // ("HTTP/1.1 404 NOT FOUND", "404.html")
  24. // };
  25. let (status_line, filename) = match &request_line[..] {
  26. "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
  27. "GET /sleep HTTP/1.1" => {
  28. thread::sleep(Duration::from_secs(5));
  29. ("HTTP/1.1 200 OK", "hello.html")
  30. }
  31. _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
  32. };
  33. let contents = fs::read_to_string(filename).unwrap();
  34. let length = contents.len();
  35. let response =
  36. format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
  37. stream.write_all(response.as_bytes()).unwrap();
  38. }
如果我们连续访问 / 路径,那效果跟之前一样:立刻看到请求的页面。但假如先访问 /sleep ,接着在另一个页面访问 /,就会看到 / 的页面直到 5 秒后才会刷出来,验证了请求排队这个糟糕的事实。 至于如何解决,其实办法不少,本章我们来看看一个经典解决方案:线程池。

使用线程池改善吞吐

为每个请求生成一个线程

  1. fn main() {
  2. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  3. for stream in listener.incoming() {
  4. let stream = stream.unwrap();
  5. thread::spawn(|| {
  6. handle_connection(stream);
  7. });
  8. }
  9. }
这种实现下,依次访问 /sleep/ 就无需再等待,不错的开始。这显然不是我们的最终方案,原因在于它会生成无上限的线程数,最终导致资源耗尽。但它确实是一个好的起点。

限制创建线程的数量

我们需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。架构设计首先是设定最大线程数的上限,其次维护一个请求队列。池中的线程去队列中依次弹出请求并处理。这样就可以同时并发处理 N 个请求,其中 N 是线程数。
  1. use std::{
  2. fs,
  3. io::{prelude::*, BufReader},
  4. net::{TcpListener, TcpStream},
  5. thread,
  6. time::Duration,
  7. };
  8. use multi_thread::ThreadPool;
  9. fn main() {
  10. // 监听地址: 127.0.0.1:7878
  11. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  12. // 创建一个包含 4 个线程的线程池
  13. let pool = ThreadPool::new(4);
  14. // 建立连接
  15. for stream in listener.incoming() {
  16. let stream = stream.unwrap();
  17. // 分发执行请求
  18. pool.execute(|| {
  19. handle_connection(stream);
  20. });
  21. }
  22. }
  23. fn handle_connection(mut stream: TcpStream) {
  24. let buf_reader = BufReader::new(&mut stream);
  25. let request_line = buf_reader.lines().next().unwrap().unwrap();
  26. // let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
  27. // ("HTTP/1.1 200 OK", "hello.html")
  28. // } else {
  29. // ("HTTP/1.1 404 NOT FOUND", "404.html")
  30. // };
  31. let (status_line, filename) = match &request_line[..] {
  32. "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
  33. "GET /sleep HTTP/1.1" => {
  34. thread::sleep(Duration::from_secs(5));
  35. ("HTTP/1.1 200 OK", "hello.html")
  36. }
  37. _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
  38. };
  39. let contents = fs::read_to_string(filename).unwrap();
  40. let length = contents.len();
  41. let response =
  42. format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
  43. stream.write_all(response.as_bytes()).unwrap();
  44. }
创建 src/lib.rs 文件并写入如下代码:
  1. use std::{
  2. sync::{mpsc, Arc, Mutex},
  3. thread,
  4. };
  5. type Job = Box<dyn FnOnce() + Send + 'static>;
  6. pub struct ThreadPool {
  7. workers: Vec<Worker>,
  8. sender: mpsc::Sender<Job>,
  9. }
  10. impl ThreadPool {
  11. // 初始化实例
  12. pub fn new(size: usize) -> ThreadPool {
  13. assert!(size > 0);
  14. let (sender, receiver) = mpsc::channel();
  15. let receiver = Arc::new(Mutex::new(receiver));
  16. let mut workers = Vec::with_capacity(size);
  17. for id in 0..size {
  18. workers.push(Worker::new(id, Arc::clone(&receiver)));
  19. }
  20. ThreadPool { workers, sender }
  21. }
  22. pub fn execute<F>(&self, f: F)
  23. where
  24. F: FnOnce() + Send + 'static,
  25. {
  26. let job = Box::new(f);
  27. self.sender.send(job).unwrap();
  28. }
  29. }
  30. // 创建一个 Worker 结构体,作为 ThreadPool 和任务线程联系的桥梁,它的任务是获得将要执行的代码,然后在具体的线程中去执行。想象一个场景:一个餐馆,Worker 等待顾客的点餐,然后将具体的点餐信息传递给厨房,感觉类似服务员,
  31. // 由于外部调用者无需知道 Worker 的存在,因此这里使用了私有的声明。
  32. struct Worker {
  33. id: usize,
  34. thread: thread::JoinHandle<()>,
  35. }
  36. impl Worker {
  37. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
  38. let thread = thread::spawn(move || loop {
  39. let job = receiver.lock().unwrap().recv().unwrap();
  40. println!("Worker {id} got a job; executing.");
  41. job();
  42. });
  43. Worker { id, thread }
  44. }
  45. }