actors

Actor模型为编写并发和分布式系统提供了一种更高的抽象级别。它将开发人员从显式地处理锁和线程管理的工作中解脱出来,使编写并发和并行系统更加容易。Actor模型是在1973年Carl Hewitt的论文中提出的,但只是被Erlang语言采用后才变得流行起来,一个成功案例是爱立信使用Erlang非常成功地创建了高并发的可靠的电信系统。 Akka Actor的API与Scala Actor类似,并且从Erlang中借用了一些语法。

1 创建Actor

注:由于Akka采用强制性的父子监管,每一个actor都被监管着,并且会监管它的子actors;我们建议你熟悉一下Actor系统监管与监控

定义一个 Actor 类

要定义自己的Actor类,需要继承Actor类并实现receive方法。 receive方法需要定义一系列case语句(类型为PartialFunction[Any, Unit]) 来描述你的Actor能够处理哪些消息(使用标准的Scala模式匹配),以及实现对消息如何进行处理的代码。

下面是一个例子

  1. import akka.actor.Actor
  2. import akka.actor.Props
  3. import akka.event.Logging
  4. class MyActor extends Actor {
  5. val log = Logging(context.system, this)
  6. def receive = {
  7. case "test" => log.info("received test")
  8. case _ => log.info("received unknown message")
  9. }
  10. }

请注意Akka Actor receive消息循环是穷尽的(exhaustive), 这与Erlang和Scala的Actor行为不同。 这意味着你需要提供一个对它所能够接受的所有消息的模式匹配规则,如果你希望处理未知的消息,你需要象上例一样提供一个缺省的case分支。否则会有一个akka.actor.UnhandledMessage(message, sender, recipient)被发布到Actor系统(ActorSystem)的事件流(EventStream中。

注意,以上定义的行为的返回类型是Unit;如果actor需要回复接收的消息,这需要显示的实现。

receive方法的结果是一个偏函数对象,它作为初始化行为(initial behavior)保存在actor中,请查看[become/unbecome]了解更进一步的信息。

Props

Props是一个用来在创建actor时指定选项的配置类。因为它是不可变的,所以可以在创建actor时共享包含部署信息的props。以下是使用如何创建Props实例的一些例子。

  1. import akka.actor.Props
  2. val props1 = Props[MyActor]
  3. val props2 = Props(new ActorWithArgs("arg")) // careful, see below
  4. val props3 = Props(classOf[ActorWithArgs], "arg")

第二种方法显示了创建actor时如何传递构造器参数。但它应该仅仅用于actor外部。

最后一行显示了传递构造器参数的一种可能的方法,无论它使用的context是什么。如果在构造Props的时候,需要找到了一个匹配的构造器。如果没有或者有多个匹配的构造器,那么会报IllegalArgumentEception异常。

危险的方法

  1. // NOT RECOMMENDED within another actor:
  2. // encourages to close over enclosing class
  3. val props7 = Props(new MyActor)

在其它的actor中使用这种方法是不推荐的。因为它返回了没有实例化的Props以及可能的竞争条件(打破了actor的封装)。我们将在未来提供一个macro-based的解决方案提供相似的语法,从而将这种方法抛弃。

注意:在其它的actor中定义一个actor是非常危险的,它打破了actor的封装。永远不要传递actor的this引用到Props中去

推荐的方法

在每个actor的伴生对象中提供工厂方法是一个好主意。这可以保持合适Props的创建与actor的定义的距离。

  1. object DemoActor {
  2. /**
  3. * Create Props for an actor of this type.
  4. * @param magciNumber The magic number to be passed to this actor’s constructor.
  5. * @return a Props for creating this actor, which can then be further configured
  6. *
  7. (e.g. calling ‘.withDispatcher()‘ on it)
  8. */
  9. def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber))
  10. }
  11. class DemoActor(magicNumber: Int) extends Actor {
  12. def receive = {
  13. case x: Int => sender() ! (x + magicNumber)
  14. }
  15. }
  16. class SomeOtherActor extends Actor {
  17. // Props(new DemoActor(42)) would not be safe
  18. context.actorOf(DemoActor.props(42), "demo")
  19. // ...
  20. }

使用Props创建Actor

