使用TCP

这一节的代码片段都假设有下面的导入:

  1. import akka.actor.{ Actor, ActorRef, Props }
  2. import akka.io.{ IO, Tcp }
  3. import akka.util.ByteString
  4. import java.net.InetSocketAddress

所有的AKKA IO API都通过manager对象访问。当使用一个IO API时,第一步是获得一个适当manager的引用。下面的代码显示了如何获得一个TCP manager引用。

  1. import akka.io.{ IO, Tcp }
  2. import context.system // implicitly used by IO(Tcp)
  3. val manager = IO(Tcp)

这个manager是一个actor,它为特殊的任务(如监听输入的连接)处理潜在的底层的IO资源(selector和channels)以及实例化workers。

1 连接

  1. object Client {
  2. def props(remote: InetSocketAddress, replies: ActorRef) =
  3. Props(classOf[Client], remote, replies)
  4. }
  5. class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
  6. import Tcp._
  7. import context.system
  8. IO(Tcp) ! Connect(remote)
  9. def receive = {
  10. case CommandFailed(_: Connect) =>
  11. listener ! "connect failed"
  12. context stop self
  13. case c @ Connected(remote, local) =>
  14. listener ! c
  15. val connection = sender()
  16. connection ! Register(self)
  17. context become {
  18. case data: ByteString =>
  19. connection ! Write(data)
  20. case CommandFailed(w: Write) =>
  21. // O/S buffer was full
  22. listener ! "write failed"
  23. case Received(data) =>
  24. listener ! data
  25. case "close" =>
  26. connection ! Close
  27. case _: ConnectionClosed =>
  28. listener ! "connection closed"
  29. context stop self
  30. } }
  31. }

连接到远程地址的第一步是发送Connect消息给TCP manager,除了上面所示的简单的形式存在,也有可能指定一个特定的本地InetSocketAddress去绑定,一个socket选项列表去应用。

SO_NODELAY(在windows中是TCP_NODELAY)这个socket选项在AKKA中默认是为true的。这个设置使Nagle的算法失效,使大部分程序的潜能得到了相当的提高。这个设置可以通过在Connect消息的socket选项列表中传递SO.TcpNoDelay(false)来覆盖。

TCP manager然后要么回复一个CommandFailed,要么回复一个代表新连接的内部actor。这个新的actor然后发送一个Connected消息到Connect消息的原始发送方。

为了激活这个新的连接,一个Register消息必须发送给连接actor。通知这个actor谁将从socket中接收数据。在这个过程开始之前,连接不可以使用。并且,有一个内部的超时时间,如果没有收到Register消息,连接actor将会关闭它自己。

连接actor将观察注册的handler,当某一个终止时关闭连接,然后清除这个连接相关的所有内部资源。

在上面的例子中,这个actor使用become从未连接的操作交换到连接的操作,表明在该状态观察到的命令和事件。CommandFailed将在下面讨论。ConnectionClosed是一个trait,标记不同的连接关闭事件。最后一行用相同的方式处理所有的连接关闭事件。可以在下面了解更多细粒度的连接关闭事件。

2 接收连接

  1. class Server extends Actor {
  2. import Tcp._
  3. import context.system
  4. IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
  5. def receive = {
  6. case b @ Bound(localAddress) =>
  7. // do some logging or setup ...
  8. case CommandFailed(_: Bind) => context stop self
  9. case c @ Connected(remote, local) =>
  10. val handler = context.actorOf(Props[SimplisticHandler])
  11. val connection = sender()
  12. connection ! Register(handler)
  13. } }

为了创建一个TCP服务器以及监听入站(inbound)连接,一个Bind命令必须发送给TCP manager。这将会通知TCP manager监听在特定InetSocketAddress上的TCP连接。为了绑定任意的接口,接口可以指定为0。

发送Bind消息的actor将接收一个Bound消息表明服务器已经准备接收输入连接。这个消息也包含InetSocketAddress,表明socket确实有限制(分解为IP地址和端口号)。

