WebSockets

WebSocket 是一种可以在浏览器内使用的 socket,它基于一种支持全双工通信的协议。只要服务器端和客户端之间存在一个活跃的 WebSocket 连接,它们之间就可以在任意时刻收发信息。

兼容 HTML5 的现代 web 浏览器通过 JavaScript WebSocket API 可以原生支持 WebSocket。然而,WebSocket 并不只局限于在浏览器中使用,有许多 WebSocket 客户端库可以用于服务器间通信,或者提供给原生的移动应用来使用。在这些情况下使用 WebSocket 有一个很大的优势,就是复用 Play 服务器已经使用的 TCP 端口。

处理 WebSocket

到目前为止,我们都是用 Action 实例来处理标准的 HTTP 请求,然后返回标准的 HTTP 响应。WebSocket 则完全不同,它无法通过标准的 Action 来处理。

Play 提供了两种不同的内建机制来处理 WebSocket。第一种使用 actor,第二种使用 iteratee。这两种机制都可以通过 Play 为 WebSocket 提供的构建器来使用。

用 actor 处理 WebSocket

想用 actor 来处理 WebSocket,我们需要一个 akka.actor.Props 对象来描述 actor,当 WebSocket 连接建立起来后,Play 应该创建这个 actor。Play 会提供一个 akka.actor.ActorRef 用于向它发送消息,因此我们可以用它(即下面的 out)来创建 Props 对象:

  1. import play.api.mvc._
  2. import play.api.Play.current
  3. def socket = WebSocket.acceptWithActor[String, String] { request => out =>
  4. MyWebSocketActor.props(out)
  5. }

在这个例子中,我们向它发送消息的 actor 定义如下:

  1. import akka.actor._
  2. object MyWebSocketActor {
  3. def props(out: ActorRef) = Props(new MyWebSocketActor(out))
  4. }
  5. class MyWebSocketActor(out: ActorRef) extends Actor {
  6. def receive = {
  7. case msg: String =>
  8. out ! ("I received your message: " + msg)
  9. }
  10. }

任何从客户端收到的信息都会被发送到 actor(即 out),任何发送给 Play 提供的 actor 的信息都会被发送到客户端。上面的 actor 就简单地将它收到的信息在前面加上 I reveived your message:,然后返回给客户端。

检测 WebSocket 何时关闭

如果 WebSocket 已经关闭,Play 会自动停掉 actor。这意味着你可以通过实现 postStop 方法来清理 WebSocket 可能使用的资源。例如:

  1. override def postStop() = {
  2. someResource.close()
  3. }

关闭 WebSocket

当处理 WebSocket 的 actor 终止时,Play 会自动关闭 WebSocket。因此,你可以通过发送一个 PoisonPill 给你自己的 actor 来关闭 WebSocket:

  1. import akka.actor.PoisonPill
  2. self ! PoisonPill

拒绝 WebSocket 请求

有时候你可能想拒绝一个 WebSocket 请求,例如,必须是授权用户才能连接 WebSocket,或者 WebSocket 所关联到的资源不存在。Play 提供了 tryAcceptWithActor 来处理这种情况,允许你返回一个结果(如 forbidden 或 not found),或是返回一个处理 WebSocket 的 actor:

  1. import scala.concurrent.Future
  2. import play.api.mvc._
  3. import play.api.Play.current
  4. def socket = WebSocket.tryAcceptWithActor[String, String] { request =>
  5. Future.successful(request.session.get("user") match {
  6. case None => Left(Forbidden)
  7. case Some(_) => Right(MyWebSocketActor.props)
  8. })
  9. }

处理不同类型的信息

到目前为止,我们看到的都是在处理 String 信息。其实,Play 也提供了内建的方式来处理 Array[Byte]JsValue 信息。你可以把这些作为类型参数传递给 WebSocket 构建方法,例如:

  1. import play.api.mvc._
  2. import play.api.libs.json._
  3. import play.api.Play.current
  4. def socket = WebSocket.acceptWithActor[JsValue, JsValue] { request => out =>
  5. MyWebSocketActor.props(out)
  6. }

