Actor介绍

java中,每个对象都有monitor逻辑监视器,用来控制对象的多线程访问,通过添加sychronized关键字来标记。但容易出现死锁等问题。
Actor并发编程模型,是Scala提供的一种事件模型的并发机制,Actor不共享数据,依赖消息传递,有效避免资源争夺,死锁等情况
image.png

  1. scala再2.11版本加入了Akka并发编程框架,老版本以废弃
  2. Actor和Akka很像

创建Actor

通过类或者单例对象,继承Actor特质的方式,来创建Actor对象

  1. 定义class或object继承Actor特质
  2. 重写act方法
  3. 调用Actor的start方法执行Actor

案例: 通过class实现

  1. import scala.actors.Actor
  2. object Test {
  3. class Actor1 extends Actor {
  4. override def act(): Unit = {
  5. // 打印1-10
  6. for(i <- 1 to 10) println("Actor1..." + i)
  7. }
  8. }
  9. class Actor2 extends Actor {
  10. override def act(): Unit = {
  11. // 打印1-10
  12. for(i <- 1 to 10) println("Actor1..." + i)
  13. }
  14. }
  15. def main(args: Array[String]): Unit = {
  16. val a1 = new Actor1()
  17. a1.start() // 自动调用act方法
  18. val a2 = new Actor2()
  19. a2.start()
  20. }
  21. }

案例:通过object实现

  1. object Actor1 extends Actor{
  2. override def act(): Unit = {
  3. // 打印1-10
  4. for(i <- 1 to 10) println("Actor1..." + i)
  5. }
  6. }
  7. def main(args: Array[String]): Unit = {
  8. Actor1.start()
  9. }

发送消息/接受消息

发送消息

! 发送异步消息,没有返回值
!? 发送同步消息,等待返回值
!! 发送异步消息,返回值是Future[Any]

给actor1发送一个异步字符串消息
actor1 ! "你好!"

接收消息

Actor中使用receive方法来接受消息,需要给receive方法传入一个偏函数

  1. {
  2. case 变量名1: 消息类型1 => 业务处理1
  3. case 变量名2: 消息类型2 => 业务处理2
  4. ...
  5. }

注意:receive方法只能接收一个消息,接受完后继续执行act方法

案例:发送接收一句话

  1. object Test {
  2. object ActorSender extends Actor {
  3. override def act(): Unit = {
  4. ActorReceiver ! "你好, 我是sender"
  5. ActorReceiver ! "你叫什么?"
  6. }
  7. }
  8. object ActorReceiver extends Actor {
  9. override def act(): Unit = {
  10. // receive()只能接收一次消息
  11. receive {
  12. case x: String => println(x)
  13. }
  14. }
  15. }
  16. def main(args:Array[String]): Unit = {
  17. ActorSender.start()
  18. ActorReceiver.start()
  19. }
  20. }
  21. // 结果: 你好, 我是sender

案例: 持续发送和接收消息

  1. object Test {
  2. object ActorSender extends Actor {
  3. override def act(): Unit = {
  4. while(true) {
  5. ActorReceiver ! "你好, 我是sender"
  6. // 休眠3秒
  7. TimeUnit.SECONDS.sleep(3)
  8. }
  9. }
  10. }
  11. object ActorReceiver extends Actor {
  12. override def act(): Unit = {
  13. while(true){
  14. receive {
  15. case x: String => println(x)
  16. }
  17. }
  18. }
  19. }
  20. def main(args:Array[String]): Unit = {
  21. ActorSender.start()
  22. ActorReceiver.start()
  23. }
  24. }

案例: 优化持续接收消息

  1. object ActorSender extends Actor {
  2. override def act(): Unit = {
  3. while(true) {
  4. ActorReceiver ! "你好, 我是sender"
  5. // 休眠3秒
  6. TimeUnit.SECONDS.sleep(3)
  7. }
  8. }
  9. }
  10. object ActorReceiver extends Actor {
  11. override def act(): Unit = {
  12. // 提高效率
  13. loop {
  14. react{
  15. case x: String => println(x)
  16. }
  17. }
  18. }
  19. }

