本节主要内容

  1. 定义Actor
  2. 创建Actor

    1. 定义Actor

    通过扩展akka.actor.Actor 特质并实现receive方法来定义Actor,代码示例如下
    1. //通过扩展Actor并实现receive方法来定义Actor
    2. class MyActor extends Actor {
    3. //获取LoggingAdapter,用于日志输出
    4. val log = Logging(context.system, this)
    5. //实现receive方法,定义Actor的行为逻辑,返回的是一个偏函数
    6. def receive = {
    7. case "test" => log.info("received test")
    8. case _ => log.info("received unknown message")
    9. }
    10. }
    receive方法被定义在Actor当中,方法标签如下
    1. //Actor中的receive方法定义,
    2. type Receive = PartialFunction[Any, Unit]
    3. def receive: Actor.Receive
    下面给出其完整使用代码:
    1. object Example_01 extends App{
    2. import akka.actor.Actor
    3. import akka.event.Logging
    4. import akka.actor.ActorSystem
    5. import akka.actor.Props
    6. class MyActor extends Actor {
    7. val log = Logging(context.system, this)
    8. def receive = {
    9. case "test" => log.info("received test")
    10. case _ => log.info("received unknown message")
    11. }
    12. }
    13. //创建ActorSystem对象
    14. val system = ActorSystem("MyActorSystem")
    15. //返回ActorSystem的LoggingAdpater
    16. val systemLog=system.log
    17. //创建MyActor,指定actor名称为myactor
    18. val myactor = system.actorOf(Props[MyActor], name = "myactor")
    19. systemLog.info("准备向myactor发送消息")
    20. //向myactor发送消息
    21. myactor!"test"
    22. myactor! 123
    23. //关闭ActorSystem,停止程序的运行
    24. system.shutdown()
    25. }
    代码运行结果:
    1. [INFO] [04/02/2016 09:29:54.223] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
    2. [INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test
    3. [INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received unknown message
    输出“[INFO] [04/02/2016 09:29:54.224] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test”中的[MyActorSystem-akka.actor.default-dispatcher-3]为对应的线程名,[akka://MyActorSystem/user/myactor]为Actor路径信息, received test为
    1. def receive = {
    2. case "test" => log.info("received test")
    3. case _ => log.info("received unknown message")
    4. }
    方法处理后的输出。关于[akka://MyActorSystem/user/myactor]路径信息,将在后续内容中进行详细阐述。

也可以通过混入ActorLogging来实现日志功能,具体代码如下:

  1. class MyActor extends Actor with ActorLogging{
  2. def receive = {
  3. case "test" => log.info("received test")
  4. case _ => log.info("received unknown message")
  5. }
  6. }

ActorLogging的定义如下:

  1. trait ActorLogging { this: Actor
  2. private var _log: LoggingAdapter = _
  3. def log: LoggingAdapter = {
  4. // only used in Actor, i.e. thread safe
  5. if (_log eq null)
  6. _log = akka.event.Logging(context.system, this)
  7. _log
  8. }
  9. }

完整代码如下:

  1. /*
  2. *定义Actor时混入ActorLogging
  3. */
  4. object Example_02 extends App{
  5. import akka.actor.Actor
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. class MyActor extends Actor with ActorLogging{
  9. def receive = {
  10. case "test" => log.info("received test")
  11. case _ => log.info("received unknown message")
  12. }
  13. }
  14. //创建ActorSystem对象
  15. val system = ActorSystem("MyActorSystem")
  16. //返回ActorSystem的LoggingAdpater
  17. val systemLog=system.log
  18. //创建MyActor,指定actor名称为myactor
  19. val myactor = system.actorOf(Props[MyActor], name = "myactor")
  20. systemLog.info("准备向myactor发送消息")
  21. //向myactor发送消息
  22. myactor!"test"
  23. myactor! 123
  24. //关闭ActorSystem,停止程序的运行
  25. system.shutdown()
  26. }

代码运行结果:

  1. [INFO] [04/02/2016 09:39:21.088] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
  2. [INFO] [04/02/2016 09:39:21.089] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received test
  3. [INFO] [04/02/2016 09:39:21.089] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/myactor] received unknown message

代码原理与Example_01类似,这里不再赘述。

2. 创建Actor

在前面两个例子中,通过

  1. val myactor = system.actorOf(Props[MyActor], name = "myactor")

创建Actor,需要注意的是system.actorOf方法返回的是ActorRef对象,ActorRef为Actor的引用,使用ActorRef对象可以进行消息的发送等操作。Props为配置对象,在创建Actor时使用,它是不可变的对象,因此它是线程案例且完全可共享的。Akka中创建Actor时,也允许直接传入MyActor对象的引用,例如

  1. //直接通过new MyActor的方式传入MyActor对象的引用,注意这里是Props(new MyActor)
  2. val myactor = system.actorOf(Props(new MyActor), name = "myactor")

但是Akka不推荐这么做,官方文档给出的解释是这种方式会导致不可序列化的Props对象且可能会导致竞争条件(破坏Actor的封装性)。另外需要特别注意的是,不允许通过下列代码创建Actor

  1. //下列两行代码编译可以通过,但运行时出抛出异常
  2. val myActor=new MyActor
  3. val myactor = system.actorOf(Props(myActor), name = "myactor")

完整运行代码如下:

  1. /*
  2. *创建Actor
  3. */
  4. object Example_03 extends App{
  5. import akka.actor.Actor
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. class MyActor extends Actor with ActorLogging{
  9. def receive = {
  10. case "test" => log.info("received test")
  11. case _ => log.info("received unknown message")
  12. }
  13. }
  14. val system = ActorSystem("MyActorSystem")
  15. val systemLog=system.log
  16. //下列两行代码编译可以通过,但运行时出抛出异常
  17. val myActor=new MyActor
  18. val myactor = system.actorOf(Props(myActor), name = "myactor")
  19. systemLog.info("准备向myactor发送消息")
  20. //向myactor发送消息
  21. myactor!"test"
  22. myactor! 123
  23. //关闭ActorSystem,停止程序的运行
  24. system.shutdown()
  25. }

运行结果如下:

  1. Exception in thread "main" akka.actor.ActorInitializationException: You cannot create an instance of [chapter02.Example_03$MyActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.
  2. at akka.actor.ActorInitializationException$.apply(Actor.scala:167)
  3. at akka.actor.Actor$class.$init$(Actor.scala:423)
  4. at chapter02.Example_03$MyActor.<init>(MyActor.scala:73)
  5. at chapter02.Example_03$delayedInit$body.apply(MyActor.scala:84)
  6. at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
  7. at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
  8. at scala.App$$anonfun$main$1.apply(App.scala:71)
  9. at scala.App$$anonfun$main$1.apply(App.scala:71)
  10. at scala.collection.immutable.List.foreach(List.scala:318)
  11. at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
  12. at scala.App$class.main(App.scala:71)
  13. at chapter02.Example_03$.main(MyActor.scala:68)
  14. at chapter02.Example_03.main(MyActor.scala)
  15. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  16. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  17. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  18. at java.lang.reflect.Method.invoke(Method.java:606)
  19. at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

从“You cannot create an instance of [chapter02.Example_03$MyActor] explicitly using the constructor (new). You have to use one of the ‘actorOf’ factory methods to create a new actor.”可以看到,不能通过显式地调用构造函数创建Actor,只能使用actorOf工厂方法创建Actor。
下面介绍2种在实际中经常使用的Actor创建方法
(1)调用system.actorOf创建Actor

  1. val system = ActorSystem("mySystem")
  2. val myActor = system.actorOf(Props[MyActor], "myactor2")

完整代码在Example_01、Example_02中已经演示过了,这里需要说明的是通过system.actorOf工厂方法创建的Actor为顶级Actor
Akka并发编程——第二节:Actor模型(一) - 图1
在Akka框架中,每个Akka应用程序都会有一个守卫Actor,名称为user,所有通过system.actorOf工厂方法创建的Actor都为user的子Actor,也是整个Akka程序的顶级Actor。
(2)调用context.actorOf创建Actor
完整代码如下:

  1. /*
  2. *创建Actor,调用context.actorOf方法
  3. */
  4. object Example_04 extends App{
  5. import akka.actor.Actor
  6. import akka.actor.ActorSystem
  7. import akka.actor.Props
  8. class FirstActor extends Actor with ActorLogging{
  9. //通过context.actorOf方法创建Actor
  10. val child = context.actorOf(Props[MyActor], name = "myChild")
  11. def receive = {
  12. case x => child ! x;log.info("received "+x)
  13. }
  14. }
  15. class MyActor extends Actor with ActorLogging{
  16. def receive = {
  17. case "test" => log.info("received test")
  18. case _ => log.info("received unknown message")
  19. }
  20. }
  21. val system = ActorSystem("MyActorSystem")
  22. val systemLog=system.log
  23. //创建FirstActor对象
  24. val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  25. systemLog.info("准备向myactor发送消息")
  26. //向myactor发送消息
  27. myactor!"test"
  28. myactor! 123
  29. Thread.sleep(5000)
  30. //关闭ActorSystem,停止程序的运行
  31. system.shutdown()
  32. }

代码运行结果

  1. [INFO] [04/02/2016 15:05:34.770] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
  2. [INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myChild] received test
  3. [INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] received test
  4. [INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] received 123
  5. [INFO] [04/02/2016 15:05:34.771] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myChild] received unknown message

通过代码的运行结果可以看到,FirstActor的Actor路径信息为akka://MyActorSystem/user/firstActor,而通过

  1. class FirstActor extends Actor with ActorLogging{
  2. //通过context.actorOf方法创建Actor
  3. val child = context.actorOf(Props[MyActor], name = "myChild")
  4. def receive = {
  5. case x => child ! x;log.info("received "+x)
  6. }
  7. }

代码使用context.actorOf创建的MyActor,其Actor路径信息为[akka://MyActorSystem/user/firstActor/myChild],这意味着mychild为firstActor的子Actor,层次结构如下图所示
Akka并发编程——第二节:Actor模型(一) - 图2
也就是说context.actorOf和system.actorOf的差别是system.actorOf创建的actor为顶级Actor,而context.actorOf方法创建的actor为调用该方法的Actor的子Actor