在Akka笔记第一篇的介绍中,我们大致介绍了下Akka工具箱中的Actor。在第二篇当中,我们来看一下Actor消息传递的功能。这里还是延用之前使用的那个学生-老师的例子。
在Actor消息的第一部分中,我们会创建一个老师的Actor,但学生Actor则先不创建,而是使用一个叫做StudentSimulatorApp的主程序。

仔细回顾下学生-老师模型

我们现在只考虑StudentSimulatorApp发送给TeacherActor的消息。这里我所说的StudentSimulatorApp指的是一个正常的主程序。
Akka笔记之消息传递 - 图1
从图中可以看到:
(如果有陌生的术语,没关系,后面我们会详细解释的)
1. 学生创建了一个叫ActorSystem的东西。
2. 他通过ActorSystem来创建了一个叫ActorRef的对象。QuoteRequest消息就是发送给ActorRef的(它是TeacherActor的一个代理)
3. ActorRef将消息发送给Dispatcher
4. Dispatcher将消息投递到目标Actor的邮箱中。
5. 随后Dispatcher将Mailbox扔给一个线程去执行(这点下节会重点讲到)
6. MailBox将消息出队并最终将其委托给真实的Teacher Actor的接收方法去处理。
正如我所说的,看不懂也别担心。现在我们来一步步地详细地分析下。全部讲完后你可以再回过头来看下这五个步骤。

STUDENTSIMULATORAPP程序

我们用这个STUDENTSIMULATORAPP来启动JVM并初始化ActorSystem。
Akka笔记之消息传递 - 图2
从图中可以看到,StudentSimulatorApp
1. 创建了一个ActorSystem
2. 通过ActorSystem创建了一个Teacher Actor的代理(ActorRef)
3. 将QuoteRequest消息发送给代理
我们现在只关注这三点。

1. 创建ActorSystem

ActorSystem是进入到Actor的世界的一扇大门。通过它你可以创建或中止Actor。甚至还可以把整个Actor环境给关闭掉。另一方面来说,Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object或者scala.Any的角色——也就是说,它是所有Actor的根对象。当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。
Akka笔记之消息传递 - 图3
初始化ActorSystem的代码是这样的:

  1. val system=ActorSystem("UniversityMessageSystem")

UniversityMessageSystem只是你给ActorSystem起的一个可爱的名字而已。

2. 创建一个TeacherActor的代理?

我们来看下下面这段代码:

  1. val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

actorOf是ActorSystem中创建Actor的方法。但是正如你所看到的,它并不会返回我们所需要的TeacherActor。它返回的是一个ActorRef。
这个ActorRef扮演了真实的Actor的一个代理的角色。客户端并不会直接和Actor通信。这也正是Actor模型中避免直接访问TeacherActor中任何的自定义/私有方法或者变量的一种方式。
再重复一遍,消息只会发送给ActorRef,最终才会到达真正的Actor。你是绝对无法直接和Actor进行通信的。如果你真的找到了什么拙劣的方式来直接通信,大家会恨你入骨的。
Akka笔记之消息传递 - 图4

将消息发送给代理

还是只有一行代码。你只需告诉说把QuoteRequest消息发送到ActorRef就好了。Actor中的这个告诉的方式就是一个!号。(ActorRef中确实也有一个tell方法,不过它只是把这个调用委托给了!号)
这就可以了!

  1. //send a message to the Teacher Actor
  2. teacherActorRef!QuoteRequest

如果你认为我在骗你的话,看一下下面StudentSimulatorApp的完整代码:

STUDENTSIMULATORAPP.SCALA
  1. package me.rerun.akkanotes.messaging.actormsg1
  2. import akka.actor.ActorSystem
  3. import akka.actor.Props
  4. import akka.actor.actorRef2Scala
  5. import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._
  6. object StudentSimulatorApp extends App{
  7. //Initialize the ActorSystem
  8. val actorSystem=ActorSystem("UniversityMessageSystem")
  9. //construct the Teacher Actor Ref
  10. val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])
  11. //send a message to the Teacher Actor
  12. teacherActorRef!QuoteRequest
  13. //Let's wait for a couple of seconds before we shut down the system
  14. Thread.sleep (2000)
  15. //Shut down the ActorSystem.
  16. actorSystem.shutdown()
  17. }

