AKKA 2.3.6 Scala 文档

持久化

Akka持久化使有状态的actor能留存其内部状态,以便在因JVM崩溃、监管者引起,或在集群中迁移导致的actor启动、重启时恢复它。Akka持久化背后的关键概念是持久化的只是一个actor的内部状态的的变化,而不是直接持久化其当前状态 (除了可选的快照)。这些更改永远只能被附加到存储,没什么是可变的,这使得高事务处理率和高效复制成为可能。有状态actor通过重放保存的变化来恢复,从而使它们可以重建其内部状态。重放的可以是完整历史记录,或着从某一个快照开始从而可以大大减少恢复时间。Akka持久化也提供了“至少一次消息传递语义”的点对点通信。

注意

本模块被标记为“experimental”直到Akka 2.3.0引入它。我们将基于用户的反馈继续改善此API,这就意味着我们对维护版本的二进制不兼容性降到最低的保证不适用于akka.persistence包的内容。

Akka持久化受eventsourced启发,并且是其正式的替代者。它遵循eventsourced相同的概念和体系结构,但在API和实现层上则显著不同。又见《迁移指南:从Eventsourced到Akka Persistence 2.3》

Akka 2.3.4的变化

在Akka 2.3.4中,较早版本中的几个概念被推倒和简化。大体上讲;ProcessorEventsourcedProcessor被替换为PersistentActorChannelPersistentChannel被替换为AtLeastOnceDeliveryView被替换为PersistentView

更改的全部细节请参阅《迁移指南:从Akka Persistence (experimental) 2.3.3到Akka Persistence 2.3.4 (和2.4.x)》。老的类在一段时间内仍被包含并标记为废弃,以便用户顺利过渡。如果你需要的旧的文档,可以参考这里

依赖

Akka持久化是一个单独的jar文件。请确保你的项目中有以下依赖关系:

  1. "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.6"

体系结构

  • PersistentActor:是一个持久的、有状态的actor。它能够持久化消息到一个日志,并以线程安全的方式对它们作出响应。它是可被用于执行命令[command]和事件来源[event sourced]的actor。当一个持久化的actor被启动或重新启动时,该actor会被重播日志消息,从而可以从这些消息恢复内部状态。
  • PersistentView:一个视图是一个持久的、有状态的actor,来接收已经由另一个持久化actor写下的日志消息。视图本身并没有新的日志消息,相反,它只能从一个持久化actor复制消息流来更新内部状态。
  • AtLeastOnceDelivery:使用至少一次的传递语义将消息发送到目的地,以防发送者和接收者 JVM崩溃。
  • Journal:日志存储发送到一个持久化actor的消息序列。应用程序可以控制actor接收的消息中,哪些需要在日记中记录,哪些不需要。日志的存储后端是可插拔的。默认日志存储插件是写入本地文件系统,复制日志在社区插件中可以获得。
  • Snapshot store:快照存储区持久化一个持久化actor或一个视图的内部状态的快照。快照可用于优化恢复时间。快照存储区的存储后端是可插拔的。默认快照存储插件写入本地文件系统。

事件来源 Event sourcing

事件来源背后的基本思想很简单。一个持久化actor接收一个 (非持久化) 命令,它首先会被验证是否可以被应用到当前状态。在这里,验证可以意味着任何东西,例如从对命令消息字段的简单检查,到引用若干外部服务。如果验证成功,从该命令生成事件,表示命令的效果。然后这些事件被持久化,在成功的持久化后,用于改变actor的状态。当持久化actor需要恢复时,仅重播持久化的事件,因为我们知道他们可以被成功地应用。换句话说,与命令不同,被重播到一个持久化actor的事件不能失败。事件来源的actor当然也可以处理不改变应用程序状态的命令,例如查询命令。

Akka持久化通过PersistentActor特质支持事件来源。一个actor可以扩展这个特质来使用persist方法持久化和处理事件。PersistentActor的行为是通过实现receiveRecoverreceiveCommand定义的。下面的示例演示了这一点。

  1. import akka.actor._
  2. import akka.persistence._
  3. case class Cmd(data: String)
  4. case class Evt(data: String)
  5. case class ExampleState(events: List[String] = Nil) {
  6. def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  7. def size: Int = events.length
  8. override def toString: String = events.reverse.toString
  9. }
  10. class ExamplePersistentActor extends PersistentActor {
  11. override def persistenceId = "sample-id-1"
  12. var state = ExampleState()
  13. def updateState(event: Evt): Unit =
  14. state = state.updated(event)
  15. def numEvents =
  16. state.size
  17. val receiveRecover: Receive = {
  18. case evt: Evt => updateState(evt)
  19. case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
  20. }
  21. val receiveCommand: Receive = {
  22. case Cmd(data) =>
  23. persist(Evt(s"${data}-${numEvents}"))(updateState)
  24. persist(Evt(s"${data}-${numEvents + 1}")) { event =>
  25. updateState(event)
  26. context.system.eventStream.publish(event)
  27. }
  28. case "snap" => saveSnapshot(state)
  29. case "print" => println(state)
  30. }
  31. }

