开箱即用,Lagom使用Play JSON序列化请求和响应消息。您也可以定义自定义的序列化器用于自己的类型,使用任何可以织入的协议,从JSON到protobufs,再到XML。

Lagom如何选择消息序列化器

当您声明一个服务描述时,call, namedCall, pathCall, restCalltopic 方法全都使用**MessageSerializer**隐式参数来处理服务调用的消息。在Scala中全部使用隐式参数是可能的,您可以让Scala编译器处理这些隐式参数,或者您也可以明确显式地使用这些参数。
下面的例子展示了如何显式地使用Lagom默认的String序列化器。

  1. import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer
  2. trait HelloService extends Service {
  3. def sayHello: ServiceCall[String, String]
  4. override def descriptor = {
  5. import Service._
  6. named("hello").withCalls(
  7. call(sayHello)(MessageSerializer.StringMessageSerializer, MessageSerializer.StringMessageSerializer)
  8. )
  9. }
  10. }

service descriptors 文档中我们已经了解到,在一个样例类的伴生对象中如何声明一个隐式的Play JSON Format ,Lagom将使用Format处理消息类型。这种方式可以生效,是因为Lagom提供了一种隐式的MessageSerializer,其中包装了一个Play JSONFormatMessageSerializer伴生对象中的jsValueFormatMessageSerializer方法实现了这一点。
MessageSerializer伴生对象中也提供了其他可能用到的非JSON格式的隐式方法。例如, NotUsed, Done 或者 String 这些请求响应类型将使用默认的序列化器。Lagom还附带了对字节串序列化器(akka noop)的支持,因此可以直接访问装配对象。
JSON消息序列化器格式化对象也可以被显式地使用。我们假设您有一个带有id属性的消息对象,当您想要发起一次服务调用,由Play JSON指令集提供的默认格式化对象来完成格式化工作,但是当在JSON中id属性被命名为identifier时,您可能想要额外不同的格式化方式,这种情况您需要提供两种不同的格式化方式。

  1. import play.api.libs.json._
  2. import play.api.libs.functional.syntax._
  3. case class MyMessage(id: String)
  4. object MyMessage {
  5. implicit val format: Format[MyMessage] = Json.format
  6. val alternateFormat: Format[MyMessage] = {
  7. (__ \ "identifier")
  8. .format[String]
  9. .inmap(MyMessage.apply, _.id)
  10. }
  11. }

您可以看到我们已经把其中一个设定为隐式的,所以如果我们让隐式处理生效,它就会被选中。然后,非隐式调用可以在服务调用描述符中显式传递:

  1. trait MyService extends Service {
  2. def getMessage: ServiceCall[NotUsed, MyMessage]
  3. def getMessageAlternate: ServiceCall[NotUsed, MyMessage]
  4. override def descriptor = {
  5. import Service._
  6. named("my-service").withCalls(
  7. call(getMessage),
  8. call(getMessageAlternate)(
  9. implicitly[MessageSerializer[NotUsed, ByteString]],
  10. MessageSerializer.jsValueFormatMessageSerializer(
  11. implicitly[MessageSerializer[JsValue, ByteString]],
  12. MyMessage.alternateFormat
  13. )
  14. )
  15. )
  16. }
  17. }

自定义序列化器

JSON可能不是您想要使用的唯一一种织入格式。可以使用LagomsMessageSerializer特性来实现定制的序列化器。
正如我们已经看到的那样,Lagom中有两种类型的消息,严格消息和流消息。对于这两种类型的消息,Lagom提供了MessageSerializer的两个子接口,StrictMessageSerializerStreamedMessageSerializer,它们主要不同于序列化和反序列化的装配格式。严格的消息序列化器序列化和反序列化使用ByteString,也就是说,它们严格地在内存中工作,而流消息序列化器使用流,即Source[ByteString, _]
在研究如何实现序列化程序之前,需要介绍几个基本概念。

消息协议

Lagom有一个消息协议的概念。消息协议使用**MessageProtocol**类型表示,它们有三个属性:内容类型、字符集和版本。所有这些属性都是可选的,消息序列化程序可能使用也可能不使用。
消息协议大致转换为HTTP的 Content-TypeAccept头信息,如果使用编码版本的mime类型方案,则可能从中提取版本,或者也可能从URL中提取版本,这取决于服务的配置方式。

