本将主要内容:
1. Actor引用、Actor路径

1. Actor引用、Actor路径

下图是Akka官方文档中给出的一张图
Akka并发编程——第四节:Actor模型(三) - 图1
该图清晰地说明了ActorPath,ActorRef,Actor及ActorSystem之间的关系,并说明了Actor整体的层次结构。前面我们提到,Akka应用程序会持有一个名称为user的Actor,该Actor被称为guardian supervisor(守卫监督器),无论是ActorSystem创建的Actor还是通过ActorContext创建的Actor都为user的子类,它是最顶级的Actor。

(一)ActorRef

对于ActorRef,我们已经很熟悉了,通过调用ActorSystem.actorOf方法可以创建Actor,返回的便是ActorRef,例如代码

  1. //创建FirstActor对象
  2. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

返回的便是FirstActor的ActorRef对象,ActorRef最重要的作用便是向Actor发送消息,例如

  1. //向myactor发送消息
  2. myactor!"test"
  3. myactor! 123

另外,还可以通过context隐式对象获取父Actor和子Actor的ActorRef,示例代码如下:

  1. /*
  2. *Actor API:成员变量self及sender()方法的使用
  3. */
  4. object Example_07 extends App{
  5. import akka.actor.Actor
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. class FirstActor extends Actor with ActorLogging{
  9. //通过context.actorOf方法创建Actor
  10. var child:ActorRef = _
  11. override def preStart(): Unit ={
  12. log.info("preStart() in FirstActor")
  13. //通过context上下文创建Actor
  14. child = context.actorOf(Props[MyActor], name = "myActor")
  15. }
  16. def receive = {
  17. //向MyActor发送消息
  18. case x => child ! x;log.info("received "+x)
  19. }
  20. }
  21. class MyActor extends Actor with ActorLogging{
  22. var parentActorRef:ActorRef=_
  23. override def preStart(): Unit ={
  24. //通过context.parent获取其父Actor的ActorRef
  25. parentActorRef=context.parent
  26. }
  27. def receive = {
  28. case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
  29. case _ => log.info("received unknown message");
  30. }
  31. }
  32. val system = ActorSystem("MyActorSystem")
  33. val systemLog=system.log
  34. //创建FirstActor对象
  35. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  36. //获取ActorPath
  37. val myActorPath=system.child("firstActor")
  38. //通过system.actorSelection方法获取ActorRef
  39. val myActor1=system.actorSelection(myActorPath)
  40. systemLog.info("准备向myactor发送消息")
  41. //向myActor1发送消息
  42. myActor1!"test"
  43. myActor1! 123
  44. Thread.sleep(5000)
  45. //关闭ActorSystem,停止程序的运行
  46. system.shutdown()
  47. }

