1 单线程Web服务器

1.1 启动监听、接受连接

  1. use std::net::TcpListener;
  2. pub fn step1(){
  3. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  4. for stream in listener.incoming(){
  5. let stream = stream.unwrap();
  6. println!("接受一个连接");
  7. }
  8. }

1.2 读取HTTP请求

  1. use std::io::prelude::*;
  2. use std::net::TcpListener;
  3. use std::net::TcpStream;
  4. pub fn step1(){
  5. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  6. for stream in listener.incoming(){
  7. handle_connection(stream.unwrap());
  8. }
  9. }
  10. fn handle_connection(mut stream: TcpStream){
  11. let mut buf = [0;512];
  12. let size = stream.read(&mut buf).unwrap();
  13. println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size]));
  14. }

1.3 发送回应

  1. fn handle_connection(mut stream: TcpStream){
  2. let mut buf = [0;512];
  3. let size = stream.read(&mut buf).unwrap();
  4. println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size]));
  5. let ack = "HTTP/1.1 200 OK\r\n";
  6. stream.write(ack.as_bytes()).unwrap();
  7. stream.flush().unwrap();
  8. }

1.4 从文件中读取HTML

  1. fn handle_connection(mut stream : TcpStream){
  2. let mut buf = [0;512];
  3. let size = stream.read(&mut buf).unwrap();
  4. println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size]));
  5. let mut file = File::open("./src/index.html").unwrap();
  6. let mut cont = String::new();
  7. file.read_to_string(&mut cont).unwrap();
  8. let mut file = File::open("./src/index.html").unwrap();
  9. let mut cont = String::new();
  10. file.read_to_string(&mut cont).unwrap();
  11. let ack = format!("HTTP/1.1 200 OK\r\n{}",cont);
  12. stream.write(ack.as_bytes()).unwrap();
  13. stream.flush().unwrap();
  14. }

1.5 区分请求路径

  1. fn handle_connection(mut stream : TcpStream){
  2. let mut buf = [0;512];
  3. let size = stream.read(&mut buf).unwrap();
  4. println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size]));
  5. let root = b"GET / HTTP/1.1";
  6. if buf.starts_with(root) {
  7. let mut file = File::open("./src/index.html").unwrap();
  8. let mut cont = String::new();
  9. file.read_to_string(&mut cont).unwrap();
  10. let ack = format!("HTTP/1.1 200 OK\r\n{}", cont);
  11. stream.write(ack.as_bytes()).unwrap();
  12. stream.flush().unwrap();
  13. }else{
  14. let mut file = File::open("./src/404.html").unwrap();
  15. let mut cont = String::new();
  16. file.read_to_string(&mut cont).unwrap();
  17. let ack = format!("HTTP/1.1 400 NOT FOUND\r\n{}", cont);
  18. stream.write(ack.as_bytes()).unwrap();
  19. stream.flush().unwrap();
  20. }
  21. }

1.6 提取函数

  1. fn handle_connection(mut stream : TcpStream){
  2. let mut buf = [0;512];
  3. let size = stream.read(&mut buf).unwrap();
  4. println!("读取到HTTP请求: {}字节\n{}\n",size,String::from_utf8_lossy(&buf[..size]));
  5. let root = b"GET / HTTP/1.1";
  6. if buf.starts_with(root) {
  7. return_page(&mut stream,"HTTP/1.1 200 OK","./src/index.html");
  8. }else{
  9. return_page(&mut stream,"HTTP/1.1 400 NOT FOUND","./src/404.html");
  10. }
  11. }
  12. fn return_page(stream : &mut TcpStream,status:&str,file:&str){
  13. let mut file = File::open(file).unwrap();
  14. let mut cont = String::new();
  15. file.read_to_string(&mut cont).unwrap();
  16. let ack = format!("{}\r\n{}",status, cont);
  17. stream.write(ack.as_bytes()).unwrap();
  18. stream.flush().unwrap();
  19. }

2 扩展成多线程

