消息派发器

Akka MessageDispatcher是维持 Akka Actor “运作”的部分, 可以说它是整个机器的引擎. 所有的MessageDispatcher实现也同时是一个ExecutionContext, 这意味着它们可以用来执行任何代码。

1 缺省派发器

在没有为 Actor作配置的情况下,一个ActorSystem将有一个缺省的派发器。 缺省派发器是可配置的,缺省情况下是一个拥有特定default-executorDispatcher。创建ActorSystem时传递了一个ExecutionContext给它,那么这个ExecutionContext作为所有派发器的默认执行者使用。如果没有给定ExecutionContext,它将会使用在akka.actor.default-dispatcher.default-executor.fallback中指定的执行者。缺省情况下这是一个“fork-join-executor”,在大多数情况下拥有非常好的性能。

2 发现派发器

派发器实现了ExecutionContext接口,因此可以用来运行Future调用。

  1. // for use with Futures, Scheduler, etc.
  2. implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

3 为 Actor 指定派发器

如果你想为你的actor分配与缺省派发器不同的派发器,你需要做两件事情,第一件事情是配置派发器:

  1. my-dispatcher {
  2. # Dispatcher 是基于事件的派发器的名称
  3. type = Dispatcher
  4. # 使用何种ExecutionService
  5. executor = "fork-join-executor"
  6. # 配置 fork join 池
  7. fork-join-executor {
  8. # 容纳基于倍数的并行数量的线程数下限
  9. parallelism-min = 2
  10. #并行数(线程) ... ceil(可用CPU数*倍数)
  11. parallelism-factor = 2.0
  12. #容纳基于倍数的并行数量的线程数上限
  13. parallelism-max = 10
  14. }
  15. # Throughput 定义了线程切换到另一个actor之前处理的消息数上限
  16. # 设置成1表示尽可能公平.
  17. throughput = 100
  18. }

以下是另一个使用 “thread-pool-executor” 的例子:

  1. my-thread-pool-dispatcher {
  2. # Dispatcher是基于事件的派发器的名称
  3. type = Dispatcher
  4. # 使用何种 ExecutionService
  5. executor = "thread-pool-executor"
  6. #配置线程池
  7. thread-pool-executor {
  8. #容纳基于倍数的并行数量的线程数下限
  9. core-pool-size-min = 2
  10. # 核心线程数 .. ceil(可用CPU数*倍数)
  11. core-pool-size-factor = 2.0
  12. #容纳基于倍数的并行数量的线程数上限
  13. core-pool-size-max = 10
  14. }
  15. #Throughput 定义了线程切换到另一个actor之前处理的消息数上限
  16. # 设置成1表示尽可能公平.
  17. throughput = 100
  18. }

然后你就可以正常的创建actor,并在部署配置中定义派发器。

  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor], "myactor")
  3. akka.actor.deployment {
  4. /myactor {
  5. dispatcher = my-dispatcher
  6. }
  7. }

部署配置的另一种选择是在代码中定义派发器。如果你在部署配置中定义派发器,这个值将会覆盖编程提供的参数值。

  1. import akka.actor.Props
  2. val myActor =
  3. context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")

你在withDispatcher中指定的 “dispatcherId” 其实是配置中的一个路径. 所以在这种情况下它位于配置的顶层,但你可以把它放在下面的层次,用.来代表子层次,象这样: “foo.bar.my-dispatcher”

4 派发器的类型

一共有4种类型的消息派发器:

  • Dispatcher

    • 这是一个基于事件的分派器,它将一个actor集合与一个线程池结合在一起,它是缺省的派发器
    • 可共享性: 无限制
    • 邮箱: 任何,为每一个Actor创建一个
    • 使用场景: 缺省派发器,Bulkheading
    • 底层使用: java.util.concurrent.ExecutorService,用fork-join-executor, thread-pool-executor 或者一个akka.dispatcher.ExecutorServiceConfigurator的全类名来指定ExecutorService。
  • PinnedDispatcher

    • 可共享性: 无
    • 邮箱: 任何,为每个Actor创建一个
    • 使用场景: Bulkheading
    • 底层使用: 任何akka.dispatch.ThreadPoolExecutorConfigurator,缺省为一个thread-pool-executor
  • BalancingDispatcher

    • 可共享性: 仅对同一类型的Actor共享
    • 邮箱: 任何,为所有的Actor创建一个
    • 使用场景: Work-sharing
    • 底层使用: java.util.concurrent.ExecutorService,用fork-join-executor, thread-pool-executor 或者一个akka.dispatcher.ExecutorServiceConfigurator的全类名来指定ExecutorService。
    • 注意你不能使用BalancingDispatcher作为路由派发器
  • CallingThreadDispatcher

    • 可共享性: 无限制
    • 邮箱: 任何,每Actor每线程创建一个(需要时)
    • 使用场景: 测试
    • 底层使用: 调用的线程 (duh)

更多 dispatcher 配置的例子

配置 PinnedDispatcher:

  1. my-pinned-dispatcher {
  2. executor = "thread-pool-executor"
  3. type = PinnedDispatcher
  4. }

然后使用它:

  1. val myActor =
  2. context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")