出站连接(outgoing)的处理过程与以上的处理过程是一样的。例子证明,当发送Register消息时,从一个特定的连接处理读可以通过命名另外一个actor为处理者来代理给这个actor。来源于系统的任何actor的写都可以发送到连接actor(发送Connected消息的actor)。最简化的处理如下:

  1. class SimplisticHandler extends Actor {
  2. import Tcp._
  3. def receive = {
  4. case Received(data) => sender() ! Write(data)
  5. case PeerClosed => context stop self
  6. }

出站actor的唯一的不同是内部actor管理监听端口-Bound消息的发送者-为Bind消息中的Connected消息观察命名为接收者的actor。当这个actor终止,监听端口也会关闭,所有与之相关的资源也会被释放;这时,存在的连接将不会被终止。

3 关闭连接

可以通过发送CloseConfirmedClose或者Abort给连接actor来关闭连接。

Close将会发送一个FIN请求来关闭连接,但是不会等待来自远程端点的确认。待处理的写入将会被清。如果关闭成功了,监听器将会被通知一个Closed

ConfirmedClose将通过发送一个FIN消息关闭连接的发送方向。但是数据将会继续被接收到直到远程的端点也关闭了连接。待处理的写入将会被清。如果关闭成功了,监听器将会被通知一个ConfirmedClosed

Abort将会发送一个RST消息给远程端点立即终止连接。待处理的写入将不会被清。如果关闭成功了,监听器将会被通知一个Aborted

如果连接已经被远程端点关闭了,PeerClosed将会被发送到监听者。缺省情况下,从这个端点而来的连接将会自动关闭。为了支持半关闭的连接,设置Register消息的keepOpenOnPeerClosed成员为true。在这种情况下,连接保持打开状态直到它接收到以上的关闭命令。

不管什么时候一个错误发生了,ErrorClosed都将会发送给监听者,强制关闭连接。

所有的关闭通知都是ConnectionClosed的子类型,所以不需要细粒度关闭事件的监听者可能用相同的方式处理所有的关闭事件。

4 写入连接

一旦一个连接已经建立了,可以从任意actor发送数据给它。数据的格式是Tcp.WriteCommand. Tcp.WriteCommand,它有三种不同的实现。

  • Tcp.Write。最简单的WriteCommand实现,它包裹一个ByteString实例和一个“ack”事件。这个ByteString拥有一个或者多个块,不可变的内存大小最大为2G。
  • Tcp.WriteFile。如果你想从一个文件发送raw数据,你用Tcp.WriteFile命令会非常有效。它允许你指定一个在磁盘上的字节块通过连接发送而不需要首先加载它们到JVM内存中。Tcp.WriteFile可以保持超过2G的数据以及一个ack事件(如果需要)。
  • Tcp.CompoundWrite。有时你可能想集合(或者交叉)几个Tcp.Write或者Tcp.WriteFile命令到一个原子的写命令,这个命令一次性完成写到连接。Tcp.CompoundWrite允许你这样做并且提供了三个好处。

    • 1 如下面章节介绍的,TCP连接actor在某一时间仅仅能处理一个单个的写命令。通过合并多个写到一个CompoundWrite,你可以以最小的开销传递,并且不需要通过一个ACK-based的消息协议一个个的回复它们。
    • 2 因为一个WriteCommand是原子的,你可以肯定其它actor不可能“注入”其它的写命令到你组合到一个CompoundWrite的写命令中去。几个actor写入相同的连接是一个重要的特征,这在某些情况下很难获得。
    • 3 一个CompoundWrite的“子写入(sub writes)”是普通的WriteWriteFile命令,它们请求“ack”事件。这些ACKs的发出与相应地“子写入”的完成是同时。这允许你附加超过一个Write或者WriteFile。或者通过在任意的点上发送临时的ACKs去让连接actor知道传递CompoundWrite的进度。

5 限制(Throttling)写和读

TCP连接actor的基本模型没有内部缓冲(它在同一时刻只能处理一个写,意味着它可以缓冲一个写直到它完全传递到OS内核)。需要在用户层为读和写处理拥挤问题。

对于back-pressuring写,有三种操作模型

  • ACK-based:每一个Write命令携带着一个任意的命令,如果这个对象不是Tcp.NoAck