1 并发编程模型Akka
1.1 Akka 介绍
写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。
netty actor 对于大量的数据
Akka是JVM平台上构建高并发、分布式和容错应用的工具包和运行平台。Akka是用 Scala语言编程的一个并发编程框架,该框架基于Actor编程模型。
1.2 Akka中Actor模型
在基于Actor的系统里,所有的事物都是Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是Actor模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor与Actor之间只能通过消息通信。
可以将Actor当作是一群人,他们互相之间不会面对面地交流,而只是通过邮件的方式进行沟通。传递消息是actor模型的基础。
Actor是一种编程模型,通过发送消息的方式实现并发编程
必须有多个相同业务逻辑的Actor才可以实现并发编程,一个Actor相当于一个实例(老师/学生),消息是发送的数据(邮件)
Actor和Actor之间可以相互发送消息
每一个Actor中都要相应的业务逻辑,如果使用编程,定义一个类,在类中定义方法,通过new一个实例,即可得到一个Actor
Actor是ActorSystem创建的,ActorSystem的职责是负责创建并管理自己创建的Actor,ActorSystem的单例的,一个JVM进程中有一个即可,而Acotr是多例的。ActorSystem相当于老大,Actor相当于干活的小弟
1.3 maven工程:
maven:
1,项目管理
2,jar包管理 hdfs 添加jar包 -> maven 配置一个依赖jar包
maven + jekins 实现项目的自动部署
自动的把项目进行编译,打包,部署到测试服务器上
版本库: svn git 代码托管 github
项目组:4-8
产品组:10
写代码,把代码提交到版本库(用户名,密码,代码的托管地址),下载
项目管理
管理jar,
项目的开发流程;
- 需求分析,项目/产品 有哪些功能 用来做什么
- 架构 技术方案选型 mapreduce hive sparkSQL spark hbase
hdfs + 数据的分析
1.3.3 Pom依赖:
完整参考pom.xml文件
| <properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
_<!-- 添加akka的actor依赖 --><br /> _<**dependency**><br /> <**groupId**>com.typesafe.akka</**groupId**><br /> <**artifactId**>akka-actor_${scala.compat.version}</**artifactId**><br /> <**version**>${akka.version}</**version**><br /> </**dependency**>
_<!-- 多进程之间的Actor通信 --><br /> _<**dependency**><br /> <**groupId**>com.typesafe.akka</**groupId**><br /> <**artifactId**>akka-remote_${scala.compat.version}</**artifactId**><br /> <**version**>${akka.version}</**version**><br /> </**dependency**>
</dependencies>
|
| —- |
1.3.4 补充:IDEA中打jar包(非Maven工程):
再回到主页中:
1.4 案例1:Actor入门案例
一个Actor就是一个类,要想实现actor功能,需要实现Actor特质
需要重写receive方法,进行消息匹配接收
程序运行需要执行入口,所以需要有main方法。
actor必须通过ActorSystem来创建
// 通过字符串添加配置信息 val str = “”” |akka.actor.provider = “akka.remote.RemoteActorRefProvider” |akka.remote.netty.tcp.hostname = “127.0.0.1” |akka.remote.netty.tcp.port = “8888” “””.stripMargin // 创建配置工厂 val conf = ConfigFactory.parseString(str) // 创建actorsystem val as: ActorSystem = ActorSystem.create(“actorsystem-test”,conf) // 通过类型反射创建actor的引用对象 val actor: ActorRef = as.actorOf(Props[HelloActor],“actor-test”) // 向actor引用对象发送异步消息 actor ! “hello” |
---|
actor发送消息:! 发送异步消息
1.5 案例 人机Ping Pang大战
思路:
需要两个Actor,通过发送消息进行通信。
一个Actor在启动时,需要向另一个actor发送连接请求。
连接请求在preStart()方法中实现。
class AlphGo extends Actor{ // 表示接受的方法,进行模式匹配 会被调用多次 override def receive: Receive = { } } object AlphGo{ def main(args: Array[String]): Unit = { } } |
---|
class MaLong extends Actor{ // 在构造器之后,receive方法之前执行 override def preStart(): Unit = { // worker向master建立连接,并向master发送消息 val proxy: ActorSelection = context.actorSelection(“akka.tcp://acs-go@127.0.0.1:8888/user/ac-go”) proxy ! “request get connect” } override def receive: Receive = { } } object MaLong{ def main(args: Array[String]): Unit = { }} |
---|
1.6 案例 Spark Master Worker进程通信示例
1.6.1 业务流程:
1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case calss 封装数据
3,master接收到worker的请求信息之后,保存worker的注册信息,向worker发送响应信息(注册成功)
4,worker收到注册成功的信息之后,要定时发送心跳(报活) 定时任务 case class workerId
5,master收到worker发送的心跳信息之后,就要更新worker的心跳时间
0,master启动之后,定时检测worker的状态 (如果检测出来worker挂掉了,那 删除该worker的注册信息)
1.6.2 业务实现
class Master extends Actor{ val workersMap = mutable.HashMapString,WorkerInfo // master 启动之后,会启动一个定时任务,作用是检测活着的worker信息 override def preStart(): Unit = { import context.dispatcher import scala.concurrent.duration. _context.system.scheduler.schedule(0 millis,15000 millis,self,CheckWorkerStatus) } // 表示接受的方法 override def receive: Receive = { } }object Master{ } } |
---|
| class Worker(val mastername:String,val master_port:Int,var memory:Int,var cores:Int) extends Actor{
var _master_proxy: ActorSelection=
val _workerId = UUID.randomUUID().toString
// 在构造器之后,receive方法之前执行
override def preStart(): Unit = {
// worker向master建立连接,并向master发送消息
master_proxy = context.actorSelection(s”akka.tcp://${Master.MASTER_ACTOR_SYSTEM}@$mastername:$master_port/user/${Master._ACTOR_NAME}“)
// 向master发送注册消息 封装数据 workerid memory cores
master_proxy ! Register2Master(workerId,memory,cores)
println(“connect to master”)
}
override def receive: Receive = {
}
}
object Worker{
}
} |
| —- |
需要传递的消息:
// 数据在网络间传输,需要实现序列化特质 // worker向master发送的注册消息 // master 向worker回复的注册成功信息case object RegisteredMaster extends Serializable// worker向master发送的心跳信息(带workerid) // worker定时器 发送的心跳信息 // master定时器 检测超时worker |
---|
1.6.3 windows机器测试
1.6.4 Linux并发测试
选择设置Master主类,并打成jar包
<transformer implementation=”org.apache.maven.plugins.shade.resource.ManifestResourceTransformer”> <mainClass>cn.edu360.akka.actor.Master</mainClass> </transformer> |
---|
修改jar包名称。
选择设置Worker主类,并打成jar包
上传master jar包到hdp-01机器上
执行启动:
上传WORKER jar包到其他节点上
执行启动:注意worker的启动参数
验证: