序列化

注:本节未经校验,如有问题欢迎提issue

Akka 提供了内置的支持序列化的扩展, 你可以使用内置的序列化功能,也可以自己写一个.

内置的序列化功能被Akka内部用来序列化消息,你也可以用它做其它的序列化工作。

用法

配置

为了让 Akka 知道对什么任务使用哪个Serializer, 你需要编辑你的 配置文件, 在 “akka.actor.serializers”一节将名称绑定为akka.serialization.Serializer的实现,像这样:

  1. akka {
  2. actor {
  3. serializers {
  4. java = "akka.serialization.JavaSerializer"
  5. proto = "akka.remote.serialization.ProtobufSerializer"
  6. myown = "docs.serialization.MyOwnSerializer"
  7. }
  8. }
  9. }

在将名称与Serializer的不同实现绑定后,你需要指定哪些类的序列化使用哪种Serializer, 这部分配置写在“akka.actor.serialization-bindings”一节中:

  1. akka {
  2. actor {
  3. serializers {
  4. java = "akka.serialization.JavaSerializer"
  5. proto = "akka.remote.serialization.ProtobufSerializer"
  6. myown = "docs.serialization.MyOwnSerializer"
  7. }
  8. serialization-bindings {
  9. "java.lang.String" = java
  10. "docs.serialization.Customer" = java
  11. "com.google.protobuf.Message" = proto
  12. "docs.serialization.MyOwnSerializable" = myown
  13. "java.lang.Boolean" = myown
  14. }
  15. }
  16. }

你只需要指定消息的接口或抽象基类. 当消息实现了配置中多个类时,为避免歧义, 将使用最具体的类, 即对其它所有配置指定类,它都是子类的那一个. 如果这个条件不满足, 例如因为同时配置java.io.SerializableMyOwnSerializable,而彼此都不是对方的子类型,将生成一个警告。

Akka 缺省提供使用 java.io.Serializableprotobuf com.google.protobuf.GeneratedMessage 的序列化工具 (后者仅当定义了对akka-remote模块的依赖时才有), 所以通常你不需要添加这两种配置; 由于 com.google.protobuf.GeneratedMessage 实现了 java.io.Serializable, 在不特别指定的情况下,protobuf 消息将总是使用 protobuf 协议来做序列化. 要禁止缺省的序列化工具,将其对应的类型设为 “none”:

  1. akka.actor.serialization-bindings {
  2. "java.io.Serializable" = none
  3. }

确认

如果你希望确认你的消息是可以被序列化的,你可以打开这个配置项:

  1. akka {
  2. actor {
  3. serialize-messages = on
  4. }
  5. }

警告

我们推荐只在运行测试代码的时候才打开这个选项. 在其它的场景打开它完全没有道理.

如果你希望确认你的 Props 可以被序列化, 你可以打开这个配置项:

  1. akka {
  2. actor {
  3. serialize-creators = on
  4. }
  5. }

警告

我们推荐只在运行测试代码的时候才打开这个选项. 在其它的场景打开它完全没有道理.

通过代码

如果你希望通过代码使用 Akka Serialization来进行序列化/反序列化, 以下是一些例子:

  1. import akka.actor.{ ActorRef, ActorSystem }
  2. import akka.serialization._
  3. import com.typesafe.config.ConfigFactory
  4. val system = ActorSystem("example")
  5. // Get the Serialization Extension
  6. val serialization = SerializationExtension(system)
  7. // Have something to serialize
  8. val original = "woohoo"
  9. // Find the Serializer for it
  10. val serializer = serialization.findSerializerFor(original)
  11. // Turn it into bytes
  12. val bytes = serializer.toBinary(original)
  13. // Turn it back into an object
  14. val back = serializer.fromBinary(bytes, manifest = None)
  15. // Voilá!
  16. back should be(original)

更多信息请见 akka.serialization._ScalaDoc

自定义

如果你希望创建自己的 Serializer, 应该已经看到配置例子中的 docs.serialization.MyOwnSerializer 了吧?

创建新的 Serializer

首先你需要为你的 Serializer 写一个类定义,像这样:

  1. import akka.actor.{ ActorRef, ActorSystem }
  2. import akka.serialization._
  3. import com.typesafe.config.ConfigFactory
  4. class MyOwnSerializer extends Serializer {
  5. // This is whether "fromBinary" requires a "clazz" or not
  6. def includeManifest: Boolean = false
  7. // Pick a unique identifier for your Serializer,
  8. // you've got a couple of billions to choose from,
  9. // 0 - 16 is reserved by Akka itself
  10. def identifier = 1234567
  11. // "toBinary" serializes the given object to an Array of Bytes
  12. def toBinary(obj: AnyRef): Array[Byte] = {
  13. // Put the code that serializes the object here
  14. // ... ...
  15. }
  16. // "fromBinary" deserializes the given array,
  17. // using the type hint (if any, see "includeManifest" above)
  18. // into the optionally provided classLoader.
  19. def fromBinary(bytes: Array[Byte],
  20. clazz: Option[Class[_]]): AnyRef = {
  21. // Put your code that deserializes here
  22. // ... ...
  23. }
  24. }

