一、Akka的远程通信

需要准备两个项目,master worker
master 启动一个端口,
worker 连接这个端口
Day07-Scala第六天 - 图1
需要创建一个空项目,里面有两个子项目。
image.png
创建两个子项目(maven项目)
image.png
image.png
image.png
导入坐标:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.scala-lang</groupId>
  4. <artifactId>scala-library</artifactId>
  5. <version>2.11.8</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.typesafe.akka</groupId>
  9. <artifactId>akka-actor_2.11</artifactId>
  10. <version>2.5.21</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>com.typesafe.akka</groupId>
  14. <artifactId>akka-remote_2.11</artifactId>
  15. <version>2.5.21</version>
  16. </dependency>
  17. </dependencies>

按照以上方式,再弄一个 app-workder项目(步骤省略)

开始准备代码:
思路:

开始的时候master启动,等待 worker进行消息的发送,workder发送消息之后,master给它回应一个消息。
跟昨天一样,只是本次是使用了两个项目,不同端口进行的通信。

image.png

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:03
 * @Version 1.0
 */
object MasterActor extends Actor{
    override def receive: Receive = {
        //接收消息的
        case "connect" => println("MasterActor 收到connect消息")
            sender ! "success"
    }
}
object Master {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
        // 加载masterActor
        val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")

    }
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "8888"
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:13
 * @Version 1.0
 */
object WorkerActor extends Actor{
    override def receive: Receive = {
        case "setup" => {
            println("worker启动成功!")
            //要发送消息给masterWorker
            val masterActorRef: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:8888/user/masterActor")
            masterActorRef ! "connect"

        }
        case "success" => {
            println("连接master成功!")
        }
    }
}
object Worker {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
        // 加载masterActor
        val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")
        workerActor ! "setup"

    }
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "9999"

二、Akka的定时任务

Akka中,提供了scheduler 对象实现定时调度,需要导入对应的包。

schedule 方法中有几个参数:
参数一:  initxxDelay   延迟多久开启定时任务
参数二:  interval      每隔多久执行一次
参数三: ActorRef      将消息发送给哪个actor
参数四:  message      要发送的消息
(implicit xxxx) 隐式参数需要手动的导入
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import javafx.util.Duration

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:32
 * @Version 1.0
 */

object TimeActor extends  Actor{
    override def receive: Receive = {
        case x => println(x)
    }
}
object DingShi {

    def main(args: Array[String]): Unit = {

        val timeActorSystem: ActorSystem = ActorSystem("timeActorSystem", ConfigFactory.load())
        val timeActor: ActorRef = timeActorSystem.actorOf(Props(TimeActor), "timeActor")

        //导入一个隐式转换,_ 代表这个类下的所有隐式转换方法  不导入没有办法使用seconds
        import scala.concurrent.duration._
        // 导入隐式参数  不导入会报错
        import timeActorSystem.dispatcher

        timeActorSystem.scheduler.schedule(

            0 seconds,
            1 seconds,
            timeActor,
            "Hello"
        )
    }

}
schedule 方法中有几个参数:
参数一:  initxxDelay   延迟多久开启定时任务
参数二:  interval      每隔多久执行一次
此方法后面可以跟一个函数 (f:=>Unit)
(implicit xxxx) 隐式参数需要手动的导入

这个定时方法的意思是:每隔多久执行一次指定的函数

三、使用Akka编写一个关于Spark通信的模型

1、环境准备

image.png
首先准备环境:
spark-master : worker管理者
spark-worker : worker本身
spark-common : 此处存放两个项目共有的一些类
创建项目跟远程通信一样,此处省略了
image.png
image.png
以上创三个项目:
三个项目都需要做以下操作:
image.png
三个项目都需要创建文件夹
image.png
导入包:
master和worker 需要导入三个包:

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.5.21</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.5.21</version>
        </dependency>
    </dependencies>

common 项目中,只需要导入一个包:

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>


    </dependencies>

接着想一个问题:你为什么要创建common?
master和worker 需要导入common的坐标:

<dependency>
  <groupId>com.qfedu</groupId>
  <artifactId>spark-common</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

2、编写简单的通信(让worker可以连接到master即可)

复制之前的远程通信代码。
在common项目中,编写通用的模板消息:

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 11:42
 * @Version 1.0
 */
case class WorkerRegisterMessage(workerId:String,cpu:Int,mem:Int)
case object RegisterSuccessMessage

