AKKA 2.3.6 Scala 文档

有限状态机(FSM)

概述

FSM (有限状态机) 可以mixin到akka Actor中,其概念在Erlang 设计原则中有最好的描述。

一个 FSM 可以描述成一组具有如下形式的关系 :

State(S) x Event(E) -> Actions (A), State(S')

这些关系的意思可以这样理解:

如果我们当前处于状态S,发生了E事件,则我们应执行操作A,然后将状态转换为S’。

一个简单的例子

为了演示FSM trait的大部分功能,考虑一个actor,它接收到一组突然爆发的消息而将其送入邮箱队列,然后在消息爆发期过后或收到flush请求时再对消息进行发送。

首先,假设以下所有代码都使用这些import语句:

  1. import akka.actor.{ Actor, ActorRef, FSM }
  2. import scala.concurrent.duration._

我们的 “Buncher” actor的契约是接收或产生以下消息:

  1. // received events
  2. case class SetTarget(ref: ActorRef)
  3. case class Queue(obj: Any)
  4. case object Flush
  5. // sent events
  6. case class Batch(obj: immutable.Seq[Any])

SetTarget用来启动,为Batches设置发送目标;Queue会添加到内部队列而Flush标志着消息爆发的结束。

  1. // states
  2. sealed trait State
  3. case object Idle extends State
  4. case object Active extends State
  5. sealed trait Data
  6. case object Uninitialized extends Data
  7. case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data

这个actor可以处于两种状态: 队列中没有消息(即Idle)或有消息(即Active)。只要一直有消息进来并且没有flush请求,它就停留在active状态。这个actor的内部状态数据是由批消息的发送目标actor引用和实际的消息队列组成.

现在让我们看看我们的FSM actor的框架:

  1. class Buncher extends Actor with FSM[State, Data] {
  2. startWith(Idle, Uninitialized)
  3. when(Idle) {
  4. case Event(SetTarget(ref), Uninitialized) =>
  5. stay using Todo(ref, Vector.empty)
  6. }
  7. // transition elided ...
  8. when(Active, stateTimeout = 1 second) {
  9. case Event(Flush | StateTimeout, t: Todo) =>
  10. goto(Idle) using t.copy(queue = Vector.empty)
  11. }
  12. // unhandled elided ...
  13. initialize()
  14. }

基本策略就是声明actor类,混入FSM trait,并将可能的状态和数据设定为类型参数。在actor的内部使用一个DSL来声明状态机:

  • startsWith定义初始状态和初始数据
  • 然后对每一个状态有一个when() { … }声明待处理的事件(可以是多个,传入的PartialFunction将用orElse进行连接)
  • 最后使用initialize来启动它,这会执行到初始状态的转换并启动定时器(如果需要的话)。

在这个例子中,我们从IdleUninitialized状态开始,这两种状态下只处理SetTarget()消息;stay准备结束这个事件的处理而不离开当前状态,而using使得FSM将其内部状态(这时为Uninitialized)替换为一个新的包含目标actor引用的Todo()对象。Active状态声明了一个状态超时,意思是如果1秒内没有收到消息,将生成一个FSM.StateTimeout消息。在本例中这与收到Flush指令消息具有相同的效果,即转回Idle状态并将内部队列重置为空vector。但消息是如何进入队列的?由于在两种状态下都要做这件事,我们利用了任何when()块未处理的消息被发送到whenUnhandled()块这个事实:

  1. whenUnhandled {
  2. // common code for both states
  3. case Event(Queue(obj), t @ Todo(_, v)) =>
  4. goto(Active) using t.copy(queue = v :+ obj)
  5. case Event(e, s) =>
  6. log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
  7. stay
  8. }

这里第一个case是将Queue()请求加入内部队列中并进入Active状态(当然显然地,如果已经在Active状态则停留),前提是收到Queue()时FSM数据不是Uninitialized。否则——及其它所有未命中的情况——第二个case记录一个警告到日志并保持内部状态。

最后剩下的只有Batches实际上是如何发送到目标的,这里我们使用onTransition机制:你可以声明多个这样的块,在状态切换发生时(即只有当状态真正改变时)所有的块都将被尝试来作匹配。

  1. onTransition {
  2. case Active -> Idle =>
  3. stateData match {
  4. case Todo(ref, queue) => ref ! Batch(queue)
  5. }
  6. }

状态转换回调是一个偏函数,它以一对状态作为输入的——当前状态和下一个状态。FSM trait为此提供了一个方便的箭头形式的提取器,非常贴心地提醒你所匹配到的状态转换的方向。在状态转换过程中,如示例所示旧状态数据可以通过stateData获得,新状态数据可以通过nextStateData获得。

要确认这个buncher真实可用,可以很简单地利用测试Actor系统(Scala)中的工具写一个测试,它方便地将ScalaTest trait融入AkkaSpec中:

  1. import akka.actor.Props
  2. import scala.collection.immutable
  3. class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
  4. // fsm code elided ...
  5. "simple finite state machine" must {
  6. "demonstrate NullFunction" in {
  7. class A extends Actor with FSM[Int, Null] {
  8. val SomeState = 0
  9. when(SomeState)(FSM.NullFunction)
  10. }
  11. }
  12. "batch correctly" in {
  13. val buncher = system.actorOf(Props(classOf[Buncher], this))
  14. buncher ! SetTarget(testActor)
  15. buncher ! Queue(42)
  16. buncher ! Queue(43)
  17. expectMsg(Batch(immutable.Seq(42, 43)))
  18. buncher ! Queue(44)
  19. buncher ! Flush
  20. buncher ! Queue(45)
  21. expectMsg(Batch(immutable.Seq(44)))
  22. expectMsg(Batch(immutable.Seq(45)))
  23. }
  24. "not batch if uninitialized" in {
  25. val buncher = system.actorOf(Props(classOf[Buncher], this))
  26. buncher ! Queue(42)
  27. expectNoMsg
  28. }
  29. }
  30. }

