本节主要内容:

  1. Actor API解析

    1. Actor API解析

    Actor中的主要成员变量和方法定义如下:
    1. package akka.actor
    2. trait Actor extends scala.AnyRef {
    3. type Receive = akka.actor.Actor.Receive
    4. //context变量暴露当前Actor的上下文信息及当前消息
    5. implicit val context : akka.actor.ActorContext = { /* compiled code */ }
    6. //self作为当前ActorRef的引用
    7. implicit final val self : akka.actor.ActorRef = { /* compiled code */ }
    8. //当前Actor接收到最后一条消息对应的消息发送者(Actor)
    9. final def sender() : akka.actor.ActorRef = { /* compiled code */ }
    10. //receive方法,抽象方法,定义Actor的行为逻辑
    11. def receive : akka.actor.Actor.Receive
    12. //内部使用API
    13. protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ }
    14. protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ }
    15. protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ }
    16. protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }
    17. protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }
    18. //监督策略,用于Actor容错处理
    19. def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ }
    20. //Hook方法,用于Actor生命周期监控
    21. @scala.throws[T](classOf[scala.Exception])
    22. def preStart() : scala.Unit = { /* compiled code */ }
    23. @scala.throws[T](classOf[scala.Exception])
    24. def postStop() : scala.Unit = { /* compiled code */ }
    25. @scala.throws[T](classOf[scala.Exception])
    26. def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }
    27. @scala.throws[T](classOf[scala.Exception])
    28. def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }
    29. //发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法
    30. def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ }
    31. }
    32. object Actor extends scala.AnyRef {
    33. type Receive = scala.PartialFunction[scala.Any, scala.Unit]
    34. //空的行为逻辑
    35. @scala.SerialVersionUID(1)
    36. object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive {
    37. def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ }
    38. def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ }
    39. }
    40. //Sender为null
    41. @scala.SerialVersionUID(1)
    42. final val noSender : akka.actor.ActorRef = { /* compiled code */ }
    43. }

    (1) Hook方法,preStart()、postStop()方法的使用

    1. /*
    2. *Actor API: Hook方法
    3. */
    4. object Example_05 extends App{
    5. import akka.actor.Actor
    6. import akka.actor.ActorSystem
    7. import akka.actor.Props
    8. class FirstActor extends Actor with ActorLogging{
    9. //通过context.actorOf方法创建Actor
    10. var child:ActorRef = _
    11. //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
    12. override def preStart(): Unit ={
    13. log.info("preStart() in FirstActor")
    14. //通过context上下文创建Actor
    15. child = context.actorOf(Props[MyActor], name = "myChild")
    16. }
    17. def receive = {
    18. //向MyActor发送消息
    19. case x => child ! x;log.info("received "+x)
    20. }
    21. //Hook方法,postStop(),Actor停止之后调用
    22. override def postStop(): Unit = {
    23. log.info("postStop() in FirstActor")
    24. }
    25. }
    26. class MyActor extends Actor with ActorLogging{
    27. //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
    28. override def preStart(): Unit ={
    29. log.info("preStart() in MyActor")
    30. }
    31. def receive = {
    32. case "test" => log.info("received test")
    33. case _ => log.info("received unknown message")
    34. }
    35. //Hook方法,postStop(),Actor停止之后调用
    36. override def postStop(): Unit = {
    37. log.info("postStop() in MyActor")
    38. }
    39. }
    40. val system = ActorSystem("MyActorSystem")
    41. val systemLog=system.log
    42. //创建FirstActor对象
    43. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
    44. systemLog.info("准备向myactor发送消息")
    45. //向myactor发送消息
    46. myactor!"test"
    47. myactor! 123
    48. Thread.sleep(5000)
    49. //关闭ActorSystem,停止程序的运行
    50. system.shutdown()
    51. }
    代码运行结果:
    1. [INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
    2. [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
    3. [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
    4. [INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123
    5. [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
    6. [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test
    7. [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message
    8. [INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor
    9. [INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor
    在代码
    1. class FirstActor extends Actor with ActorLogging{
    2. //通过context.actorOf方法创建Actor
    3. var child:ActorRef = _
    4. //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
    5. override def preStart(): Unit ={
    6. log.info("preStart() in FirstActor")
    7. //通过context上下文创建Actor
    8. child = context.actorOf(Props[MyActor], name = "myChild")
    9. }
    10. def receive = {
    11. //向MyActor发送消息
    12. case x => child ! x;log.info("received "+x)
    13. }
    14. //Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作
    15. override def postStop(): Unit = {
    16. log.info("postStop() in FirstActor")
    17. }
    18. }
    中分别对postStop、preStart方法进行了重写,在preStart方法中通过代码
    1. child = context.actorOf(Props[MyActor], name = "myChild")
    对成员变量child进行初始化,然后在postStop方法中使用
    1. //通过context上下文停止MyActor的运行
    2. context.stop(child)
    停止MyActor的运行。在使用代码
    1. //创建FirstActor对象
    2. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
    创建FirstActor时,便会调用preStart方法完成MyActor的创建,因此首先会执行FirstActor中的preStart()方法
    1. dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
    然后在创建MyActor时执行MyActor中定义的preStart方法
    1. [INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
    在执行
    1. //关闭ActorSystem,停止程序的运行
    2. system.shutdown()
    FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。

    (2) 成员变量self及成员方法sender方法的使用

    整体代码如下:
    1. /*
    2. *Actor API:成员变量self及sender()方法的使用
    3. */
    4. object Example_05 extends App{
    5. import akka.actor.Actor
    6. import akka.actor.ActorSystem
    7. import akka.actor.Props
    8. class FirstActor extends Actor with ActorLogging{
    9. //通过context.actorOf方法创建Actor
    10. var child:ActorRef = _
    11. override def preStart(): Unit ={
    12. log.info("preStart() in FirstActor")
    13. //通过context上下文创建Actor
    14. child = context.actorOf(Props[MyActor], name = "myActor")
    15. }
    16. def receive = {
    17. //向MyActor发送消息
    18. case x => child ! x;log.info("received "+x)
    19. }
    20. }
    21. class MyActor extends Actor with ActorLogging{
    22. self!"message from self reference"
    23. def receive = {
    24. case "test" => log.info("received test");sender()!"message from MyActor"
    25. case "message from self reference"=>log.info("message from self refrence")
    26. case _ => log.info("received unknown message");
    27. }
    28. }
    29. val system = ActorSystem("MyActorSystem")
    30. val systemLog=system.log
    31. //创建FirstActor对象
    32. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
    33. systemLog.info("准备向myactor发送消息")
    34. //向myactor发送消息
    35. myactor!"test"
    36. myactor! 123
    37. Thread.sleep(5000)
    38. //关闭ActorSystem,停止程序的运行
    39. system.shutdown()
    40. }
    运行结果:
    1. [INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
    2. [INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
    3. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
    4. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
    5. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
    6. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence
    7. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor
    8. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
    9. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
    代码:
    1. class MyActor extends Actor with ActorLogging{
    2. self!"message from self reference"
    3. def receive = {
    4. case "test" => log.info("received test");sender()!"message from MyActor"
    5. case "message from self reference"=>log.info("message from self refrence")
    6. case _ => log.info("received unknown message");
    7. }
    8. }
    中使用
    1. self!"message from self reference"
    向自身发送了一条消息,receive方法通过
    1. case "message from self reference"=>log.info("message from self refrence")
    对这条消息进行处理。receive方法在处理
    1. def receive = {
    2. case "test" => log.info("received test");sender()!"message from MyActor"
    “test”消息时,会调用
    1. sender()!"message from MyActor"
    向sender(本例中为FirstActor)发送”message from MyActor”消息,FirstActor使用
    1. def receive = {
    2. //向MyActor发送消息
    3. case x => child ! x;log.info("received "+x)
    4. }
    处理消息时又向MyActor回送该消息,因此最终的输出有两个unknown message,分别对应123和”message from MyActor”
    1. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
    2. [INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

    (3) unhandled方法的使用

    unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码
    1. /*
    2. *Actor API:unhandled方法
    3. */
    4. object Example_06 extends App{
    5. import akka.actor.Actor
    6. import akka.actor.ActorSystem
    7. import akka.actor.Props
    8. class FirstActor extends Actor with ActorLogging{
    9. def receive = {
    10. //向MyActor发送消息
    11. case "test" => log.info("received test")
    12. }
    13. }
    14. val system = ActorSystem("MyActorSystem")
    15. val systemLog=system.log
    16. //创建FirstActor对象
    17. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
    18. systemLog.info("准备向myactor发送消息")
    19. //向myactor发送消息
    20. myactor!"test"
    21. myactor! 123
    22. Thread.sleep(5000)
    23. //关闭ActorSystem,停止程序的运行
    24. system.shutdown()
    25. }
    代码输出:
    1. [INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
    2. [INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
    不难看出,对于
    1. myactor! 123
    发送的这条消息没有被处理,没有任何的处理逻辑。在实际开发过程中,可能会对不能被处理的消息增加一些应对逻辑,此时可以重写unhandled方法,代码如下:
    1. /*
    2. *Actor API:unhandled方法的使用
    3. */
    4. object Example_06 extends App{
    5. import akka.actor.Actor
    6. import akka.actor.ActorSystem
    7. import akka.actor.Props
    8. class FirstActor extends Actor with ActorLogging{
    9. def receive = {
    10. //向MyActor发送消息
    11. case "test" => log.info("received test")
    12. }
    13. //重写unhandled方法
    14. override def unhandled(message: Any): Unit = {
    15. log.info("unhandled message is {}",message)
    16. }
    17. }
    18. val system = ActorSystem("MyActorSystem")
    19. val systemLog=system.log
    20. //创建FirstActor对象
    21. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
    22. systemLog.info("准备向myactor发送消息")
    23. //向myactor发送消息
    24. myactor!"test"
    25. myactor! 123
    26. Thread.sleep(5000)
    27. //关闭ActorSystem,停止程序的运行
    28. system.shutdown()
    29. }
    代码输出结果:
    1. [INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
    2. [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
    3. [INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123
    其它如preRestart等方法的使用将在Akka容错部分进行讲解。