本节主要内容:
- Actor API解析
1. Actor API解析
Actor中的主要成员变量和方法定义如下:package akka.actortrait Actor extends scala.AnyRef {type Receive = akka.actor.Actor.Receive//context变量暴露当前Actor的上下文信息及当前消息implicit val context : akka.actor.ActorContext = { /* compiled code */ }//self作为当前ActorRef的引用implicit final val self : akka.actor.ActorRef = { /* compiled code */ }//当前Actor接收到最后一条消息对应的消息发送者(Actor)final def sender() : akka.actor.ActorRef = { /* compiled code */ }//receive方法,抽象方法,定义Actor的行为逻辑def receive : akka.actor.Actor.Receive//内部使用APIprotected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ }protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ }protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ }protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }//监督策略,用于Actor容错处理def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ }//Hook方法,用于Actor生命周期监控@scala.throws[T](classOf[scala.Exception])def preStart() : scala.Unit = { /* compiled code */ }@scala.throws[T](classOf[scala.Exception])def postStop() : scala.Unit = { /* compiled code */ }@scala.throws[T](classOf[scala.Exception])def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }@scala.throws[T](classOf[scala.Exception])def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }//发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ }}object Actor extends scala.AnyRef {type Receive = scala.PartialFunction[scala.Any, scala.Unit]//空的行为逻辑@scala.SerialVersionUID(1)object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive {def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ }def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ }}//Sender为null@scala.SerialVersionUID(1)final val noSender : akka.actor.ActorRef = { /* compiled code */ }}
(1) Hook方法,preStart()、postStop()方法的使用
代码运行结果:/**Actor API: Hook方法*/object Example_05 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{//通过context.actorOf方法创建Actorvar child:ActorRef = _//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作override def preStart(): Unit ={log.info("preStart() in FirstActor")//通过context上下文创建Actorchild = context.actorOf(Props[MyActor], name = "myChild")}def receive = {//向MyActor发送消息case x => child ! x;log.info("received "+x)}//Hook方法,postStop(),Actor停止之后调用override def postStop(): Unit = {log.info("postStop() in FirstActor")}}class MyActor extends Actor with ActorLogging{//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作override def preStart(): Unit ={log.info("preStart() in MyActor")}def receive = {case "test" => log.info("received test")case _ => log.info("received unknown message")}//Hook方法,postStop(),Actor停止之后调用override def postStop(): Unit = {log.info("postStop() in MyActor")}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//创建FirstActor对象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("准备向myactor发送消息")//向myactor发送消息myactor!"test"myactor! 123Thread.sleep(5000)//关闭ActorSystem,停止程序的运行system.shutdown()}
在代码[INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message[INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor[INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor
中分别对postStop、preStart方法进行了重写,在preStart方法中通过代码class FirstActor extends Actor with ActorLogging{//通过context.actorOf方法创建Actorvar child:ActorRef = _//Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作override def preStart(): Unit ={log.info("preStart() in FirstActor")//通过context上下文创建Actorchild = context.actorOf(Props[MyActor], name = "myChild")}def receive = {//向MyActor发送消息case x => child ! x;log.info("received "+x)}//Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作override def postStop(): Unit = {log.info("postStop() in FirstActor")}}
对成员变量child进行初始化,然后在postStop方法中使用child = context.actorOf(Props[MyActor], name = "myChild")
停止MyActor的运行。在使用代码//通过context上下文停止MyActor的运行context.stop(child)
创建FirstActor时,便会调用preStart方法完成MyActor的创建,因此首先会执行FirstActor中的preStart()方法//创建FirstActor对象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
然后在创建MyActor时执行MyActor中定义的preStart方法dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
在执行[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。//关闭ActorSystem,停止程序的运行system.shutdown()
(2) 成员变量self及成员方法sender方法的使用
整体代码如下:
运行结果:/**Actor API:成员变量self及sender()方法的使用*/object Example_05 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{//通过context.actorOf方法创建Actorvar child:ActorRef = _override def preStart(): Unit ={log.info("preStart() in FirstActor")//通过context上下文创建Actorchild = context.actorOf(Props[MyActor], name = "myActor")}def receive = {//向MyActor发送消息case x => child ! x;log.info("received "+x)}}class MyActor extends Actor with ActorLogging{self!"message from self reference"def receive = {case "test" => log.info("received test");sender()!"message from MyActor"case "message from self reference"=>log.info("message from self refrence")case _ => log.info("received unknown message");}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//创建FirstActor对象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("准备向myactor发送消息")//向myactor发送消息myactor!"test"myactor! 123Thread.sleep(5000)//关闭ActorSystem,停止程序的运行system.shutdown()}
代码:[INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息[INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
中使用class MyActor extends Actor with ActorLogging{self!"message from self reference"def receive = {case "test" => log.info("received test");sender()!"message from MyActor"case "message from self reference"=>log.info("message from self refrence")case _ => log.info("received unknown message");}}
向自身发送了一条消息,receive方法通过self!"message from self reference"
对这条消息进行处理。receive方法在处理case "message from self reference"=>log.info("message from self refrence")
“test”消息时,会调用def receive = {case "test" => log.info("received test");sender()!"message from MyActor"
向sender(本例中为FirstActor)发送”message from MyActor”消息,FirstActor使用sender()!"message from MyActor"
处理消息时又向MyActor回送该消息,因此最终的输出有两个unknown message,分别对应123和”message from MyActor”def receive = {//向MyActor发送消息case x => child ! x;log.info("received "+x)}
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
(3) unhandled方法的使用
unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码
代码输出:/**Actor API:unhandled方法*/object Example_06 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{def receive = {//向MyActor发送消息case "test" => log.info("received test")}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//创建FirstActor对象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("准备向myactor发送消息")//向myactor发送消息myactor!"test"myactor! 123Thread.sleep(5000)//关闭ActorSystem,停止程序的运行system.shutdown()}
不难看出,对于[INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息[INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
发送的这条消息没有被处理,没有任何的处理逻辑。在实际开发过程中,可能会对不能被处理的消息增加一些应对逻辑,此时可以重写unhandled方法,代码如下:myactor! 123
代码输出结果:/**Actor API:unhandled方法的使用*/object Example_06 extends App{import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsclass FirstActor extends Actor with ActorLogging{def receive = {//向MyActor发送消息case "test" => log.info("received test")}//重写unhandled方法override def unhandled(message: Any): Unit = {log.info("unhandled message is {}",message)}}val system = ActorSystem("MyActorSystem")val systemLog=system.log//创建FirstActor对象val myactor = system.actorOf(Props[FirstActor], name = "firstActor")systemLog.info("准备向myactor发送消息")//向myactor发送消息myactor!"test"myactor! 123Thread.sleep(5000)//关闭ActorSystem,停止程序的运行system.shutdown()}
其它如preRestart等方法的使用将在Akka容错部分进行讲解。[INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息[INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test[INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123
 