代码运行结果

  1. [INFO] [04/02/2016 20:28:08.941] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
  2. [INFO] [04/02/2016 20:28:08.942] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
  3. [INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
  4. [INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
  5. [INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
  6. [INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
  7. [INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from ParentActorRef
  8. [INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

代码

  1. class MyActor extends Actor with ActorLogging{
  2. var parentActorRef:ActorRef=_
  3. override def preStart(): Unit ={
  4. //通过context.parent获取其父Actor的ActorRef
  5. parentActorRef=context.parent
  6. }
  7. def receive = {
  8. case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
  9. case _ => log.info("received unknown message");
  10. }
  11. }

中,使用

  1. //通过context.parent获取其父Actor的ActorRef
  2. parentActorRef=context.parent

获取MyActor 的直接父Actor的ActorRef,在本例中为FirstActor,因为在FirstActor中,我们使用

  1. //通过context上下文创建Actor
  2. child = context.actorOf(Props[MyActor], name = "myActor")

创建了MyActor,FirstActor便自动成为MyActor的直接Supervisor。如此便可以通过代码

  1. parentActorRef!"message from ParentActorRef"

发送消息。
另外,还可以通过

  1. //创建FirstActor对象
  2. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  3. //获取ActorPath
  4. val myActorPath=system.child("firstActor")
  5. //通过system.actorSelection方法获取ActorRef
  6. val myActor1=system.actorSelection(myActorPath)

system.actorSelection(myActorPath)方法获取相应的ActorRef。然后再使用获取到的ActorRef向Acto发送消息

  1. //向myActor1发送消息
  2. myActor1!"test"
  3. myActor1! 123

现在,让我们来总结一下获取ActorRef的方法:
(1)通过ActorSystem的actorOf方法,不过这种方式是通过创建Actor,然后返回其ActorRef
(2)通过context.actorOf方法,这种方式也是通过创建Actor,然后返回其ActorRef
(3)通过context.parent、context.self、context.children方法获取当前Actor的父Actor、当前Actor及子Actor的ActorRef,这种方式是获取已经创建好的Actor的ActorRef
(4)通过val myActor1=system.actorSelection(myActorPath)方法来获取ActorRef,这种方式也是获取已经创建好的Actor的ActorRef

(二)ActorPath

在前面的例子中,我们通过

  1. val myActorPath=system.child("firstActor")

已经使用到了ActorPath。在Akka中,ActorPath采用统一资源定位符的方式进行组织,例如

  1. //本地ActorPath
  2. "akka://my-sys/user/service-a/worker1"
  3. //远程ActorPath
  4. "akka.tcp://my-sys@host.example.com:5678/user/service-b"

本地ActorPath是Akka并发编程中的ActorPath表示方式,而远程ActorPath是Akka分布式编程中常用的ActorPath表示方式,”akka.tcp://my-sys@host.example.com:5678/user/service-b”中的TCP表示使用的是TCP协议,也可以使用UPD协议,使用UDP协议的远程ActorPath表示方式有如下形式

  1. "akka.udp://my-sys@host.example.com:5678/user/service-b"

ActorPath当中,akka.udp表示的是使用的协议,my-sys表示的是ActorSytem名称,host.example.com:5678为远程机器地址及端口,user为guardian supervisor,service-b为当前应用程序的顶级Actor(通过system.actorOf方法创建的)

  1. /*
  2. *ActorPath
  3. */
  4. object Example_08 extends App{
  5. import akka.actor.Actor
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. class FirstActor extends Actor with ActorLogging {
  9. //通过context.actorOf方法创建Actor
  10. var child:ActorRef = _
  11. override def preStart(): Unit ={
  12. log.info("preStart() in FirstActor")
  13. //通过context上下文创建Actor
  14. child = context.actorOf(Props[MyActor], name = "myActor")
  15. }
  16. def receive = {
  17. //向MyActor发送消息
  18. case x => child ! x;log.info("received "+x)
  19. }
  20. }
  21. class MyActor extends Actor with ActorLogging {
  22. def receive = {
  23. case "test" => log.info("received test");
  24. case _ => log.info("received unknown message");
  25. }
  26. }
  27. val system = ActorSystem("MyActorSystem")
  28. val systemLog=system.log
  29. //创建FirstActor对象
  30. val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")
  31. //获取ActorPath
  32. val firstActorPath=system.child("firstActor")
  33. systemLog.info("firstActorPath--->{}",firstActorPath)
  34. //通过system.actorSelection方法获取ActorRef
  35. val myActor1=system.actorSelection(firstActorPath)
  36. //直接指定其路径
  37. val myActor2=system.actorSelection("/user/firstActor")
  38. //使用相对路径
  39. val myActor3=system.actorSelection("../firstActor")
  40. systemLog.info("准备向myactor发送消息")
  41. //向myActor1发送消息
  42. myActor2!"test"
  43. myActor2! 123
  44. Thread.sleep(5000)
  45. //关闭ActorSystem,停止程序的运行
  46. system.shutdown()
  47. }

代码运行结果:

  1. [INFO] [04/02/2016 21:04:59.612] [main] [ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor
  2. [INFO] [04/02/2016 21:04:59.612] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
  3. [INFO] [04/02/2016 21:04:59.615] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
  4. [INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
  5. [INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123
  6. [INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myActor] received test
  7. [INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

本例中的重点代码如下

  1. //获取ActorPath
  2. val firstActorPath=system.child("firstActor")
  3. systemLog.info("myActorPath--->{}",firstActorPath)
  4. //通过system.actorSelection方法获取ActorRef
  5. val myActor1=system.actorSelection(firstActorPath)
  6. //直接指定其路径
  7. val myActor2=system.actorSelection("/user/firstActor")
  8. //使用相对路径
  9. val myActor3=system.actorSelection("../firstActor")

通过 val firstActorPath=system.child(“firstActor”)构造一个ActorPath,然后使用

  1. val myActor1=system.actorSelection(firstActorPath)

获取路径下的ActorRef,除这种方式外,还可以通过绝对路径 val myActor2=system.actorSelection(“/user/firstActor”)及相对路径 val myActor3=system.actorSelection(“../firstActor”)的方式获取ActorRef
,需要注意的是绝对路径使用的是guardian supevisor,即/user/firstActor的这种方式。如果要获取myActor,则基方法是类型的,例如

  1. //获取ActorPath
  2. val myActorPath=system.child("firstActor").child("myActor")
  3. systemLog.info("firstActorPath--->{}",myActorPath)
  4. //通过system.actorSelection方法获取ActorRef
  5. val myActor1=system.actorSelection(myActorPath)
  6. //直接指定其路径
  7. val myActor2=system.actorSelection("/user/firstActor/myActor")
  8. //使用相对路径
  9. val myActor3=system.actorSelection("../firstActor/myActor")

完整代码如下:

  1. /*
  2. *ActorPath,获取myActor
  3. */
  4. object Example_09 extends App{
  5. import akka.actor.Actor
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. class FirstActor extends Actor with ActorLogging{
  9. //通过context.actorOf方法创建Actor
  10. var child:ActorRef = _
  11. override def preStart(): Unit ={
  12. log.info("preStart() in FirstActor")
  13. //通过context上下文创建Actor
  14. child = context.actorOf(Props[MyActor], name = "myActor")
  15. }
  16. def receive = {
  17. //向MyActor发送消息
  18. case x => child ! x;log.info("received "+x)
  19. }
  20. }
  21. class MyActor extends Actor with ActorLogging{
  22. def receive = {
  23. case "test" => log.info("received test");
  24. case _ => log.info("received unknown message");
  25. }
  26. }
  27. val system = ActorSystem("MyActorSystem")
  28. val systemLog=system.log
  29. //创建FirstActor对象
  30. val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")
  31. //获取ActorPath
  32. val myActorPath=system.child("firstActor").child("myActor")
  33. systemLog.info("firstActorPath--->{}",myActorPath)
  34. //通过system.actorSelection方法获取ActorRef
  35. val myActor1=system.actorSelection(myActorPath)
  36. //直接指定其路径
  37. val myActor2=system.actorSelection("/user/firstActor/myActor")
  38. //使用相对路径
  39. val myActor3=system.actorSelection("../firstActor/myActor")
  40. systemLog.info("准备向myactor发送消息")
  41. //向myActor1发送消息
  42. myActor1!"test"
  43. myActor1! 123
  44. Thread.sleep(5000)
  45. //关闭ActorSystem,停止程序的运行
  46. system.shutdown()
  47. }

代码运行结果:

  1. [INFO] [04/02/2016 21:21:14.377] [main] [ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor/myActor
  2. [INFO] [04/02/2016 21:21:14.377] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
  3. [INFO] [04/02/2016 21:21:14.381] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
  4. [INFO] [04/02/2016 21:21:14.382] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
  5. [INFO] [04/02/2016 21:21:14.382] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

关于远程ActorRef的获取,我们将在后续章节中进行讲述。