Actor可以通过将 Props 实例传入 actorOf 工厂方法来创建。这个工厂方法可以用于ActorSystemActorContext

  1. import akka.actor.ActorSystem
  2. // ActorSystem is a heavy object: create only one per application
  3. val system = ActorSystem("mySystem")
  4. val myActor = system.actorOf(Props[MyActor], "myactor2")

使用ActorSystem将会创建top-level actor,被actor系统提供的监护人actor监控,同时,利用actor的context将会创建子actor。

  1. class FirstActor extends Actor {
  2. val child = context.actorOf(Props[MyActor], name = "myChild")
  3. // plus some behavior ...
  4. }

推荐创建一个由子actor、孙actor等构成的树形结构,从而使其适合应用程序的逻辑错误处理结构。

调用actorOf会返回一个ActorRef对象。它是与actor实例交互的唯一手段。ActorRef是不可变的,并且和actor拥有一对一的关系。ActorRef也是可序列化的以及网络可感知的。这就意味着,你可以序列化它、通过网络发送它、在远程机器上使用它,它将一直代表同一个actor。

name参数是可选的,但是你最后为你的actor设置一个name,它可以用于日志消息以及识别actor。name不能为空或者以$开头,但它可以包含URL编码字符(如表示空格的20%)。如果给定的name在其他子actor中已经存在,那么会抛出InvalidActorNameException异常。

当创建actor后,它自动以异步的方式开始。

依赖注入

如上所述,当你的actor拥有一个持有参数的构造器,那么这些参数也必须是Props的一部分。但是,当一个工厂方法必须被使用时除外。例如,当实际的构造器参数由一个依赖注入框架决定。

  1. import akka.actor.IndirectActorProducer
  2. class DependencyInjector(applicationContext: AnyRef, beanName: String)
  3. extends IndirectActorProducer {
  4. override def actorClass = classOf[Actor]
  5. override def produce =
  6. // obtain fresh Actor instance from DI framework ...
  7. }
  8. val actorRef = system.actorOf(
  9. Props(classOf[DependencyInjector], applicationContext, "hello"),
  10. "helloBean")

收件箱(inbox)

当在actor外编写代码与actor进行通讯时,ask模式可以解决,但是有两件事情它们不能做:接收多个回复、观察其它actor的生命周期。我们可以用inbox类实现该目的:

  1. implicit val i = inbox()
  2. echo ! "hello"
  3. i.receive() should be("hello")

从inbox到actor引用的隐式转换意味着,在这个例子中,sender引用隐含在inbox中。它允许在最后一行接收回复。观察actor也非常简单。

  1. val target = // some actor
  2. val i = inbox()
  3. i watch target

2 Actor API

Actor trait 只定义了一个抽象方法,就是上面提到的receive, 用来实现actor的行为。

如果当前 actor 的行为与收到的消息不匹配,则会调用unhandled, 它的缺省实现是向actor系统的事件流中发布一个akka.actor.UnhandledMessage(message, sender, recipient)(设置akka.actor.debug.unhandled为on将它们转换为实际的Debug消息)。

另外,它还包括:

  • self 代表本actor的 ActorRef
  • sender 代表最近收到的消息的sender actor,通常用于下面将讲到的回应消息中
  • supervisorStrategy 用户可重写它来定义对子actor的监管策略
  • context 暴露actor和当前消息的上下文信息,如:

    • 用于创建子actor的工厂方法 (actorOf)
    • actor所属的系统
    • 父监管者
    • 所监管的子actor
    • 生命周期监控
    • hotswap行为栈

你可以import context的成员来避免总是要加上context.前缀

  1. class FirstActor extends Actor {
  2. import context._
  3. val myActor = actorOf(Props[MyActor], name = "myactor")
  4. def receive = {
  5. case x => myActor ! x
  6. }
  7. }

其余的可见方法是可以被用户重写的生命周期hook,描述如下:

  1. def preStart(): Unit = ()
  2. def postStop(): Unit = ()
  3. def preRestart(reason: Throwable, message: Option[Any]): Unit = { context.children foreach { child )
  4. context.unwatch(child)
  5. context.stop(child)
  6. }
  7. postStop() }
  8. def postRestart(reason: Throwable): Unit = {
  9. preStart()
  10. }

以上所示的实现是Actor trait 的缺省实现。

Actor 生命周期

actor lifecycle

