Handling Disconnections

目前,我们只对 map 有,添加新 peers 的操作。这显然是错误的:如果一个 peer 关闭了与 chat 的连接,我们就不应尝试向其发送更多消息。

处理断开连接的一个细微之处是,我们可以检测到,它是在 reader 的 task 中,还是在 writer 的 task 中。这里最显而易见的解决方案是:在两种情况下,试着仅将 peer 从peers map 中移除,但这是错误的。如果读取和写入(read and write)两个都失败,我们将会移除 peer 两次,但有种情况是,在两种错误之间, peer 重新连接上了!解决上面问题的思考是,我们仅在 write 端完结后,才移除 peer 。如果 read 端完结,我们将通知 write 端也应停止。也就是说,我们需要添加一项手段,发出信号,以关闭 writer task。

解决这个问题的一种方法是shutdown: Receiver<()> channel。但是,有一个更小的解决方案,它可以巧妙地使用 RAII。 shutdown channel 是一个同步事件,因此我们不需要发送一个 shutdown 消息,只需丢弃 sender 即可。这样,即使我们的?或 panics 提前返回,我们也能坐享其成地确定,发出 shutdown 仅一次。

首先,让我们向connection_loop,添加一个 shutdown channel:

  1. # extern crate async_std;
  2. # extern crate futures;
  3. # use async_std::net::TcpStream;
  4. # use futures::channel::mpsc;
  5. # use futures::SinkExt;
  6. # use std::sync::Arc;
  7. #
  8. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  9. # type Sender<T> = mpsc::UnboundedSender<T>;
  10. # type Receiver<T> = mpsc::UnboundedReceiver<T>;
  11. #
  12. #[derive(Debug)]
  13. enum Void {} // 1
  14. #[derive(Debug)]
  15. enum Event {
  16. NewPeer {
  17. name: String,
  18. stream: Arc<TcpStream>,
  19. shutdown: Receiver<Void>, // 2
  20. },
  21. Message {
  22. from: String,
  23. to: Vec<String>,
  24. msg: String,
  25. },
  26. }
  27. async fn connection_loop(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()> {
  28. // ...
  29. # let name: String = unimplemented!();
  30. let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); // 3
  31. broker.send(Event::NewPeer {
  32. name: name.clone(),
  33. stream: Arc::clone(&stream),
  34. shutdown: shutdown_receiver,
  35. }).await.unwrap();
  36. // ...
  37. # unimplemented!()
  38. }
  1. 为了强制信息不通过 shutdown channel 发送,我们使用了仅作标记的类型。
  2. 我们将 shutdown channel 传递给 writer task
  3. 在 reader 中,我们创建了一个_shutdown_sender,其唯一的目的就是 get dropped。

connection_writer_loop里面,我们现在需要在 shutdown 和 message channel 之间进行选择。为此,我们使用select宏:

  1. # extern crate async_std;
  2. # extern crate futures;
  3. # use async_std::{net::TcpStream, prelude::*};
  4. use futures::channel::mpsc;
  5. use futures::{select, FutureExt};
  6. # use std::sync::Arc;
  7. # type Receiver<T> = mpsc::UnboundedReceiver<T>;
  8. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  9. # type Sender<T> = mpsc::UnboundedSender<T>;
  10. # #[derive(Debug)]
  11. # enum Void {} // 1
  12. async fn connection_writer_loop(
  13. messages: &mut Receiver<String>,
  14. stream: Arc<TcpStream>,
  15. shutdown: Receiver<Void>, // 1
  16. ) -> Result<()> {
  17. let mut stream = &*stream;
  18. let mut messages = messages.fuse();
  19. let mut shutdown = shutdown.fuse();
  20. loop { // 2
  21. select! {
  22. msg = messages.next().fuse() => match msg {
  23. Some(msg) => stream.write_all(msg.as_bytes()).await?,
  24. None => break,
  25. },
  26. void = shutdown.next().fuse() => match void {
  27. Some(void) => match void {}, // 3
  28. None => break,
  29. }
  30. }
  31. }
  32. Ok(())
  33. }
  1. 我们添加 shutdown channel 作为参数。
  2. 因为select,我们不能使用while let循环,所以我们将其 desugar(解语法糖) 到 loop
  3. 在 shutdown 案例下,我们使用match void {}作为静态检查版的unreachable!()

另一个问题是,新的信息可能会被推进 peer 的 channel,这问题发生的时机是,在我们在connection_writer_loop检测到 disconnection(断连),和我们实际上从peers map 中移除这个 peer 之间。为了不完全失去这些消息,我们将 message channel 返回给 broker。这也使我们能够建立一个有用的不变式,message channel 在peers map 中,是严格要求’长命’过这个 peer 的,并使 broker 本身处于无法失败的状态。

Final Code