参考

FSM Trait 及 FSM Object

FSM trait只能被混入到Actor子类中。这里选择了 使用self类型的写法而不是继承Actor,这样是为了标明事实上创建的是一个actor。//TODO这里代码有误?

  1. class Buncher extends Actor with FSM[State, Data] {
  2. // fsm body ...
  3. initialize()
  4. }

注意

FSM特质定义receive方法处理内部消息,并将其它一切通过FSM逻辑传递(根据当前状态)。当重写receive方法,请牢记例如超时状态处理取决于实际通过FSM逻辑传递的消息。

FSM trait 有两个类型参数 :

  • 所有状态名称的父类型,通常是一个sealed trait,状态名称作为case object来继承它
  • 状态数据的类型,由FSM模块自己跟踪。

注意

状态数据与状态名称一起描述了状态机的内部状态;如果你坚持这种模式,不向FSM类中加入可变量成员,你就可以充分享受在一些周知的位置改变所有内部状态的好处.

定义状态

状态的定义是通过一次或多次调用

  1. when(<name>[, stateTimeout = <timeout>])(stateFunction)

方法。

给定的名称对象必须与为FSM trait指定的第一个参数类型相匹配。这个对象将被用作一个hash表的键,所以你必须确保它正确地实现了equals hashCode方法;特别是它不能为可变变量。满足这些条件的最简单的就是 case objects。

如果给定了stateTimeout参数,那么所有到这个状态的转换,包括停留,缺省都会收到这个超时。初始化转换时显式指定一个超时可以用来覆盖这个缺省行为,更多信息见发起状态转换。在操作执行的过程中可以通过使用setStateTimeout(state, duration)方法来修改任何状态的超时时间。这使得运行时配置(例如通过外部消息)成为可能。

参数stateFunction是一个PartialFunction[Event, State],可以很方便地用偏函数的语法来指定,见下例:

  1. when(Idle) {
  2. case Event(SetTarget(ref), Uninitialized) =>
  3. stay using Todo(ref, Vector.empty)
  4. }
  5. when(Active, stateTimeout = 1 second) {
  6. case Event(Flush | StateTimeout, t: Todo) =>
  7. goto(Idle) using t.copy(queue = Vector.empty)
  8. }

Event(msg: Any, data: D) case类将FSM持有的数据类型参数化,从而可以方便地进行模式匹配。

警告

它要求你为所有可能的FSM状态定义处理程序,否则尝试切换到未声明的状态时会有失败。

推荐的实践做法是,将状态声明为对象,继承自一个sealed特质,然后验证是否对每一个状态都有when子句。如果你想要一些状态“不被处理”(详见下文),它仍然需要像这样声明:

  1. when(SomeState)(FSM.NullFunction)
定义初始状态

每个FSM都需要一个起点,用以下代码声明

  1. startWith(state, data[, timeout])

其中可选的超时参数将覆盖所有为期望的初始状态所指定的值。如果你想要取消缺省的超时,使用Duration.Inf

未处理事件

如果一个状态未能处理一个收到的事件,日志中将记录一条警告。这种情况下如果你想做点其它的事,你可以使用whenUnhandled(stateFunction)来指定:

  1. whenUnhandled {
  2. case Event(x: X, data) =>
  3. log.info("Received unhandled event: " + x)
  4. stay
  5. case Event(msg, _) =>
  6. log.warning("Received unknown event: " + msg)
  7. goto(Error)
  8. }

