有类型Actor

Akka 中的有类型Actor是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。

有类型 Actor 由两 “部分” 组成, 一个公开的接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API (公开接口的实例) 来将方法调用异步地委托给其实现的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些限制,你不能使用become/unbecome

有类型Actor是使用JDK Proxies实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意:和普通Akka actor一样,有类型actor也一次处理一个消息。

1 什么时候使用有类型actor

有类型actor是actor系统和非actor代码的桥梁,因为它允许你在外部写看起来像面向对象的代码。可以把它们想象成门:它们适用于private和public之间的范围,但是你不想在你的房子里面安装很多门。

更多一点背景:TypedActors很容易被滥用为RPC,而这总所周知具有漏洞。所以,当我们想要构建高可用的并发软件时,TypedActors并不是我们编写正确代码第一个想到的东西。它们有它们实用的场景,我们应该小心地使用它们。

2 工具箱

在创建第一个有类型Actor之前,我们先了解一下我们手上可供使用的工具,它的位置在akka.actor.TypedActor中。

  1. import akka.actor.TypedActor
  2. //返回有类型actor扩展
  3. val extension = TypedActor(system) //system是一个Actor系统实例
  4. //判断一个引用是否是有类型actor代理
  5. TypedActor(system).isTypedActor(someReference)
  6. //返回一个外部有类型actor代理所代表的Akka actor
  7. TypedActor(system).getActorRefFor(someReference)
  8. //返回当前的ActorContext,
  9. // 此方法仅在一个TypedActor 实现的方法中有效
  10. val c: ActorContext = TypedActor.context
  11. //返回当前有类型actor的外部代理,
  12. //此方法仅在一个TypedActor实现的方法中有效
  13. val s: Squarer = TypedActor.self[Squarer]
  14. //返回一个有类型Actor扩展的上下文实例
  15. //这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor
  16. TypedActor(TypedActor.context)

注意:就象不应该暴露Akka actor的this一样, 不要暴露有类型Actor的this, 你应该传递其外部代理引用,它可以在你的有类型Actor中用TypedActor.self获得, 这是你的外部标识, 就象ActorRef是Akka actor的外部标识一样。

3 创建有类型Actor

要创建有类型Actor,需要一个或多个接口以及一个实现。

我们的示例接口:

  1. trait Squarer {
  2. def squareDontCare(i: Int): Unit //fire-forget
  3. def square(i: Int): Future[Int] //non-blocking send-request-reply
  4. def squareNowPlease(i: Int): Option[Int] //blocking send-request-reply
  5. def squareNow(i: Int): Int //blocking send-request-reply
  6. @throws(classOf[Exception]) //declare it or you will get an UndeclaredThrowableException
  7. def squareTry(i: Int): Int //blocking send-request-reply with possible exception
  8. }

SquarerImpl中实现接口定义的方法:

  1. class SquarerImpl(val name: String) extends Squarer {
  2. def this() = this("default")
  3. def squareDontCare(i: Int): Unit = i * i //Nobody cares :(
  4. def square(i: Int): Future[Int] = Future.successful(i * i)
  5. def squareNowPlease(i: Int): Option[Int] = Some(i * i)
  6. def squareNow(i: Int): Int = i * i
  7. def squareTry(i: Int): Int = throw new Exception("Catch me!")
  8. }

创建我们的Squarer有类型actor实例的最简单方法是:

  1. val mySquarer: Squarer =
  2. TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())

第一个类型是代理的类型,第二个类型是实现的类型. 如果要调用某特定的构造方法要这样做:

  1. val otherSquarer: Squarer =
  2. TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],
  3. new SquarerImpl("foo")), "name")

由于你提供了一个 Props, 你可以指定使用哪个派发器, 缺省的超时时间及其它。

4 方法派发语义

方法返回:

  • Unit 会以 fire-and-forget 语义进行派发, 与ActorRef.tell完全一致。
  • akka.dispatch.Future[_] 会以 send-request-reply 语义进行派发, 与 ActorRef.ask完全一致。
  • scala.Option[_]或者akka.japi.Option<?> 会以send-request-reply语义派发, 但是会阻塞等待应答, 如果在超时时限内没有应答则返回 None , 否则返回包含结果的scala.Some/akka.japi.Some。在这个调用中发生的异常将被重新抛出。
  • 任何其它类型的值将以send-request-reply语义进行派发, 但会阻塞地等待应答, 如果超时会抛出java.util.concurrent.TimeoutException,如果发生异常则将异常重新抛出。

5 消息与不可变性

虽然Akka并不强制要求你传给有类型Actor方法的参数类型是不可变的, 我们强烈建议只传递不可变参数。

单向消息发送

  1. mySquarer.squareDontCare(10)

就是这么简单!方法会在另一个线程中异步地调用。

请求-响应消息发送

  1. val oSquare = mySquarer.squareNowPlease(10) //Option[Int]

如果需要,这会阻塞到有类型actor的Props中设置的超时时限。如果超时,会返回None。

  1. val iSquare = mySquarer.squareNow(10) //Int

