路由

消息可以通过路由器有效的发送到目的actor,这称为routees。一个路由器可以在actor内部和外部使用,你能够自己管理routees或者使用一个配置容量的自包含路由器actor。

根据你的应用程序的需求,可以使用不同的路由器策略。Akka包含了几个开箱可用的路由器策略。

1 一个简单的路由器

下面的例子证明了怎样使用Router以及管理routees

  1. import akka.routing.ActorRefRoutee
  2. import akka.routing.Router
  3. import akka.routing.RoundRobinRoutingLogic
  4. class Master extends Actor {
  5. var router = {
  6. val routees = Vector.fill(5) {
  7. val r = context.actorOf(Props[Worker])
  8. context watch r
  9. ActorRefRoutee(r)
  10. }
  11. Router(RoundRobinRoutingLogic(), routees)
  12. }
  13. def receive = {
  14. case w: Work =>
  15. router.route(w, sender())
  16. case Terminated(a) =>
  17. router = router.removeRoutee(a)
  18. val r = context.actorOf(Props[Worker])
  19. context watch r
  20. router = router.addRoutee(r)
  21. } }

我们创建一个路由器并且当路由消息到routee时指定使用RoundRobinRoutingLogic

Akka中自带的路由逻辑有:

  • akka.routing.RoundRobinRoutingLogic
  • akka.routing.RandomRoutingLogic
  • akka.routing.SmallestMailboxRoutingLogic
  • akka.routing.BroadcastRoutingLogic
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic
  • akka.routing.TailChoppingRoutingLogic
  • akka.routing.ConsistentHashingRoutingLogic

我们创建routees为包裹ActorRefRoutee的普通子actor,我们监视routees,当它们停止时可以置换它们。

通过路由器发送消息用route方法,如发送上例中的Work消息。

Router是不可变的,RoutingLogic是线程安全的。这意味着它可以在actor之外使用。

注:一般请求下,发送到路由器的任何消息将会进一步发送到routees,但是有一个例外, Broadcast Messages将会发送到所有路由器的routees

2 一个路由器actor

一个路由器可以创建为一个自包含的actor,它自己管理routees以及从配置中加载路由逻辑和其它设置。

这种类型的路由器actor有两种不同的风格:

  • Pool:路由器创建routees为子actors,如果它们终止了,那么就从路由器中删除它们
  • Group:外部创建routee actors给路由器,路由器使用actor selection发送消息到特定的路径,不观察它的终止

可以用配置或者编码定义路由器actor的设置。虽然路由器actor可以在配置文件中配置,但是它还是必须通过变成创建。如果你在配置文件中定义了路由器actor,那么这些设置将会被用来替换编程提供的参数。

你通过路由器actor发送消息到routees与普通的actor的方式(通过它的ActorRef)是一样的。路由器actor转发消息到它的routees而不需要改变它的原始发送者。当routee回复一个路由过的消息,这个回复将会发送到原始发送者,而不是路由器actor。

Pool