编写worker端,在worker启动时就发送注册信息给master:

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.{Random, UUID}

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:13
 * @Version 1.0
 */
object WorkerActor extends Actor{

    private var workerId:String = _
    private var cpu:Int = _
    private var mem:Int = _

    private val CPU_LIST = List(1,2,4,6,8)
    private val MEM_LIST = List(512,1024,2048,4096)

    // 在workActor 启动之前就加载了
    override def preStart(): Unit = {
        val masterActorRef: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:7777/user/masterActor")

        // 造数据,随机数据
        workerId = UUID.randomUUID().toString
        var random = new Random()
        cpu = CPU_LIST(random.nextInt(CPU_LIST.length))
        mem = MEM_LIST(random.nextInt(MEM_LIST.length))


        masterActorRef ! WorkerRegisterMessage(workerId,cpu,mem)
    }


    override def receive: Receive = {
        case RegisterSuccessMessage => {
            println("worker已经收到了master的回复,注册成功")

        }
    }
}
object Worker {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
        // 加载masterActor
        val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")
        workerActor ! "setup"

    }
}

配置文件:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7001"

编写消息的接收方Master :

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:03
 * @Version 1.0
 */
object MasterActor extends Actor{
    override def receive: Receive = {
        //接收消息的
        case WorkerRegisterMessage(workerId,cpu,mem) => {
            println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
        }
            sender ! RegisterSuccessMessage
    }
}
object Master {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
        // 加载masterActor
        val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")

    }
}

配置文件:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7777"

最终的效果是:
先启动master 一直等待 worker的连接。
启动worker :
启动成功就立即发送 连接请求,发送的消息模式是:WorkerRegisterMessage
master 接收到消息之后,打印 WorkerRegisterMessage 信息,立即发送一个 RegisterSuccessMessage
给worker ,worker接收到之后,打印连接成功!
image.png
image.png

3、Master获取Worker的注册信息

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.Date

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:03
 * @Version 1.0
 */
object MasterActor extends Actor{

    // 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
    private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()

    override def receive: Receive = {
        //接收消息的
        case WorkerRegisterMessage(workerId,cpu,mem) => {
            println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
            //传递过来的map需要保存下来:
            regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
            sender ! RegisterSuccessMessage
        }
        case WorkerHeartBeatMessage(workerId,cpu,mem) => {
            // 接收到心跳数据:更新最新的时间
            var workInfo: WorkInfo = regWorkerMap(workerId)
            // 每次接收到心跳数据后,都更新一下最新的时间
            workInfo.date = (new Date()).getTime
            regWorkerMap += (workerId -> workInfo)
            println(regWorkerMap)
        }

    }
}
object Master {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
        // 加载masterActor
        val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")

    }
}

4、Worker定期发送心跳数据给Master

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{ConfigFactory}

import java.util.{Random, UUID}

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:13
 * @Version 1.0
 */
object WorkerActor extends Actor{

    private var workerId:String = _
    private var cpu:Int = _
    private var mem:Int = _

    private val CPU_LIST = List(1,2,4,6,8)
    private val MEM_LIST = List(512,1024,2048,4096)

    var masterActorRef: ActorSelection = _

    // 在workActor 启动之前就加载了
    override def preStart(): Unit = {
         masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:7777/user/masterActor")

        // 造数据,随机数据
        workerId = UUID.randomUUID().toString
        var random = new Random()
        cpu = CPU_LIST(random.nextInt(CPU_LIST.length))
        mem = MEM_LIST(random.nextInt(MEM_LIST.length))


        masterActorRef ! WorkerRegisterMessage(workerId,cpu,mem)
    }


    override def receive: Receive = {
        case RegisterSuccessMessage => {
            println("worker已经收到了master的回复,注册成功")
            import scala.concurrent.duration._
            import context.dispatcher

            // 定时发送心跳数据,每隔 5 秒跳动一次
            context.system.scheduler.schedule(
                0 seconds,
                ConfigUtils.`worker.heartbeat.interval` seconds
            ){
                masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem)
            }

        }
    }
}
object Worker {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
        // 加载masterActor
        val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")


    }
}

worker每隔5秒发送心跳数据,可以将这个参数写入conf文件中:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7001"


worker.heartbeat.interval = 5

编写工具类,读取配置文件中的值:

object ConfigUtils {

    private val config: Config = ConfigFactory.load()
    // 从配置文件中获取心跳的间隔时间 使用两个esc 下的 ``
    val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}

master接收心跳数据,更新WorkInfo的最新的时间:

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.Date

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:03
 * @Version 1.0
 */
object MasterActor extends Actor{

    // 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
    private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()

    override def receive: Receive = {
        //接收消息的
        case WorkerRegisterMessage(workerId,cpu,mem) => {
            println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
            //传递过来的map需要保存下来:
            regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
            sender ! RegisterSuccessMessage
        }
        case WorkerHeartBeatMessage(workerId,cpu,mem) => {
            // 接收到心跳数据:更新最新的时间
            var workInfo: WorkInfo = regWorkerMap(workerId)
            // 每次接收到心跳数据后,都更新一下最新的时间
            workInfo.date = (new Date()).getTime
            regWorkerMap += (workerId -> workInfo)
            println(regWorkerMap)
        }

    }
}
object Master {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
        // 加载masterActor
        val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")

    }
}

5、Master 定期的检查Map集合中的WorkInfo信息是否超时,如果超时,移除掉。

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.Date

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:03
 * @Version 1.0
 */
object MasterActor extends Actor{

    // 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
    private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()

    // master 在一开始的时候就应该定时的检查regWorkerMap 数据是否超时了,超时就移除
    override def preStart(): Unit = {

        import scala.concurrent.duration._
        import context.dispatcher
        context.system.scheduler.schedule(
            0 seconds,
            ConfigUtils.`master.check.heartbeat.interval` seconds
        ){
            //获取超时的WorkInfo数据
            val timeOutWorkerMap = regWorkerMap.filter{
                keyVal => {
                    val lastBeatTime: Long = keyVal._2.date
                    if(new Date().getTime - lastBeatTime > ConfigUtils.`master.check.heartbeat.timeout` * 1000 ) true else false
                }
            }
            if(!timeOutWorkerMap.isEmpty){
                // 这个是我们的总的map, timeOutWorkerMap 超时的
                regWorkerMap --= timeOutWorkerMap.map(_._1)
                println("剩余的没过期的信息有:"+regWorkerMap)
            }
        }
    }

    override def receive: Receive = {
        //接收消息的
        case WorkerRegisterMessage(workerId,cpu,mem) => {
            println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
            //传递过来的map需要保存下来:
            regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
            sender ! RegisterSuccessMessage
        }
        case WorkerHeartBeatMessage(workerId,cpu,mem) => {
            // 接收到心跳数据:更新最新的时间
            var workInfo: WorkInfo = regWorkerMap(workerId)
            // 每次接收到心跳数据后,都更新一下最新的时间
            workInfo.date = (new Date()).getTime
            regWorkerMap += (workerId -> workInfo)
            println(regWorkerMap)
        }

    }
}
object Master {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
        // 加载masterActor
        val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")

    }
}

至此,结束。完整的代码示例:

common 项目中:

import java.util.Date

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 11:42
 * @Version 1.0
 */
case class WorkerRegisterMessage(workerId:String,cpu:Int,mem:Int)
case object RegisterSuccessMessage

case class WorkInfo(workId:String,cpu:Int,mem:Int,var date:Long)

case class WorkerHeartBeatMessage(workerId:String,cpu:Int,mem:Int)

master 代码:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7777"

// master 每隔多少时间查看一下map集合中的元素
master.check.heartbeat.interval = 8
// WorkInfo 中的数据和现在时间相比,过去多久算过期
master.check.heartbeat.timeout = 10
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.Date

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:03
 * @Version 1.0
 */
object MasterActor extends Actor{

    // 定义一个可变的Map,存放worker注册过来的信息,需要使用一个可变的map集合
    private val regWorkerMap = collection.mutable.Map[String,WorkInfo]()

    // master 在一开始的时候就应该定时的检查regWorkerMap 数据是否超时了,超时就移除
    override def preStart(): Unit = {

        import scala.concurrent.duration._
        import context.dispatcher
        context.system.scheduler.schedule(
            0 seconds,
            ConfigUtils.`master.check.heartbeat.interval` seconds
        ){
            //获取超时的WorkInfo数据
            val timeOutWorkerMap = regWorkerMap.filter{
                keyVal => {
                    val lastBeatTime: Long = keyVal._2.date
                    if(new Date().getTime - lastBeatTime > ConfigUtils.`master.check.heartbeat.timeout` * 1000 ) true else false
                }
            }
            if(!timeOutWorkerMap.isEmpty){
                // 这个是我们的总的map, timeOutWorkerMap 超时的
                regWorkerMap --= timeOutWorkerMap.map(_._1)
                println("剩余的没过期的信息有:"+regWorkerMap)
            }
        }
    }

