All Together

此时,我们只需要开始 broker,就可获得功能齐全的聊天室(在愉快的情况下!):

  1. # extern crate async_std;
  2. # extern crate futures;
  3. use async_std::{
  4. io::{self, BufReader},
  5. net::{TcpListener, TcpStream, ToSocketAddrs},
  6. prelude::*,
  7. task,
  8. };
  9. use futures::channel::mpsc;
  10. use futures::SinkExt;
  11. use std::{
  12. collections::hash_map::{HashMap, Entry},
  13. sync::Arc,
  14. };
  15. type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  16. type Sender<T> = mpsc::UnboundedSender<T>;
  17. type Receiver<T> = mpsc::UnboundedReceiver<T>;
  18. // main
  19. fn run() -> Result<()> {
  20. task::block_on(accept_loop("127.0.0.1:8080"))
  21. }
  22. fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
  23. where
  24. F: Future<Output = Result<()>> + Send + 'static,
  25. {
  26. task::spawn(async move {
  27. if let Err(e) = fut.await {
  28. eprintln!("{}", e)
  29. }
  30. })
  31. }
  32. async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
  33. let listener = TcpListener::bind(addr).await?;
  34. let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1
  35. let _broker_handle = task::spawn(broker_loop(broker_receiver));
  36. let mut incoming = listener.incoming();
  37. while let Some(stream) = incoming.next().await {
  38. let stream = stream?;
  39. println!("Accepting from: {}", stream.peer_addr()?);
  40. spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
  41. }
  42. Ok(())
  43. }
  44. async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
  45. let stream = Arc::new(stream); // 2
  46. let reader = BufReader::new(&*stream);
  47. let mut lines = reader.lines();
  48. let name = match lines.next().await {
  49. None => Err("peer disconnected immediately")?,
  50. Some(line) => line?,
  51. };
  52. broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3
  53. .unwrap();
  54. while let Some(line) = lines.next().await {
  55. let line = line?;
  56. let (dest, msg) = match line.find(':') {
  57. None => continue,
  58. Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
  59. };
  60. let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
  61. let msg: String = msg.to_string();
  62. broker.send(Event::Message { // 4
  63. from: name.clone(),
  64. to: dest,
  65. msg,
  66. }).await.unwrap();
  67. }
  68. Ok(())
  69. }
  70. async fn connection_writer_loop(
  71. mut messages: Receiver<String>,
  72. stream: Arc<TcpStream>,
  73. ) -> Result<()> {
  74. let mut stream = &*stream;
  75. while let Some(msg) = messages.next().await {
  76. stream.write_all(msg.as_bytes()).await?;
  77. }
  78. Ok(())
  79. }
  80. #[derive(Debug)]
  81. enum Event {
  82. NewPeer {
  83. name: String,
  84. stream: Arc<TcpStream>,
  85. },
  86. Message {
  87. from: String,
  88. to: Vec<String>,
  89. msg: String,
  90. },
  91. }
  92. async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
  93. let mut peers: HashMap<String, Sender<String>> = HashMap::new();
  94. while let Some(event) = events.next().await {
  95. match event {
  96. Event::Message { from, to, msg } => {
  97. for addr in to {
  98. if let Some(peer) = peers.get_mut(&addr) {
  99. let msg = format!("from {}: {}\n", from, msg);
  100. peer.send(msg).await?
  101. }
  102. }
  103. }
  104. Event::NewPeer { name, stream} => {
  105. match peers.entry(name) {
  106. Entry::Occupied(..) => (),
  107. Entry::Vacant(entry) => {
  108. let (client_sender, client_receiver) = mpsc::unbounded();
  109. entry.insert(client_sender); // 4
  110. spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
  111. }
  112. }
  113. }
  114. }
  115. }
  116. Ok(())
  117. }
  1. accept_loop里面,我们创建了 broker 的 channel,和task
  2. connection_loop里面,我们需要将TcpStream包进一个Arc,以便能够与connection_writer_loop共享它。
  3. 登录后,我们会通知 broker。注意我们.unwrap on send: broker 应该比所有客户都长命,如果不是这种情况, broker 应该 panics,所以这个 panics 应该是致命的。
  4. 同样,我们将已解析的消息转发给 broker,并假设该消息仍处于活动状态。