案例:发送和接收自定义消息(同步有返回)

  1. object Test {
  2. case class Message(id: Int, message: String)
  3. case class ReplyMessage(message: String, name: String)
  4. object MsgActor extends Actor {
  5. override def act(): Unit = {
  6. loop {
  7. react{
  8. case Message(id, message) => {
  9. println(s"我是MsgActor,我接受到的消息是${message}")
  10. // 回复
  11. sender ! ReplyMessage("我很好","赵丽颖")
  12. }
  13. }
  14. }
  15. }
  16. // main方法底层也是一个Actor,MainActor
  17. def main(args: Array[String]): Unit = {
  18. MsgActor.start()
  19. // 发送并接收
  20. val result: Any = MsgActor !? Message(1, "你好,我是MainActor")
  21. // 转型
  22. val reply: ReplyMessage = result.asInstanceOf[ReplyMessage]
  23. }
  24. }

案例:发送和接收自定义消息(异步有返回)

  • 使用!!
  • 返回类型为Future[Any]
  • Future表示异步返回数据的封装,虽然获取到Future的返回值,但不一定有值,可能在未来某一时刻才会有返回消息
  • Future的isSet()可检查是否已经收到返回消息,apply()方法可以获取返回数据

image.png

  1. object Test {
  2. case class Message(id: Int, message: String)
  3. case class ReplyMessage(message: String, name: String)
  4. object MsgActor extends Actor {
  5. override def act(): Unit = {
  6. loop {
  7. react{
  8. case Message(id, message) => {
  9. println(s"我是MsgActor,我接受到的消息是${message}")
  10. // 回复
  11. sender ! ReplyMessage("我很好","赵丽颖")
  12. }
  13. }
  14. }
  15. }
  16. // main方法底层也是一个Actor,MainActor
  17. def main(args: Array[String]): Unit = {
  18. MsgActor.start()
  19. // 发送并接收
  20. val future: Future[Any] = MsgActor !! Message(1, "你好,我是MainActor")
  21. while(!future.isSet){}
  22. val reply: ReplyMessage = future.apply().asInstanceOf[ReplyMessage]
  23. }
  24. }

WordCount

需求:

  1. 进行单词统计文本文件名发送给WordCountActor
  2. 接收每一个WordCountActor返回的结果进行合并

步骤:
1. 获取所有文件的全路径
2. 根据文件数量,创建指定个数的WordCountActor对象
wordCountActor对象要和文件拉链到一起
3. 启动WordCountActor对象,统计文件中单词数量
4. 每个WordCountActor都需要对文件中的单词数量进行统计
5. 每个WordCountActor统计后的结果,都要返回给MainActor
6. 对接收到的每个返回数据统计并打印

  1. object MainActor {
  2. def main(args: Array[String]): Unit = {
  3. // 1. 获取所有要统计的文件的路径
  4. var dir = "./data/"
  5. var fileNameList = new File(dir).list().toList
  6. val fileDirList = fileNameList.map(dir + _)
  7. println(fileDirList)
  8. // 2. 根据文件数量创建对应个数的WordCountActor对象
  9. val wordCountList = fileNameList.map(_ => new WordCountActor)
  10. val actorWithFile = wordCountList.zip(fileDirList)
  11. // WordCountActor1 -> ./data/1.txt WordCountActor2 -> ./data/2.txt
  12. println(actorWithFile)
  13. // 3.
  14. val futureList: List[Future[Any]] = actorWithFile.map {
  15. keyVal =>
  16. val actor = keyVal._1
  17. actor.start()
  18. val future = actor !! WordCountTask(keyVal._2)
  19. future
  20. }
  21. // 如果没有返回具体值的future对象个数不为零,说明还有Actor没有返回值
  22. while(futureList.filter(!_.isSet).size != 0){}
  23. val wordCountMap = futureList.map(_.apply().asInstanceOf[WordCountResult].wordCountMap)
  24. val result = wordCountMap.flatten.groupBy(_._1).map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum)
  25. println(result)
  26. }
  27. }
  28. class WordCountActor extends Actor {
  29. override def act(): Unit = {
  30. loop {
  31. react {
  32. case WordCountTask(fileName) =>
  33. println(s"获取到的任务是 ${fileName}")
  34. //4.
  35. val lineList = Source.fromFile(fileName).getLines().toList // List("hadoop sqoop hadoop" , "hadoop hadoop flume")
  36. val strList = lineList.flatMap(_.split(" ")) // List("hadoop", "sqoop", "hadoop", "hadoop", "hadoop", flume")
  37. val wordCount = strList.map(_ -> 1) // List("hadoop"->1, "sqoop"->1, "hadoop"->1, "hadoop"->1, "hadoop"->1, flume"->1)
  38. val groupMap = wordCount.groupBy(_._1) // "hadoop" -> List("hadoop"->1, "hadpop"->1), "sqoop" -> List("sqoop" -> 1)
  39. // ↓ ↓
  40. val wordCountMap = groupMap.map(keyVal => keyVal._1 -> keyVal._2.map(_._2).sum )
  41. //5.
  42. sender ! WordCountResult(wordCountMap)
  43. }
  44. }
  45. }
  46. }
  47. case class WordCountTask(fileName: String)
  48. case class WordCountResult(wordCountMap: Map[String, Int])
  49. 1.txt
  50. hadoop sqoop hadoop
  51. hadoop hadoop flume