在Akka笔记第一篇的介绍中,我们大致介绍了下Akka工具箱中的Actor。在第二篇当中,我们来看一下Actor消息传递的功能。这里还是延用之前使用的那个学生-老师的例子。
在Actor消息的第一部分中,我们会创建一个老师的Actor,但学生Actor则先不创建,而是使用一个叫做StudentSimulatorApp的主程序。
仔细回顾下学生-老师模型
我们现在只考虑StudentSimulatorApp发送给TeacherActor的消息。这里我所说的StudentSimulatorApp指的是一个正常的主程序。
从图中可以看到:
(如果有陌生的术语,没关系,后面我们会详细解释的)
1. 学生创建了一个叫ActorSystem的东西。
2. 他通过ActorSystem来创建了一个叫ActorRef的对象。QuoteRequest消息就是发送给ActorRef的(它是TeacherActor的一个代理)
3. ActorRef将消息发送给Dispatcher
4. Dispatcher将消息投递到目标Actor的邮箱中。
5. 随后Dispatcher将Mailbox扔给一个线程去执行(这点下节会重点讲到)
6. MailBox将消息出队并最终将其委托给真实的Teacher Actor的接收方法去处理。
正如我所说的,看不懂也别担心。现在我们来一步步地详细地分析下。全部讲完后你可以再回过头来看下这五个步骤。
STUDENTSIMULATORAPP程序
我们用这个STUDENTSIMULATORAPP来启动JVM并初始化ActorSystem。
从图中可以看到,StudentSimulatorApp
1. 创建了一个ActorSystem
2. 通过ActorSystem创建了一个Teacher Actor的代理(ActorRef)
3. 将QuoteRequest消息发送给代理
我们现在只关注这三点。
1. 创建ActorSystem
ActorSystem是进入到Actor的世界的一扇大门。通过它你可以创建或中止Actor。甚至还可以把整个Actor环境给关闭掉。另一方面来说,Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object或者scala.Any的角色——也就是说,它是所有Actor的根对象。当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。
初始化ActorSystem的代码是这样的:
val system=ActorSystem("UniversityMessageSystem")
UniversityMessageSystem只是你给ActorSystem起的一个可爱的名字而已。
2. 创建一个TeacherActor的代理?
我们来看下下面这段代码:
val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])
actorOf是ActorSystem中创建Actor的方法。但是正如你所看到的,它并不会返回我们所需要的TeacherActor。它返回的是一个ActorRef。
这个ActorRef扮演了真实的Actor的一个代理的角色。客户端并不会直接和Actor通信。这也正是Actor模型中避免直接访问TeacherActor中任何的自定义/私有方法或者变量的一种方式。
再重复一遍,消息只会发送给ActorRef,最终才会到达真正的Actor。你是绝对无法直接和Actor进行通信的。如果你真的找到了什么拙劣的方式来直接通信,大家会恨你入骨的。
将消息发送给代理
还是只有一行代码。你只需告诉说把QuoteRequest消息发送到ActorRef就好了。Actor中的这个告诉的方式就是一个!号。(ActorRef中确实也有一个tell方法,不过它只是把这个调用委托给了!号)
这就可以了!
//send a message to the Teacher Actor
teacherActorRef!QuoteRequest
如果你认为我在骗你的话,看一下下面StudentSimulatorApp的完整代码:
STUDENTSIMULATORAPP.SCALA
package me.rerun.akkanotes.messaging.actormsg1
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._
object StudentSimulatorApp extends App{
//Initialize the ActorSystem
val actorSystem=ActorSystem("UniversityMessageSystem")
//construct the Teacher Actor Ref
val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])
//send a message to the Teacher Actor
teacherActorRef!QuoteRequest
//Let's wait for a couple of seconds before we shut down the system
Thread.sleep (2000)
//Shut down the ActorSystem.
actorSystem.shutdown()
}
好吧,我承认我撒了点小谎。你还得关掉ActorSystem,不然JVM会一直运行下去的。我还让主线程睡眠了一小会儿,以便给点时间让TeacherActor去完成它的任务。我知道这听起来很愚蠢。别担心。后面我们会通过些优雅的测试用例来替换掉这种取巧的方式。
消息
我们刚发送了一个QuoteMessage给ActorRef,但是,还压根儿没看着过这个消息类呢!
说曹操,曹操到:
(实践中推荐你把消息封装成一个好点的对象,这样维护起来容易些)
TeacherProtocol
package me.rerun.akkanotes.messaging.protocols
object TeacherProtocol{
case class QuoteRequest()
case class QuoteResponse(quoteString:String)
}
正如你所想的那样,QuoteRequest就是发送给TeacherActor的那个消息。Actor会回复一个QuoteResponse。
分发器及邮箱
ActorRef把消息处理功能委托给了Dispatcher。实际上,当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。我们来看下它们是干什么的。
邮箱
每个Actor都有一个MailBox(后面会介绍一种特殊的情况)。在我们这个比喻当中,每个老师也有一个邮箱。老师得去检查邮箱并处理消息。在Actor的世界中,则是另一种形式——邮箱一有机会就会要求Actor去完成自己的任务。
同样的,邮箱里也有一个队列来以FIFO的方式来存储并处理消息——它和实际的邮箱还有点不同,真实的邮箱新的信总是在最上面的。
现在讲到分发器了
Dispatcher会完成一些很酷的事。从它的角度来看,它只是从ActorRef中取出一条消息然后将它传给了MailBox。但是,在这后面发生了一件不可意义的事情:
Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。
看下Dispatcher里面的一段代码:
protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {
...
try {
executorService execute mbox
...
}
什么,你是说要执行一下邮箱?
是的。我们看到MailBox中包含了队列里面的消息。由于Executor得去执行MailBox,所以它得是一个Thread类型。是的没错。MailBox的声明及构造器就是这样的。
下面是MailBox的签名信息。
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable
TeacherActor
当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。
当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。
TeacherActor只是一个很简单的类,它有一个名言的列表,而receive方法很明显就是用来处理消息的。
来看下代码:
TeacherActor.scala
package me.rerun.akkanotes.messaging.actormsg1
import scala.util.Random
import akka.actor.Actor
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._
/*
* Your Teacher Actor class.
*
* The class could use refinement by way of
* using ActorLogging which uses the EventBus of the Actor framework
* instead of the plain old System out
*
*/
class TeacherActor extends Actor {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
def receive = {
case QuoteRequest => {
import util.Random
//Get a random Quote from the list and construct a response
val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size)))
println (quoteResponse)
}
}
}
TeacherActor的receive方法的模式匹配只会匹配一种消息——QuoteRequest (事实上,模式匹配中最好匹配下默认的情况,不过这个就说来话长了)
receive方法做的就是
1. 匹配QuoteRequest的模式
2. 从名言列表中随机选取一条
3. 构造出一个QuoteResponse
4. 将QuoteResponse打印到控制台上
代码
整个项目的代码可以从Github中下载到。