Connecting Readers and Writers

那么我们如何确保,connection_loop读到的信息流入对应的connection_writer_loop?我们应该以某种方式保有一个peers: HashMap<String, Sender<String>> map,一个允许客户查找目标频道的’地图’。但是,此 map 会有些共享的可变状态,因此我们必须裹入一个RwLock,并回答个棘手的问题,即如果客户端在收到消息的同时,加入该怎么办。

使状态的推理更简单的一个技巧,来自参与者模型(actor model)。我们可以创建一个专用的代理人(dedicated broker)任务,该任务拥有peers map ,并通过 channels 与其他任务进行通信。通过将peers隐藏在,如一个 “actor” 任务内,我们无需使用 mutxes,并且还明确了序列化点。两件事件,“Bob 将消息发送给 Alice”和“Alice 加入”的顺序,由代理人的事件队列中,相应事件的顺序确定。

  1. # extern crate async_std;
  2. # extern crate futures;
  3. # use async_std::{
  4. # net::TcpStream,
  5. # prelude::*,
  6. # task,
  7. # };
  8. # use futures::channel::mpsc;
  9. # use futures::sink::SinkExt;
  10. # use std::sync::Arc;
  11. #
  12. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  13. # type Sender<T> = mpsc::UnboundedSender<T>;
  14. # type Receiver<T> = mpsc::UnboundedReceiver<T>;
  15. #
  16. # async fn connection_writer_loop(
  17. # mut messages: Receiver<String>,
  18. # stream: Arc<TcpStream>,
  19. # ) -> Result<()> {
  20. # let mut stream = &*stream;
  21. # while let Some(msg) = messages.next().await {
  22. # stream.write_all(msg.as_bytes()).await?;
  23. # }
  24. # Ok(())
  25. # }
  26. #
  27. # fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
  28. # where
  29. # F: Future<Output = Result<()>> + Send + 'static,
  30. # {
  31. # task::spawn(async move {
  32. # if let Err(e) = fut.await {
  33. # eprintln!("{}", e)
  34. # }
  35. # })
  36. # }
  37. #
  38. use std::collections::hash_map::{Entry, HashMap};
  39. #[derive(Debug)]
  40. enum Event { // 1
  41. NewPeer {
  42. name: String,
  43. stream: Arc<TcpStream>,
  44. },
  45. Message {
  46. from: String,
  47. to: Vec<String>,
  48. msg: String,
  49. },
  50. }
  51. async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
  52. let mut peers: HashMap<String, Sender<String>> = HashMap::new(); // 2
  53. while let Some(event) = events.next().await {
  54. match event {
  55. Event::Message { from, to, msg } => { // 3
  56. for addr in to {
  57. if let Some(peer) = peers.get_mut(&addr) {
  58. let msg = format!("from {}: {}\n", from, msg);
  59. peer.send(msg).await?
  60. }
  61. }
  62. }
  63. Event::NewPeer { name, stream } => {
  64. match peers.entry(name) {
  65. Entry::Occupied(..) => (),
  66. Entry::Vacant(entry) => {
  67. let (client_sender, client_receiver) = mpsc::unbounded();
  68. entry.insert(client_sender); // 4
  69. spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
  70. }
  71. }
  72. }
  73. }
  74. }
  75. Ok(())
  76. }
  1. 代理人(Broker)应处理两种类型的事件:一个消息,或一个新端点的到来。
  2. 代理人的内部状态是HashMap。请留意,在这里我们不需要Mutex,且可以自信地讲出,在代理人循环的每次迭代中,当前的端点集合(set of peers)是什么
  3. 为了处理消息,我们通过 channel,将其发送到每个目的地
  4. 为了处理新的 peer,我们首先在 peer 的 map 中,注册它…
  5. … 然后 spawn(孵) 一个专用任务(dedicated task),将消息实际写入 socket。