该示例定义了两种数据类型,CmdEvt 分别代表命令和事件。ExamplePersistentActorstate 包含在 ExampleState中的持久化的事件数据的列表。

持久化actor的receiveRecover方法定义如何通过在恢复过程中处理 EvtSnapshotOffer 消息来更新state。持久化actor的receiveCommand 方法是一个命令处理程序。在此示例中,命令处理是通过生成两个事件,然后被持久化和处理的。事件通过调用persist方法持久化,该方法第一个参数是事件(或一系列事件),第二个参数是事件处理程序。

persist方法以异步方式持久化事件,而事件处理程序对成功持久化的事件进行处理。成功持久化的事件在内部作为独立消息发送回给持久化actor,来触发事件处理执行。事件处理程序可能会包含持久化actor的状态并修改它。持久化事件的发送者也是相应命令的发送者。这使事件处理程序可以回复命令的发送者(未显示)。

事件处理程序的主要任务是:使用事件数据更改持久化actor状态,并通过发布事件通知其他人成功的状态变化。

当使用persist持久化事件的时候,可以保证持久化actor在persist调用和相应的事件处理程序的(多次)执行之间不会进一步收到命令。这在单个命令的上下文中多次调用persist的情况下也成立。

运行该示例最简单的方法是下载Typesafe Activator,并打开Akka Persistence Samples with Scala这个教程。它包含如何运行PersistentActorExample的说明。

注意

还有可能在正常处理过程中使用不同的命令处理程序,并使用context.become()context.unbecome()来恢复。恢复后使actor进入相同的状态,你需要特别谨慎地使用receiveRecover方法中的becomeunbecome进行相同的状态转换,就像你会在命令处理程序中做的一样。

标识符

一个持久化actor必须具有跨不同actor化身而不改变的标识符。必须使用persistenceId方法定义该标识符。

  1. override def persistenceId = "my-stable-persistence-id"
恢复

默认情况下,一个持久化actor通过在启动和重启时重放日志消息实现自动恢复。恢复过程中发送给持久化actor的新消息不会干扰重放消息。新消息只会在持久化actor恢复完成后被收到。

自定义恢复

通过使用空实现重写preStart,可以禁用启动时的自动恢复。

  1. override def preStart() = ()

在这种情况下,必须显式地通过Recover()消息的发送恢复一个持久化actor。

  1. processor ! Recover()

如果没有重写,preStart将发送一个Recover()消息到self。应用程序还可能重写preStart来定义进一步的Recover() 参数如序列号范围上界,例如。

  1. override def preStart() {
  2. self ! Recover(toSequenceNr = 457L)
  3. }

序列号范围上界可以用来恢复持久化actor到过去的某个状态,而不是当前状态。通过使用空实现重写preRestart,可以禁用重新启动时的自动恢复。

  1. override def preRestart(reason: Throwable, message: Option[Any]) = ()
恢复状态

一个持久化actor可以通过以下方法查询自身的恢复状态

  1. def recoveryRunning: Boolean
  2. def recoveryFinished: Boolean

有时持久化actor在恢复完成时,处理任意其他消息之前,需要执行额外的初始化。持久化actor会在恢复完成后,处理任意其他消息之前,收到一个特别的RecoveryCompleted消息。

如果该actor在从日志中恢复状态出现问题,该actor将发送RecoveryFailure消息并可以选择在receiveRecover中处理。如果该actor不处理RecoveryFailure消息,它将被停止。

  1. def receiveRecover: Receive = {
  2. case RecoveryCompleted => recoveryCompleted()
  3. case evt => //...
  4. }
  5. def receiveCommand: Receive = {
  6. case msg => //...
  7. }
  8. def recoveryCompleted(): Unit = {
  9. // perform init after recovery, before any other messages
  10. // ...
  11. }
放宽的局部一致性要求和高吞吐量的用例

如果面临放宽的局部一致性要求和高吞吐量,有时PersistentActor及其persist在处理大量涌入的命令时可能会不够,因为它必须等待知道给定命令相关的所有事件都处理完成后,才开始处理下一条命令。虽然这种抽象在大多数的情况下非常有用,有时你可能会放宽一致性要求——例如你会想要尽可能快速地处理命令,假设事件最终会持久化并在后台恰当处理,并在需要时追溯性地回应持久性故障。

persistAsync方法提供了一个工具,用于实现高吞吐量的持久化actor。在日志仍在致力于持久化和(或) 执行用户事件回调代码时,它会贮藏传入的命令。

