1 并发编程模型Akka

1.1 Akka 介绍

写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。
netty actor 对于大量的数据
Akka是JVM平台上构建高并发、分布式和容错应用的工具包和运行平台。Akka是用 Scala语言编程的一个并发编程框架,该框架基于Actor编程模型。

1.2 Akka中Actor模型

在基于Actor的系统里,所有的事物都是Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是Actor模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor与Actor之间只能通过消息通信。

可以将Actor当作是一群人,他们互相之间不会面对面地交流,而只是通过邮件的方式进行沟通。传递消息是actor模型的基础。

Actor是一种编程模型,通过发送消息的方式实现并发编程
必须有多个相同业务逻辑的Actor才可以实现并发编程,一个Actor相当于一个实例(老师/学生),消息是发送的数据(邮件)
Actor和Actor之间可以相互发送消息
每一个Actor中都要相应的业务逻辑,如果使用编程,定义一个类,在类中定义方法,通过new一个实例,即可得到一个Actor
Actor是ActorSystem创建的,ActorSystem的职责是负责创建并管理自己创建的Actor,ActorSystem的单例的,一个JVM进程中有一个即可,而Acotr是多例的。ActorSystem相当于老大,Actor相当于干活的小弟

1.3 maven工程:

maven:
1,项目管理
2,jar包管理 hdfs 添加jar包 -> maven 配置一个依赖jar包

maven + jekins 实现项目的自动部署
自动的把项目进行编译,打包,部署到测试服务器上
版本库: svn git 代码托管 github
项目组:4-8
产品组:10
写代码,把代码提交到版本库(用户名,密码,代码的托管地址),下载
项目管理
管理jar,

项目的开发流程;

  1. 需求分析,项目/产品 有哪些功能 用来做什么
  2. 架构 技术方案选型 mapreduce hive sparkSQL spark hbase

hdfs + 数据的分析

  1. 代码开发(数据预处理 -> 存储 )
  2. 测试
  3. 上线

    1.3.1 IDEA中配置maven

    默认加载本机的maven目录,所以正常不用配置
    part4-并发编程 - 图1

    1.3.2 创建maven工程

    part4-并发编程 - 图2
    part4-并发编程 - 图3

1.3.3 Pom依赖:

完整参考pom.xml文件

| <properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties>

<dependencies>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

  1. _<!-- 添加akkaactor依赖 --><br /> _<**dependency**><br /> <**groupId**>com.typesafe.akka</**groupId**><br /> <**artifactId**>akka-actor_${scala.compat.version}</**artifactId**><br /> <**version**>${akka.version}</**version**><br /> </**dependency**>
  2. _<!-- 多进程之间的Actor通信 --><br /> _<**dependency**><br /> <**groupId**>com.typesafe.akka</**groupId**><br /> <**artifactId**>akka-remote_${scala.compat.version}</**artifactId**><br /> <**version**>${akka.version}</**version**><br /> </**dependency**>

</dependencies>
| | —- |

1.3.4 补充:IDEA中打jar包(非Maven工程):

part4-并发编程 - 图4part4-并发编程 - 图5
part4-并发编程 - 图6
part4-并发编程 - 图7
再回到主页中:
part4-并发编程 - 图8

1.4 案例1:Actor入门案例

一个Actor就是一个类,要想实现actor功能,需要实现Actor特质
需要重写receive方法,进行消息匹配接收

程序运行需要执行入口,所以需要有main方法。
actor必须通过ActorSystem来创建

// 通过字符串添加配置信息
val str =
“””
|akka.actor.provider = “akka.remote.RemoteActorRefProvider”
|akka.remote.netty.tcp.hostname = “127.0.0.1”
|akka.remote.netty.tcp.port = “8888”
“””
.stripMargin
// 创建配置工厂 val conf = ConfigFactory.parseString(str)
// 创建actorsystem
val as: ActorSystem = ActorSystem.create(“actorsystem-test”,conf)
// 通过类型反射创建actor的引用对象 val actor: ActorRef = as.actorOf(Props[HelloActor],“actor-test”)
// 向actor引用对象发送异步消息
actor ! “hello”

actor发送消息:! 发送异步消息

1.5 案例 人机Ping Pang大战

思路:
需要两个Actor,通过发送消息进行通信。
一个Actor在启动时,需要向另一个actor发送连接请求。
连接请求在preStart()方法中实现。

class AlphGo extends Actor{
// 表示接受的方法,进行模式匹配 会被调用多次
override def receive: Receive = {
}
}
object AlphGo{
def main(args: Array[String]): Unit = { }
}
class MaLong extends Actor{
// 在构造器之后,receive方法之前执行
override def preStart(): Unit = {
// worker向master建立连接,并向master发送消息
val proxy: ActorSelection = context.actorSelection(“akka.tcp://acs-go@127.0.0.1:8888/user/ac-go”)
proxy ! “request get connect”
}
override def receive: Receive = {
}
}
object MaLong{
def main(args: Array[String]): Unit = {
}}

1.6 案例 Spark Master Worker进程通信示例

1.6.1 业务流程:

1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case calss 封装数据
3,master接收到worker的请求信息之后,保存worker的注册信息,向worker发送响应信息(注册成功)
4,worker收到注册成功的信息之后,要定时发送心跳(报活) 定时任务 case class workerId
5,master收到worker发送的心跳信息之后,就要更新worker的心跳时间
0,master启动之后,定时检测worker的状态 (如果检测出来worker挂掉了,那 删除该worker的注册信息)

1.6.2 业务实现

class Master extends Actor{
val workersMap = mutable.HashMapString,WorkerInfo
// master 启动之后,会启动一个定时任务,作用是检测活着的worker信息
override def preStart(): Unit = {
import context.dispatcher
import scala.concurrent.duration.
_context
.system.scheduler.schedule(0 millis,15000 millis,self,CheckWorkerStatus)
}
// 表示接受的方法
override def receive: Receive = {
}
}object Master{
}
}

| class Worker(val mastername:String,val master_port:Int,var memory:Int,var cores:Int) extends Actor{
var _master_proxy
: ActorSelection=
val _workerId
= UUID.randomUUID().toString
// 在构造器之后,receive方法之前执行
override def preStart(): Unit = {
// worker向master建立连接,并向master发送消息
master_proxy
= context.actorSelection(s”akka.tcp://${Master.MASTER_ACTOR_SYSTEM}@$mastername:$master_port/user/${Master._ACTOR_NAME})
// 向master发送注册消息 封装数据 workerid memory cores
master_proxy
! Register2Master(workerId,memory,cores)
println(“connect to master”)
}

override def receive: Receive = {
}
}
object Worker{
}
} | | —- |

需要传递的消息:

// 数据在网络间传输,需要实现序列化特质
// worker向master发送的注册消息

// master 向worker回复的注册成功信息case object RegisteredMaster extends Serializable// worker向master发送的心跳信息(带workerid)
// worker定时器 发送的心跳信息
// master定时器 检测超时worker

1.6.3 windows机器测试

1.6.4 Linux并发测试

选择设置Master主类,并打成jar包

<transformer implementation=”org.apache.maven.plugins.shade.resource.ManifestResourceTransformer”>
<mainClass>cn.edu360.akka.actor.Master</mainClass>
</transformer>

part4-并发编程 - 图9
修改jar包名称。

选择设置Worker主类,并打成jar包
part4-并发编程 - 图10

上传master jar包到hdp-01机器上
执行启动:
part4-并发编程 - 图11

上传WORKER jar包到其他节点上
执行启动:注意worker的启动参数
part4-并发编程 - 图12

验证:
part4-并发编程 - 图13