如果需要,这会阻塞到有类型actor的Props中设置的超时时限。如果超时,会抛出java.util.concurrent.TimeoutException

请求-以future作为响应的消息发送

  1. val fSquare = mySquarer.square(10) //A Future[Int]

这个调用是异步的,返回的Future可以用作异步组合。

6 终止有类型Actor

由于有类型actor底层还是Akka actor,所以在不需要的时候要终止它。

  1. TypedActor(system).stop(mySquarer)

这将会异步地终止与指定的代理关联的有类型Actor。

  1. TypedActor(system).poisonPill(otherSquarer)

在这个调用之前的所有调用完成后,这将会异步地终止与指定的代理关联的有类型Actor。

7 有类型Actor监管树

你可以通过传入ActorContext来获得有类型Actor上下文,所以你可以对它调用typedActorOf(..) 来创建子有类型actor。

  1. //Inside your Typed Actor
  2. val childSquarer: Squarer =
  3. TypedActor(TypedActor.context).typedActorOf(TypedProps[SquarerImpl]())
  4. //Use "childSquarer" as a Squarer

将给的ActorContext作为输入参数传人TypedActor.get(. . . ),你也可以在Akka actor中创建子有类型actor。

8 监管策略

通过让你的有类型Actor实现类实现 TypedActor.Supervisor方法 你可以定义用来监管子actor的策略

9 生命周期回调

通过使你的有类型actor实现类实现以下方法:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart

你可以hook进你的有类型actor的整个生命周期。

10 接收任意消息

如果你的有类型actor的实现类扩展了akka.actor.TypedActor.Receiver, 所有非方法调用(MethodCall)的消息会被传递到onReceive-方法(MethodCall‘‘s will be passed into the ‘‘onReceive-method)。

这使你能够对 DeathWatch的Terminated-消息或其它类型的消息进行处理, 例如,与无类型actor进行交互的场合。

11 代理

你可以使用带TypedPropsActorReftypedActorOf来将指定的ActorRef代理成一个有类型Actor。这在你需要与远程主机上的有类型Actor通信时会有用, 只需要将 ActorRef 传递给typedActorOf

注意:目标ActorRef需要能处理MethodCall消息。

12 查找和远程

由于TypedActor的背后仍然是actor,你可以使用typedActorOf代理远程节点上的ActorRefs

  1. val typedActor: Foo with Bar =TypedActor(system).typedActorOf(TypedProps[FooBar],actorRefToRemoteActor)
  2. //Use "typedActor" as a FooBar

13 功能扩充

以下是使用traits来为你的有类型actor混入行为的示例:

  1. trait Foo {
  2. def doFoo(times: Int): Unit = println("doFoo(" + times + ")")
  3. }
  4. trait Bar {
  5. def doBar(str: String): Future[String] =
  6. Future.successful(str.toUpperCase)
  7. }
  8. class FooBar extends Foo with Bar
  1. val awesomeFooBar: Foo with Bar =
  2. TypedActor(system).typedActorOf(TypedProps[FooBar]())
  3. awesomeFooBar.doFoo(10)
  4. val f = awesomeFooBar.doBar("yes")
  5. TypedActor(system).poisonPill(awesomeFooBar)

14 有类型的路由模式

有时你想在多个actor之间传播消息,在AKKA中最简单的方式就是使用路由,它能够实现一个特定的路由逻辑,如smallest-mailboxconsistent-hashing等。

路由不直接支持有类型actor,但是可以很简单的在一个无类型的路由前用一个有类型的代理来使用它。为了展示这一点,我们创建有类型actor并给它们分配一些随机id,所以,实际上我们知道,路由器将消息发送到了不同的actor。

  1. trait HasName {
  2. def name(): String
  3. }
  4. class Named extends HasName {
  5. import scala.util.Random
  6. private val id = Random.nextInt(1024)
  7. def name(): String = "name-" + id
  8. }

为了在这些actor的几个实例间进行轮训,你可以简单的创建一个非类型的路由链,然后将它表示为一个TypedActor。这可以工作是因为有类型的actor当然是可以通讯的,这个actor的机制相同。它们将消息转换为MethodCall消息在方法间发送(methods calls on them get transformed into message sends of MethodCall messages)。

  1. def namedActor(): HasName = TypedActor(system).typedActorOf(TypedProps[Named]())
  2. // prepare routees
  3. val routees: List[HasName] = List.fill(5) { namedActor() }
  4. val routeePaths = routees map { r =>
  5. TypedActor(system).getActorRefFor(r).path.toStringWithoutAddress
  6. }
  7. // prepare untyped router
  8. val router: ActorRef = system.actorOf(RoundRobinGroup(routeePaths).props())
  9. // prepare typed proxy, forwarding MethodCall messages to ‘router‘
  10. val typedRouter: HasName =
  11. TypedActor(system).typedActorOf(TypedProps[Named](), actorRef = router)
  12. println("actor was: " + typedRouter.name()) // name-184
  13. println("actor was: " + typedRouter.name()) // name-753
  14. println("actor was: " + typedRouter.name()) // name-320
  15. println("actor was: " + typedRouter.name()) // name-164