在下面的示例中,事件回调可能在"任何时候"被调用,甚至在处理下一条命令之后。两个事件之间的顺序仍能得到保证("evt-b-1"将在"evt-a-2"后发送,而它又在"evt-a-1"后发送,以此类推)。

  1. class MyPersistentActor extends PersistentActor {
  2. override def persistenceId = "my-stable-persistence-id"
  3. def receiveRecover: Receive = {
  4. case _ => // handle recovery here
  5. }
  6. def receiveCommand: Receive = {
  7. case c: String => {
  8. sender() ! c
  9. persistAsync(s"evt-$c-1") { e => sender() ! e }
  10. persistAsync(s"evt-$c-2") { e => sender() ! e }
  11. }
  12. }
  13. }
  14. // usage
  15. processor ! "a"
  16. processor ! "b"
  17. // possible order of received messages:
  18. // a
  19. // b
  20. // evt-a-1
  21. // evt-a-2
  22. // evt-b-1
  23. // evt-b-2

注意

为了实现"命令源"模式,只需对所有传入消息马上调用persistAsync(cmd)(…),并在回调中处理它们。

警告

如果在调用persistAsync和日志确定写操作之间,actor被重启(或停止)时,将不会调用回调。

推迟行动,直到持久化处理程序已执行

使用persistAsync时,有时你会发现定义一些''在persistAsync处理程序调用之后发生''的行动是很好的。PersistentActor提供了一个工具方法defer,它类似于persistAsync,可是并不持久化过去的事件。推荐它用于读取的操作,和在你的域模型中没有相应事件的行动。

使用这种方法和持久化系列方法的使用是非常相似的,但它不会持久化过去的事件。它将保留在内存中,并在调用处理程序时使用。

  1. class MyPersistentActor extends PersistentActor {
  2. override def persistenceId = "my-stable-persistence-id"
  3. def receiveRecover: Receive = {
  4. case _ => // handle recovery here
  5. }
  6. def receiveCommand: Receive = {
  7. case c: String => {
  8. sender() ! c
  9. persistAsync(s"evt-$c-1") { e => sender() ! e }
  10. persistAsync(s"evt-$c-2") { e => sender() ! e }
  11. defer(s"evt-$c-3") { e => sender() ! e }
  12. }
  13. }
  14. }

注意sender()是可以在处理程序回调中安全访问的,并将指向defer处理程序被调用的命令的原始发送者。

调用方将以这样的顺序(保证)获得响应:

  1. processor ! "a"
  2. processor ! "b"
  3. // order of received messages:
  4. // a
  5. // b
  6. // evt-a-1
  7. // evt-a-2
  8. // evt-a-3
  9. // evt-b-1
  10. // evt-b-2
  11. // evt-b-3

警告

如果该actor在调用defer和日志处理与确认所有写入之间的回调,将不会在actor重启(或停止)时调用。

批处理写操作

为了优化吞吐量,一个持久化actor在高负荷下,会内部将一批事件先储存,然后再(作为一个批处理)写到日志中。批处理大小可以调整,从低和中等载荷作用下的1,动态增长到高负荷下可配置的最大大小(默认为200)。在使用persistAsync时,这极大地增加了最大吞吐量。

  1. akka.persistence.journal.max-message-batch-size = 200

只要一个batch达到最大大小或日志完成前一批写操作,就会立即触发持久化actor新的批处理写操作。批处理写操作永远不会是基于计时器的,从而将延迟保持在最低限度。

批处理也在内部使用确保事件写操作的原子性。在单个命令上下文中的所有事件将作为单个批处理写入到日志中(即使在一个命令中多次调用persist)。因此,PersistentActor的恢复将永远不会部分完成 (只持久化单个命令中事件的一个子集)。

删除邮件

若要删除所有消息(由一个持久化actor记录) 到指定的序列号,持久化actor可以调用deleteMessages方法。

一个可选的permanent参数指定是否应从日志中永久删除消息,或仅标记为已删除。在这两种情况下,消息都不会重播。Akka持久化以后的扩展将允许重播标记为已删除的消息,例如可用于调试。

持久化视图

持久化视图可以通过扩展PersistentView特质以及实现receivepersistenceId方法实现。

  1. class MyView extends PersistentView {
  2. override def persistenceId: String = "some-persistence-id"
  3. override def viewId: String = "some-persistence-id-view"
  4. def receive: Actor.Receive = {
  5. case payload if isPersistent =>
  6. // handle message from journal...
  7. case payload =>
  8. // handle message from user-land...
  9. }
  10. }

PersistenceId标识从视图中接收的日志消息来自的持久化actor。该引用持久化actor实际并非必须正在运行。视图直接从一个持久化actor日志中读取消息。当一个持久化actor后来启动,并开始写新消息时,将默认自动更新相应的视图。

可以确定一条消息是从日志中发送,还是由用户定义的另一个调用isPersistent方法的actor发送。尽管有这样的功能,很多时候你根本不需要此信息,并可以简单地将相同的逻辑应用于这两种情况(跳过if isPersistent检查)。

更新

actor系统的所有视图的默认更新间隔是可配置的:

  1. akka.persistence.view.auto-update-interval = 5s

PersistentView实现类还可以重写autoUpdateInterval方法,以返回对特定的视图类或视图实例自定义的更新时间间隔。应用程序也可以在任何时候通过对一个视图发送Update消息触发额外的数据更新。

  1. val view = system.actorOf(Props[MyView])
  2. view ! Update(await = true)

如果await参数设置为true,在Update请求后面的消息在增量消息重播时会被处理,在这个更新请求处理完成时触发。如果设置为false(默认值),更新请求后的消息可能与重播的消息流交织。自动更新始终以await = false运行。

actor系统中所有视图的自动更新可以在配置中关闭:

  1. akka.persistence.view.auto-update = off

实现类可以通过重载autoUpdate方法重写配置的默认值。若要限制的每个更新请求的重播消息数量,应用程序可以配置自定义的akka.persistence.view.auto-update-replay-max值或重载autoUpdateReplayMax方法。手动更新的重播消息数目可以通过Update消息的replayMax参数进行限制。

恢复

持久化视图的初始化恢复过程和持久化actor的工作方式相同(即通过发送一个Recover消息到自己)。初始化恢复的最大重放消息数由autoUpdateReplayMax确定。关于自定义初始化恢复更多的可能性参见恢复一节。

标识符

一个持久化视图必须具有跨不同actor化身而不改变的标识符。必须使用viewId方法定义该标识符。

ViewId必须不同于引用的persistenceId,除非快照视图和其持久化actor是共享的(即应用程序通常不需要做的东西)。

快照

快照可以大幅减少持久化actor和视图的恢复时间。下面讨论的快照内容是基于持久化actor的上下文,但这也同样适用于持久化视图。

持久化actor可以通过调用saveSnapshot方法保存内部状态的快照。如果快照保存成功,持久化actor接收SaveSnapshotSuccess消息,否则SaveSnapshotFailure消息

  1. class MyProcessor extends Processor {
  2. var state: Any = _
  3. def receive = {
  4. case "snap" => saveSnapshot(state)
  5. case SaveSnapshotSuccess(metadata) => // ...
  6. case SaveSnapshotFailure(metadata, reason) => // ...
  7. }
  8. }

这里metadata的类型是SnapshotMetadata

  1. case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) {
  2. @deprecated("Use persistenceId instead.", since = "2.3.4")
  3. def processorId: String = persistenceId
  4. }

在恢复期间,持久化actor可以通过SnapshotOffer消息获取以前保存的快照,从中可以初始化内部状态。

  1. class MyProcessor extends Processor {
  2. var state: Any = _
  3. def receive = {
  4. case SnapshotOffer(metadata, offeredSnapshot) => state = offeredSnapshot
  5. case Persistent(payload, sequenceNr) => // ...
  6. }
  7. }

紧随着SnapshotOffer的重播消息,如果有的话,是比快照年轻的。他们帮助持久化actor恢复到其当前(即最新的)状态。

一般情况下,如果持久化actor之前保存了多份快照,且这些快照中至少有一个满足SnapshotSelectionCriteria并可被指定用于恢复的情况下,才会给持久化actor提供一个快照。

  1. processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(
  2. maxSequenceNr = 457L,
  3. maxTimestamp = System.currentTimeMillis))

如果未指定,他们默认为SnapshotSelectionCriteria.Latest,即选择最新(= 最小)的快照。若要禁用基于快照的恢复,应用程序应使用SnapshotSelectionCriteria.None。如果已保存的快照没有匹配指定的SnapshotSelectionCriteria,恢复时将重播所有日志消息。

快照删除

一个持久化actor可以通过调用deleteSnapshot方法并指定快照的序列号与的时间戳作为参数,来删除单个快照。要批量删除匹配SnapshotSelectionCriteria的快照,持久化actor应该使用deleteSnapshots方法。

至少一次投递

要在至少一次投递语义下发送消息到目的地,你可以在发送端的PersistentActor混入AtLeastOnceDelivery特质。如果他们在可配置的超时时间内未得到确认,它负责重新发送消息。

注意

至少一次投递意味着原始消息发送顺序并不总是保留的,以及目的地可能接收重复的消息。这意味着语义不匹配那些正常的ActorRef发送操作:

  • 它不是在最多一次投递
  • 同一个发件人-接收人对的消息顺序不保留,因为可重新发送
  • 崩溃并重新启动后,消息仍然会发送到目的地——向新actor化身

这些语义和ActorPath所表示的相似(见actor生命周期),因此你在发送消息时需要提供的是一个路径而不是一个引用。消息被发送到一个指向actor selection的路径。

deliver方法用于将消息发送到目的地。当目的地已回复一条确认消息,调用confirmDelivery方法。

  1. import akka.actor.{ Actor, ActorPath }
  2. import akka.persistence.AtLeastOnceDelivery
  3. case class Msg(deliveryId: Long, s: String)
  4. case class Confirm(deliveryId: Long)
  5. sealed trait Evt
  6. case class MsgSent(s: String) extends Evt
  7. case class MsgConfirmed(deliveryId: Long) extends Evt
  8. class MyPersistentActor(destination: ActorPath)
  9. extends PersistentActor with AtLeastOnceDelivery {
  10. def receiveCommand: Receive = {
  11. case s: String => persist(MsgSent(s))(updateState)
  12. case Confirm(deliveryId) => persist(MsgConfirmed(deliveryId))(updateState)
  13. }
  14. def receiveRecover: Receive = {
  15. case evt: Evt => updateState(evt)
  16. }
  17. def updateState(evt: Evt): Unit = evt match {
  18. case MsgSent(s) =>
  19. deliver(destination, deliveryId => Msg(deliveryId, s))
  20. case MsgConfirmed(deliveryId) => confirmDelivery(deliveryId)
  21. }
  22. }
  23. class MyDestination extends Actor {
  24. def receive = {
  25. case Msg(deliveryId, s) =>
  26. // ...
  27. sender() ! Confirm(deliveryId)
  28. }
  29. }

deliverconfirmDelivery之间的相关性,是通过传入deliveryIdToMessage函数的deliveryId参数进行的。通常在消息中包含deliveryId传递到目的地,然后用一个包含相同deliveryId的消息进行答复。

deliveryId是无间隙严格单调递增序列号。相同的序列将用于所有目标actor,即当发送到多个目标时会看到序列中的空白,如果没有执行转译。

AtLeastOnceDelivery特质具有未经确认的消息和一个序列号组成的一个状态。它并不存储这个状态本身。你必须持久化从你的PersistentActor调用deliverconfirmDelivery所对应的事件,从而可以通过调用相同的方法在PersistentActor的恢复阶段恢复状态。有时这些事件可以来自其他业务级别的事件,而有时你必须创建单独的事件。在恢复过程中deliver的调用不会发出消息,但如果没有匹配的confirmDelivery执行,它将稍后发送。

支持快照功能是getDeliverySnapshotsetDeliverySnapshot提供的。AtLeastOnceDeliverySnapshot包含完整的投递状态,包括未经确认的消息。如果你需要一个自定义的快照保存actor其他部分的状态,你还必须包括AtLeastOnceDeliverySnapshot。它使用protobuf序列化,即利用Akka的通用序列化机制。最简单的方法是将AtLeastOnceDeliverySnapshot中的字节作为blob包含在你自定义的快照中。

重发尝试之间的间隔是由redeliverInterval方法定义的。其默认值可以用akka.persistence.at-least-once-delivery.redeliver-interval配置键来配置。可以在实现类中重写该方法来返回非默认值。

经过若干次尝试后,一个AtLeastOnceDelivery.UnconfirmedWarning消息将发送到self。重新发送仍会继续,但你可以选择调用confirmDelivery来取消重新发送。warnAfterNumberOfUnconfirmedAttempts方法定义发出警告之前传递尝试的次数。其默认值可以用akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts配置键配置。可以用实现类重写该方法来返回非默认值。

AtLeastOnceDelivery特质将消息保留在内存中,直到他们成功投递已被确认。actor能保留在内存中的未经确认的消息的最大数目限制是由maxUnconfirmedMessages方法定义的。如果超过了此限制deliver方法将不会接受更多的消息,它将抛出AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException。可以用akka.persistence.at-least-once-delivery.max-unconfirmed-messages配置键配置其默认值。可以用实现类重写该方法来返回非默认值。

存储插件

对于日志和快照存储的存储后端在Akka持久化中是可插拔的。默认日志插件将消息写入LevelDB(见本地LevelDB日志)。默认快照存储插件将快照作为单独的文件写入本地文件系统(请参阅本地快照存储区)。应用程序可以通过实现一个插件API并通过配置激活它们来提供他们自己的插件。插件开发需要以下引入:

  1. import akka.actor.ActorSystem
  2. import akka.persistence._
  3. import akka.persistence.journal._
  4. import akka.persistence.snapshot._
  5. import akka.testkit.TestKit
  6. import com.typesafe.config._
  7. import org.scalatest.WordSpec
  8. import scala.collection.immutable.Seq
  9. import scala.concurrent.Future
  10. import scala.concurrent.duration._
日志插件API

日志插件要扩展SyncWriteJournalAsyncWriteJournalSyncWriteJournal是一个actor,当存储后端的API只支持同步、阻塞写入时应扩展它。在这种情况下,要实现的方法是:

  1. /**
  2. * Plugin API: synchronously writes a batch of persistent messages to the journal.
  3. * The batch write must be atomic i.e. either all persistent messages in the batch
  4. * are written or none.
  5. */
  6. def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit
  7. /**
  8. * Plugin API: synchronously writes a batch of delivery confirmations to the journal.
  9. */
  10. @deprecated("writeConfirmations will be removed, since Channels will be removed.", since = "2.3.4")
  11. def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Unit
  12. /**
  13. * Plugin API: synchronously deletes messages identified by `messageIds` from the
  14. * journal. If `permanent` is set to `false`, the persistent messages are marked as
  15. * deleted, otherwise they are permanently deleted.
  16. */
  17. @deprecated("deleteMessages will be removed.", since = "2.3.4")
  18. def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit
  19. /**
  20. * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`
  21. * (inclusive). If `permanent` is set to `false`, the persistent messages are marked
  22. * as deleted, otherwise they are permanently deleted.
  23. */
  24. def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit

当存储后端的API支持异步、非阻塞写入时,应扩展AsyncWriteJournal这个actor。在这种情况下,要实现的方法是:

  1. /**
  2. * Plugin API: asynchronously writes a batch of persistent messages to the journal.
  3. * The batch write must be atomic i.e. either all persistent messages in the batch
  4. * are written or none.
  5. */
  6. def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit]
  7. /**
  8. * Plugin API: asynchronously writes a batch of delivery confirmations to the journal.
  9. */
  10. @deprecated("writeConfirmations will be removed, since Channels will be removed.", since = "2.3.4")
  11. def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Future[Unit]
  12. /**
  13. * Plugin API: asynchronously deletes messages identified by `messageIds` from the
  14. * journal. If `permanent` is set to `false`, the persistent messages are marked as
  15. * deleted, otherwise they are permanently deleted.
  16. */
  17. @deprecated("asyncDeleteMessages will be removed.", since = "2.3.4")
  18. def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit]
  19. /**
  20. * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
  21. * (inclusive). If `permanent` is set to `false`, the persistent messages are marked
  22. * as deleted, otherwise they are permanently deleted.
  23. */
  24. def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]

消息重播和序列号恢复始终是异步的,因此任何日志插件必须实现:

  1. /**
  2. * Plugin API: asynchronously replays persistent messages. Implementations replay
  3. * a message by calling `replayCallback`. The returned future must be completed
  4. * when all messages (matching the sequence number bounds) have been replayed.
  5. * The future must be completed with a failure if any of the persistent messages
  6. * could not be replayed.
  7. *
  8. * The `replayCallback` must also be called with messages that have been marked
  9. * as deleted. In this case a replayed message's `deleted` method must return
  10. * `true`.
  11. *
  12. * The channel ids of delivery confirmations that are available for a replayed
  13. * message must be contained in that message's `confirms` sequence.
  14. *
  15. * @param persistenceId persistent actor id.
  16. * @param fromSequenceNr sequence number where replay should start (inclusive).
  17. * @param toSequenceNr sequence number where replay should end (inclusive).
  18. * @param max maximum number of messages to be replayed.
  19. * @param replayCallback called to replay a single message. Can be called from any
  20. * thread.
  21. *
  22. * @see [[AsyncWriteJournal]]
  23. * @see [[SyncWriteJournal]]
  24. */
  25. def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr Unit): Future[Unit]
  26. /**
  27. * Plugin API: asynchronously reads the highest stored sequence number for the
  28. * given `persistenceId`.
  29. *
  30. * @param persistenceId persistent actor id.
  31. * @param fromSequenceNr hint where to start searching for the highest sequence
  32. * number.
  33. */
  34. def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long]

日志插件可以以下最小配置下激活:

  1. # Path to the journal plugin to be used
  2. akka.persistence.journal.plugin = "my-journal"
  3. # My custom journal plugin
  4. my-journal {
  5. # Class name of the plugin.
  6. class = "docs.persistence.MyJournal"
  7. # Dispatcher for the plugin actor.
  8. plugin-dispatcher = "akka.actor.default-dispatcher"
  9. }

指定的插件class必须具有一个无参数构造函数。plugin-dispatcher是用于插件actor的调度程序。如果未指定,则默认是对SyncWriteJournal插件的akka.persistence.dispatchers.default-plugin-dispatcher和对AsyncWriteJournal插件的akka.actor.default-dispatcher

快照存储插件API

一个快照存储插件必须扩展SnapshotStoreactor并实现以下方法:

  1. /**
  2. * Plugin API: asynchronously loads a snapshot.
  3. *
  4. * @param persistenceId processor id.
  5. * @param criteria selection criteria for loading.
  6. */
  7. def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
  8. /**
  9. * Plugin API: asynchronously saves a snapshot.
  10. *
  11. * @param metadata snapshot metadata.
  12. * @param snapshot snapshot.
  13. */
  14. def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
  15. /**
  16. * Plugin API: called after successful saving of a snapshot.
  17. *
  18. * @param metadata snapshot metadata.
  19. */
  20. def saved(metadata: SnapshotMetadata)
  21. /**
  22. * Plugin API: deletes the snapshot identified by `metadata`.
  23. *
  24. * @param metadata snapshot metadata.
  25. */
  26. def delete(metadata: SnapshotMetadata)
  27. /**
  28. * Plugin API: deletes all snapshots matching `criteria`.
  29. *
  30. * @param persistenceId processor id.
  31. * @param criteria selection criteria for deleting.
  32. */
  33. def delete(persistenceId: String, criteria: SnapshotSelectionCriteria)

可通过以下最小配置激活快照存储插件:

  1. # Path to the snapshot store plugin to be used
  2. akka.persistence.snapshot-store.plugin = "my-snapshot-store"
  3. # My custom snapshot store plugin
  4. my-snapshot-store {
  5. # Class name of the plugin.
  6. class = "docs.persistence.MySnapshotStore"
  7. # Dispatcher for the plugin actor.
  8. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
  9. }

指定的插件class必须具有一个无参数构造函数。plugin-dispatcher是用于插件actor的调度程序。如果未指定,则默认为akka.persistence.dispatchers.default-plugin-dispatcher

插件TCK

为了帮助开发人员构建正确和高质量存储插件,我们提供技术兼容性工具包 (简称TCK)。

TCK可用于Java或Scala项目中,对Scala你需要引入akka-persistence-tck-experimental依赖关系:

  1. "com.typesafe.akka" %% "akka-persistence-tck-experimental" % "2.3.5" % "test"

要在你的测试套件中包括日志TCK的测试,只需要扩展提供的JournalSpec

  1. class MyJournalSpec extends JournalSpec {
  2. override val config = ConfigFactory.parseString(
  3. """
  4. |akka.persistence.journal.plugin = "my.journal.plugin"
  5. """.stripMargin)
  6. }

我们还提供一个简单的基准测试类JournalPerfSpec,包括所有JournalSpec有的测试,还会执行日志上的一些长操作,并打印性能统计数据。虽然它并旨在提供一个适当的基准测试环境,它仍可以被用于对典型的应用场景下你的日志表现进行粗略的感受。

要在你的测试套件中包括SnapshotStore TCK测试,只需要扩展SnapshotStoreSpec

  1. class MySnapshotStoreSpec extends SnapshotStoreSpec {
  2. override val config = ConfigFactory.parseString(
  3. """
  4. |akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
  5. """.stripMargin)
  6. }

在你的插件需要一些初始设置的情况下(启动模拟数据库,删除临时文件等),你可以重写beforeAllafterAll来钩入测试生命周期:

  1. class MyJournalSpec extends JournalSpec {
  2. override val config = ConfigFactory.parseString(
  3. """
  4. |akka.persistence.journal.plugin = "my.journal.plugin"
  5. """.stripMargin)
  6. val storageLocations = List(
  7. new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")),
  8. new File(config.getString("akka.persistence.snapshot-store.local.dir")))
  9. override def beforeAll() {
  10. super.beforeAll()
  11. storageLocations foreach FileUtils.deleteRecursively
  12. }
  13. override def afterAll() {
  14. storageLocations foreach FileUtils.deleteRecursively
  15. super.afterAll()
  16. }
  17. }

我们强烈建议在你的测试套件包括这些规格,因为从头编写一个插件时,它们涵盖了广泛的,你可能会遗忘的测试用例。

预先包装好的插件

本地LevelDB日志

默认日志插件是akka.persistence.journal.leveldb,它将消息写入到本地的LevelDB实例。LevelDB文件的默认位置是当前工作目录中一个名为journal的目录。此位置可以由配置中指定的相对或绝对的路径更改:

  1. akka.persistence.journal.leveldb.dir = "target/journal"

用这个插件,每个actor系统可运行其自己私有的LevelDB实例。

共享LevelDB日志

一个LevelDB实例还可以由多个actor系统(在相同或不同节点上)共享。它,例如,允许持久化actor进行故障转移到备份节点,并从备份节点继续使用共享的日志实例。

警告

共享的LevelDB实例是单点故障,因此应仅用于测试目的。高可用、带复本的日志可以从社区插件中获得。