在actor系统中,一个路径代表一个“位置(place)”,这个位置被一个存活的actor占据。path被初始化为空。当调用actorOf()时,它分配一个actor的incarnation给给定的path。一个actor incarnation通过path和一个UID来识别。重启仅仅会交换通过Props定义的actor实例,它的实体以及UID不会改变。

当actor停止是,它的incarnation的生命周期结束。在那时,适当的生命周期事件被调用,终止的观察actor被通知。actor incarnation停止后,通过actorOf()创建actor时,path可以被重用。在这种情况下,新的incarnation的名字和之前的incarnation的名字是相同的,不同的是它们的UID。

一个ActorRef总是表示incarnation而不仅仅是path。因此,如果一个actor被停止,拥有相同名字的新的actor被创建。旧的incarnation的ActorRef不会指向新的Actor。

另一方面,ActorSelection指向path(如果用到占位符,是多个path),它完全不知道哪一个incarnation占有它。正是因为这个原因,ActorSelection无法被观察。通过发送一个Identify消息到ActorSelection可以解析当前incarnation存活在path下的ActorRef。这个ActorSelection将会被回复一个包含正确引用的ActorIdentity。这也可以通过ActorSelectionresolveOne方法实现,这个方法返回一个匹配ActorRefFuture

使用DeathWatch进行生命周期监控

为了在其它actor结束时 (永久终止, 而不是临时的失败和重启)收到通知, actor可以将自己注册为其它actor在终止时所发布的Terminated消息的接收者。 这个服务是由actor系统的DeathWatch组件提供的。

注册一个监控器很简单:

  1. import akka.actor.{ Actor, Props, Terminated }
  2. class WatchActor extends Actor {
  3. val child = context.actorOf(Props.empty, "child")
  4. context.watch(child) // <-- this is the only call needed for registration
  5. var lastSender = system.deadLetters
  6. def receive = {
  7. case "kill" =>
  8. context.stop(child); lastSender = sender()
  9. case Terminated(‘child‘) => lastSender ! "finished"
  10. } }

要注意Terminated消息的产生与注册和终止行为所发生的顺序无关。特别是,观察actor将会接收到一个Terminated消息,即使观察actor在注册时已经终止了。

多次注册并不表示会有多个消息产生,也不保证有且只有一个这样的消息被接收到:如果被监控的actor已经生成了消息并且已经进入了队列,在这个消息被处理之前又发生了另一次注册,则会有第二个消息进入队列,因为一个已经终止的actor注册监控器会立刻导致Terminated消息的发生。

可以使用context.unwatch(target)来停止对另一个actor的生存状态的监控, 但很明显这不能保证不会接收到Terminated消息因为该消息可能已经进入了队列。

启动 Hook

actor启动后,它的preStart会被立即执行。

  1. override def preStart() {
  2. child = context.actorOf(Props[MyActor], "child")
  3. }

当actor第一次被创建时,该方法会被调用。在重启的过程中,它会在postRestart的默认实现中调用,这意味着,对于这个actor或者每一个重启,重写那个方法你可以决定初始化代码是否被仅仅调用一次。当创建一个actor实例时,作为actor构造器一部分的初始化代码(在每一次重启时发生)总是被调用。

重启 Hook

所有的Actor都是被监管的,以某种失败处理策略与另一个actor链接在一起。 如果在处理一个消息的时候抛出的异常,Actor将被重启。这个重启过程包括上面提到的Hook:

  • 要被重启的actor的preRestart被调用,携带着导致重启的异常以及触发异常的消息; 如果重启并不是因为消息的处理而发生的,所携带的消息为 None , 例如,当一个监管者没有处理某个异常继而被它自己的监管者重启时。 这个方法是用来完成清理、准备移交给新的actor实例的最佳位置。它的缺省实现是终止所有的子actor并调用postStop
  • 最初 actorOf 调用的工厂方法将被用来创建新的实例。
  • 新的actor的 postRestart 方法被调用,携带着导致重启的异常信息。默认情况下,preStart被调用。

actor的重启会替换掉原来的actor对象; 重启不影响邮箱的内容, 所以对消息的处理将在postRestart hook返回后继续。 触发异常的消息不会被重新接收。在actor重启过程中所有发送到该actor的消息将象平常一样被放进邮箱队列中。

终止 Hook

一个Actor终止后,它的postStop hook将被调用, 这可以用来取消该actor在其它服务中的注册。这个hook保证在该actor的消息队列被禁止后才运行, i.e. 之后发给该actor的消息将被重定向到ActorSystemdeadLetters中。

