示例:聊天服务器

我们将使用到目前为止已经涵盖的内容来构建聊天服务器。 这是一个非平凡的Tokio服务器应用程序。

服务器将使用基于行的协议。 行以\ r \ n结束。 这与telnet兼容,因此我们只使用telnet作为客户端。 当客户端连接时,它必须通过发送包含其“缺口”的行来标识自己(即,用于在其 Peer中标识客户端的某个名称)。

识别出客户端后,所有发送的行都以[nick]:为前缀,并广播给所有其他连接的客户端。

完整的代码可以在这里找到。 请注意,Tokio提供了一些尚未涵盖的额外抽象,这些抽象将使聊天服务器能够用更少的代码编写。

首先,生成一个新的箱子。

  1. $ cargo new --bin line-chat
  2. cd line-chat

接下来,添加必要的依赖项:

  1. [dependencies]
  2. tokio = "0.1"
  3. tokio-io = "0.1"
  4. futures = "0.1"
  5. bytes = "0.4"
  1. extern crate tokio;
  2. #[macro_use]
  3. extern crate futures;
  4. extern crate bytes;
  5. use tokio::io;
  6. use tokio::net::{TcpListener, TcpStream};
  7. use tokio::prelude::*;
  8. use futures::sync::mpsc;
  9. use futures::future::{self, Either};
  10. use bytes::{BytesMut, Bytes, BufMut};
  11. use std::collections::HashMap;
  12. use std::net::SocketAddr;
  13. use std::sync::{Arc, Mutex};
  14. /// Shorthand for the transmit half of the message channel.
  15. type Tx = mpsc::UnboundedSender<Bytes>;
  16. /// Shorthand for the receive half of the message channel.
  17. type Rx = mpsc::UnboundedReceiver<Bytes>;

现在,我们为服务器设置必要的结构。 这些步骤与Hello World中使用的步骤相同! 例:

  • 将TcpListener绑定到本地端口。
  • 定义接受入站连接并处理它们的任务。
  • 启动Tokio运行时
  • 产生服务器任务。

同样,在执行程序上生成服务器任务之前,实际上不会发生任何工作。

  1. fn main() {
  2. let addr = "127.0.0.1:6142".parse().unwrap();
  3. let listener = TcpListener::bind(&addr).unwrap();
  4. let server = listener.incoming().for_each(move |socket| {
  5. // TODO: Process socket
  6. Ok(())
  7. })
  8. .map_err(|err| {
  9. // Handle error by printing to STDOUT.
  10. println!("accept error = {:?}", err);
  11. });
  12. println!("server running on localhost:6142");
  13. // Start the server
  14. //
  15. // This does a few things:
  16. //
  17. // * Start the Tokio runtime (reactor, threadpool, etc...)
  18. // * Spawns the `server` task onto the runtime.
  19. // * Blocks the current thread until the runtime becomes idle, i.e. all
  20. // spawned tasks have completed.
  21. tokio::run(server);
  22. }

Chat State

聊天服务器要求从一个客户端接收的消息被广播到所有其他连接的客户端。 这将使用通过mpsc通道传递的消息来完成。

每个客户端套接字都将由任务管理。 每个任务都有一个关联的mpsc通道,用于接收来自其他客户端的消息。 所有这些通道的发送一半存储在Rc单元中以使它们可访问。

在这个例子中,我们将使用无界通道。 理想情况下,渠道永远不应该是无限制的,但在这种情况下处理背压有点棘手。 我们将把通道限制在后面专门用于处理背压的部分。

以下是共享状态的定义方式(上面已完成Tx类型别名):

  1. struct Shared {
  2. peers: HashMap<SocketAddr, Tx>,
  3. }

然后,在main函数的最顶部,创建状态实例。 此状态实例将移动到接受传入连接的任务中。

  1. let state = Arc::new(Mutex::new(Shared::new()));

现在我们可以处理传入的连接。 服务器任务更新为:

  1. listener.incoming().for_each(move |socket| {
  2. process(socket, state.clone());
  3. Ok(())
  4. })

服务器任务将所有套接字以及服务器状态的克隆传递给进程函数。 我们来定义那个功能。 它将具有这样的结构:

  1. fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
  2. // Define the task that processes the connection.
  3. let task = unimplemented!();
  4. // Spawn the task
  5. tokio::spawn(task);
  6. }