内容协商

Lagom消息序列化器能够使用内容协商来决定要使用的正确协议来相互通信。这可以用来指定不同的装配格式,比如JSON和XML,以及不同的版本。
Lagom的内容协商功能与HTTP相同。对于请求消息,客户端将选择它想要使用的任何协议,因此不需要进行协商。然后,服务器使用客户端发送的消息协议来决定如何反序列化请求。
对于响应,客户机发送一个它将接受的消息协议列表,服务器应该从该列表中选择一个协议来响应。然后,客户端将读取服务器选择的协议,并使用该协议反序列化响应。

协商序列化器

作为内容协商的结果,Lagom的MessageSerializer并不直接序列化和反序列化消息,而是提供了协商消息协议的方法,返回**NegotiatedSerializer****NegotiatedDeserializer**。实际上,正是这些协商好的类负责执行序列化和反序列化。
让我们看一个内容协商的示例。假设我们想实现一个自定义字符串MessageSerializer,它可以序列化为纯文本,也可以序列化为JSON,这取决于客户端请求的内容。如果您有一些客户端以JSON的形式发送文本主体,而另一些客户端以纯文本的形式发送文本主体,这可能会很有用,也许其中一个客户端是用一种方式完成工作的传统客户端,但现在您想用新客户端来完成另一种方式。
首先,我们将实现NegotiatedSerializer序列化为纯文本字符串:

  1. import akka.util.ByteString
  2. import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer
  3. import com.lightbend.lagom.scaladsl.api.transport.DeserializationException
  4. import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol
  5. import com.lightbend.lagom.scaladsl.api.transport.NotAcceptable
  6. import com.lightbend.lagom.scaladsl.api.transport.UnsupportedMediaType
  7. class PlainTextSerializer(val charset: String) extends NegotiatedSerializer[String, ByteString] {
  8. override val protocol = MessageProtocol(Some("text/plain"), Some(charset))
  9. def serialize(s: String) = ByteString.fromString(s, charset)
  10. }

protocol方法返回这个序列化器序列化到的协议,您可以看到,我们正在传递这个序列化器将在构造函数中使用的字符集。serialize方法是从StringByteString的直接转换。
接下来,我们将实现同样的事情,这次将序列化为JSON:

  1. import play.api.libs.json.Json
  2. import play.api.libs.json.JsString
  3. class JsonTextSerializer extends NegotiatedSerializer[String, ByteString] {
  4. override val protocol = MessageProtocol(Some("application/json"))
  5. def serialize(s: String) =
  6. ByteString.fromString(Json.stringify(JsString(s)))
  7. }

这里我们使用Play JSON将String转换为JSON字符串。
现在让我们实现纯文本反序列化器:

  1. import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer
  2. class PlainTextDeserializer(val charset: String) extends NegotiatedDeserializer[String, ByteString] {
  3. def deserialize(bytes: ByteString) =
  4. bytes.decodeString(charset)
  5. }

同样,我们将字符集作为构造函数参数,并直接从ByteString转换为String
类似地,我们有一个JSON文本反序列化器:

  1. import scala.util.control.NonFatal
  2. class JsonTextDeserializer extends NegotiatedDeserializer[String, ByteString] {
  3. def deserialize(bytes: ByteString) = {
  4. try {
  5. Json.parse(bytes.iterator.asInputStream).as[String]
  6. } catch {
  7. case NonFatal(e) => throw DeserializationException(e)
  8. }
  9. }
  10. }