在此处理程序中可以使用stateName方法查询FSM的状态。

重要:这个处理器不会入栈叠加,这意味着whenUnhandled的每一次调用都会覆盖先前指定的处理程序。

发起状态转换

任何stateFunction的结果都必须是下一个状态的定义,除非是终止FSM,这种情况在从内部终止中介绍。状态定义可以是当前状态,由stay指令描述,或由goto(state)指定的另一个状态。结果对象可以通过下面列出的修饰器作进一步的限制:

  • forMax(duration)

这个修饰器为新状态指定了一个状态超时。这意味着将启动一个定时器,它过期时将向FSM发送一个StateTimeout消息。其间接收到任何其它消息,定时器都将被取消;你可以确定的事实是StateTimeout消息不会在任何一个中间消息之后被处理。

这个修饰器也可以用于覆盖任何对目标状态指定的缺省超时。如果要取消缺省超时,可以使用Duration.Inf

  • using(data)

这个修饰器用给定的新数据替换旧的状态数据。如果你遵循上面的建议,这是内部状态数据被修改的唯一位置。

  • replying(msg)

这个修饰器为当前处理完的消息发送一个应答,不同的是它不会改变状态转换。

所有的修饰器都可以链式调用来获得优美简洁的表达方式:

  1. when(SomeState) {
  2. case Event(msg, _) =>
  3. goto(Processing) using (newData) forMax (5 seconds) replying (WillDo)
  4. }

事实上这里所有的括号都不是必须的,但它们在视觉上将修饰器和它们的参数区分开,因而使代码对于他人来说有更好的可读性。

注意

请注意return语句不可以用于when或类似的代码块中;这是Scala的限制。要么使用if () … else …重构你的代码,要么将它改写到一个方法定义中。

监控状态转换

在概念上,转换发生在“两个状态之间”,也就是在你放在事件处理代码块执行的任何操作之后;这是显然的,因为只有在事件处理逻辑返回了值以后,才能确定下一个状态。相对于设置内部状态变量,你不需要担心操作顺序的细节,因为FSM actor中的所有代码都是在一个线程中运行的。

内部监控

到目前为止,FSM DSL都围绕着状态和事件。另外一种视角是将其描述成一系列的状态转换。是通过这个方法实现的

  1. onTransition(handler)

它将操作与状态转换联系起来,而不是联系状态与事件。这个处理器是一个偏函数,它以一对状态作为输入;不需要结果状态,因为不可能改变正在进行的状态转换。

  1. onTransition {
  2. case Idle -> Active => setTimer("timeout", Tick, 1 second, true)
  3. case Active -> _ => cancelTimer("timeout")
  4. case x -> Idle => log.info("entering Idle from " + x)
  5. }

提取器->用来解开状态对,并以清晰的形式表达了状态转换的方向。与通常的模式匹配一样,可以用_来表示不关心的内容;或者你可以将不关心的状态绑定到一个无约束的变量,例如像上一个例子那样供记日志使用。

也可以向onTransition传递一个以两个状态为参数的函数,此时你的状态转换处理逻辑是定义成方法的:

  1. onTransition(handler _)
  2. def handler(from: StateType, to: StateType) {
  3. // handle it here ...
  4. }

用这个方法注册的处理器是堆栈迭加的,这样你可以将onTransition块和when块分散定义以适应设计的需要。但需要注意的是,所有的处理器对每一次状态转换都会被调用,而不只是最先匹配的那个。这是有意设计的,使得你可以将某一部分状态转换处理放在某一个地方,而不用担心先前的定义会屏蔽后面的逻辑;当然这些操作还是按定义的顺序执行的。

注意

这种内部监控可以用于通过状态转换来构建你的FSM,这样在添加新的目标状态时,不会忘记例如在离开某个状态时,取消定时器这种操作。

外部监控

可以通过发送一个SubscribeTransitionCallBack(actorRef)消息注册外部actor,来接收状态转换的通知。这个被命名的actor将立即收到CurrentState(self, stateName)消息,并在之后每次进入新状态时收到Transition(actorRef, oldState, newState)消息。可以通过向FSM actor发送UnsubscribeTransitionCallBack(actorRef)来注销外部监控actor。

停止一个监听器,而不注销,将不会从注册列表中移除它;需要在停止监听器前使用UnsubscribeTransitionCallback

转换状态

when()传递的偏函数参数,可以使用Scala充分的函数式编程工具来转换。为了保留类型推断,还有一个辅助函数,它可以在通用处理逻辑中使用,并被应用到不同子句:

  1. when(SomeState)(transform {
  2. case Event(bytes: ByteString, read) => stay using (read + bytes.length)
  3. } using {
  4. case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 =>
  5. goto(Processing)
  6. })