2.1 线程池

  1. use std::thread;
  2. pub struct ThreadPool{
  3. size : usize,
  4. workers : Vec<Worker>,
  5. }
  6. struct Worker{
  7. id: usize,
  8. handle: thread::JoinHandle<()>,
  9. }
  10. impl Worker{
  11. pub fn new(id: usize) -> Worker{
  12. let thread = thread::spawn(|| {});//什么也不做的空线程
  13. Worker{
  14. id:id,
  15. handle:thread,
  16. }
  17. }
  18. }
  19. impl ThreadPool{
  20. pub fn new(max : usize) -> ThreadPool{
  21. assert!(max > 0);
  22. // 创建多个工作者
  23. let mut list = Vec::with_capacity(max);
  24. for id in 0..max{
  25. list.push(Worker::new(id));
  26. }
  27. ThreadPool{
  28. size: max,
  29. workers: list,
  30. }
  31. }
  32. // 后续需要将f传递到Worker中
  33. pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static {
  34. }
  35. }
  • std::thread::spawn()的签名如下:
  1. pub fn spawn<F,T>(f: F) -> JoinHandle<T> where F:FnOnce() -> T + Send + 'static{},T: Send + 'static
  • 分析如下
    • 不关心返回值的类型T
    • 参数类型F的特性限定FnOnce()execute()中也是需要的:因为参数需要最终传递到execute()
    • 参数类型F的特性限定Send是必须的:Send允许将参数从一个线程传递到另一个线程(新创建的线程)
    • 参数类型F的生命周期限定'static是必须的:不知道线程会运行多长时间

2.2 使用通道传递工作函数

  1. use std::sync;
  2. use std::sync::mpsc;
  3. use std::thread;
  4. // 要传递的工作函数由特性限定表示;但是不能直接传递大小不确定的特性类型,要用Box封装
  5. type Job = Box<FnOnce() + Send + 'static>;
  6. pub struct ThreadPool {
  7. size: usize,
  8. workers: Vec<Worker>,
  9. sender: mpsc::Sender<Job>,
  10. }
  11. struct Worker {
  12. id: usize,
  13. handle: thread::JoinHandle<()>,
  14. }
  15. impl Worker {
  16. pub fn new(id: usize, receiver: sync::Arc<sync::Mutex<mpsc::Receiver<Job>>>) -> Worker {
  17. let thread = thread::spawn(move || loop {// 需要将receiver的所有权移动到闭包中,所以使用move
  18. let job = receiver.lock().unwrap().recv().unwrap();
  19. // 这里无法通过编译: 被封装的 FnOnce() 特性在作为函数进行调用时,需要获取资源所有权
  20. // 但是Box不允许移出资源的所有权:资源是属于Box的,不能移出
  21. // 解决方案见下一节
  22. (*job)();
  23. });
  24. Worker {
  25. id: id,
  26. handle: thread,
  27. }
  28. }
  29. }
  30. impl ThreadPool {
  31. pub fn new(max: usize) -> ThreadPool {
  32. assert!(max > 0);
  33. let (tx, rx) = mpsc::channel();
  34. let receiver = sync::Arc::new(sync::Mutex::new(rx));
  35. let mut list = Vec::with_capacity(max);
  36. for id in 0..max {
  37. // 不能直接使用receiver,因为所有权被转移走,后续循环中就不能再使用了
  38. // 使用Arc和Mutex进行封装
  39. // Arc适用于资源需要被多线程、多处只读使用、无法确定哪部分最后结束使用(然后释放资源)的情况
  40. // Mutex保证资源的互斥使用
  41. list.push(Worker::new(id, receiver.clone()));
  42. }
  43. ThreadPool {
  44. size: max,
  45. workers: list,
  46. sender: tx,
  47. }
  48. }
  49. pub fn execute<F>(&self, f: F)
  50. where
  51. F: FnOnce() + Send + 'static,
  52. {
  53. // 用Box封装工作函数,然后发送到通道中
  54. let job = Box::new(f);
  55. self.sender.send(job);
  56. }
  57. }

