邮箱

一个AKKA邮箱保有发向actor的消息,一般情况下,每个actor都有自己的邮箱,but with for example a BalancingPool all routees will share a single mailbox instance。

1 邮箱选择

为一个actor指定一个消息队列类型

通过继承参数化的trait RequiresMessageQueue为一个确定类型的actor指定一个确定类型的消息类型是可能的。下面是一个例子。

  1. import akka.dispatch.RequiresMessageQueue
  2. import akka.dispatch.BoundedMessageQueueSemantics
  3. class MyBoundedActor extends MyActor
  4. with RequiresMessageQueue[BoundedMessageQueueSemantics]

RequiresMessageQueue trait需要在配置文件中映射到一个邮箱上。如下所示:

  1. bounded-mailbox {
  2. mailbox-type = "akka.dispatch.BoundedMailbox"
  3. mailbox-capacity = 1000
  4. mailbox-push-timeout-time = 10s
  5. }
  6. akka.actor.mailbox.requirements {
  7. "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
  8. }

现在,你每次创建MyBoundedActor类型的actor,它都会尝试得到一个有边界的邮箱。如果在部署中有不同的邮箱配置(或者是直接的,或者是通过一个拥有特定邮箱类型的派发器),那将覆盖这个映射。

注意:为一个actor创建的邮箱类型队列将会在trait中进行核对,检查是不是所需类型。如果队列没有实现所需类型,actor创建将会失败

为一个派发器指定一个消息队列类型

一个派发器可能也有这样的需求:需要一个邮箱类型。一个例子是BalancingDispatcher需要一个消息队列,这个队列对多个并发消费者来说是线程安全的。这个需求在派发器的配置中配置,如下所示:

  1. my-dispatcher {
  2. mailbox-requirement = org.example.MyInterface
  3. }

mailbox-requirement需要一个类或者接口名,这个接口然后能够被确认为消息队列实现的超类型。万一冲突了-如果actor需要的邮箱类型不能满足这个需求,actor创建将会失败。

怎样选择邮箱类型

当一个actor创建时,ActorRefProvider首先确定执行它的派发器,如何确定邮箱,如下所述:

  • 如果actor的部署配置片段包含了一个邮箱key,那么这个配置部分描述的邮箱类型将被采用
  • 如果actor的Props包含邮箱选择-在其上调用了withMailbox,那么这个配置部分描述的邮箱类型将被采用
  • 如果派发器的配置片段包含了一个mailbox-type key,这个配置片段将会用来配置邮箱类型
  • 如果派发器需要一个邮箱类型,这个需求的映射将会用来决定使用的邮箱类型
  • 缺省的akka.actor.default-mailbox将会被使用

缺省的邮箱

当邮箱没有指定时,缺省的邮箱将会使用。默认情况下,这个邮箱是没有边界的。由java.util.concurrent.ConcurrentLinkedQueue.SingleConsumerOnlyUnboundedMailbox支持的邮箱是效率更高的邮箱,可以作为缺省的邮箱,但是不能作为和BalancingDispatcher一起使用。

配置SingleConsumerOnlyUnboundedMailbox为缺省邮箱,如下所示:

  1. akka.actor.default-mailbox {
  2. mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  3. }

哪个配置传递给邮箱类型

每个邮箱类型通过一个继承MailboxType的类实现,这个类的构造器有两个参数,一个ActorSystem.Settings对象,一个Config片段。后者通过从actor系统配置中获得的具名配置片段计算得到,用邮箱类型的配置路径覆盖它的id key,并且添加一个fall-back到缺省的邮箱配置片段。(he latter is computed by obtaining the named configuration section from the actor system’s configuration, overriding its id key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section)

2 内置实现

Akka内置有几个邮箱实现:

  • UnboundedMailbox-默认的邮箱

    • java.util.concurrent.ConcurrentLinkedQueue支持
    • 阻塞:无
    • 边界:无
    • 配置名:unbounded或者akka.dispatch.UnboundedMailbox
  • SingleConsumerOnlyUnboundedMailbox

    • 由一个高效的的多生产者单消费者队列支持,不能和BalancingDispatcher一起使用
    • 阻塞:无
    • 边界:无
    • 配置名:akka.dispatch.SingleConsumerOnlyUnboundedMailbox
  • BoundedMailbox

    • java.util.concurrent.LinkedBlockingQueue支持
    • 阻塞:有
    • 边界:有
    • 配置名:bounded或者akka.dispatch.BoundedMailbox
  • UnboundedPriorityMailbox

    • java.util.concurrent.PriorityBlockingQueue支持
    • 阻塞:有
    • 边界:无
    • 配置名:akka.dispatch.UnboundedPriorityMailbox
  • BoundedPriorityMailbox

    • java.util.PriorityBlockingQueue包裹一个akka.util.BoundedBlockingQueue支持
    • 阻塞:有
    • 边界:有
    • 配置名:kka.dispatch.BoundedPriorityMailbox

3 邮箱配置例子

