
一个AKKA邮箱保有发向actor的消息,一般情况下,每个actor都有自己的邮箱,but with for example a BalancingPool all routees will share a single mailbox instance。

1 邮箱选择


通过继承参数化的trait RequiresMessageQueue为一个确定类型的actor指定一个确定类型的消息类型是可能的。下面是一个例子。

  1. import akka.dispatch.RequiresMessageQueue
  2. import akka.dispatch.BoundedMessageQueueSemantics
  3. class MyBoundedActor extends MyActor
  4. with RequiresMessageQueue[BoundedMessageQueueSemantics]

RequiresMessageQueue trait需要在配置文件中映射到一个邮箱上。如下所示:

  1. bounded-mailbox {
  2. mailbox-type = "akka.dispatch.BoundedMailbox"
  3. mailbox-capacity = 1000
  4. mailbox-push-timeout-time = 10s
  5. }
  6. akka.actor.mailbox.requirements {
  7. "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
  8. }





  1. my-dispatcher {
  2. mailbox-requirement = org.example.MyInterface
  3. }




  • 如果actor的部署配置片段包含了一个邮箱key,那么这个配置部分描述的邮箱类型将被采用
  • 如果actor的Props包含邮箱选择-在其上调用了withMailbox,那么这个配置部分描述的邮箱类型将被采用
  • 如果派发器的配置片段包含了一个mailbox-type key,这个配置片段将会用来配置邮箱类型
  • 如果派发器需要一个邮箱类型,这个需求的映射将会用来决定使用的邮箱类型
  • 缺省的akka.actor.default-mailbox将会被使用




  1. akka.actor.default-mailbox {
  2. mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  3. }


每个邮箱类型通过一个继承MailboxType的类实现,这个类的构造器有两个参数,一个ActorSystem.Settings对象,一个Config片段。后者通过从actor系统配置中获得的具名配置片段计算得到,用邮箱类型的配置路径覆盖它的id key,并且添加一个fall-back到缺省的邮箱配置片段。(he latter is computed by obtaining the named configuration section from the actor system’s configuration, overriding its id key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section)

2 内置实现


  • UnboundedMailbox-默认的邮箱

    • java.util.concurrent.ConcurrentLinkedQueue支持
    • 阻塞:无
    • 边界:无
    • 配置名:unbounded或者akka.dispatch.UnboundedMailbox
  • SingleConsumerOnlyUnboundedMailbox

    • 由一个高效的的多生产者单消费者队列支持,不能和BalancingDispatcher一起使用
    • 阻塞:无
    • 边界:无
    • 配置名:akka.dispatch.SingleConsumerOnlyUnboundedMailbox
  • BoundedMailbox

    • java.util.concurrent.LinkedBlockingQueue支持
    • 阻塞:有
    • 边界:有
    • 配置名:bounded或者akka.dispatch.BoundedMailbox
  • UnboundedPriorityMailbox

    • java.util.concurrent.PriorityBlockingQueue支持
    • 阻塞:有
    • 边界:无
    • 配置名:akka.dispatch.UnboundedPriorityMailbox
  • BoundedPriorityMailbox

    • java.util.PriorityBlockingQueue包裹一个akka.util.BoundedBlockingQueue支持
    • 阻塞:有
    • 边界:有
    • 配置名:kka.dispatch.BoundedPriorityMailbox