2.3 使用Box<Self>

  1. use std::sync;
  2. use std::sync::mpsc;
  3. use std::thread;
  4. pub trait FnBox {
  5. fn call_box(self: Box<Self>);
  6. }
  7. // 总括实现: 用Box<Self>可以取得Box中的值的所有权
  8. impl<F: FnOnce()> FnBox for F {
  9. fn call_box(self: Box<Self>) {
  10. (*self)();
  11. }
  12. }
  13. type Job = Box<FnBox + Send + 'static>;
  14. pub struct ThreadPool {
  15. sender: mpsc::Sender<Job>,
  16. }
  17. struct Worker;
  18. impl Worker {
  19. fn run(id: usize, receiver: sync::Arc<sync::Mutex<mpsc::Receiver<Job>>>) {
  20. thread::spawn(move ||
  21. loop {
  22. // lock()的返回值为LockResult<MutexGuard<T>>类型,其中的MutexGuard<T>失效时,锁被自动释放
  23. // let 语句中,对MutexGuard<T>调用recv().unwrap()方法后,将结果赋值给job后,临时的MutexGuard<T>就失效了,锁被释放
  24. let job = receiver.lock().unwrap().recv().unwrap();
  25. println!("线程{}执行任务", id);
  26. job.call_box();//通过Box<Self>类型获取Box中的值的所有权,然后将值作为函数进行调用
  27. }
  28. /*
  29. // while 表达式中的值在整个while块中一直有效,MutexGuard<T>一直有效,锁一直被持有:在调用job.call_box()期间锁也被持有
  30. // 无法实现多个请求的并行执行
  31. while let Ok(job) = receiver.lock().unwrap().recv() {
  32. println!("Worker {} got a job; executing.", id);
  33. job.call_box();
  34. }
  35. */
  36. );
  37. }
  38. }
  39. impl ThreadPool {
  40. pub fn new(max: usize) -> ThreadPool {
  41. assert!(max > 0);
  42. let (tx, rx) = mpsc::channel();
  43. let receiver = sync::Arc::new(sync::Mutex::new(rx));
  44. for id in 0..max {
  45. Worker::run(id, receiver.clone());
  46. }
  47. ThreadPool { sender: tx }
  48. }
  49. pub fn execute<F>(&self, f: F)
  50. where
  51. F: FnBox + Send + 'static // 这里FnOnce() 改成 FnBox
  52. {
  53. let job = Box::new(f);
  54. self.sender.send(job).unwrap();
  55. }
  56. }
  • 将第一节的单线程服务器改成多线程时,仅需要修改1.1节的step1()函数
  1. pub fn step1(){
  2. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  3. // 创建线程池
  4. let pool = super::section2::ThreadPool::new(4);
  5. for stream in listener.incoming(){
  6. // 使用线程池执行函数
  7. pool.execute(||handle_connection(stream.unwrap()));
  8. }
  9. }

3 优雅停机与清理

  1. use std::sync;
  2. use std::sync::mpsc;
  3. use std::thread;
  4. pub trait FnBox {
  5. fn call_box(self: Box<Self>);
  6. }
  7. impl<F: FnOnce()> FnBox for F {
  8. // 使用Box<Self>可以从Box<T>中取出T的值,获得所有权
  9. fn call_box(self: Box<Self>) {
  10. (*self)();
  11. }
  12. }
  13. type Job = Box<FnBox + Send + 'static>;
  14. // 使用枚举类型粘结异构类型
  15. enum Msg {
  16. NewJob(Job),
  17. Exit,
  18. }
  19. pub struct ThreadPool {
  20. sender: mpsc::Sender<Msg>,
  21. list: Vec<Worker>,
  22. }
  23. struct Worker {
  24. thread: Option<thread::JoinHandle<()>>,
  25. }
  26. impl Worker {
  27. fn run(id: usize, receiver: sync::Arc<sync::Mutex<mpsc::Receiver<Msg>>>) -> Worker {
  28. let t = thread::spawn(move || loop {
  29. // 使用if let
  30. if let Msg::NewJob(job) = receiver.lock().unwrap().recv().unwrap() {
  31. println!("线程{}执行任务", id);
  32. job.call_box();
  33. } else {
  34. println!("线程{}收到退出请求", id);
  35. break;
  36. }
  37. // 使用match
  38. /*
  39. let msg = receiver.lock().unwrap().recv().unwrap();
  40. match msg {
  41. Msg::NewJob(job) => {
  42. println!("线程{}执行任务", id);
  43. job.call_box();
  44. }
  45. Msg::Exit => {
  46. println!("线程{}收到退出请求", id);
  47. break;
  48. }
  49. }
  50. */ });
  51. Worker { thread: Some(t) }
  52. }
  53. }
  54. impl ThreadPool {
  55. pub fn new(max: usize) -> ThreadPool {
  56. assert!(max > 0);
  57. let (tx, rx) = mpsc::channel();
  58. // 使用Arc和Mutex封装类型,使得类型可以安全地在多线程中使用
  59. let receiver = sync::Arc::new(sync::Mutex::new(rx));
  60. let mut list = Vec::with_capacity(max);
  61. for id in 0..max {
  62. list.push(Worker::run(id, receiver.clone()));
  63. }
  64. ThreadPool {
  65. sender: tx,
  66. list: list,
  67. }
  68. }
  69. pub fn execute<F>(&self, f: F)
  70. where
  71. F: FnBox + Send + 'static,
  72. {
  73. let job = Box::new(f);
  74. self.sender.send(Msg::NewJob(job)).unwrap();
  75. }
  76. }
  77. impl Drop for ThreadPool {
  78. fn drop(&mut self) {
  79. // 发送多个退出请求,让每个线程收到一个
  80. for _ in &self.list {
  81. self.sender.send(Msg::Exit).unwrap();
  82. }
  83. // 等待每个线程退出
  84. for item in &mut self.list {
  85. if let Some(t) = item.thread.take() {
  86. match t.join() {
  87. Ok(_) => 0,
  88. Err(_) => 1,
  89. };
  90. }
  91. }
  92. }
  93. }