3 通过Actor Selection识别actor

4 消息与不可变性

IMPORTANT: 消息可以是任何类型的对象,但必须是不可变的。目前Scala还无法强制不可变性,所以这一点必须作为约定。String, Int, Boolean这些原始类型总是不可变的。 除了它们以外,推荐的做法是使用Scala case class,它们是不可变的(如果你不专门暴露数据的话),并与接收方的模式匹配配合得非常好。

以下是一个例子:

  1. // define the case class
  2. case class Register(user: User)
  3. // create a new case class message
  4. val message = Register(user)

5 发送消息

向actor发送消息是使用下列方法之一:

  • ! 意思是“fire-and-forget”,表示异步发送一个消息并立即返回。也称为tell
  • ? 异步发送一条消息并返回一个 Future代表一个可能的回应。也称为ask

每一个消息发送者分别保证自己的消息的次序。

Tell: Fire-forget

这是发送消息的推荐方式。不会阻塞地等待消息。它拥有最好的并发性和可扩展性。

  1. actorRef ! message

如果是在一个Actor中调用,那么发送方的actor引用会被隐式地作为消息的sender(): ActorRef成员一起发送。目的actor可以使用它来向原actor发送回应,使用sender() ! replyMsg

如果不是从Actor实例发送的,sender缺省为deadLetters actor引用。

Ask: Send-And-Receive-Future

ask模式既包含actor也包含future, 所以它是作为一种使用模式,而不是ActorRef的方法:

  1. import akka.pattern.{ ask, pipe }
  2. import system.dispatcher // The ExecutionContext that will be used
  3. case class Result(x: Int, s: String, d: Double)
  4. case object Request
  5. implicit val timeout = Timeout(5 seconds) // needed for ‘?‘ below
  6. val f: Future[Result] =
  7. for {
  8. x <- ask(actorA, Request).mapTo[Int] // call pattern directly
  9. s <- (actorB ask Request).mapTo[String] // call by implicit conversion
  10. d <- (actorC ? Request).mapTo[Double] // call by symbolic name
  11. } yield Result(x, s, d)
  12. f pipeTo actorD // .. or ..
  13. pipe(f) to actorD

上面的例子展示了将 ask 与future上的pipeTo模式一起使用,因为这是一种非常常用的组合。 请注意上面所有的调用都是完全非阻塞和异步的:ask 产生 Future, 三个Future通过for-语法组合成一个新的Future,然后用pipeTo在future上安装一个onComplete-处理器来完成将收集到的Result发送到其它actor的动作。

使用ask将会象tell一样发送消息给接收方, 接收方必须通过sender ! reply发送回应来为返回的Future填充数据。ask操作包括创建一个内部actor来处理回应,必须为这个内部actor指定一个超时期限,过了超时期限内部actor将被销毁以防止内存泄露。

注意:如果要以异常来填充future你需要发送一个 Failure 消息给发送方。这个操作不会在actor处理消息发生异常时自动完成。

  1. try {
  2. val result = operation()
  3. sender() ! result
  4. } catch {
  5. case e: Exception =>
  6. sender() ! akka.actor.Status.Failure(e)
  7. throw e }

如果一个actor没有完成future, 它会在超时时限到来时过期, 以AskTimeoutException来结束。超时的时限是按下面的顺序和位置来获取的:

  • 显式指定超时:
  1. import scala.concurrent.duration._
  2. import akka.pattern.ask
  3. val future = myActor.ask("hello")(5 seconds)
  • 提供类型为akka.util.Timeout的隐式参数, 例如,
  1. import scala.concurrent.duration._
  2. import akka.util.Timeout
  3. import akka.pattern.ask
  4. implicit val timeout = Timeout(5 seconds)
  5. val future = myActor ? "hello"

Future的onComplete, onResult, 或 onTimeout方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

注意:在使用future回调如onComplete, onSuccess,和onFailure时, 在actor内部你要小心避免捕捉该actor的引用, 也就是不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引起同步bug和race condition, 因为回调会与此actor一同被并发调度。不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。

转发消息

你可以将消息从一个actor转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的actor时会很有用。

  1. target forward message

6 接收消息

Actor必须实现receive方法来接收消息:

  1. type Receive = PartialFunction[Any, Unit]
  2. def receive: Actor.Receive