3 邮箱配置例子


  1. import akka.dispatch.PriorityGenerator
  2. import akka.dispatch.UnboundedPriorityMailbox
  3. import com.typesafe.config.Config
  4. // We inherit, in this case, from UnboundedPriorityMailbox
  5. // and seed it with the priority generator
  6. class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
  7. extends UnboundedPriorityMailbox(
  8. // Create a new PriorityGenerator, lower prio means more important
  9. PriorityGenerator {
  10. // ’highpriority messages should be treated first if possible
  11. case highpriority => 0
  12. // ’lowpriority messages should be treated last if possible
  13. case lowpriority => 2
  14. // PoisonPill when no other left
  15. case PoisonPill => 3
  16. // We default to 1, which is in between high and low
  17. case otherwise => 1
  18. })


  1. prio-dispatcher {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other dispatcher configuration goes here
  4. }


  1. // We create a new Actor that just prints out what it processes
  2. class Logger extends Actor {
  3. val log: LoggingAdapter = Logging(context.system, this)
  4. self ! lowpriority
  5. self ! lowpriority
  6. self ! highpriority
  7. self ! pigdog
  8. self ! pigdog2
  9. self ! pigdog3
  10. self ! highpriority
  11. self ! PoisonPill
  12. def receive = {
  13. case x => log.info(x.toString)
  14. } }
  15. val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
  16. "prio-dispatcher"))
  17. /*
  18. * Logs:
  19. * ’highpriority
  20. * ’highpriority
  21. * ’pigdog
  22. * ’pigdog2
  23. * ’pigdog3
  24. * ’lowpriority
  25. * ’lowpriority
  26. */


  1. prio-mailbox {
  2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  3. //Other mailbox configuration goes here
  4. }
  5. akka.actor.deployment {
  6. /priomailboxactor {
  7. mailbox = prio-mailbox
  8. }
  9. }


  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor], "priomailboxactor")


  1. import akka.actor.Props
  2. val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))

4 创建你自己的邮箱类型


  1. import akka.actor.ActorRef
  2. import akka.actor.ActorSystem
  3. import akka.dispatch.Envelope
  4. import akka.dispatch.MailboxType
  5. import akka.dispatch.MessageQueue
  6. import akka.dispatch.ProducesMessageQueue
  7. import com.typesafe.config.Config
  8. import java.util.concurrent.ConcurrentLinkedQueue
  9. import scala.Option
  10. // Marker trait used for mailbox requirements mapping
  11. trait MyUnboundedMessageQueueSemantics
  12. object MyUnboundedMailbox {
  13. // This is the MessageQueue implementation
  14. class MyMessageQueue extends MessageQueue
  15. with MyUnboundedMessageQueueSemantics {
  16. private final val queue = new ConcurrentLinkedQueue[Envelope]()
  17. // these should be implemented; queue used as example
  18. def enqueue(receiver: ActorRef, handle: Envelope): Unit =queue.offer(handle)
  19. def dequeue(): Envelope = queue.poll()
  20. def numberOfMessages: Int = queue.size
  21. def hasMessages: Boolean = !queue.isEmpty
  22. def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
  23. while (hasMessages) {
  24. deadLetters.enqueue(owner, dequeue())
  25. }
  26. }
  27. }
  28. }
  29. // This is the Mailbox implementation
  30. class MyUnboundedMailbox extends MailboxType
  31. with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
  32. import MyUnboundedMailbox._
  33. // This constructor signature must exist, it will be called by Akka
  34. def this(settings: ActorSystem.Settings, config: Config) = {
  35. // put your initialization code here
  36. this()
  37. }
  38. // The create method is called to create the MessageQueue
  39. final override def create(owner: Option[ActorRef],
  40. system: Option[ActorSystem]): MessageQueue =
  41. new MyMessageQueue()
  42. }


注意:确保包含一个构造器,这个构造器持有akka.actor.ActorSystem.Settingscom.typesafe.config.Config两个参数,这个构造器在构造你的邮箱类型时会被直接调用。The config passed in as second argument is that section from the configuration which describes the dispatcher or mailbox setting using this mailbox type; the mailbox type will be instantiated once for each dispatcher or mailbox setting using it


  1. custom-dispatcher {
  2. mailbox-requirement =
  3. "docs.dispatcher.MyUnboundedJMessageQueueSemantics"
  4. }
  5. akka.actor.mailbox.requirements {
  6. "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
  7. custom-dispatcher-mailbox
  8. }
  9. custom-dispatcher-mailbox {
  10. mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
  11. }


  1. class MySpecialActor extends Actor
  2. with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
  3. // ...
  4. }

5 system.actorOf 的特殊语义


Until that has happened, messages sent to the ActorRef will be queued locally, and only upon swapping the real filling in will they be transferred into the real mailbox。


  1. val props: Props = ...
  2. // this actor uses MyCustomMailbox, which is assumed to be a singleton
  3. system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
  4. assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")
