一、Akka的远程通信
需要准备两个项目,master worker
master 启动一个端口,
worker 连接这个端口
需要创建一个空项目,里面有两个子项目。
创建两个子项目(maven项目)


导入坐标:
<dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.11</artifactId><version>2.5.21</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.11</artifactId><version>2.5.21</version></dependency></dependencies>
按照以上方式,再弄一个 app-workder项目(步骤省略)
开始准备代码:
思路:
开始的时候master启动,等待 worker进行消息的发送,workder发送消息之后,master给它回应一个消息。
跟昨天一样,只是本次是使用了两个项目,不同端口进行的通信。

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:03
* @Version 1.0
*/
object MasterActor extends Actor{
override def receive: Receive = {
//接收消息的
case "connect" => println("MasterActor 收到connect消息")
sender ! "success"
}
}
object Master {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
// 加载masterActor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "8888"
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:13
* @Version 1.0
*/
object WorkerActor extends Actor{
override def receive: Receive = {
case "setup" => {
println("worker启动成功!")
//要发送消息给masterWorker
val masterActorRef: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor")
masterActorRef ! "connect"
}
case "success" => {
println("连接master成功!")
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
// 加载masterActor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")
workerActor ! "setup"
}
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "9999"
二、Akka的定时任务
Akka中,提供了scheduler 对象实现定时调度,需要导入对应的包。
schedule 方法中有几个参数:
参数一: initxxDelay 延迟多久开启定时任务
参数二: interval 每隔多久执行一次
参数三: ActorRef 将消息发送给哪个actor
参数四: message 要发送的消息
(implicit xxxx) 隐式参数需要手动的导入
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import javafx.util.Duration
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:32
* @Version 1.0
*/
object TimeActor extends Actor{
override def receive: Receive = {
case x => println(x)
}
}
object DingShi {
def main(args: Array[String]): Unit = {
val timeActorSystem: ActorSystem = ActorSystem("timeActorSystem", ConfigFactory.load())
val timeActor: ActorRef = timeActorSystem.actorOf(Props(TimeActor), "timeActor")
//导入一个隐式转换,_ 代表这个类下的所有隐式转换方法 不导入没有办法使用seconds
import scala.concurrent.duration._
// 导入隐式参数 不导入会报错
import timeActorSystem.dispatcher
timeActorSystem.scheduler.schedule(
0 seconds,
1 seconds,
timeActor,
"Hello"
)
}
}
schedule 方法中有几个参数:
参数一: initxxDelay 延迟多久开启定时任务
参数二: interval 每隔多久执行一次
此方法后面可以跟一个函数 (f:=>Unit)
(implicit xxxx) 隐式参数需要手动的导入
这个定时方法的意思是:每隔多久执行一次指定的函数
三、使用Akka编写一个关于Spark通信的模型
1、环境准备

首先准备环境:
spark-master : worker管理者
spark-worker : worker本身
spark-common : 此处存放两个项目共有的一些类
创建项目跟远程通信一样,此处省略了

以上创三个项目:
三个项目都需要做以下操作:
三个项目都需要创建文件夹
导入包:
master和worker 需要导入三个包:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.21</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.21</version>
</dependency>
</dependencies>
common 项目中,只需要导入一个包:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
</dependencies>
接着想一个问题:你为什么要创建common?
master和worker 需要导入common的坐标:
<dependency>
<groupId>com.qfedu</groupId>
<artifactId>spark-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2、编写简单的通信(让worker可以连接到master即可)
复制之前的远程通信代码。
在common项目中,编写通用的模板消息:
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 11:42
* @Version 1.0
*/
case class WorkerRegisterMessage(workerId:String,cpu:Int,mem:Int)
case object RegisterSuccessMessage
编写worker端,在worker启动时就发送注册信息给master:
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import java.util.{Random, UUID}
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:13
* @Version 1.0
*/
object WorkerActor extends Actor{
private var workerId:String = _
private var cpu:Int = _
private var mem:Int = _
private val CPU_LIST = List(1,2,4,6,8)
private val MEM_LIST = List(512,1024,2048,4096)
// 在workActor 启动之前就加载了
override def preStart(): Unit = {
val masterActorRef: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:7777/user/masterActor")
// 造数据,随机数据
workerId = UUID.randomUUID().toString
var random = new Random()
cpu = CPU_LIST(random.nextInt(CPU_LIST.length))
mem = MEM_LIST(random.nextInt(MEM_LIST.length))
masterActorRef ! WorkerRegisterMessage(workerId,cpu,mem)
}
override def receive: Receive = {
case RegisterSuccessMessage => {
println("worker已经收到了master的回复,注册成功")
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
// 加载masterActor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")
workerActor ! "setup"
}
}
配置文件:
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7001"
编写消息的接收方Master :
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:03
* @Version 1.0
*/
object MasterActor extends Actor{
override def receive: Receive = {
//接收消息的
case WorkerRegisterMessage(workerId,cpu,mem) => {
println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
}
sender ! RegisterSuccessMessage
}
}
object Master {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
// 加载masterActor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
配置文件:
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7777"
最终的效果是:
先启动master 一直等待 worker的连接。
启动worker :
启动成功就立即发送 连接请求,发送的消息模式是:WorkerRegisterMessage
master 接收到消息之后,打印 WorkerRegisterMessage 信息,立即发送一个 RegisterSuccessMessage
给worker ,worker接收到之后,打印连接成功!
3、Master获取Worker的注册信息
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import java.util.Date
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:03
* @Version 1.0
*/
object MasterActor extends Actor{
// 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()
override def receive: Receive = {
//接收消息的
case WorkerRegisterMessage(workerId,cpu,mem) => {
println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
//传递过来的map需要保存下来:
regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
sender ! RegisterSuccessMessage
}
case WorkerHeartBeatMessage(workerId,cpu,mem) => {
// 接收到心跳数据:更新最新的时间
var workInfo: WorkInfo = regWorkerMap(workerId)
// 每次接收到心跳数据后,都更新一下最新的时间
workInfo.date = (new Date()).getTime
regWorkerMap += (workerId -> workInfo)
println(regWorkerMap)
}
}
}
object Master {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
// 加载masterActor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
4、Worker定期发送心跳数据给Master
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{ConfigFactory}
import java.util.{Random, UUID}
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:13
* @Version 1.0
*/
object WorkerActor extends Actor{
private var workerId:String = _
private var cpu:Int = _
private var mem:Int = _
private val CPU_LIST = List(1,2,4,6,8)
private val MEM_LIST = List(512,1024,2048,4096)
var masterActorRef: ActorSelection = _
// 在workActor 启动之前就加载了
override def preStart(): Unit = {
masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:7777/user/masterActor")
// 造数据,随机数据
workerId = UUID.randomUUID().toString
var random = new Random()
cpu = CPU_LIST(random.nextInt(CPU_LIST.length))
mem = MEM_LIST(random.nextInt(MEM_LIST.length))
masterActorRef ! WorkerRegisterMessage(workerId,cpu,mem)
}
override def receive: Receive = {
case RegisterSuccessMessage => {
println("worker已经收到了master的回复,注册成功")
import scala.concurrent.duration._
import context.dispatcher
// 定时发送心跳数据,每隔 5 秒跳动一次
context.system.scheduler.schedule(
0 seconds,
ConfigUtils.`worker.heartbeat.interval` seconds
){
masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem)
}
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
// 加载masterActor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")
}
}
worker每隔5秒发送心跳数据,可以将这个参数写入conf文件中:
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7001"
worker.heartbeat.interval = 5
编写工具类,读取配置文件中的值:
object ConfigUtils {
private val config: Config = ConfigFactory.load()
// 从配置文件中获取心跳的间隔时间 使用两个esc 下的 ``
val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}
master接收心跳数据,更新WorkInfo的最新的时间:
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import java.util.Date
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:03
* @Version 1.0
*/
object MasterActor extends Actor{
// 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()
override def receive: Receive = {
//接收消息的
case WorkerRegisterMessage(workerId,cpu,mem) => {
println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
//传递过来的map需要保存下来:
regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
sender ! RegisterSuccessMessage
}
case WorkerHeartBeatMessage(workerId,cpu,mem) => {
// 接收到心跳数据:更新最新的时间
var workInfo: WorkInfo = regWorkerMap(workerId)
// 每次接收到心跳数据后,都更新一下最新的时间
workInfo.date = (new Date()).getTime
regWorkerMap += (workerId -> workInfo)
println(regWorkerMap)
}
}
}
object Master {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
// 加载masterActor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
5、Master 定期的检查Map集合中的WorkInfo信息是否超时,如果超时,移除掉。
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import java.util.Date
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:03
* @Version 1.0
*/
object MasterActor extends Actor{
// 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()
// master 在一开始的时候就应该定时的检查regWorkerMap 数据是否超时了,超时就移除
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(
0 seconds,
ConfigUtils.`master.check.heartbeat.interval` seconds
){
//获取超时的WorkInfo数据
val timeOutWorkerMap = regWorkerMap.filter{
keyVal => {
val lastBeatTime: Long = keyVal._2.date
if(new Date().getTime - lastBeatTime > ConfigUtils.`master.check.heartbeat.timeout` * 1000 ) true else false
}
}
if(!timeOutWorkerMap.isEmpty){
// 这个是我们的总的map, timeOutWorkerMap 超时的
regWorkerMap --= timeOutWorkerMap.map(_._1)
println("剩余的没过期的信息有:"+regWorkerMap)
}
}
}
override def receive: Receive = {
//接收消息的
case WorkerRegisterMessage(workerId,cpu,mem) => {
println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
//传递过来的map需要保存下来:
regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
sender ! RegisterSuccessMessage
}
case WorkerHeartBeatMessage(workerId,cpu,mem) => {
// 接收到心跳数据:更新最新的时间
var workInfo: WorkInfo = regWorkerMap(workerId)
// 每次接收到心跳数据后,都更新一下最新的时间
workInfo.date = (new Date()).getTime
regWorkerMap += (workerId -> workInfo)
println(regWorkerMap)
}
}
}
object Master {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
// 加载masterActor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
至此,结束。完整的代码示例:
common 项目中:
import java.util.Date
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 11:42
* @Version 1.0
*/
case class WorkerRegisterMessage(workerId:String,cpu:Int,mem:Int)
case object RegisterSuccessMessage
case class WorkInfo(workId:String,cpu:Int,mem:Int,var date:Long)
case class WorkerHeartBeatMessage(workerId:String,cpu:Int,mem:Int)
master 代码:
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7777"
// master 每隔多少时间查看一下map集合中的元素
master.check.heartbeat.interval = 8
// WorkInfo 中的数据和现在时间相比,过去多久算过期
master.check.heartbeat.timeout = 10
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import java.util.Date
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:03
* @Version 1.0
*/
object MasterActor extends Actor{
// 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()
// master 在一开始的时候就应该定时的检查regWorkerMap 数据是否超时了,超时就移除
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(
0 seconds,
ConfigUtils.`master.check.heartbeat.interval` seconds
){
//获取超时的WorkInfo数据
val timeOutWorkerMap = regWorkerMap.filter{
keyVal => {
val lastBeatTime: Long = keyVal._2.date
if(new Date().getTime - lastBeatTime > ConfigUtils.`master.check.heartbeat.timeout` * 1000 ) true else false
}
}
if(!timeOutWorkerMap.isEmpty){
// 这个是我们的总的map, timeOutWorkerMap 超时的
regWorkerMap --= timeOutWorkerMap.map(_._1)
println("剩余的没过期的信息有:"+regWorkerMap)
}
}
}
override def receive: Receive = {
//接收消息的
case WorkerRegisterMessage(workerId,cpu,mem) => {
println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
//传递过来的map需要保存下来:
regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
sender ! RegisterSuccessMessage
}
case WorkerHeartBeatMessage(workerId,cpu,mem) => {
// 接收到心跳数据:更新最新的时间
var workInfo: WorkInfo = regWorkerMap(workerId)
// 每次接收到心跳数据后,都更新一下最新的时间
workInfo.date = (new Date()).getTime
regWorkerMap += (workerId -> workInfo)
println(regWorkerMap)
}
}
}
object Master {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
// 加载masterActor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")
}
}
import com.typesafe.config.{Config, ConfigFactory}
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 14:29
* @Version 1.0
*/
object ConfigUtils {
private val config: Config = ConfigFactory.load()
// 从配置文件中获取心跳的间隔时间 使用两个esc 下的 ``
val `master.check.heartbeat.interval`: Int = config.getInt("master.check.heartbeat.interval")
val `master.check.heartbeat.timeout`: Int = config.getInt("master.check.heartbeat.timeout")
}
worker中的代码:
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7004"
worker.heartbeat.interval = 5
import com.typesafe.config.{Config, ConfigFactory}
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 14:29
* @Version 1.0
*/
object ConfigUtils {
private val config: Config = ConfigFactory.load()
// 从配置文件中获取心跳的间隔时间 使用两个esc 下的 ``
val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{ConfigFactory}
import java.util.{Random, UUID}
/**
* @Author laoyan
* @Description TODO
* @Date 2022/5/7 10:13
* @Version 1.0
*/
object WorkerActor extends Actor{
private var workerId:String = _
private var cpu:Int = _
private var mem:Int = _
private val CPU_LIST = List(1,2,4,6,8)
private val MEM_LIST = List(512,1024,2048,4096)
var masterActorRef: ActorSelection = _
// 在workActor 启动之前就加载了
override def preStart(): Unit = {
masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:7777/user/masterActor")
// 造数据,随机数据
workerId = UUID.randomUUID().toString
var random = new Random()
cpu = CPU_LIST(random.nextInt(CPU_LIST.length))
mem = MEM_LIST(random.nextInt(MEM_LIST.length))
masterActorRef ! WorkerRegisterMessage(workerId,cpu,mem)
}
override def receive: Receive = {
case RegisterSuccessMessage => {
println("worker已经收到了master的回复,注册成功")
import scala.concurrent.duration._
import context.dispatcher
// 定时发送心跳数据,每隔 5 秒跳动一次
context.system.scheduler.schedule(
0 seconds,
ConfigUtils.`worker.heartbeat.interval` seconds
){
masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem)
}
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
// 构造 actorSystem
//ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
// 加载masterActor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")
}
}
最后,演示多个Woker 。
记得点击OK键。
每次启动的时候,记得修改端口,点击 运行按钮启动,不要右键启动了。