这个方法应返回一个PartialFunction, 例如 一个 ‘match/case’ 子句,消息可以与其中的不同分支进行scala模式匹配。如下例:

  1. import akka.actor.Actor
  2. import akka.actor.Props
  3. import akka.event.Logging
  4. class MyActor extends Actor {
  5. val log = Logging(context.system, this)
  6. def receive = {
  7. case "test" => log.info("received test")
  8. case _ => log.info("received unknown message")
  9. }
  10. }

7 回应消息

如果你需要一个用来发送回应消息的目标,可以使用sender(), 它是一个Actor引用. 你可以用sender() ! replyMsg 向这个引用发送回应消息。你也可以将这个Actor引用保存起来将来再作回应。如果没有sender() (不是从actor发送的消息或者没有future上下文) 那么sender()缺省为 ‘死信’ actor的引用。

  1. case request =>
  2. val result = process(request)
  3. sender() ! result // will have dead-letter actor as default

8 接收超时

在接收消息时,如果在一段时间内没有收到第一条消息,可以使用超时机制。 要检测这种超时你必须设置 receiveTimeout 属性并声明一个处理ReceiveTimeout对象的匹配分支。

  1. import akka.actor.ReceiveTimeout
  2. import scala.concurrent.duration._
  3. class MyActor extends Actor {
  4. // To set an initial delay
  5. context.setReceiveTimeout(30 milliseconds)
  6. def receive = {
  7. case "Hello" =>
  8. // To set in a response to a message
  9. context.setReceiveTimeout(100 milliseconds)
  10. case ReceiveTimeout =>
  11. // To turn it off
  12. context.setReceiveTimeout(Duration.Undefined)
  13. throw new RuntimeException("Receive timed out")
  14. } }

9 终止Actor

通过调用ActorRefFactory 也就是ActorContextActorSystem 的stop方法来终止一个actor , 通常context用来终止子actor,而system用来终止顶级actor。实际的终止操作是异步执行的,也就是说stop可能在actor被终止之前返回。

如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem 的死信, 但是这取决于邮箱的实现。

actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己 (调用postStop, 销毁邮箱, 向DeathWatch发布Terminated, 通知其监管者). 这个过程保证actor系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应 (由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。

ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。

postStop hook 是在actor被完全终止以后调用的。这是为了清理资源:

  1. override def postStop() {
  2. // clean up some resources ...
  3. }

注意:由于actor的终止是异步的, 你不能马上使用你刚刚终止的子actor的名字;这会导致InvalidActorNameException. 你应该监视正在终止的 actor 而在最终到达的Terminated消息的处理中创建它的替代者。

PoisonPill

你也可以向actor发送akka.actor.PoisonPill消息, 这个消息处理完成后actor会被终止。PoisonPill与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。

优雅地终止

如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:

  1. import akka.pattern.gracefulStop
  2. import scala.concurrent.Await
  3. try {
  4. val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown)
  5. Await.result(stopped, 6 seconds)
  6. // the actor has been stopped
  7. } catch {
  8. // the actor wasn’t stopped within 5 seconds
  9. case e: akka.pattern.AskTimeoutException =>
  10. }
  1. object Manager {
  2. case object Shutdown
  3. }
  4. class Manager extends Actor {
  5. import Manager._
  6. val worker = context.watch(context.actorOf(Props[Cruncher], "worker"))
  7. def receive = {
  8. case "job" => worker ! "crunch"
  9. case Shutdown =>
  10. worker ! PoisonPill
  11. context become shuttingDown
  12. }
  13. def shuttingDown: Receive = {
  14. case "job" => sender() ! "service unavailable, shutting down"
  15. case Terminated(‘worker‘) =>
  16. context stop self
  17. } }

10 Become/Unbecome

升级

Akka支持在运行时对Actor消息循环的实现进行实时替换: 在actor中调用context.become方法。Become 要求一个 PartialFunction[Any, Unit]参数作为新的消息处理实现。被替换的代码被存在一个栈中,可以被push和pop。

请注意actor被其监管者重启后将恢复其最初的行为。