你可以已经注意到了,上面有两个类型参数(都是 JsValue),这允许我们将接收进来的信息与发送出去的信息定义为不同的类型。这通常对于低级类型来说是没什么用的,但当你想把信息转换成高级类型时,它就非常有用了。

例如,我们想接收 JSON 类型的信息,然后把它解析成 InEvent 类型,再把返回的信息转成 OutEvent 类型。我们要做的第一件事是为 InEventOutEvent 创建 JSON 格式(用于隐式转换):

  1. import play.api.libs.json._
  2. implicit val inEventFormat = Json.format[InEvent]
  3. implicit val outEventFormat = Json.format[OutEvent]

接着,我们为这两个类型创建 WebSocket FrameFormatter

  1. import play.api.mvc.WebSocket.FrameFormatter
  2. implicit val inEventFrameFormatter = FrameFormatter.jsonFrame[InEvent]
  3. implicit val outEventFrameFormatter = FrameFormatter.jsonFrame[OutEvent]

最终,我们可以在 WebSocket 中使用这两个类型:

  1. import play.api.mvc._
  2. import play.api.Play.current
  3. def socket = WebSocket.acceptWithActor[InEvent, OutEvent] { request => out =>
  4. MyWebSocketActor.props(out)
  5. }

现在,在我们的 actor 中,我们会接收到类型为 InEvent 的信息,而发送出去的信息类型为 OutEvent

用 iteratee 处理 WebSocket

actor 是一种更好的抽象来处理离散信息,而 iteratee 是一种更好的抽象来处理流。

处理 WebSocket 请求,使用 WebSocket 而不是 Action

  1. import play.api.mvc._
  2. import play.api.libs.iteratee._
  3. import play.api.libs.concurrent.Execution.Implicits.defaultContext
  4. def socket = WebSocket.using[String] { request =>
  5. // Log events to the console
  6. val in = Iteratee.foreach[String](println).map { _ =>
  7. println("Disconnected")
  8. }
  9. // Send a single 'Hello!' message
  10. val out = Enumerator("Hello!")
  11. (in, out)
  12. }

WebSocket 可以访问初始化 WebSocket 连接的 HTTP 请求的报头,允许你获取标准报头和会话数据。然而,它无权访问请求体或是 HTTP 响应。

当你通过这种方式来构建 WebSocket 时,你必须同时返回 inout 两个通道。

  • in 通道的类型是 Iteratee[A,Unit](其中 A 是消息类型,这里我们用的是 String),对于每条传来的消息,它都会被通知到。当客户端的 socket 关闭后,它会收到 EOF
  • out 通道的类型是 Enumerator[A],它会产生发送给 Web 客户端的消息。它也可以通过发送 EOF 在服务器端关闭连接。

在这个例子中,我们创建了一个简单的 iteratee,它会在控制台打印每条消息。同时,我们创建了一个简单的枚举器(enumerator)来发送一个 Hello! 消息。

小贴士:你可以在这里测试 WebSocket,只需要把 location 设置为 ws://localhost:9000 即可。

下面的例子直接忽略输入数据,在发送完 Hello! 消息后就关闭 socket:

  1. import play.api.mvc._
  2. import play.api.libs.iteratee._
  3. def socket = WebSocket.using[String] { request =>
  4. // Just ignore the input
  5. val in = Iteratee.ignore[String]
  6. // Send a single 'Hello!' message and close
  7. val out = Enumerator("Hello!").andThen(Enumerator.eof)
  8. (in, out)
  9. }

下面是另外一个例子,其中输入数据被打印到标准输出,然后利用 Concurrent.broadcast 广播到客户端:

  1. import play.api.mvc._
  2. import play.api.libs.iteratee._
  3. import play.api.libs.concurrent.Execution.Implicits.defaultContext
  4. def socket = WebSocket.using[String] { request =>
  5. // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  6. val (out, channel) = Concurrent.broadcast[String]
  7. // log the message to stdout and send response back to client
  8. val in = Iteratee.foreach[String] {
  9. msg => println(msg)
  10. // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
  11. // receive the pushed messages
  12. channel push("I received your message: " + msg)
  13. }
  14. (in,out)
  15. }