All Together
此时,我们只需要开始 broker,就可获得功能齐全的聊天室(在愉快的情况下!):
# extern crate async_std;# extern crate futures;use async_std::{io::{self, BufReader},net::{TcpListener, TcpStream, ToSocketAddrs},prelude::*,task,};use futures::channel::mpsc;use futures::SinkExt;use std::{collections::hash_map::{HashMap, Entry},sync::Arc,};type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;type Sender<T> = mpsc::UnboundedSender<T>;type Receiver<T> = mpsc::UnboundedReceiver<T>;// mainfn run() -> Result<()> {task::block_on(accept_loop("127.0.0.1:8080"))}fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>whereF: Future<Output = Result<()>> + Send + 'static,{task::spawn(async move {if let Err(e) = fut.await {eprintln!("{}", e)}})}async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {let listener = TcpListener::bind(addr).await?;let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1let _broker_handle = task::spawn(broker_loop(broker_receiver));let mut incoming = listener.incoming();while let Some(stream) = incoming.next().await {let stream = stream?;println!("Accepting from: {}", stream.peer_addr()?);spawn_and_log_error(connection_loop(broker_sender.clone(), stream));}Ok(())}async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {let stream = Arc::new(stream); // 2let reader = BufReader::new(&*stream);let mut lines = reader.lines();let name = match lines.next().await {None => Err("peer disconnected immediately")?,Some(line) => line?,};broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3.unwrap();while let Some(line) = lines.next().await {let line = line?;let (dest, msg) = match line.find(':') {None => continue,Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),};let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();let msg: String = msg.to_string();broker.send(Event::Message { // 4from: name.clone(),to: dest,msg,}).await.unwrap();}Ok(())}async fn connection_writer_loop(mut messages: Receiver<String>,stream: Arc<TcpStream>,) -> Result<()> {let mut stream = &*stream;while let Some(msg) = messages.next().await {stream.write_all(msg.as_bytes()).await?;}Ok(())}#[derive(Debug)]enum Event {NewPeer {name: String,stream: Arc<TcpStream>,},Message {from: String,to: Vec<String>,msg: String,},}async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {let mut peers: HashMap<String, Sender<String>> = HashMap::new();while let Some(event) = events.next().await {match event {Event::Message { from, to, msg } => {for addr in to {if let Some(peer) = peers.get_mut(&addr) {let msg = format!("from {}: {}\n", from, msg);peer.send(msg).await?}}}Event::NewPeer { name, stream} => {match peers.entry(name) {Entry::Occupied(..) => (),Entry::Vacant(entry) => {let (client_sender, client_receiver) = mpsc::unbounded();entry.insert(client_sender); // 4spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5}}}}}Ok(())}
- 在
accept_loop里面,我们创建了 broker 的 channel,和task。 - 在
connection_loop里面,我们需要将TcpStream包进一个Arc,以便能够与connection_writer_loop共享它。 - 登录后,我们会通知 broker。注意我们
.unwrapon send: broker 应该比所有客户都长命,如果不是这种情况, broker 应该 panics,所以这个 panics 应该是致命的。 - 同样,我们将已解析的消息转发给 broker,并假设该消息仍处于活动状态。