    override def receive: Receive = {
        //接收消息的
        case WorkerRegisterMessage(workerId,cpu,mem) => {
            println(s"MasterActor: 接收到worker的注册信息:${workerId},${cpu},${mem}")
            //传递过来的map需要保存下来:
            regWorkerMap += (workerId -> WorkInfo(workerId,cpu,mem,new Date().getTime))
            sender ! RegisterSuccessMessage
        }
        case WorkerHeartBeatMessage(workerId,cpu,mem) => {
            // 接收到心跳数据:更新最新的时间
            var workInfo: WorkInfo = regWorkerMap(workerId)
            // 每次接收到心跳数据后,都更新一下最新的时间
            workInfo.date = (new Date()).getTime
            regWorkerMap += (workerId -> workInfo)
            println(regWorkerMap)
        }

    }
}
object Master {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem", ConfigFactory.load())
        // 加载masterActor
        val masterActor: ActorRef = masterActorSystem.actorOf(Props(MasterActor), "masterActor")

    }
}
import com.typesafe.config.{Config, ConfigFactory}

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 14:29
 * @Version 1.0
 */
object ConfigUtils {

    private val config: Config = ConfigFactory.load()
    // 从配置文件中获取心跳的间隔时间 使用两个esc 下的 ``
    val `master.check.heartbeat.interval`: Int = config.getInt("master.check.heartbeat.interval")
    val `master.check.heartbeat.timeout`: Int = config.getInt("master.check.heartbeat.timeout")

}

worker中的代码:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.hostname = "127.0.0.1"
akka.remote.netty.tcp.port = "7004"


worker.heartbeat.interval = 5
import com.typesafe.config.{Config, ConfigFactory}

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 14:29
 * @Version 1.0
 */
object ConfigUtils {

    private val config: Config = ConfigFactory.load()
    // 从配置文件中获取心跳的间隔时间 使用两个esc 下的 ``
    val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{ConfigFactory}

import java.util.{Random, UUID}

/**
 * @Author laoyan 
 * @Description TODO
 * @Date 2022/5/7 10:13
 * @Version 1.0
 */
object WorkerActor extends Actor{

    private var workerId:String = _
    private var cpu:Int = _
    private var mem:Int = _

    private val CPU_LIST = List(1,2,4,6,8)
    private val MEM_LIST = List(512,1024,2048,4096)

    var masterActorRef: ActorSelection = _

    // 在workActor 启动之前就加载了
    override def preStart(): Unit = {
         masterActorRef = context.actorSelection("akka.tcp://masterActorSystem@127.0.0.1:7777/user/masterActor")

        // 造数据,随机数据
        workerId = UUID.randomUUID().toString
        var random = new Random()
        cpu = CPU_LIST(random.nextInt(CPU_LIST.length))
        mem = MEM_LIST(random.nextInt(MEM_LIST.length))


        masterActorRef ! WorkerRegisterMessage(workerId,cpu,mem)
    }


    override def receive: Receive = {
        case RegisterSuccessMessage => {
            println("worker已经收到了master的回复,注册成功")
            import scala.concurrent.duration._
            import context.dispatcher

            // 定时发送心跳数据,每隔 5 秒跳动一次
            context.system.scheduler.schedule(
                0 seconds,
                ConfigUtils.`worker.heartbeat.interval` seconds
            ){
                masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem)
            }

        }
    }
}
object Worker {

    def main(args: Array[String]): Unit = {
        // 构造 actorSystem
        //ConfigFactory.load() 是用来加载配置文件的(resources 下的配置文件)
        val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", ConfigFactory.load())
        // 加载masterActor
        val workerActor: ActorRef = workerActorSystem.actorOf(Props(WorkerActor), "workerActor")


    }
}

最后,演示多个Woker 。
image.png
记得点击OK键。
每次启动的时候,记得修改端口,点击 运行按钮启动,不要右键启动了。
image.png