然后你只需要做填空,在 配置文件 中将它绑定到一个名称, 然后列出需要用它来做序列化的类即可.

Actor引用的序列化

所有的 ActorRef 都是用 JavaSerializer进行序列化的, 但如果你写了自己的serializer, 你可能想知道如何正确对它们进行序列化和反序列化。在一般情况下,要使用的本地地址取决于作为序列化信息收件人的远程地址的类型。像这样使用Serialization.serializedActorPath(actorRef)

  1. import akka.actor.{ ActorRef, ActorSystem }
  2. import akka.serialization._
  3. import com.typesafe.config.ConfigFactory
  4. // Serialize
  5. // (beneath toBinary)
  6. val identifier: String = Serialization.serializedActorPath(theActorRef)
  7. // Then just serialize the identifier however you like
  8. // Deserialize
  9. // (beneath fromBinary)
  10. val deserializedActorRef = extendedSystem.provider.resolveActorRef(identifier)
  11. // Then just use the ActorRef

这是假定序列化发生在通过远程传输发送消息的上下文中。不过有一些序列化有其他用途,例如在actor应用程序之外存储actor引用(数据库等)。在这种情况下,需要牢记的重要一点是,actor路径的地址部分决定actor如何被通信连通。存储一个本地actor路径可能是正确的选择,如果回取发生在相同的逻辑上下文,但当在不同的网络主机上反序列化时它还不够: 为此,它将需要包括系统的远程传输地址。一个actor系统并不局限于只有一个远程运输,使得这个问题变得更有意思。当向remoteAddr发送消息时,要找出恰当的地址,你可以像下面这样使用ActorRefProvider.getExternalAddressFor(remoteAddr)

  1. object ExternalAddress extends ExtensionKey[ExternalAddressExt]
  2. class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
  3. def addressFor(remoteAddr: Address): Address =
  4. system.provider.getExternalAddressFor(remoteAddr) getOrElse
  5. (throw new UnsupportedOperationException("cannot send to " + remoteAddr))
  6. }
  7. def serializeTo(ref: ActorRef, remote: Address): String =
  8. ref.path.toSerializationFormatWithAddress(ExternalAddress(extendedSystem).
  9. addressFor(remote))

注意

如果地址并没有hostport组件时ActorPath.toSerializationFormatWithAddress不同于toString,即它只为本地地址插入地址信息。

toSerializationFormatWithAddress 还为actor添加了唯一 id,当actor终止,然后又按照相同名称重新创建时,该id将改变。将消息发送到指向那个老actor的引用,将不会传递给新的actor。如果你不想要这种行为,例如在长期存储引用的情况下,你可以使用不包括的唯一id的 toStringWithAddress

这就要求你至少知道哪种类型的地址将被反序列化生成的actor引用的系统支持;如果你手头没有具体的地址,你可以创建一个虚拟使用正确协议的地址 Address(protocol, "", "", 0)(假定所用的实际传输与Akka的RemoteActorRefProvider是相兼容的)。

也有一个用来支持集群的默认远程地址(并且典型系统只有这一个);你可以像这样获取它:

  1. object ExternalAddress extends ExtensionKey[ExternalAddressExt]
  2. class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
  3. def addressForAkka: Address = system.provider.getDefaultAddress
  4. }
  5. def serializeAkkaDefault(ref: ActorRef): String =
  6. ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).
  7. addressForAkka)
Actor的深度序列化

做内部actor状态深度序列化,推荐的方法是使用Akka持久化

关于 Java 序列化

如果在做Java序列化时没有使用JavaSerializer, 你必须保证在动态变量JavaSerializer.currentSystem中提供一个有效的 ExtendedActorSystem . 它用在读取ActorRef表示时将其字符串表示转换成实际的引用. 动态变量DynamicVariable 是一个 thread-local变量,所以在反序列化任何可能包含actor引用的数据时要保证这个变量有值.

外部 Akka Serializers

Akka-protostuff by Roman Levenstein

Akka-quickser by Roman Levenstein

Akka-kryo by Roman Levenstein