不用说此方法的参数也可能被存储,被多次使用,例如在几个不同的when()代码块中应用相同的转换:

  1. val processingTrigger: PartialFunction[State, State] = {
  2. case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 =>
  3. goto(Processing)
  4. }
  5. when(SomeState)(transform {
  6. case Event(bytes: ByteString, read) => stay using (read + bytes.length)
  7. } using processingTrigger)
定时器

除了状态超时,FSM还管理以String类型名称为标识的定时器。你可以用下面代码设置定时器

  1. setTimer(name, msg, interval, repeat)

其中msg是经过interval时间以后发送的消息对象。 如果repeat设成true,定时器将以interval参数指定的时间段重复规划。添加新的计时器之前,具有相同名称的任何现有计时器将被自动取消。

可以用下面代码取消定时器

  1. cancelTimer(name)

取消操作确保立即执行,这意味着在这个调用之后,定时器已经规划的消息将不会执行,即使定时器已经发起并入队该消息。任何定时器的状态可以用下面的代码进行查询

  1. isTimerActive(name)

这些具名定时器是对状态超时的补充,因为它们不受中间收到的其它消息的影响。

从内部终止

可以像下面这样指定结果状态来终止FSM

  1. stop([reason[, data]])

其中的reason必须是Normal(默认值)、ShutdownFailure(reason)之一,可以提供第二个参数来改变状态数据,在终止处理器中可以使用该数据。

注意

必须注意stop并不会中止当前的操作并立即停止FSM。stop操作必须像状态转换一样从事件处理器中返回(但要注意return语句不能用在when代码块中)。

  1. when(Error) {
  2. case Event("stop", _) =>
  3. // do cleanup ...
  4. stop()
  5. }

你可以用onTermination(handler)来指定当FSM停止时要运行的代码。其中的handler是以一个StopEvent(reason, stateName, stateData)为参数的偏函数:

  1. onTermination {
  2. case StopEvent(FSM.Normal, state, data) => // ...
  3. case StopEvent(FSM.Shutdown, state, data) => // ...
  4. case StopEvent(FSM.Failure(cause), state, data) => // ...
  5. }

对于使用whenUnhandled的场合,这个处理器不会堆栈迭加,所以每次onTermination调用都会替换先前指定的处理器。

从外部终止

当与FSM关联的ActorRefstop方法停止后,它的postStop hook 将被执行。在FSM特质中的缺省实现是执行onTermination处理器(如果有的话)来处理StopEvent(Shutdown, …)事件。

警告

如果你重写postStop并而希望你的onTermination处理器被调用,不要忘记调用super.postStop

测试和调试有限状态机

在开发和调试过程中,FSM和其它actor一样需要照顾。测试有限状态机以及下文中介绍了一些专门的工具。

事件跟踪

配置文件中的akka.actor.debug.fsm打开用LoggingFSM实例完成的事件跟踪日志:

  1. import akka.actor.LoggingFSM
  2. class MyFSM extends Actor with LoggingFSM[StateType, Data] {
  3. // body elided ...
  4. }

这个FSM将以DEBUG级别记录日志:

  • 所有处理完的事件,包括StateTimeout和计划的定时器消息
  • 所有具名定时器的设置和取消
  • 所有的状态转换

生命周期变化及特殊消息可以如Actor中所述进行日志记录。

滚动事件日志

LoggingFSM特质为FSM添加了一个新的特性:一个滚动的事件日志,它可以在debugging中使用(跟踪为什么FSM会进入某个失败的状态)或其它的新用法:

  1. import akka.actor.LoggingFSM
  2. class MyFSM extends Actor with LoggingFSM[StateType, Data] {
  3. override def logDepth = 12
  4. onTermination {
  5. case StopEvent(FSM.Failure(_), state, data) =>
  6. val lastEvents = getLog.mkString("\n\t")
  7. log.warning("Failure in state " + state + " with data " + data + "\n" +
  8. "Events leading up to this point:\n\t" + lastEvents)
  9. }
  10. // ...
  11. }

logDepth缺省值为0,意思是关闭事件日志。

警告

日志缓冲区是在actor创建时分配的,这也是为什么logDepth的配置使用了虚方法调用。如果你想用一个val对其进行覆盖,必须保证它的初始化在LoggingFSM的初始化之前完成,而且在缓冲区分配完成后不要修改logDepth返回的值。

事件日志的内容可以用getLog方法获取,它返回一个IndexedSeq[LogEntry],其中最老的条目下标为0。

示例

Typesafe Activator的模板工程Akka FSM in Scala中,可以找到一个比Actor’s become/unbecome更大的FSM示例。