基于单线程模拟慢请求
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
// 监听地址: 127.0.0.1:7878
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// 建立连接
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
// let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
// ("HTTP/1.1 200 OK", "hello.html")
// } else {
// ("HTTP/1.1 404 NOT FOUND", "404.html")
// };
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
如果我们连续访问 / 路径,那效果跟之前一样:立刻看到请求的页面。但假如先访问 /sleep ,接着在另一个页面访问 /,就会看到 / 的页面直到 5 秒后才会刷出来,验证了请求排队这个糟糕的事实。
至于如何解决,其实办法不少,本章我们来看看一个经典解决方案:线程池。
使用线程池改善吞吐
为每个请求生成一个线程
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
这种实现下,依次访问 /sleep 和 / 就无需再等待,不错的开始。这显然不是我们的最终方案,原因在于它会生成无上限的线程数,最终导致资源耗尽。但它确实是一个好的起点。
限制创建线程的数量
我们需要限制线程池中的线程数量,以保护服务器免受拒绝服务攻击(DoS)的影响:如果针对每个请求创建一个新线程,那么一个人向我们的服务器发出1000万个请求,会直接耗尽资源,导致后续用户的请求无法被处理,这也是拒绝服务名称的来源。架构设计首先是设定最大线程数的上限,其次维护一个请求队列。池中的线程去队列中依次弹出请求并处理。这样就可以同时并发处理 N 个请求,其中 N 是线程数。
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
use multi_thread::ThreadPool;
fn main() {
// 监听地址: 127.0.0.1:7878
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// 创建一个包含 4 个线程的线程池
let pool = ThreadPool::new(4);
// 建立连接
for stream in listener.incoming() {
let stream = stream.unwrap();
// 分发执行请求
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
// let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
// ("HTTP/1.1 200 OK", "hello.html")
// } else {
// ("HTTP/1.1 404 NOT FOUND", "404.html")
// };
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
创建 src/lib.rs 文件并写入如下代码:
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl ThreadPool {
// 初始化实例
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// 创建一个 Worker 结构体,作为 ThreadPool 和任务线程联系的桥梁,它的任务是获得将要执行的代码,然后在具体的线程中去执行。想象一个场景:一个餐馆,Worker 等待顾客的点餐,然后将具体的点餐信息传递给厨房,感觉类似服务员,
// 由于外部调用者无需知道 Worker 的存在,因此这里使用了私有的声明。
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}