使用 become 替换Actor的行为:

  1. class HotSwapActor extends Actor {
  2. import context._
  3. def angry: Receive = {
  4. case "foo" => sender() ! "I am already angry?"
  5. case "bar" => become(happy)
  6. }
  7. def happy: Receive = {
  8. case "bar" => sender() ! "I am already happy :-)"
  9. case "foo" => become(angry)
  10. }
  11. def receive = {
  12. case "foo" => become(angry)
  13. case "bar" => become(happy)
  14. } }

become 方法还有很多其它的用处,一个特别好的例子是用它来实现一个有限状态机 (FSM)。

以下是另外一个使用 become和unbecome的例子:

  1. case object Swap
  2. class Swapper extends Actor {
  3. import context._
  4. val log = Logging(system, this)
  5. def receive = {
  6. case Swap =>
  7. log.info("Hi")
  8. become({
  9. case Swap =>
  10. log.info("Ho")
  11. unbecome() // resets the latest ’become’ (just for fun)
  12. }, discardOld = false) // push on top instead of replace
  13. }
  14. }
  15. object SwapperApp extends App {
  16. val system = ActorSystem("SwapperSystem")
  17. val swap = system.actorOf(Props[Swapper], name = "swapper")
  18. swap ! Swap // logs Hi
  19. swap ! Swap // logs Ho
  20. swap ! Swap // logs Hi
  21. swap ! Swap // logs Ho
  22. swap ! Swap // logs Hi
  23. swap ! Swap // logs Ho
  24. }

11 Stash

Stash trait可以允许一个actor临时存储消息,这些消息不能或者不应被当前的行为处理。通过改变actor的消息处理,也就是调用context.become或者context.unbecome,所有存储的消息都会unstashed,从而将它们预先放入邮箱中。通过这种方法,可以与接收消息时相同的顺序处理存储的消息。

注意:Stash trait继承自标记trait RequiresMessageQueue[DequeBasedMessageQueueSemantics]。这个trait需要系统自动的选择一个基于邮箱实现的双端队列给actor。

如下是一个例子:

  1. import akka.actor.Stash
  2. class ActorWithProtocol extends Actor with Stash {
  3. def receive = {
  4. case "open" =>
  5. unstashAll()
  6. context.become({
  7. case "write" => // do writing...
  8. case "close" =>
  9. unstashAll()
  10. context.unbecome()
  11. case msg => stash()
  12. }, discardOld = false) // stack on top instead of replacing
  13. case msg => stash()
  14. } }

调用stash()添加当前消息到actor的stash中。通常情况下,actor的消息处理默认case会调用它处理其它case无法处理的stash消息。相同的消息stash两次是不对的,它会导致一个IllegalStateException异常被抛出。stash也是可能有边界的,在这种情况下,调用stash()可能导致容量问题,抛出StashOverflowException异常。可以通过在邮箱的配置文件中配置stash-capacity选项来设置容量。

调用unstashAll()从stash中取数据到邮箱中,直至邮箱容量(如果有)满为止。在这种情况下,有边界的邮箱会溢出,抛出MessageQueueAppendFailedException异常。调用unstashAll()后,stash可以确保为空。

stash保存在scala.collection.immutable.Vector中。即使大数量的消息要stash,也不会存在性能问题。

12 杀死actor

你可以发送Kill消息来杀死actor。这会让actor抛出一个ActorKilledException异常,触发一个错误。actor将会暂停它的操作,并且询问它的监控器如何处理错误。这可能意味着恢复actor、重启actor或者完全终止它。

用下面的方式使用Kill

  1. // kill the ’victim’ actor
  2. victim ! Kill

13 Actor 与异常

在消息被actor处理的过程中可能会抛出异常,例如数据库异常。

消息会怎样

如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己捕捉异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。

邮箱会怎样

如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。

actor会怎样

如果抛出了异常,actor将会被暂停,监控过程将会开始。依据监控器的决定,actor会恢复、重试或者终止。

14 使用 PartialFunction 链来扩展actor

有时,在几个actor之间共享相同的行为或者用多个更小的函数组成一个actor的行为是非常有用的。因为一个actor的receive方法返回一个Actor.Receive,所以这是可能实现的。Actor.ReceivePartialFunction[Any,Unit]的类型别名,偏函数可以用PartialFunction#orElse方法链接在一起。你可以链接任意多的函数,但是你应该记住“first match”将会赢-当这些合并的函数均可以处理同类型的消息时,这很重要。