对tokio :: spawn的调用将在当前的Tokio运行时生成一个新任务。 所有工作线程都保留对存储在线程局部变量中的当前运行时的引用。 注意,尝试从Tokio运行时外部调用tokio :: spawn将导致恐慌。

所有连接处理逻辑必须能够理解协议。 该协议是基于行的,由\ r \ n终止。 它不是在字节流级别工作,而是更容易在帧级工作,即使用表示原子消息的值。

我们实现了一个包含套接字的编解码器,并公开了一个采用和消耗行的API。

线性编解码器

对于采用字节流类型(AsyncRead + AsyncWrite)并在帧级别公开读写API的类型,编解码器是一个松散术语。 tokio-io crate为编写编解码器提供了额外的帮助,在这个例子中,我们将手动完成。

Lines编解码器定义如下:

  1. struct Lines {
  2. socket: TcpStream,
  3. rd: BytesMut,
  4. wr: BytesMut,
  5. }
  6. impl Lines {
  7. /// Create a new `Lines` codec backed by the socket
  8. fn new(socket: TcpStream) -> Self {
  9. Lines {
  10. socket,
  11. rd: BytesMut::new(),
  12. wr: BytesMut::new(),
  13. }
  14. }
  15. }

从套接字读取的数据缓冲到rd中。 读取完整行后,将返回给调用者。 调用者提交以写入套接字的行被缓冲到wr中,然后刷新。

这是读取一半的实现方式:

  1. impl Stream for Lines {
  2. type Item = BytesMut;
  3. type Error = io::Error;
  4. fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
  5. // First, read any new data that might have been received
  6. // off the socket
  7. //
  8. // We track if the socket is closed here and will be used
  9. // to inform the return value below.
  10. let sock_closed = self.fill_read_buf()?.is_ready();
  11. // Now, try finding lines
  12. let pos = self.rd.windows(2)
  13. .position(|bytes| bytes == b"\r\n");
  14. if let Some(pos) = pos {
  15. // Remove the line from the read buffer and set it
  16. // to `line`.
  17. let mut line = self.rd.split_to(pos + 2);
  18. // Drop the trailing \r\n
  19. line.split_off(pos);
  20. // Return the line
  21. return Ok(Async::Ready(Some(line)));
  22. }
  23. if sock_closed {
  24. Ok(Async::Ready(None))
  25. } else {
  26. Ok(Async::NotReady)
  27. }
  28. }
  29. }
  30. impl Lines {
  31. fn fill_read_buf(&mut self) -> Result<Async<()>, io::Error> {
  32. loop {
  33. // Ensure the read buffer has capacity.
  34. //
  35. // This might result in an internal allocation.
  36. self.rd.reserve(1024);
  37. // Read data into the buffer.
  38. //
  39. // The `read_buf` fn is provided by `AsyncRead`.
  40. let n = try_ready!(self.socket.read_buf(&mut self.rd));
  41. if n == 0 {
  42. return Ok(Async::Ready(()));
  43. }
  44. }
  45. }
  46. }

该示例使用字节包中的BytesMut。 这为在网络环境中处理字节序列提供了一些很好的实用程序。 Stream实现产生的BytesMut值只包含一行。

与往常一样,实现返回Async的函数的关键是永远不会返回Async :: NotReady,除非函数实现收到Async :: NotReady本身。 在此示例中,仅当fill_read_buf返回NotReady时才返回NotReady,如果TcpStream :: read_buf返回NotReady,则fill_read_buf仅返回NotReady。

  1. struct Lines {
  2. socket: TcpStream,
  3. rd: BytesMut,
  4. wr: BytesMut,
  5. }
  6. impl Lines {
  7. fn buffer(&mut self, line: &[u8]) {
  8. // Push the line onto the end of the write buffer.
  9. //
  10. // The `put` function is from the `BufMut` trait.
  11. self.wr.put(line);
  12. }
  13. fn poll_flush(&mut self) -> Poll<(), io::Error> {
  14. // As long as there is buffered data to write, try to write it.
  15. while !self.wr.is_empty() {
  16. // Try to write some bytes to the socket
  17. let n = try_ready!(self.socket.poll_write(&self.wr));
  18. // As long as the wr is not empty, a successful write should
  19. // never write 0 bytes.
  20. assert!(n > 0);
  21. // This discards the first `n` bytes of the buffer.
  22. let _ = self.wr.split_to(n);
  23. }
  24. Ok(Async::Ready(()))
  25. }
  26. }
  27. fn main() {}