最终代码如下所示:

  1. # extern crate async_std;
  2. # extern crate futures;
  3. use async_std::{
  4. io::BufReader,
  5. net::{TcpListener, TcpStream, ToSocketAddrs},
  6. prelude::*,
  7. task,
  8. };
  9. use futures::channel::mpsc;
  10. use futures::{select, FutureExt, SinkExt};
  11. use std::{
  12. collections::hash_map::{Entry, HashMap},
  13. future::Future,
  14. sync::Arc,
  15. };
  16. type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  17. type Sender<T> = mpsc::UnboundedSender<T>;
  18. type Receiver<T> = mpsc::UnboundedReceiver<T>;
  19. #[derive(Debug)]
  20. enum Void {}
  21. // main
  22. fn run() -> Result<()> {
  23. task::block_on(accept_loop("127.0.0.1:8080"))
  24. }
  25. async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
  26. let listener = TcpListener::bind(addr).await?;
  27. let (broker_sender, broker_receiver) = mpsc::unbounded();
  28. let broker_handle = task::spawn(broker_loop(broker_receiver));
  29. let mut incoming = listener.incoming();
  30. while let Some(stream) = incoming.next().await {
  31. let stream = stream?;
  32. println!("Accepting from: {}", stream.peer_addr()?);
  33. spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
  34. }
  35. drop(broker_sender);
  36. broker_handle.await;
  37. Ok(())
  38. }
  39. async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
  40. let stream = Arc::new(stream);
  41. let reader = BufReader::new(&*stream);
  42. let mut lines = reader.lines();
  43. let name = match lines.next().await {
  44. None => Err("peer disconnected immediately")?,
  45. Some(line) => line?,
  46. };
  47. let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
  48. broker.send(Event::NewPeer {
  49. name: name.clone(),
  50. stream: Arc::clone(&stream),
  51. shutdown: shutdown_receiver,
  52. }).await.unwrap();
  53. while let Some(line) = lines.next().await {
  54. let line = line?;
  55. let (dest, msg) = match line.find(':') {
  56. None => continue,
  57. Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
  58. };
  59. let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
  60. let msg: String = msg.trim().to_string();
  61. broker.send(Event::Message {
  62. from: name.clone(),
  63. to: dest,
  64. msg,
  65. }).await.unwrap();
  66. }
  67. Ok(())
  68. }
  69. async fn connection_writer_loop(
  70. messages: &mut Receiver<String>,
  71. stream: Arc<TcpStream>,
  72. shutdown: Receiver<Void>,
  73. ) -> Result<()> {
  74. let mut stream = &*stream;
  75. let mut messages = messages.fuse();
  76. let mut shutdown = shutdown.fuse();
  77. loop {
  78. select! {
  79. msg = messages.next().fuse() => match msg {
  80. Some(msg) => stream.write_all(msg.as_bytes()).await?,
  81. None => break,
  82. },
  83. void = shutdown.next().fuse() => match void {
  84. Some(void) => match void {},
  85. None => break,
  86. }
  87. }
  88. }
  89. Ok(())
  90. }
  91. #[derive(Debug)]
  92. enum Event {
  93. NewPeer {
  94. name: String,
  95. stream: Arc<TcpStream>,
  96. shutdown: Receiver<Void>,
  97. },
  98. Message {
  99. from: String,
  100. to: Vec<String>,
  101. msg: String,
  102. },
  103. }
  104. async fn broker_loop(events: Receiver<Event>) {
  105. let (disconnect_sender, mut disconnect_receiver) = // 1
  106. mpsc::unbounded::<(String, Receiver<String>)>();
  107. let mut peers: HashMap<String, Sender<String>> = HashMap::new();
  108. let mut events = events.fuse();
  109. loop {
  110. let event = select! {
  111. event = events.next().fuse() => match event {
  112. None => break, // 2
  113. Some(event) => event,
  114. },
  115. disconnect = disconnect_receiver.next().fuse() => {
  116. let (name, _pending_messages) = disconnect.unwrap(); // 3
  117. assert!(peers.remove(&name).is_some());
  118. continue;
  119. },
  120. };
  121. match event {
  122. Event::Message { from, to, msg } => {
  123. for addr in to {
  124. if let Some(peer) = peers.get_mut(&addr) {
  125. let msg = format!("from {}: {}\n", from, msg);
  126. peer.send(msg).await
  127. .unwrap() // 6
  128. }
  129. }
  130. }
  131. Event::NewPeer { name, stream, shutdown } => {
  132. match peers.entry(name.clone()) {
  133. Entry::Occupied(..) => (),
  134. Entry::Vacant(entry) => {
  135. let (client_sender, mut client_receiver) = mpsc::unbounded();
  136. entry.insert(client_sender);
  137. let mut disconnect_sender = disconnect_sender.clone();
  138. spawn_and_log_error(async move {
  139. let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await;
  140. disconnect_sender.send((name, client_receiver)).await // 4
  141. .unwrap();
  142. res
  143. });
  144. }
  145. }
  146. }
  147. }
  148. }
  149. drop(peers); // 5
  150. drop(disconnect_sender); // 6
  151. while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
  152. }
  153. }
  154. fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
  155. where
  156. F: Future<Output = Result<()>> + Send + 'static,
  157. {
  158. task::spawn(async move {
  159. if let Err(e) = fut.await {
  160. eprintln!("{}", e)
  161. }
  162. })
  163. }
  1. 在 broker 中,我们创建一个 channel,来获取断开连接的 peer,及其未传递的消息。
  2. 当输入事件 channel 耗尽时(即,所有 readers 退出时), broker 的 main 循环退出。
  3. 因为 broker 本身持有一个disconnect_sender,我们就知道在 main 循环中的 disconnections channel,无法完全排掉。
  4. 我们将 peer 的 name 和待处理消息(pending messages),发送到 disconnections channel ,这里有两条路径,一条快乐,一条不是太快乐(in both the happy and the not-so-happy path)。 同样,我们可以安全地 unwrap,因为 broker 的生命周期是超过 writers 的。
  5. 我们 drop peers map ,以关闭 writers 的 message channel ,并确定关闭 writers。 在当前设置中, broker 等待 reader 的关闭,其实并不是绝对必要的。但是,如果我们要添加一个 server-initiated shutdown(例如,kbd:[ctrl+c] 处理),这将是 broker 关闭 writers 的一种方式。
  6. 最后,我们关闭,并排干 disconnections channel 。