怎样创建一个PriorityMailbox

  1. import akka.dispatch.PriorityGenerator
  2. import akka.dispatch.UnboundedPriorityMailbox
  3. import com.typesafe.config.Config
  4. // We inherit, in this case, from UnboundedPriorityMailbox
  5. // and seed it with the priority generator
  6. class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
  7. extends UnboundedPriorityMailbox(
  8. // Create a new PriorityGenerator, lower prio means more important
  9. PriorityGenerator {
  10. // ’highpriority messages should be treated first if possible
  11. case highpriority => 0
  12. // ’lowpriority messages should be treated last if possible
  13. case lowpriority => 2
  14. // PoisonPill when no other left
  15. case PoisonPill => 3
  16. // We default to 1, which is in between high and low
  17. case otherwise => 1
  18. })

然后将它添加到配置文件中:

  1. prio-dispatcher {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other dispatcher configuration goes here
  4. }

然后是一个如何使用它的例子:

  1. // We create a new Actor that just prints out what it processes
  2. class Logger extends Actor {
  3. val log: LoggingAdapter = Logging(context.system, this)
  4. self ! lowpriority
  5. self ! lowpriority
  6. self ! highpriority
  7. self ! pigdog
  8. self ! pigdog2
  9. self ! pigdog3
  10. self ! highpriority
  11. self ! PoisonPill
  12. def receive = {
  13. case x => log.info(x.toString)
  14. } }
  15. val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
  16. "prio-dispatcher"))
  17. /*
  18. * Logs:
  19. * ’highpriority
  20. * ’highpriority
  21. * ’pigdog
  22. * ’pigdog2
  23. * ’pigdog3
  24. * ’lowpriority
  25. * ’lowpriority
  26. */

也可以用下面的方式直接配置邮箱

  1. prio-mailbox {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other mailbox configuration goes here
  4. }
  5. akka.actor.deployment {
  6. /priomailboxactor {
  7. mailbox = prio-mailbox
  8. }
  9. }

然后使用它:

  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor], "priomailboxactor")

或者:

  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))

4 创建你自己的邮箱类型

一个例子胜过千言万语

  1. import akka.actor.ActorRef
  2. import akka.actor.ActorSystem
  3. import akka.dispatch.Envelope
  4. import akka.dispatch.MailboxType
  5. import akka.dispatch.MessageQueue
  6. import akka.dispatch.ProducesMessageQueue
  7. import com.typesafe.config.Config
  8. import java.util.concurrent.ConcurrentLinkedQueue
  9. import scala.Option
  10. // Marker trait used for mailbox requirements mapping
  11. trait MyUnboundedMessageQueueSemantics
  12. object MyUnboundedMailbox {
  13. // This is the MessageQueue implementation
  14. class MyMessageQueue extends MessageQueue
  15. with MyUnboundedMessageQueueSemantics {
  16. private final val queue = new ConcurrentLinkedQueue[Envelope]()
  17. // these should be implemented; queue used as example
  18. def enqueue(receiver: ActorRef, handle: Envelope): Unit =queue.offer(handle)
  19. def dequeue(): Envelope = queue.poll()
  20. def numberOfMessages: Int = queue.size
  21. def hasMessages: Boolean = !queue.isEmpty
  22. def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
  23. while (hasMessages) {
  24. deadLetters.enqueue(owner, dequeue())
  25. }
  26. }
  27. }
  28. }
  29. // This is the Mailbox implementation
  30. class MyUnboundedMailbox extends MailboxType
  31. with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
  32. import MyUnboundedMailbox._
  33. // This constructor signature must exist, it will be called by Akka
  34. def this(settings: ActorSystem.Settings, config: Config) = {
  35. // put your initialization code here
  36. this()
  37. }
  38. // The create method is called to create the MessageQueue
  39. final override def create(owner: Option[ActorRef],
  40. system: Option[ActorSystem]): MessageQueue =
  41. new MyMessageQueue()
  42. }

然后,你仅仅需要你的MailboxType的全类名作为派发器配置项或者邮箱配置项mailbox-type的值。

注意:确保包含一个构造器,这个构造器持有akka.actor.ActorSystem.Settingscom.typesafe.config.Config两个参数,这个构造器在构造你的邮箱类型时会被直接调用。The config passed in as second argument is that section from the configuration which describes the dispatcher or mailbox setting using this mailbox type; the mailbox type will be instantiated once for each dispatcher or mailbox setting using it

你也可以使用邮箱作为派发器的requirement

  1. custom-dispatcher {
  2. mailbox-requirement =
  3. "docs.dispatcher.MyUnboundedJMessageQueueSemantics"
  4. }
  5. akka.actor.mailbox.requirements {
  6. "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
  7. custom-dispatcher-mailbox
  8. }
  9. custom-dispatcher-mailbox {
  10. mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
  11. }

或者在你的actor类里面定义requirement

  1. class MySpecialActor extends Actor
  2. with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
  3. // ...
  4. }

5 system.actorOf 的特殊语义

为了使system.actorOf在保证(keeping)返回类型ActorRef的同时(返回ref的语义是完全函数式的)既是同步的又是非阻塞的,特殊的处理将会进行。在后台,一个空的actor引用被构造,它被送到系统监控actor,这个监控actor创建actor和它的context,并且将它们放入引用。直到这发生后,发送给ActorRef的消息将会在本地排队,并且只在交换真实的内容后才将它们转移到邮箱。

Until that has happened, messages sent to the ActorRef will be queued locally, and only upon swapping the real filling in will they be transferred into the real mailbox。

所以,

  1. val props: Props = ...
  2. // this actor uses MyCustomMailbox, which is assumed to be a singleton
  3. system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
  4. assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")

将有可能失败。