Receiving messages

让我们实现协议的接收部分。我们要:

  1. 将传入的TcpStream\n拆开,并将字节解码为 utf-8
  2. 将第一行解释为登录名
  3. 将其余的行解析为login: message
  1. # extern crate async_std;
  2. # use async_std::{
  3. # io::BufReader,
  4. # net::{TcpListener, TcpStream, ToSocketAddrs},
  5. # prelude::*,
  6. # task,
  7. # };
  8. #
  9. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  10. #
  11. async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
  12. let listener = TcpListener::bind(addr).await?;
  13. let mut incoming = listener.incoming();
  14. while let Some(stream) = incoming.next().await {
  15. let stream = stream?;
  16. println!("Accepting from: {}", stream.peer_addr()?);
  17. let _handle = task::spawn(connection_loop(stream)); // 1
  18. }
  19. Ok(())
  20. }
  21. async fn connection_loop(stream: TcpStream) -> Result<()> {
  22. let reader = BufReader::new(&stream); // 2
  23. let mut lines = reader.lines();
  24. let name = match lines.next().await { // 3
  25. None => Err("peer disconnected immediately")?,
  26. Some(line) => line?,
  27. };
  28. println!("name = {}", name);
  29. while let Some(line) = lines.next().await { // 4
  30. let line = line?;
  31. let (dest, msg) = match line.find(':') { // 5
  32. None => continue,
  33. Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
  34. };
  35. let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
  36. let msg: String = msg.trim().to_string();
  37. }
  38. Ok(())
  39. }
  1. 我们用task::spawn函数 孵 一个独立的任务,让它与每个客户端一起工作。也就是说,在接受客户之后accept_loop能立即开始等待下一个。这是事件驱动的体系结构的核心优势:我们同时为许多客户端提供服务,而无需花费很多硬件线程。

  2. 幸运的是,“将字节流分成几行”的功能已经实现。.lines()调用会传回String的数据流(stream)。

  3. 我们得到第一行 — login

  4. 而且,我们再次手动实现了一个异步 for 循环。

  5. 最后,我们将每一行解析为目标登录名和消息本身的列表。

Managing Errors

上述解决方案中的一个严重问题是,尽管我们在connection_loop正确传播了 errors,之后我们却将错误扔到了地上!原因是,task::spawn是不会立即返回错误(它做不到,它需要先让 future 完成运行)。而我们可以通过,等待 task 能可以加入了,来“fix”它,如下所示:

  1. # #![feature(async_closure)]
  2. # extern crate async_std;
  3. # use async_std::{
  4. # io::BufReader,
  5. # net::{TcpListener, TcpStream, ToSocketAddrs},
  6. # prelude::*,
  7. # task,
  8. # };
  9. #
  10. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  11. #
  12. # async fn connection_loop(stream: TcpStream) -> Result<()> {
  13. # let reader = BufReader::new(&stream); // 2
  14. # let mut lines = reader.lines();
  15. #
  16. # let name = match lines.next().await { // 3
  17. # None => Err("peer disconnected immediately")?,
  18. # Some(line) => line?,
  19. # };
  20. # println!("name = {}", name);
  21. #
  22. # while let Some(line) = lines.next().await { // 4
  23. # let line = line?;
  24. # let (dest, msg) = match line.find(':') { // 5
  25. # None => continue,
  26. # Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
  27. # };
  28. # let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
  29. # let msg: String = msg.trim().to_string();
  30. # }
  31. # Ok(())
  32. # }
  33. #
  34. # async move |stream| {
  35. let handle = task::spawn(connection_loop(stream));
  36. handle.await
  37. # };

.await等到客户端完成,然后?传播结果。

但是,此解决方案有两个问题!第一,因为我们立即等待客户端,所以我们一次只能处理一个客户端,这完全违反了异步的目的!第二,如果客户端遇到 IO 错误,则整个服务器都会立即退出。也就是说,仅一个端点的不稳定互联网连接,就可以让整个聊天室瘫痪了!

在这种情况下,处理客户端错误的正确方法是记录它们,并继续为其他客户端提供服务。为此,我们使用一个辅助函数:

  1. # extern crate async_std;
  2. # use async_std::{
  3. # io,
  4. # prelude::*,
  5. # task,
  6. # };
  7. fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
  8. where
  9. F: Future<Output = io::Result<()>> + Send + 'static,
  10. {
  11. task::spawn(async move {
  12. if let Err(e) = fut.await {
  13. eprintln!("{}", e)
  14. }
  15. })
  16. }