调用者通过调用缓冲区对所有行进行排队。 这会将该行附加到内部wr缓冲区。 然后,一旦所有数据排队,调用者就会调用poll_flush,它会对套接字进行实际写入操作。 poll_flush仅在所有排队数据成功写入套接字后才返回Ready。

与读取半部分类似,仅在函数实现收到NotReady本身时返回NotReady。

Lines编解码器在进程函数中使用如下:

  1. fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
  2. // Wrap the socket with the `Lines` codec that we wrote above.
  3. let lines = Lines::new(socket);
  4. // The first line is treated as the client's name. The client
  5. // is not added to the set of connected peers until this line
  6. // is received.
  7. //
  8. // We use the `into_future` combinator to extract the first
  9. // item from the lines stream. `into_future` takes a `Stream`
  10. // and converts it to a future of `(first, rest)` where `rest`
  11. // is the original stream instance.
  12. let connection = lines.into_future()
  13. // `into_future` doesn't have the right error type, so map
  14. // the error to make it work.
  15. .map_err(|(e, _)| e)
  16. // Process the first received line as the client's name.
  17. .and_then(|(name, lines)| {
  18. let name = match name {
  19. Some(name) => name,
  20. None => {
  21. // TODO: Handle a client that disconnects
  22. // early.
  23. unimplemented!();
  24. }
  25. };
  26. // TODO: Rest of the process function
  27. });
  28. }

广播消息

下一步是实现处理实际聊天功能的连接处理逻辑,即从一个客户端向所有其他客户端广播消息。

为了实现这一点,我们将明确地实现一个Future,它接受Lines编解码器实例并处理广播逻辑。 这个逻辑处理:

  • 在其消息通道上接收消息并将其写入套接字。
  • 从套接字接收消息并将其广播给所有 Peer

完全使用组合器实现此逻辑也是可能的,但需要使用拆分,但尚未涉及。 此外,这提供了一个机会,可以看到如何手动实现一个非平凡的 future

以下是处理连接的广播逻辑的 future定义:

  1. struct Peer {
  2. /// Name of the peer. This is the first line received from the client.
  3. name: BytesMut,
  4. /// The TCP socket wrapped with the `Lines` codec.
  5. lines: Lines,
  6. /// Handle to the shared chat state.
  7. state: Arc<Mutex<Shared>>,
  8. /// Receive half of the message channel.
  9. ///
  10. /// This is used to receive messages from peers. When a message is received
  11. /// off of this `Rx`, it will be written to the socket.
  12. rx: Rx,
  13. /// Client socket address.
  14. ///
  15. /// The socket address is used as the key in the `peers` HashMap. The
  16. /// address is saved so that the `Peer` drop implementation can clean up its
  17. /// entry.
  18. addr: SocketAddr,
  19. }

并且创建如下Peer实例:

  1. impl Peer {
  2. fn new(name: BytesMut,
  3. state: Arc<Mutex<Shared>>,
  4. lines: Lines) -> Peer
  5. {
  6. // Get the client socket address
  7. let addr = lines.socket.peer_addr().unwrap();
  8. // Create a channel for this peer
  9. let (tx, rx) = mpsc::unbounded();
  10. // Add an entry for this `Peer` in the shared state map.
  11. state.lock().unwrap()
  12. .peers.insert(addr, tx);
  13. Peer {
  14. name,
  15. lines,
  16. state,
  17. rx,
  18. addr,
  19. }
  20. }
  21. }

为其他 Peer创建mpsc通道,以将其消息发送到此新创建的 Peer。 在创建信道之后,将发送半部分插入 Peer映射中。 此条目在Peer的drop实现中删除。

  1. impl Drop for Peer {
  2. fn drop(&mut self) {
  3. self.state.lock().unwrap().peers
  4. .remove(&self.addr);
  5. }
  6. }

