注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。
Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃

什么是Scala Actor

概念

Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息(message)的发送、接收来实现多线程的。使用Scala能够更容易地实现多线程应用的开发。

传统java并发编程与Scala Actor编程的区别

Actor编程 - 图1

对于Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的try…catch语句块中加上wait方法、notify方法、notifyAll方法是让人很头疼的。原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。 而在Scala中,我们可以通过复制不可变状态的资源(即对象,Scala中一切都是对象,连函数、方法也是)的一个副本,再基于Actor的消息发送、接收机制进行并行编程

Actor方法执行顺序

  1. 首先调用start()方法启动Actor

  2. 调用start()方法后其act()方法会被执行

  3. 向Actor发送消息

发送消息的方式

! 发送异步消息,没有返回值。
!? 发送同步消息,等待返回值。
!! 发送异步消息,返回值是 Future[Any]。

Actor实战

第一个例子

  1. //注意导包是scala.actors.Actor
  2. import scala.actors.Actor
  3. object MyActor1 extends Actor{
  4. //重新act方法
  5. def act(){
  6. for(i <- 1 to 10){
  7. println("actor-1 " + i)
  8. Thread.sleep(2000)
  9. }
  10. }
  11. }
  12. object MyActor2 extends Actor{
  13. //重新act方法
  14. def act(){
  15. for(i <- 1 to 10){
  16. println("actor-2 " + i)
  17. Thread.sleep(2000)
  18. }
  19. }
  20. }
  21. object ActorTest extends App{
  22. //启动Actor
  23. MyActor1.start()
  24. MyActor2.start()
  25. }

说明:上面分别调用了两个单例对象的start()方法,他们的act()方法会被执行,相同与在java中开启了两个线程,线程的run()方法会被执行
注意:这两个Actor是并行执行的,act()方法中的for循环执行完成后actor程序就退出了

第二个例子(可以不断地接收消息)

  1. import scala.actors.Actor
  2. class MyActor extends Actor {
  3. override def act(): Unit = {
  4. while (true) {
  5. receive {
  6. case "start" => {
  7. println("starting ...")
  8. Thread.sleep(5000)
  9. println("started")
  10. }
  11. case "stop" => {
  12. println("stopping ...")
  13. Thread.sleep(5000)
  14. println("stopped ...")
  15. }
  16. }
  17. }
  18. }
  19. }
  20. object MyActor {
  21. def main(args: Array[String]) {
  22. val actor = new MyActor
  23. actor.start()
  24. actor ! "start"
  25. actor ! "stop"
  26. println("消息发送完成!")
  27. }
  28. }

说明:在act()方法中加入了while (true) 循环,就可以不停的接收消息
注意:发送start消息和stop的消息是异步的,但是Actor接收到消息执行的过程是同步的按顺序执行

第三个例子(react方式会复用线程,比receive更高效)

示例1

import scala.actors.Actor

class YourActor extends Actor {
/*
*  loop + react 相当于加入一个ThreadPool(线程池),提高了线程的并发复用
*/
  override def act(): Unit = {
    //调用loop方法,循环
    loop {
      //react 偏函数(偏函数两个参数一个为输入,一个为输出),不用写match
      //作用:复用线程
      react {
        case "start" => {
          println("starting ...")
          Thread.sleep(5000)
          println("started")
        }
        case "stop" => {
          println("stopping ...")
          Thread.sleep(8000)
          println("stopped ...")
        }
      }
    }
  }
}

object YourActor {
  def main(args: Array[String]) {
    val actor = new YourActor
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("消息发送完成!")
  }
}

示例2

package com.zhiyoulxj.actor.actor

