Akka是一个用于构建高并发,分布式可扩展的基于事件驱动的应用工具包。
image.png

创建Actor

Api介绍

  • ActorSystem负责创建和监督Actor

    1. ActorSystem是一个重量级结构,需要分配多个线程
    2. ActorSystem通常是一个单例对象,可以创建很多Actor
    3. 直接使用context.system就可以获取到管理该Actor的ActorSystem的引用
  • 实现Actor类

    1. 定义类或单例对象继承Actor
    2. 实现receive方法,在方法里处理消息。不需要loop react Akka会自动调用receive接收消息
    3. 还可以实现preStart()方法
  • 加载Actor

    1. 要创建Akka的Actor,必须先获取创建一个ActorSystem。需要给一个指定名称,并加载一些配置项
    2. 调用ActorSystem.actorOf(Props(Actor对象), "Actor名字")来加载Actor
  1. case class SubmitTaskMessage(msg: String)
  2. case class SuccessSubmitTaskMessage(msg: String)
  3. object SenderActor {
  4. override def receive: Receive = {
  5. case "start" => {
  6. println("接收到start")
  7. // 获取ReceiverActor路径
  8. // 格式:akka://actorSystem的名字/user/具体获取对象的名字
  9. val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")
  10. receiverActor ! SubmitTaskMessage("我在给你发消息")
  11. }
  12. case SuccessSubmitTaskMessage(msg) =>
  13. println(s"接收到消息${msg}")
  14. }
  15. }
  16. object ReceiverActor {
  17. override def receive: Receive = {
  18. case SubmitTaskMessage(msg) => {
  19. println(s"接收到的消息是${msg}")
  20. sender ! SuccessSubmitTaskMessage("接收任务成功!")
  21. }
  22. }
  23. object Entrance {
  24. def main(args: Array[String]): Unit = {
  25. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
  26. val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
  27. val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
  28. senderActor ! "start"
  29. }
  30. }

Akka定时任务

ActorSystem.scheduler.schedule()方法可以启动一个定时任务
def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExcutionContext)
def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => Unit)(implicit executor: ExcutionContext)

  1. import akka.actor.Actor
  2. object MainActor{
  3. object ReceiverActor extends Actor {
  4. override def receive: Receive = {
  5. case x => println(x)
  6. }
  7. }
  8. def main(args: Array[String]): Unit = {
  9. val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
  10. val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
  11. //导入隐式转换,支持定时器
  12. import actorSystem,dispatcher
  13. // 导入隐式参数,用来给定时器设置默认值
  14. import scala.concurrent.duration._
  15. // 通过定时器3秒开始,间隔2秒,给ReceiverActor发送一句话
  16. // 方式一 采用发送消息形式
  17. actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "恭喜")
  18. // 方式二 采用自定义消息,结合函数
  19. actorSyste.scheduler.schedule(3 seconds, 2 seconds)(receiverActor ! "恭喜")
  20. // 实际写法
  21. actorSyste.scheduler.schedule(3 seconds, 2 seconds){
  22. receiverActor ! "恭喜"
  23. }
  24. }
  25. }

实现两个进程之间的通信

  1. object WorkerActor extends Actor {
  2. override def receive(): Receive = {
  3. case "setup" => {
  4. println("接收到消息setup")
  5. // 获取远程Actor
  6. val masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8888/user/masterActior")
  7. masterActor ! "connect"
  8. }
  9. case "success" => println("接收到消息success")
  10. }
  11. }
  12. object Entrance {
  13. def main(args: Array[String]): Unit = {
  14. val actorSystem = ActorSystem("actorSystem", ConfigFictory.load())
  15. val workerActor = actorSystem.actorOf(Props(WorkerAcotr),"workerActor")
  16. workerActor ! "setup"
  17. }
  18. }
  19. ------------------------------------------------------------
  20. // 对象地址: akka.tcp://actorSystem@127.0.0.1:8888
  21. object MasterActor extends Actor {
  22. override def receive: Receive = {
  23. case "connect" => {
  24. println("接收到消息connect")
  25. // 获取远程Actor
  26. sender ! "success"
  27. }
  28. }
  29. }
  30. object Entrance {
  31. def main(args: Array[String]): Unit = {
  32. val actorSystem = ActorSystem("actorSystem", ConfigFictory.load())
  33. val workerActor = actorSystem.actorOf(Props(MasterAcotr),"masterActor")
  34. }
  35. }

两个main函数都需要启动