这是实现

  1. impl Future for Peer {
  2. type Item = ();
  3. type Error = io::Error;
  4. fn poll(&mut self) -> Poll<(), io::Error> {
  5. // Receive all messages from peers.
  6. loop {
  7. // Polling an `UnboundedReceiver` cannot fail, so `unwrap`
  8. // here is safe.
  9. match self.rx.poll().unwrap() {
  10. Async::Ready(Some(v)) => {
  11. // Buffer the line. Once all lines are buffered,
  12. // they will be flushed to the socket (right
  13. // below).
  14. self.lines.buffer(&v);
  15. }
  16. _ => break,
  17. }
  18. }
  19. // Flush the write buffer to the socket
  20. let _ = self.lines.poll_flush()?;
  21. // Read new lines from the socket
  22. while let Async::Ready(line) = self.lines.poll()? {
  23. println!("Received line ({:?}) : {:?}", self.name, line);
  24. if let Some(message) = line {
  25. // Append the peer's name to the front of the line:
  26. let mut line = self.name.clone();
  27. line.put(": ");
  28. line.put(&message);
  29. line.put("\r\n");
  30. // We're using `Bytes`, which allows zero-copy clones
  31. // (by storing the data in an Arc internally).
  32. //
  33. // However, before cloning, we must freeze the data.
  34. // This converts it from mutable -> immutable,
  35. // allowing zero copy cloning.
  36. let line = line.freeze();
  37. // Now, send the line to all other peers
  38. for (addr, tx) in &self.state.lock().unwrap().peers {
  39. // Don't send the message to ourselves
  40. if *addr != self.addr {
  41. // The send only fails if the rx half has been
  42. // dropped, however this is impossible as the
  43. // `tx` half will be removed from the map
  44. // before the `rx` is dropped.
  45. tx.unbounded_send(line.clone()).unwrap();
  46. }
  47. }
  48. } else {
  49. // EOF was reached. The remote client has disconnected.
  50. // There is nothing more to do.
  51. return Ok(Async::Ready(()));
  52. }
  53. }
  54. // As always, it is important to not just return `NotReady`
  55. // without ensuring an inner future also returned `NotReady`.
  56. //
  57. // We know we got a `NotReady` from either `self.rx` or
  58. // `self.lines`, so the contract is respected.
  59. Ok(Async::NotReady)
  60. }
  61. }

剩下的就是连接刚刚实施的Peer future。 为此,将客户端连接任务(在process函数中定义)扩展为使用Peer。

  1. let connection = lines.into_future()
  2. .map_err(|(e, _)| e)
  3. .and_then(|(name, lines)| {
  4. // If `name` is `None`, then the client disconnected without
  5. // actually sending a line of data.
  6. //
  7. // Since the connection is closed, there is no further work
  8. // that we need to do. So, we just terminate processing by
  9. // returning `future::ok()`.
  10. //
  11. // The problem is that only a single future type can be
  12. // returned from a combinator closure, but we want to
  13. // return both `future::ok()` and `Peer` (below).
  14. //
  15. // This is a common problem, so the `futures` crate solves
  16. // this by providing the `Either` helper enum that allows
  17. // creating a single return type that covers two concrete
  18. // future types.
  19. let name = match name {
  20. Some(name) => name,
  21. None => {
  22. // The remote client closed the connection without
  23. // sending any data.
  24. return Either::A(future::ok(()));
  25. }
  26. };
  27. println!("`{:?}` is joining the chat", name);
  28. // Create the peer.
  29. //
  30. // This is also a future that processes the connection, only
  31. // completing when the socket closes.
  32. let peer = Peer::new(
  33. name,
  34. state,
  35. lines);
  36. // Wrap `peer` with `Either::B` to make the return type fit.
  37. Either::B(peer)
  38. })
  39. // Task futures have an error of type `()`, this ensures we handle
  40. // the error. We do this by printing the error to STDOUT.
  41. .map_err(|e| {
  42. println!("connection error = {:?}", e);
  43. });

除了添加Peer之外,还会处理name == None。 在这种情况下,远程客户端在识别自身之前终止。

返回多个 futurename == None handler和 Peer)通过将返回的 future包装在Either中来处理。 要么是枚举,要为每个变体接受不同的 future类型。 这允许返回多个 future类型而不到达trait对象。