下面的代码和配置片段显示了怎样创建一个轮询的路由器,这个路由器转发消息到五个Worker routees。routees将会创建为路由器的孩子。

  1. akka.actor.deployment {
  2. /parent/router1 {
  3. router = round-robin-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router1: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router1")

下面是一个相同的例子,只是路由器配置通过编码而不是配置文件获得。

  1. val router2: ActorRef =
  2. context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")

远程部署的Routees

除了可以将本地创建的actors作为routees, 你也可以让路由actor将自己创建的子actors部署到一组远程主机上; 这是以轮询的方式执行的。要完成这个工作,将配置包在RemoteRouterConfig中, 并附上作为部署目标的结点的远程地址。自然地这要求你在classpath中包括akka-remote模块:

  1. import akka.actor.{ Address, AddressFromURIString }
  2. import akka.remote.routing.RemoteRouterConfig
  3. val addresses = Seq(
  4. Address("akka.tcp", "remotesys", "otherhost", 1234),
  5. AddressFromURIString("akka.tcp://othersys@anotherhost:1234"))
  6. val routerRemote = system.actorOf(
  7. RemoteRouterConfig(RoundRobinPool(5), addresses).props(Props[Echo]))

发送者(Sender)

默认情况下,当一个routee发送消息时,它将隐式地设置它自己为发送者

  1. sender() ! x // replies will go to this actor

然而,对于routees而言,设置路由器为发送者通常是有用的。例如,你有想隐藏路由器背后routees的细节时,你有可能想设置路由器为发送者。下面的代码片段显示怎样设置父路由器为发送者。

  1. sender().tell("reply", context.parent) // replies will go back to parent
  2. sender().!("reply")(context.parent) // alternative syntax

监视(Supervision)

通过一个pool路由器创建的routees是路由器的子actors,所有路由器是子actors的监视器。

路由器actor的监视策略可以通过Pool的supervisorStrategy的属性配置。如果没有提供配置,那么缺省的策略是“一直升级(always escalate)”。这意味着错误会传递到路由器的监视器上进行处理。路由器的监视器将会决定去做什么。

注意路由器监控器将会将错误当作一个带有路由器本身的错误。因此,一个停止或者重启指令将会造成路由器自己停止或者重启。路由器的停止又会造成子actors停止或者重启。

需要提出的一点是,路由器的重启行为已经被重写了,所以它将会重新创建子actors,并且保证Pool中拥有相同数量的actors。

这意味着,如果你没有指定路由器或者它的父actor的supervisorStrategy,routees中的失败将会升级到路由器的父actor,这将默认导致路由器重启,进而重启所有的routees。这是因为默认行为-添加withRouter到子actor的定义,不会改变应用到子actor的监控策略。这可能是无效的,所以你应该避免在定义路由器时指定监督策略。

This means that if you have not specified supervisorStrategy of the router or its parent a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.

可以很简单的设置策略:

  1. val escalator = OneForOneStrategy() {
  2. case e => testActor ! e; SupervisorStrategy.Escalate
  3. }
  4. val router = system.actorOf(RoundRobinPool(1, supervisorStrategy = escalator).props(
  5. routeeProps = Props[TestActor]))

注:一个Pool路由器的子actors终止,Pool路由器不会自动创建一个新的子actor。如果一个Pool路由器的所有子actors终止,路由器自己也会终止,除非它是一个动态路由器,如使用一个resizer

Group

有时,与其用路由器actor创建它的routees,分开创建routees并把它们提供给路由器使用更令人满意。你可以通过传递一个routees的路径到路由器的配置做到这一点。消息将会利用ActorSelection发送到这些路径。

下面的例子显示了通过提供给路由器三个routee actors的路径字符串来创建这个路由器。

  1. akka.actor.deployment {
  2. /parent/router3 {
  3. router = round-robin-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router3: ActorRef =
  2. context.actorOf(FromConfig.props(), "router3")

下面是相同的例子,只是路由器的配置通过编程设置而不是配置文件。

  1. val router4: ActorRef =
  2. context.actorOf(RoundRobinGroup(paths).props(), "router4")

routee actors在路由器外部创建:

  1. system.actorOf(Props[Workers], "workers")
  1. class Workers extends Actor {
  2. context.actorOf(Props[Worker], name = "w1")
  3. context.actorOf(Props[Worker], name = "w2")
  4. context.actorOf(Props[Worker], name = "w3")
  5. // ...

路径可能包括为运行在远程机器上的actors提供的协议和地址信息。Remoting需要将akka-remote模块包含在类路径下

  1. akka.actor.deployment {
  2. /parent/remoteGroup {
  3. router = round-robin-group
  4. routees.paths = [
  5. "akka.tcp://app@10.0.0.1:2552/user/workers/w1",
  6. "akka.tcp://app@10.0.0.2:2552/user/workers/w1",
  7. "akka.tcp://app@10.0.0.3:2552/user/workers/w1"]
  8. } }

3 路由器的使用

这一章,我们将介绍怎样创建不同类型的路由器actor。

这一章的路由器actors通过一个名叫parent的顶级actor创建。注意在配置中,部署路径以/parent/开头,后跟着路由器actor的名字。

  1. system.actorOf(Props[Parent], "parent")

RoundRobinPool和RoundRobinGroup

以轮询的方式路由到routee

在配置文件中定义的RoundRobinPool

  1. akka.actor.deployment {
  2. /parent/router1 {
  3. router = round-robin-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router1: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router1")

在代码中定义的RoundRobinPool

  1. val router2: ActorRef =
  2. context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router2")

在配置文件中定义的RoundRobinGroup

  1. akka.actor.deployment {
  2. /parent/router3 {
  3. router = round-robin-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router3: ActorRef =
  2. context.actorOf(FromConfig.props(), "router3")

在代码中定义的RoundRobinGroup

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router4: ActorRef =
  3. context.actorOf(RoundRobinGroup(paths).props(), "router4")

RandomPool和RandomGroup

这个路由器类型为每个消息随机选择一个routee

在配置文件中定义的RandomGroup

  1. akka.actor.deployment {
  2. /parent/router5 {
  3. router = random-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router5: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router5")

在代码中定义的RandomGroup

  1. val router6: ActorRef =
  2. context.actorOf(RandomPool(5).props(Props[Worker]), "router6")

在配置文件中定义的RandomGroup

  1. akka.actor.deployment {
  2. /parent/router7 {
  3. router = random-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router7: ActorRef =
  2. context.actorOf(FromConfig.props(), "router7")

在代码中定义的RandomGroup

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router8: ActorRef =
  3. context.actorOf(RandomGroup(paths).props(), "router8")

BalancingPool

这个路由器重新分配工作,从繁忙的routees到空闲的routees。所有的routees共享相同的邮箱

在配置文件中定义的BalancingPool

  1. akka.actor.deployment {
  2. /parent/router9 {
  3. router = balancing-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router9: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router9")

在代码中定义的BalancingPool

  1. val router10: ActorRef =
  2. context.actorOf(BalancingPool(5).props(Props[Worker]), "router10")

平衡派发器有额外的配置,这可以被Pool使用,在路由器部署配置的pool-dispatcher片段中配置。

  1. akka.actor.deployment {
  2. /parent/router9b {
  3. router = balancing-pool
  4. nr-of-instances = 5
  5. pool-dispatcher {
  6. attempt-teamwork = off
  7. }
  8. } }

SmallestMailboxPool

这个路由器选择未挂起的邮箱中消息数最少的routee。选择顺序如下所示:

  • 选取任何一个空闲的(没有正在处理的消息)邮箱为空的 routee
  • 选择任何邮箱为空的routee
  • 选择邮箱中等待的消息最少的 routee
  • 选择任何一个远程 routee, 由于邮箱大小未知,远程actor被认为具有低优先级

定义在配置文件中的SmallestMailboxPool

  1. akka.actor.deployment {
  2. /parent/router11 {
  3. router = smallest-mailbox-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router11: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router11")

在代码中定义的SmallestMailboxPool

  1. val router12: ActorRef =
  2. context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router12")

BroadcastPool和BroadcastGroup

一个广播路由器转发消息到所有的routees

定义在配置文件中的BroadcastPool

  1. akka.actor.deployment {
  2. /parent/router13 {
  3. router = broadcast-pool
  4. nr-of-instances = 5
  5. }
  6. }
  1. val router13: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router13")

定义在代码中的BroadcastPool

  1. val router14: ActorRef =
  2. context.actorOf(BroadcastPool(5).props(Props[Worker]), "router14")

定义在配置文件中的BroadcastGroup

  1. akka.actor.deployment {
  2. /parent/router15 {
  3. router = broadcast-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. }
  6. }
  1. val router15: ActorRef =
  2. context.actorOf(FromConfig.props(), "router15")

定义在代码中的BroadcastGroup

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router16: ActorRef =
  3. context.actorOf(BroadcastGroup(paths).props(), "router16")

ScatterGatherFirstCompletedPool和ScatterGatherFirstCompletedGroup

ScatterGatherFirstCompletedRouter发送消息到它的每个routees,然后等待返回的第一个回复。这个结果将会返回原始发送者(original sender)。其它回复丢弃。

它期待至少一个带有配置时间的回复。否则它将回复一个带有akka.pattern.AskTimeoutExceptionakka.actor.Status.Failure

在配置文件中定义的ScatterGatherFirstCompletedPool

  1. akka.actor.deployment {
  2. /parent/router17 {
  3. router = scatter-gather-pool
  4. nr-of-instances = 5
  5. within = 10 seconds
  6. } }
  1. val router17: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router17")

在代码中定义的ScatterGatherFirstCompletedPool

  1. val router18: ActorRef =
  2. context.actorOf(ScatterGatherFirstCompletedPool(5, within = 10.seconds).
  3. props(Props[Worker]), "router18")

在配置文件中定义的ScatterGatherFirstCompletedGroup

  1. akka.actor.deployment {
  2. /parent/router19 {
  3. router = scatter-gather-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. within = 10 seconds
  6. } }
  1. val router19: ActorRef =
  2. context.actorOf(FromConfig.props(), "router19")

在代码中定义的ScatterGatherFirstCompletedGroup

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router20: ActorRef =
  3. context.actorOf(ScatterGatherFirstCompletedGroup(paths,
  4. within = 10.seconds).props(), "router20")

TailChoppingPool和TailChoppingGroup

TailChoppingPool首先发送一个消息给一个随机选择的routee,然后等待一段时间,发送第二个消息给一个随机选择的routee,依此类推。它等待返回的第一个回复,然后讲回复发送给原始发送者。其它回复丢弃。

这个路由器的目标是减少通过到多个routees的路由冗余查询而产生的性能延迟,假定其它actors中的某一个比初始化的那个反应速度快。

在配置文件中定义的TailChoppingPool

  1. akka.actor.deployment {
  2. /parent/router21 {
  3. router = tail-chopping-pool
  4. nr-of-instances = 5
  5. within = 10 seconds
  6. tail-chopping-router.interval = 20 milliseconds
  7. } }
  1. val router21: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router21")

在代码中定义的TailChoppingPool

  1. val router22: ActorRef =
  2. context.actorOf(TailChoppingPool(5, within = 10.seconds, interval = 20.millis).
  3. props(Props[Worker]), "router22")

在配置文件中定义的TailChoppingGroup

  1. akka.actor.deployment {
  2. /parent/router23 {
  3. router = tail-chopping-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. within = 10 seconds
  6. tail-chopping-router.interval = 20 milliseconds
  7. } }
  1. val router23: ActorRef =
  2. context.actorOf(FromConfig.props(), "router23")

在代码中定义的TailChoppingGroup

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router24: ActorRef =
  3. context.actorOf(TailChoppingGroup(paths,
  4. within = 10.seconds, interval = 20.millis).props(), "router24")

ConsistentHashingPool 和 ConsistentHashingGroup

ConsistentHashingPool使用一致性哈希选择routee发送消息。这个文章给出了一致性哈希实现的一些好的建议。

对于一致性哈希而言有三种方式定义使用哪些数据。

  • 你可以定义路由器的hashMapping去映射输入消息到它们的一致性哈希key,这使得决策对发送者(sender)是透明的。
  • 消息可能实现akka.routing.ConsistentHashingRouter.ConsistentHashable。key是消息的一部分,把它和消息定义在一起是方便的。
  • 消息可以包裹在akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope中定义一致性哈希key使用的数据。发送者知道使用的key。

定义一致性哈希的这些方式可以一起使用,并且可以在一个路由器上同时使用。首先尝试使用hashMapping

例子代码:

  1. import akka.actor.Actor
  2. import akka.routing.ConsistentHashingRouter.ConsistentHashable
  3. class Cache extends Actor {
  4. var cache = Map.empty[String, String]
  5. def receive = {
  6. case Entry(key, value) => cache += (key -> value)
  7. case Get(key) => sender() ! cache.get(key)
  8. case Evict(key) => cache -= key
  9. }
  10. }
  11. case class Evict(key: String)
  12. case class Get(key: String) extends ConsistentHashable {
  13. override def consistentHashKey: Any = key
  14. }
  15. case class Entry(key: String, value: String)
  1. import akka.actor.Props
  2. import akka.routing.ConsistentHashingPool
  3. import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
  4. import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
  5. def hashMapping: ConsistentHashMapping = {
  6. case Evict(key) => key
  7. }
  8. val cache: ActorRef =
  9. context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping).
  10. props(Props[Cache]), name = "cache")
  11. cache ! ConsistentHashableEnvelope(
  12. message = Entry("hello", "HELLO"), hashKey = "hello")
  13. cache ! ConsistentHashableEnvelope(
  14. message = Entry("hi", "HI"), hashKey = "hi")
  15. cache ! Get("hello")
  16. expectMsg(Some("HELLO"))
  17. cache ! Get("hi")
  18. expectMsg(Some("HI"))
  19. cache ! Evict("hi")
  20. cache ! Get("hi")
  21. expectMsg(None)

在上面的例子中,你可以看到Get消息实现了ConsistentHashableEntry消息包裹在了ConsistentHashableEnvelope中。Evict消息通过hashMapping偏函数处理。

定义在配置文件中的ConsistentHashingPool

  1. akka.actor.deployment {
  2. /parent/router25 {
  3. router = consistent-hashing-pool
  4. nr-of-instances = 5
  5. virtual-nodes-factor = 10
  6. }
  7. }
  1. val router25: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router25")

定义在代码中的ConsistentHashingPool

  1. val router26: ActorRef =
  2. context.actorOf(ConsistentHashingPool(5).props(Props[Worker]),
  3. "router26")

定义在配置文件中的ConsistentHashingGroup

  1. akka.actor.deployment {
  2. /parent/router27 {
  3. router = consistent-hashing-group
  4. routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
  5. virtual-nodes-factor = 10
  6. }
  7. }
  1. val router27: ActorRef =
  2. context.actorOf(FromConfig.props(), "router27")

定义在代码中的ConsistentHashingGroup

  1. val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
  2. val router28: ActorRef =
  3. context.actorOf(ConsistentHashingGroup(paths).props(), "router28")

virtual-nodes-factor是每个routee 虚拟节点的个数,这些节点用在一致哈希环中使分布更加均匀。

4 特殊处理的消息

大部分发送到路由器actor的消息都会根据路由器的路由逻辑转发。然而有几种类型的消息有特殊的行为。

注意,除了Broadcast消息,这些特殊的消息仅仅被自包含的路由器actor处理,而不会被akka.routing.Router组件处理。

广播消息

一个广播消息可以被用来发送消息到所有个routees。当一个路由器接收到一个广播消息,它将广播这个消息的负载(payload)到所有routees,不管这个路由器如何路由这个消息。

下面的例子展示了怎样使用一个广播消息发送一个非常重要的消息到路由器的每个routee。

  1. import akka.routing.Broadcast
  2. router ! Broadcast("Watch out for Davy Jones’ locker")

在这个例子中,路由器接收到广播消息,抽取它的负载“Watch out for Davy Jones’ locker”,然后发送这个负载到所有的routees。

PoisonPill消息

PoisonPill消息对所有的actors(包括路由器)拥有特殊的处理。当任何actor收到一个PoisonPill消息,这个actor将被停止。

  1. import akka.actor.PoisonPill
  2. router ! PoisonPill

对于一个正常传递消息到routees的路由器来说,认识到PoisonPill消息仅仅被路由器处理是非常重要的。发送到路由器的PoisonPill消息不会被转发给routees。

然而,发送给路由器的PoisonPill消息仍然有可能影响到它的routees。因为它将停止路由器,路由器停止了,它的子actors也会停止。停止子actors是正常的actor行为。路由器将会停止routees。每个子actor将会处理它当前的消息然后停止。这可能导致一些消息没有被处理。

如果你希望停止一个路由器以及它的routees,但是希望routees首先处理它们邮箱中的所有消息,你不应该发送一个PoisonPill消息到路由器,而应该在Broadcast消息内包裹一个PoisonPill消息,这样每个routee都会收到PoisonPill消息。注意这将停止所有的routees,即使这个routee不是路由器的子actor。

  1. import akka.actor.PoisonPill
  2. import akka.routing.Broadcast
  3. router ! Broadcast(PoisonPill)

上面的代码显示,每个routee都将收到PoisonPill消息。每个routee都将正常处理消息,最后处理PoisonPill消息。这将造成routee停止。当所有的routee停止后,路由器也会自动停止,除非它是动态路由器。

Kill消息

Kill消息是另一种有特殊处理的消息。

当一个Kill消息发送到路由器,路由器将会在内部处理这个消息,不会把它发送到routees。路由器将会抛出一个ActorKilledException异常并且失败。然后它将会恢复、重启或者终止,这依赖于它被怎样监视。

作为路由器子actor的routees也将悬起,这受到应用于路由器的监督指令的影响。不是路由器子actor的routees不会收到影响。

  1. import akka.actor.Kill
  2. router ! Kill

杀死路由器的所有routees可以将kill消息包裹在广播消息中

  1. import akka.actor.Kill
  2. import akka.routing.Broadcast
  3. router ! Broadcast(Kill)

管理消息

  • 发送akka.routing.GetRoutees给路由器actor将会使它返回其当前在一个akka.routing.Routees消息中使用的routee。
  • 发送akka.routing.AddRoutee给路由器actor将会添加这个routee到它的routee集合中
  • 发送akka.routing.RemoveRoutee给路由器将会从routee集合中删除这个routee
  • 发送akka.routing.AdjustPoolSize给一个Pool路由器actor将会调整包含routees的集合的容量

管理消息可能在其它消息之后处理,所以如果你发送AddRoutee消息之后马上发送一个普通的消息到路由器,你无法保证当普通消息路由时,routees已经改变了。如果你需要知道哪些改变已经应用了,你可以在AddRoutee之后发送GetRoutees,当你收到Routees的回复之后你就能指定之前的那个改变应用了。

5 动态调整Pool大小

大部分的池被使用,它们有一个固定数量routees。也可以用调整大小策略动态调整routees的数量。

在配置文件中定义的可调整大小的Pool

  1. akka.actor.deployment {
  2. /parent/router29 {
  3. router = round-robin-pool
  4. resizer {
  5. lower-bound = 2
  6. upper-bound = 15
  7. messages-per-resize = 100
  8. } }
  9. }
  1. val router29: ActorRef =
  2. context.actorOf(FromConfig.props(Props[Worker]), "router29")

在代码中定义的可调整大小的Pool

  1. val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
  2. val router30: ActorRef =
  3. context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]),
  4. "router30")

如果你在配置文件中定义了router,那么这个值会代替任何代码中设置的值。

6 Akka中的路由是如何设计的

表面上路由器看起来像一般的actor,但是它们实际的实现是完全不同的。路由器被设计为非常有效地接收消息然后很快地传递它们到routees。

一个一般的actor可以被用来路由消息,但是一个actor单线程的处理可能成为一个瓶颈。路由器可以获得更高的吞吐量,它使用的消息处理管道可以允许并发路由。这可以通过直接嵌套路由器的路由逻辑到ActorRef而不是路由器actor来实现。发送到ActorRef的消息可以立即路由到routee,完全绕过单线程路由器actor。

当然,这样的代价比一般actor实现的路由器的路由代码更加复杂。幸运的是,所有的复杂性对消费者来说是不可见的。

7 自定义路由器

你也可以创建你自己的路由器。

下面的例子创建一个路由器复制每个消息到几个目的地。

从路由逻辑开始:

  1. import scala.collection.immutable
  2. import scala.concurrent.forkjoin.ThreadLocalRandom
  3. import akka.routing.RoundRobinRoutingLogic
  4. import akka.routing.RoutingLogic
  5. import akka.routing.Routee
  6. import akka.routing.SeveralRoutees
  7. class RedundancyRoutingLogic(nbrCopies: Int) extends RoutingLogic {
  8. val roundRobin = RoundRobinRoutingLogic()
  9. def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = {
  10. val targets = (1 to nbrCopies).map(_ => roundRobin.select(message, routees))
  11. SeveralRoutees(targets)
  12. }
  13. }

在这个例子中,通过重用已经存在的RoundRobinRoutingLogic,包裹结果到SeveralRoutees实例。为每个消息调用select,并且通过轮询方式选择几个目的地。

路由逻辑的实现必须是线程安全的,因为它可能会在actor外部使用。

一个路由逻辑的单元测试

  1. case class TestRoutee(n: Int) extends Routee {
  2. override def send(message: Any, sender: ActorRef): Unit = ()
  3. }
  4. val logic = new RedundancyRoutingLogic(nbrCopies = 3)
  5. val routees = for (n <- 1 to 7) yield TestRoutee(n)
  6. val r1 = logic.select("msg", routees)
  7. r1.asInstanceOf[SeveralRoutees].routees should be(
  8. Vector(TestRoutee(1), TestRoutee(2), TestRoutee(3)))
  9. val r2 = logic.select("msg", routees)
  10. r2.asInstanceOf[SeveralRoutees].routees should be(
  11. Vector(TestRoutee(4), TestRoutee(5), TestRoutee(6)))
  12. val r3 = logic.select("msg", routees)
  13. r3.asInstanceOf[SeveralRoutees].routees should be(
  14. Vector(TestRoutee(7), TestRoutee(1), TestRoutee(2)))

让我们继续把这变成一个自包含、可配置的路由器actor。

创建一个继承自PoolGroup或者CustomRouterConfig的类。这个类是路由逻辑的工厂类,保有路由器的配置。

  1. import akka.dispatch.Dispatchers
  2. import akka.routing.Group
  3. import akka.routing.Router
  4. import akka.japi.Util.immutableSeq
  5. import com.typesafe.config.Config
  6. case class RedundancyGroup(override val paths: immutable.Iterable[String], nbrCopies: Int) extends Group {
  7. def this(config: Config) = this(
  8. paths = immutableSeq(config.getStringList("routees.paths")),
  9. nbrCopies = config.getInt("nbr-copies"))
  10. override def createRouter(system: ActorSystem): Router =
  11. new Router(new RedundancyRoutingLogic(nbrCopies))
  12. override val routerDispatcher: String = Dispatchers.DefaultDispatcherId
  13. }

这可以作为被Akka支持的路由器actors正确的使用

  1. for (n <- 1 to 10) system.actorOf(Props[Storage], "s" + n)
  2. val paths = for (n <- 1 to 10) yield ("/user/s" + n)
  3. val redundancy1: ActorRef =
  4. system.actorOf(RedundancyGroup(paths, nbrCopies = 3).props(),
  5. name = "redundancy1")
  6. redundancy1 ! "important"

注意我们在RedundancyGroup中增加了一个构造函数持有Config参数。这使它可以在配置文件中配置。

  1. akka.actor.deployment {
  2. /redundancy2 {
  3. router = "docs.routing.RedundancyGroup"
  4. routees.paths = ["/user/s1", "/user/s2", "/user/s3"]
  5. nbr-copies = 5
  6. }
  7. }

router属性中描述完整的类名。路由器类必须继承自akka.routing.RouterConfig (Pool, Group or CustomRouterConfig),并且拥有一个包含com.typesafe.config.Config参数的构造器。

  1. val redundancy2: ActorRef = system.actorOf(FromConfig.props(),
  2. name = "redundancy2")
  3. redundancy2 ! "very important"

8 配置派发器

为了简单地定义Pool的routees的派发器,你可以在配置文件的部署片段定义派发器。

  1. akka.actor.deployment {
  2. /poolWithDispatcher {
  3. router = random-pool
  4. nr-of-instances = 5
  5. pool-dispatcher {
  6. fork-join-executor.parallelism-min = 5
  7. fork-join-executor.parallelism-max = 5
  8. }
  9. }
  10. }

这是为一个Pool提供派发器唯一需要做的事情。

如果你使用一个actor的group路由到它的路径,它将会一直使用在Props中配置的相同的派发器,在actor创建之后,不能修改actor的派发器

“head”路由actor, 不能运行在同样的派发器上, 因为它并不处理相同的消息,这个特殊的actor并不使用 Props中配置的派发器, 而是使用从RouterConfig 来的 routerDispatcher , 它缺省为actor系统的缺省派发器. 所有的标准路由actor都允许在其构造方法或工厂方法中设置这个属性,自定义路由必须以合适的方式自己实现这个方法。

  1. val router: ActorRef = system.actorOf(
  2. // “head” router actor will run on "router-dispatcher" dispatcher
  3. // Worker routees will run on "pool-dispatcher" dispatcher
  4. RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
  5. name = "poolWithDispatcher")

注:不允许为一个akka.dispatch.BalancingDispatcherConfigurator配置routerDispatcher,因为特殊路由器的消息不能被其它的消息处理