import scala.actors.Actor
/*
*  自己给自己发送消息,实现Actor模型通信
* */
class MyActor extends Actor{
  override def act(): Unit = {
    while (true) {
//receive是一个偏函数(用于匹配接收到消息),接收actor发送的消息
      receive{
        case "start" =>{
          print("start.....")
          Thread.sleep(5000)
        }
        case "stop" =>{
          print("stop.....")
          Thread.sleep(5000)
        }
      }
    }
  }
}
/*
*  loop + react 相当于加入一个ThreadPool(线程池),提高了线程的并发复用
*/
class MyActor1 extends Actor {
  override def act(): Unit = {
    //调用loop方法,循环
    loop{
      //react 偏函数(偏函数两个参数一个为输入,一个为输出),不用写match
      //作用:复用线程
      react{
        case "start" => {
          println("startint...")
        }
        case "stop" => {
          println("stopping...")
        }
      }
    }
  }
}
//伴生对象
object MyActor{
  def main(args: Array[String]): Unit = {
    /*val actor = new MyActor
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("消息发送成功")*/
    val actor1 = new MyActor1
    actor1.start()
    actor1 ! "start"
    actor1 ! "stop"
    println("输入完成")
  }
}

说明: react 如果要反复执行消息处理,react外层要用loop,不能用while

第四个例子(结合case class发送消息)

示例1

import scala.actors.Actor
class AppleActor extends Actor {

  def act(): Unit = {
    while (true) {
      receive {
        case "start" => println("starting ...")
        case SyncMsg(id, msg) => {
          println(id + ",sync " + msg)
          Thread.sleep(5000)
          sender ! ReplyMsg(3,"finished")
        }
        case AsyncMsg(id, msg) => {
          println(id + ",async " + msg)
          Thread.sleep(5000)
        }
      }
    }
  }
}

object AppleActor {
  def main(args: Array[String]) {
    val a = new AppleActor
    a.start()
    //异步消息
    a ! AsyncMsg(1, "hello actor")
    println("异步消息发送完成")
    //同步消息
    //val content = a.!?(1000, SyncMsg(2, "hello actor"))
    //println(content)
    val reply = a !! SyncMsg(2, "hello actor")
    println(reply.isSet)
    //println("123")
    val c = reply.apply()
    println(reply.isSet)
    println(c)
  }
}
case class SyncMsg(id : Int, msg: String)
case class AsyncMsg(id : Int, msg: String)
case class ReplyMsg(id : Int, msg: String)

示例2

package com.zhiyoulxj.actor.actor

import scala.actors.Actor
case class AsyMSG(id: Int,name: String)   //异步
case class SynMSG(id: Int,name: String)   //同步
case class FutureMSG(id: Int,name: String)   //带返回值的:Future

/*
*  Actor三种消息发送模式  !异步无返回值   !?同步等待返回值    !!异有返回值
* */
class CaseClassActor extends Actor{
  override def act(): Unit = {
    while (true) {
      receive {
        case "start" => println("starting....")
        case AsyMSG(id, name) => {
          println(id + " Asy... " + name)
        }
        case SynMSG(id, name) => {
          println(id + " Syn... " + name)
          sender ! FutureMSG(3,"wangwu finish")
        }
        case FutureMSG(id, name) => {
          println(id + " Future... " + name)
        }
      }
    }
  }
}
object CaseClassActor{
  def main(args: Array[String]): Unit = {
    val actor = new CaseClassActor
    actor.start()
//    actor ! AsyMSG(1,"zhangsan 异步")
//    println("!:发送异步消息没有返回值")
//    val returnVal = actor !? (2000,SynMSG(2,"李四 同步"))
//    println(returnVal)
//    print("!?:发送同步消息等待返回值")

    //!! 发送异步消息,返回值是 Future[Any]。
    // 异步发送消息后,不会等待返回值发送过来,随机拿走一个"空箱子",返回值发送过来后就放进去
    // 当正常使用用的就是发送过来的返回值
      val FutureVal = actor !! SynMSG(4,"zhaoliu 同步有返回值")
      println(FutureVal.isSet)
      val c = FutureVal.apply()
    //FutureVal.apply()   ---->toString()
      println(c)
  }
}

练习

用actor并发编程写一个单机版的WorldCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果

Actor中的WordCount

Actor编程 - 图2

示例1

import java.io.File

import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

/**
  * Created by ZX on 2016/4/4.
  */
class Task extends Actor {