好吧,我承认我撒了点小谎。你还得关掉ActorSystem,不然JVM会一直运行下去的。我还让主线程睡眠了一小会儿,以便给点时间让TeacherActor去完成它的任务。我知道这听起来很愚蠢。别担心。后面我们会通过些优雅的测试用例来替换掉这种取巧的方式。

消息

我们刚发送了一个QuoteMessage给ActorRef,但是,还压根儿没看着过这个消息类呢!
说曹操,曹操到:
(实践中推荐你把消息封装成一个好点的对象,这样维护起来容易些)

TeacherProtocol
  1. package me.rerun.akkanotes.messaging.protocols
  2. object TeacherProtocol{
  3. case class QuoteRequest()
  4. case class QuoteResponse(quoteString:String)
  5. }

正如你所想的那样,QuoteRequest就是发送给TeacherActor的那个消息。Actor会回复一个QuoteResponse。

分发器及邮箱

ActorRef把消息处理功能委托给了Dispatcher。实际上,当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。我们来看下它们是干什么的。
Akka笔记之消息传递 - 图5

邮箱

每个Actor都有一个MailBox(后面会介绍一种特殊的情况)。在我们这个比喻当中,每个老师也有一个邮箱。老师得去检查邮箱并处理消息。在Actor的世界中,则是另一种形式——邮箱一有机会就会要求Actor去完成自己的任务。
同样的,邮箱里也有一个队列来以FIFO的方式来存储并处理消息——它和实际的邮箱还有点不同,真实的邮箱新的信总是在最上面的。

现在讲到分发器了

Dispatcher会完成一些很酷的事。从它的角度来看,它只是从ActorRef中取出一条消息然后将它传给了MailBox。但是,在这后面发生了一件不可意义的事情:
Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。
看下Dispatcher里面的一段代码:

  1. protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {
  2. ...
  3. try {
  4. executorService execute mbox
  5. ...
  6. }

什么,你是说要执行一下邮箱?

是的。我们看到MailBox中包含了队列里面的消息。由于Executor得去执行MailBox,所以它得是一个Thread类型。是的没错。MailBox的声明及构造器就是这样的。
下面是MailBox的签名信息。

  1. private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable

TeacherActor

Akka笔记之消息传递 - 图6
当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。
当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。
TeacherActor只是一个很简单的类,它有一个名言的列表,而receive方法很明显就是用来处理消息的。
来看下代码:

TeacherActor.scala

  1. package me.rerun.akkanotes.messaging.actormsg1
  2. import scala.util.Random
  3. import akka.actor.Actor
  4. import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._
  5. /*
  6. * Your Teacher Actor class.
  7. *
  8. * The class could use refinement by way of
  9. * using ActorLogging which uses the EventBus of the Actor framework
  10. * instead of the plain old System out
  11. *
  12. */
  13. class TeacherActor extends Actor {
  14. val quotes = List(
  15. "Moderation is for cowards",
  16. "Anything worth doing is worth overdoing",
  17. "The trouble is you think you have time",
  18. "You never gonna know if you never even try")
  19. def receive = {
  20. case QuoteRequest => {
  21. import util.Random
  22. //Get a random Quote from the list and construct a response
  23. val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size)))
  24. println (quoteResponse)
  25. }
  26. }
  27. }

TeacherActor的receive方法的模式匹配只会匹配一种消息——QuoteRequest (事实上,模式匹配中最好匹配下默认的情况,不过这个就说来话长了)
receive方法做的就是
1. 匹配QuoteRequest的模式
2. 从名言列表中随机选取一条
3. 构造出一个QuoteResponse
4. 将QuoteResponse打印到控制台上

代码

整个项目的代码可以从Github中下载到。