例如,假设你有一组actor,要么是Producers,要么是Consumers。有时候,一个actor拥有这两者的行为是有意义的。可以在不重复代码的情况下简单的实现该目的。那就是,抽取行为到trait中,实现actor的receive函数当作这些偏函数的合并。

  1. trait ProducerBehavior {
  2. this: Actor =>
  3. val producerBehavior: Receive = {
  4. case GiveMeThings =>
  5. sender() ! Give("thing")
  6. }
  7. }
  8. trait ConsumerBehavior {
  9. this: Actor with ActorLogging =>
  10. val consumerBehavior: Receive = {
  11. case ref: ActorRef =>
  12. ref ! GiveMeThings
  13. case Give(thing) =>
  14. log.info("Got a thing! It’s {}", thing)
  15. } }
  16. class Producer extends Actor with ProducerBehavior {
  17. def receive = producerBehavior
  18. }
  19. class Consumer extends Actor with ActorLogging with ConsumerBehavior {
  20. def receive = consumerBehavior
  21. }
  22. class ProducerConsumer extends Actor with ActorLogging
  23. with ProducerBehavior with ConsumerBehavior {
  24. def receive = producerBehavior orElse consumerBehavior
  25. }
  26. // protocol
  27. case object GiveMeThings
  28. case class Give(thing: Any)

15 初始化模式

actor丰富的生命周期hooks提供了一个有用的工具包实现多个初始化模式。在ActorRef的生命中,一个actor可能经历多次重启,旧的实例被新的实例替换,这对外部的观察者来说是不可见的,它们只能看到ActorRef

人们可能会想到作为”incarnations”的新对象,一个actor的每个incarnation的初始化可能也是必须的,但是有些人可能希望当ActorRef创建时,初始化仅仅发生在第一个实例出生时。下面介绍几种不同的初始化模式。

通过构造器初始化

用构造器初始化有几个好处。第一,它使用val保存状态成为可能,这个状态在actor实例的生命期内不会改变。这使actor的实现更有鲁棒性。actor的每一个incarnation都会调用构造器,因此,在actor的内部组件中,总是假设正确的初始化已经发生了。这也是这种方法的缺点,因为有这样的场景,人们不希望在重启时重新初始化actor内部组件。例如,这种方法在重启时保护子actor非常有用。下面将介绍适合这种场景的模式。

通过preStart初始化

在初始化第一个actor实例时,actor的preStart()方法仅仅被直接调用一次,那就是创建它的ActorRef。在重启的情况下,preStart()postRestart()中调用,因此,如果不重写,preStart()会在每一个incarnation中调用。然而,我们可以重写postRestart()禁用这个行为,保证只有一个调用preStart()

这中模式有用之处是,重启时它禁止为子actors创建新的ActorRefs。这可以通过重写来实现:

  1. override def preStart(): Unit = {
  2. // Initialize children here
  3. }
  4. // Overriding postRestart to disable the call to preStart()
  5. // after restarts
  6. override def postRestart(reason: Throwable): Unit = ()
  7. // The default implementation of preRestart() stops all the children
  8. // of the actor. To opt-out from stopping the children, we
  9. // have to override preRestart()
  10. override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
  11. // Keep the call to postStop(), but no stopping of children
  12. postStop() }

注意,子actors仍然重启了,但是没有创建新的ActorRefs。人们可以为子actor递归的才有这个准则,保证preStart()只在它们的refs创建时被调用一次。

通过消息传递初始化

不可能在构造器中为actor的初始化传递所有的必须的信息,例如循环依赖。在这种情况下,actor应该监听初始化消息,用become()或者有限状态机状态转换器去编码actor初始的或者未初始的状态。

  1. var initializeMe: Option[String] = None
  2. override def receive = {
  3. case "init" =>
  4. initializeMe = Some("Up and running")
  5. context.become(initialized, discardOld = true)
  6. }
  7. def initialized: Receive = {
  8. case "U OK?" => initializeMe foreach { sender() ! _ }
  9. }

如果actor在初始化之前收到了消息,一个有用的工具Stash可以保存消息直到初始化完成,待初始化完成之后,重新处理它们。

注意:这种模式应该小心使用,只有在以上模式不可用是使用。一个潜在的问题是,当发送到远程actor时,消息有可能丢失。另外,在一个未初始化的状态下发布一个ActorRefs可能导致它在初始化之前接收一个用户消息。