现在我们已经实现了协商的序列化器和反序列化器,现在是实现MessageSerializer来进行实际的协议协商的时候了。我们的类将继承StrictMessageSerializer:

  1. import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer
  2. class TextMessageSerializer extends StrictMessageSerializer[String] {

我们需要做的下一件事是定义我们能接受的协议。这将被客户端用来设置Accept头文件:

  1. override def acceptResponseProtocols = List(
  2. MessageProtocol(Some("text/plain")),
  3. MessageProtocol(Some("application/json"))
  4. )

您可以看到,这个序列化器既支持文本协议,也支持json协议。需要注意的一点是,我们没有在文本协议中设置字符集,这是因为我们不需要对它进行特定的设置,我们可以接受服务器选择的任何字符集。
现在让我们实现serializerForRequest方法。客户端使用它来确定对请求使用哪个序列化器。因为在这个阶段,服务器和客户端之间没有通信,所以不能进行协商,所以客户端只是选择一个默认的序列化器,在这种情况下,一个utf-8纯文本序列化器如下:

  1. def serializerForRequest = new PlainTextSerializer("utf-8")

接下来我们将实现deserializer方法。这可以由服务器用于为请求选择反序列化器,也可以由客户机用于为响应选择反序列化器。在MessageProtocol中传递的是与请求或响应一起发送的内容类型,我们需要检查它,看看它是否是可以反序列化的内容类型,并返回适当的内容类型:

  1. def deserializer(protocol: MessageProtocol) = {
  2. protocol.contentType match {
  3. case Some("text/plain") | None =>
  4. new PlainTextDeserializer(protocol.charset.getOrElse("utf-8"))
  5. case Some("application/json") =>
  6. new JsonTextDeserializer
  7. case _ =>
  8. throw UnsupportedMediaType(protocol, MessageProtocol(Some("text/plain")))
  9. }
  10. }

注意,如果没有指定内容类型,我们将返回一个默认的反序列化器。在这里,我们也可能通过抛出异常而快速失败,但最好不要这样做,因为一些底层传输不允许将内容类型与消息一起传递。例如,如果这用于WebSocket请求,web浏览器不允许你为WebSocket请求设置内容类型。如果没有设置内容类型,则返回默认值,这样可以确保最大的可移植性。
接下来,我们将实现serializerForResponse方法。它接受客户端发送的已接受协议的列表,并选择一个用于序列化响应。如果它找不到它支持的对象,它就会抛出异常。请注意,任何属性的空值都意味着客户端将接受任何内容,就像客户端没有指定任何可接受的协议一样。

  1. import scala.collection.immutable
  2. def serializerForResponse(accepted: immutable.Seq[MessageProtocol]) = {
  3. accepted match {
  4. case Nil => new PlainTextSerializer("utf-8")
  5. case protocols =>
  6. protocols
  7. .collectFirst {
  8. case MessageProtocol(Some("text/plain" | "text/*" | "*/*" | "*"), charset, _) =>
  9. new PlainTextSerializer(charset.getOrElse("utf-8"))
  10. case MessageProtocol(Some("application/json"), _, _) =>
  11. new JsonTextSerializer
  12. }
  13. .getOrElse {
  14. throw NotAcceptable(accepted, MessageProtocol(Some("text/plain")))
  15. }
  16. }
  17. }

示例

Protocol buffer 序列化器

Protocol buffers是一种与JSON无关的高性能语言替代方案,对于服务之间的内部通信来说,它是一个非常好的选择。下面是一个如何为protoc生成的Order类编写MessageSerializer的例子:

  1. import akka.util.ByteString
  2. import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedDeserializer
  3. import com.lightbend.lagom.scaladsl.api.deser.MessageSerializer.NegotiatedSerializer
  4. import com.lightbend.lagom.scaladsl.api.deser.StrictMessageSerializer
  5. import com.lightbend.lagom.scaladsl.api.transport.MessageProtocol
  6. import scala.collection.immutable
  7. class ProtobufSerializer extends StrictMessageSerializer[Order] {
  8. private final val serializer = {
  9. new NegotiatedSerializer[Order, ByteString]() {
  10. override def protocol: MessageProtocol =
  11. MessageProtocol(Some("application/octet-stream"))
  12. def serialize(order: Order) = {
  13. val builder = ByteString.createBuilder
  14. order.writeTo(builder.asOutputStream)
  15. builder.result
  16. }
  17. }
  18. }
  19. private final val deserializer = {
  20. new NegotiatedDeserializer[Order, ByteString] {
  21. override def deserialize(bytes: ByteString) =
  22. Order.parseFrom(bytes.iterator.asInputStream)
  23. }
  24. }
  25. override def serializerForRequest =
  26. serializer
  27. override def deserializer(protocol: MessageProtocol) =
  28. deserializer
  29. override def serializerForResponse(
  30. acceptedMessageProtocols: immutable.Seq[MessageProtocol]
  31. ) = serializer
  32. }

注意,这个MessageSerializer不会做任何内容协商。许多情况下,内容协商是一种过分的手段,如果你不需要它,不必去实现它。