  override def act(): Unit = {
    loop {
      react {
        case SubmitTask(fileName) => {
          val contents = Source.fromFile(new File(fileName)).mkString
          val arr = contents.split("\r\n")
          val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
          //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
        case StopTask => {
          exit()
        }
      }
    }
  }
}

object WorkCount {
  def main(args: Array[String]) {
    val files = Array("c://words.txt", "c://words.log")

    val replaySet = new mutable.HashSet[Future[Any]]
    val resultList = new mutable.ListBuffer[ResultTask]

    for(f <- files) {
      val t = new Task
      val replay = t.start() !! SubmitTask(f)
      replaySet += replay
    }

    while(replaySet.size > 0){
      val toCumpute = replaySet.filter(_.isSet)
      for(r <- toCumpute){
        val result = r.apply()
        resultList += result.asInstanceOf[ResultTask]
        replaySet.remove(r)
      }
      Thread.sleep(100)
    }
    val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
    println(finalResult)
  }
}

case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])

示例2

package com.zhiyoulxj.actor.actor

import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source

class ActorWordCount extends Actor{
  override def act(): Unit ={
    loop{
      react{
        case MapTask(filename) => {
          //Map的业务逻辑 1.读取文件  2.单词切割
          // 3.(key,1)加 1 操作 4.combiner
          //combiner局部汇总,结果是Map[String,Int]
//          Source.fromFile(filename)从本地读文件内容
          val result = Source.fromFile(filename).getLines().
            flatMap(_.split(" ")).map((_,1)).
            toList.groupBy(_._1).mapValues(_.size)
          //将Map结果发送给Reduce
          sender ! ReduceTask(result)
        }
        case ExistTask => {
          exit()
        }
      }
    }
  }
}
object ActorWordCount {
  def main(args: Array[String]): Unit = {
    val resSet = new mutable.HashSet[Future[Any]]()
    val resultList = new ListBuffer[ReduceTask]
    val files = Array[String]("D://words.txt", "D://MR.txt")
    for (filename <- files) {
      val actor = new ActorWordCount
      //启动actor并向MapTask发送消息,返回值是一个Future
      val res = actor.start() !! MapTask(filename)
      resSet += res //把这些Future放到Set集合中
    }
    while (resSet.size > 0){
      //isSet判断resSet中Future返回值是否可用
      val toHandle = resSet.filter(_.isSet)  //取出有效的结果集,待处理的数据
      for(f <- toHandle) {
        var result = f.apply()
        //获取ReduceTask的实例
        val result1 = result.asInstanceOf[ReduceTask] //asInstanceOf  相当于java中的强转A-->B
        resultList += result1 //将有效的数据放到另一个容器中
        resSet -= f //从future所在集合中删除用过的元素
      }
    }
//    println(resultList)
    //resultList:((hello,3),(hello,2),(tom,1)....)
    var r = resultList.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
    println(r.toBuffer)
  }
}

case class MapTask(filename: String)
case class ReduceTask(result:Map[String,Int])
case object ExistTask

RPC通信(AKKA)

Master类与Worker类

Actor编程 - 图3

业务逻辑:

  1. 启动Master,然后启动所有的worker。

  2. Worker向Master发送建立连接。

  3. 向Master注册,向Master发送列表信息,通过一个封装好的类。

  4. Master收到Worker的注册信息将Worker信息保存起来,然后向Worker发送反馈注册成功。

  5. Worker要定时的向Master发送心跳,为了告诉Master我还活着。

  6. Master会定时清除超时的Worker。

Master类

package com.zhiyoulxj.actor.akka

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable

class Master extends Actor {
  //用来封装worker传输过来的信息,value最好用一个实体类,以后方便实用其中的属性
  val ids = new mutable.HashMap[String,WorkInfo]()
  //保存Map中的WorkInfo的集合,方便以后Master对象其中属性的排序
  val workers = new mutable.HashSet[WorkInfo]()
  val CHECK_BEAT = 15000
  //生命周期之启动Actor    ctrl+o   查询未实现的方法
  override def preStart(): Unit = {
    print("Actor is preStart ....")
    //定时器,检查work心跳是否正常
    //隐式转换,增强功能
    import context.dispatcher
    context.system.scheduler.schedule(0.millis,CHECK_BEAT.millis,self,CheckTimeOut)
  }
  //用于接收信息
  override def receive: Receive = {
    //接收客户端的注册信息并保存数据
    case RegisterWorker(id,mem,cores) =>{
      //判断一下worker是否注册过
        if (!ids.contains(id)) {
          val workInfo = new WorkInfo(id, mem, cores)
          //保存数据策略:1.保存在内存  2.持久化到磁盘  3.保存到zookeeper
          //map.put操作
          ids(id) = workInfo
          //Set的追加操作
          workers += workInfo
          //创建样例类发送注册成功的确认消息
          sender ! RegisterFinish("akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
        }
    }
    case HeartBeat(id) =>{
      if(ids.contains(id)){
        val workInfo = ids(id)
        val currentTime = System.currentTimeMillis()
        //把心跳时间用当前时间置换
        workInfo.LastBeat = currentTime
      }
    }
    case CheckTimeOut => {
      val currentTime = System.currentTimeMillis()
      val toClean = workers.filter(x=>currentTime - x.LastBeat > CHECK_BEAT)
      for (w <- toClean){
        workers -= w
        ids -= w.id
      }
      println("活着的worker数量: "+workers.size)
    }
  }
}
object Master{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    //准备配置文件
    val config =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
     """.stripMargin
    val cfg = ConfigFactory.parseString(config)
    //ActorSystem:Actor的领导,监控所有的actor,singletong 单例
    val actorSystem = ActorSystem("MasterSystem",cfg)
    //创建actor
    val master = actorSystem.actorOf(Props[Master],"Master")
    master ! "nihao"
    actorSystem.awaitTermination()  //让进程等待,不结束
  }
}

Worker类

package com.zhiyoulxj.actor.akka

import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

class Worker(val masterHost:String,val masterPort:Int,val mem:Int,var cores:Int) extends Actor {
  var master:ActorSelection = _
  val workersID = UUID.randomUUID().toString
  val HEART_BEAT = 10000
  //建立连接
  override def preStart(): Unit = {
    //参数需要有/user/master
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    //向master发送注册信息,Master的receive方法中的case class 接收
    master ! RegisterWorker(workersID,mem,cores)
  }
  override def receive: Receive = {
    //worker收到master返回的确认消息
    case RegisterFinish(masterURL) =>{
      println(masterURL)
      //定时发送心跳    scheduler 调度
      //导入隐式转换
      import context.dispatcher
      //自己给自己发  self (发送者:自己) SendHeartBeat(发送内容)
      context.system.scheduler.schedule(0.millis,HEART_BEAT.millis,self,SendHeartBeat)
    }
      //接收自己发送给自己的心跳,然后再发送给master
    case SendHeartBeat =>{
      println("确认接收到心跳")
      master ! HeartBeat(workersID)
    }
  }
}
object Worker{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    val mem = args(4).toInt
    val cores = args(5).toInt
    //准备配置
    val config =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val cfg = ConfigFactory.parseString(config)
    //ActorSystem 单例  actor的管理者
    val actorSystem = ActorSystem("WorkerSystem",cfg)
    //actorOf 实例化一个actor
    actorSystem.actorOf(Props(new Worker(masterHost,masterPort,mem,cores)),"Worker")
    actorSystem.awaitTermination()
  }
}

所用样例类

package com.zhiyoulxj.actor.akka

/*
*  交互信息:用来封装Master与Worker进行交互的信息
*  1.注册  2.Master的反馈  3.心跳  4.检查信息(时间超时)
*  这里面都是样例类,因为在网络间传输,所以必须实现序列化
*/
trait IntervalMessage extends Serializable
  //注册样例类,封装了从Worker到Master的注册消息  worker-->master
  case class RegisterWorker(id:String,mem:Int,cores:Int)extends IntervalMessage
  //注册成功确认消息   master --->worker
  case class RegisterFinish(masterURL:String)extends  IntervalMessage
  //发送心跳  worker --> worker(不用序列化)
  case object SendHeartBeat
  //发送心跳  worker ---> master
  case class HeartBeat(id:String) extends IntervalMessage
  //超时检查 master-->master
  case object CheckTimeOut

封装信息的类

package com.zhiyoulxj.actor.akka

/*
*  用来封装master接收到的worker的信息
*/
class WorkInfo(val id:String,val mem:Int,val cores:Int) {
  var LastBeat:Long=_
}