通过实例化SharedLeveldbStoreactor可以启动一个共享的LevelDB实例。

  1. }
  2. }
  3. class MyJournal extends AsyncWriteJournal {
  4. def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ???
  5. def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = ???
  6. def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = ???
  7. def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???
  8. def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ???
  9. def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ???
  10. }
  11. class MySnapshotStore extends SnapshotStore {
  12. def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
  13. def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
  14. def saved(metadata: SnapshotMetadata): Unit = ???
  15. def delete(metadata: SnapshotMetadata): Unit = ???
  16. def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ???
  17. }
  18. object PersistenceTCKDoc {
  19. new AnyRef {
  20. import akka.persistence.journal.JournalSpec
  21. class MyJournalSpec extends JournalSpec {
  22. override val config = ConfigFactory.parseString(
  23. """
  24. |akka.persistence.journal.plugin = "my.journal.plugin"
  25. """.stripMargin)
  26. }
  27. }
  28. new AnyRef {
  29. import akka.persistence.snapshot.SnapshotStoreSpec
  30. class MySnapshotStoreSpec extends SnapshotStoreSpec {
  31. override val config = ConfigFactory.parseString(
  32. """
  33. |akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
  34. """.stripMargin)
  35. }
  36. }
  37. new AnyRef {
  38. import java.io.File
  39. import akka.persistence.journal.JournalSpec
  40. import org.iq80.leveldb.util.FileUtils
  41. class MyJournalSpec extends JournalSpec {
  42. override val config = ConfigFactory.parseString(
  43. """
  44. |akka.persistence.journal.plugin = "my.journal.plugin"
  45. """.stripMargin)
  46. val storageLocations = List(
  47. new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")),
  48. new File(config.getString("akka.persistence.snapshot-store.local.dir")))
  49. override def beforeAll() {
  50. super.beforeAll()
  51. storageLocations foreach FileUtils.deleteRecursively
  52. }
  53. override def afterAll() {
  54. storageLocations foreach FileUtils.deleteRecursively
  55. super.afterAll()
  56. }
  57. }
  58. }
  59. }

默认情况下,共享的实例将日志消息写入到当前的工作目录中一个名为journal的本地目录。可以通过配置更改存储位置:

  1. akka.persistence.journal.leveldb-shared.store.dir = "target/shared"

使用共享的LevelDB存储的actor系统必须激活akka.persistence.journal.leveldb-shared插件。

  1. akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"

这个插件必须由注入(远程的)SharedLeveldbStore actor引用来进行初始化。注入是通过以actor引用作为参数调用SharedLeveldbJournal.setStore方法完成的。

  1. trait SharedStoreUsage extends Actor {
  2. override def preStart(): Unit = {
  3. context.actorSelection("akka.tcp://example@127.0.0.1:2552/user/store") ! Identify(1)
  4. }
  5. def receive = {
  6. case ActorIdentity(1, Some(store)) =>
  7. SharedLeveldbJournal.setStore(store, context.system)
  8. }
  9. }

内部日志命令(由持久化actor发送的)会缓冲直到注入完成。注入是幂等的,即只有第一次的注入被使用。

本地快照存储区

默认快照存储插件是akka.persistence.snapshot-store.local。它将快照文件写入本地文件系统。默认的存储位置是当前工作目录中一个名为snapshots 的目录。这可以通过配置中指定的相对或绝对的路径来更改:

  1. akka.persistence.snapshot-store.local.dir = "target/snapshots"

自定义序列化

快照序列化和Persistent消息的有效载荷是可以通过Akka序列化基础架构配置的。例如,如果应用程序想要序列化

  • 有效载荷的MyPayload类型与自定义的MyPayloadSerializer
  • 快照的类型MySnapshot与自定义的MySnapshotSerializer

它必须添加

  1. akka.actor {
  2. serializers {
  3. my-payload = "docs.persistence.MyPayloadSerializer"
  4. my-snapshot = "docs.persistence.MySnapshotSerializer"
  5. }
  6. serialization-bindings {
  7. "docs.persistence.MyPayload" = my-payload
  8. "docs.persistence.MySnapshot" = my-snapshot
  9. }
  10. }

在应用程序配置中。如果未指定,则使用默认的序列化程序。

测试

运行测试时使用sbt的LevelDB默认设置,请确保在你的sbt项目中设置fork := true,否则你将看到一个UnsatisfiedLinkError。或者,你可以切换到一个LevelDB Java 端口,通过这样的设置

  1. akka.persistence.journal.leveldb.native = off

  1. akka.persistence.journal.leveldb-shared.store.native = off

在Akka配置中。LevelDB 的Java端口仅用于测试目的。

杂项

状态机

状态机可以通过将FSM特质混入持久化actor来实现持久化。

  1. import akka.actor.FSM
  2. import akka.persistence.{ Persistent, Processor }
  3. class PersistentDoor extends Processor with FSM[String, Int] {
  4. startWith("closed", 0)
  5. when("closed") {
  6. case Event(Persistent("open", _), counter) =>
  7. goto("open") using (counter + 1) replying (counter)
  8. }
  9. when("open") {
  10. case Event(Persistent("close", _), counter) =>
  11. goto("closed") using (counter + 1) replying (counter)
  12. }
  13. }

配置

配置中有几个属性为持久化模块使用,请参阅参考配置