Sending Messages

现在该实现另一半了 —— 发送消息。实现发送的最明显方法是,给每个connection_loop访问,平分彼此客户端TcpStream的 write。这样,一个客户可以直接.write_all,给到收件人消息。但是,这会是错误的:如果 Alice 发送bob: foo,然后 Charley 发送bob: bar,Bob 实际上可能会收到fobaor。在 socket 上,发送消息可能需要几个 syscalls,因此两个并发.write_all可能会互相干扰!

根据经验,每个 task 应该只写入一个TcpStream。因此,让我们创建一个connection_writer_loop task,它能在一个通道(channel)上接收消息,并将其写入 socket。该 task 将是消息序列化的重点。如果 Alice 和 Charley 同时向 Bob 发送了两条消息,则 Bob 会以到达消息的顺序,来查看消息。

  1. # extern crate async_std;
  2. # extern crate futures;
  3. # use async_std::{
  4. # net::TcpStream,
  5. # prelude::*,
  6. # };
  7. use futures::channel::mpsc; // 1
  8. use futures::sink::SinkExt;
  9. use std::sync::Arc;
  10. # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
  11. type Sender<T> = mpsc::UnboundedSender<T>; // 2
  12. type Receiver<T> = mpsc::UnboundedReceiver<T>;
  13. async fn connection_writer_loop(
  14. mut messages: Receiver<String>,
  15. stream: Arc<TcpStream>, // 3
  16. ) -> Result<()> {
  17. let mut stream = &*stream;
  18. while let Some(msg) = messages.next().await {
  19. stream.write_all(msg.as_bytes()).await?;
  20. }
  21. Ok(())
  22. }
  1. 我们将使用futures箱子的 channels。
  2. 简单起见,我们将使用unbounded channels,并且在本教程中不会讨论 backpressure。
  3. connection_loop一样,connection_writer_loop分享相同的TcpStream,我们需要将其放入Arc。注意,因为client只能读取 stream,和connection_writer_loop只写入 stream,我们在这里没有竞态问题。