0. 序列化介绍
- 序列化
- 内存中的Java对象—> 硬盘/网络
- 反序列化
- 把硬盘/网络中的对象—->内存
序列化技术
- Serializable 接口 序列化出来的文件太大, 不利于文件的网络传输
- JSON 把对象转换为一个json字符串 {“name”:”tom”,”age:18,”sex”:”M”}
- Writable Hadoop内置序列化方式 —DataInputStream DataOutPutStream
- 读写的顺序必须一致
-
1. 对象的序列化
我们都知道,当我们需要把一个 Java 对象从内存中保存到磁盘中或者从内存中的对象通过网络传输到另外 一个地方的,我们都必须把对象进行序列化操作
在我们的 Spark 程序中,会有很多对象都需要进行网络通 信,比如说我们需要把我们在 Driver 端生成的 Task 任务发送到 Executor 中去执行任务,那么我们需要先对 这个 Task 任务进行序列化才能在网络上传输,但是很多时候我们在写 Spark 程序的时候,会保存对象没有 序列化操作,所以我们先来彻底的了解一下序列化的操作1.1. Java 对象的序列化
实现序列化接口的对象
public class Student implements Serializable {
private Long id;
private String name;
private Integer age;
public Student() {}
public Student(Long id, String name, Integer age) {
this.id = id;
this.name = name;
this.ag = age;
}
}
将一个java对象序列化后写入磁盘中
然后再将磁盘的对象序列化文件读取,生成一个对象//java 的对象序列化
public class JavaSerDemo {
//序列化到磁盘
@Test
public void testwrite() throws Exception {
//1 创建一个对象
Student s1 = new Student(1L, "hesj", 18);
//day06.cn.wolfcode.spark.ser.Student@ba4d54
System.out.println("序列化之前的对象:"+s1);
//2 把对象保存到磁盘
FileOutputStream fos = new FileOutputStream("d:/s1.dat");
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(s1);
//关闭流对象,同时会关闭 fos
oos.close();
}
//从磁盘读取文件
@Test
public void testRead() throws Exception {
FileInputStream fis = new FileInputStream("d:/s1.dat");
ObjectInputStream ois = new ObjectInputStream(fis);
Student s1 = (Student) ois.readObject();
//day06.cn.wolfcode.spark.ser.Student@c818063
System.out.println("反序列化的一个对象:"+s1);
ois.close();
FileInputStream fis2 = new FileInputStream("d:/s1.dat");
ObjectInputStream ois2 = new ObjectInputStream(fis2);
Student s2 = (Student) ois2.readObject();
//day06.cn.wolfcode.spark.ser.Student@c818032
System.out.println("反序列化的一个对象:"+s2);
ois.close();
}
}
通过上面这个程序, 我们可以看出来
- 实现序列化操作需要实现 Serializable
反序列化创建的一个文件和我们序列化出去的一个对象不再是同一个对象
1.2. 理解 Spark 中的序列化
方式一
定义一个映射规则对象
- 因为规则映射是在 Executor 中使用的,所以在每个 Executor 中都创建一个规则映射对象
- 也即:如果有 10 个executor,则会创建10个规则映射对象 ```scala class Rules { //定义一个映射的规则 val rulesMap = Map(“hadoop” -> 2.7, “spark” -> 2.2,”java”->1.8) //获取到当前创建改 Rules 对象的一个主机名 val hostname = InetAddress.getLocalHost.getHostName //打印主机名 println(hostname + “@@@@@@@@@@@@@@@@”) }
object SerTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(“SerTest”) val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile(args(0)) val r = lines.map(word => { val rules = new Rules //获取当前的主机名称 val hostname = InetAddress.getLocalHost.getHostName //获取当前运行的线程名字 val threadName = Thread.currentThread().getName //rules 的实际是在 Executor 中使用的 (hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString) }) r.saveAsTextFile(args(1)) sc.stop() } }
通过日志我们发现两个问题:
- 对于我们的具体的任务执行,创建规则对象是在 Executor 端创建的
- 对于不同的线程中所创建的任务规则也是不一样的, 所以我们需要一个 Rules 对象都会创建再立即销
毁
<a name="fnTqI"></a>
### 方式二
- 对于方式一,显示是不合理的
- 使用 scala 的**闭包**特性,在匿名函数内,可以使用函数外面的变量
- 此时就避免了广播的操作
- 但注意:存在网络传输,还是需要序列化的
- 我们可以在 Driver 中只创建一次,然后后面在各个 Executor 中一直引用同一个对
象即可
```scala
class Rules extends Serializable # 类需要实现序列化
object SerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile(args(0))
//在 Driver 端创建我们的规则
val rules = new Rules
val r = lines.map(word => {
//获取当前的主机名称
val hostname = InetAddress.getLocalHost.getHostName
//获取当前运行的线程名字
val threadName = Thread.currentThread().getName
//rules 的实际是在 Executor 中使用的
(hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString)
})
//当我们在执行调度我们的任务的时候, task 会序列化我们的 Rule 规则
r.saveAsTextFile(args(1))
sc.stop()
}
}
方式三
- 使用广播的方式(对象也需要序列化)
- 每个 Executor 有一份,不是每个 Task 一份
```scala
object SerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“SerTest”)
val sc = new SparkContext(conf)
val lines: RDD[String] = sc.textFile(args(0))
//在 Driver 端创建我们的规则
val rules = new Rules
val rulesRef: Broadcast[Rules] = sc.broadcast(rules)
val r = lines.map(word => {
}) //当我们在执行调度我们的任务的时候, task 会序列化我们的 Rule 规则 r.saveAsTextFile(args(1)) sc.stop() } }val currentRule: Rules = rulesRef.value
//获取当前的主机名称
val hostname = InetAddress.getLocalHost.getHostName
//获取当前运行的线程名字
val threadName = Thread.currentThread().getName
//rules 的实际是在 Executor 中使用的
(hostname, threadName, currentRule.rulesMap.getOrElse(word, 0), currentRule.toString)
方式四
- 使用一个单例对象
,然后各个 Executor 使用同一个对象
```scala
object Rules {
//定义一个映射的规则
val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2,"java"->1.8)
//获取到当前创建改 Rules 对象的一个主机名
val hostname = InetAddress.getLocalHost.getHostName
//打印主机名
println(hostname + "@@@@@@@@@@@@@@@@")
}
2. Spark 中的并发
并发问题出现地方
- 被抽出去的公共静态类的成员属性
并发问题原因
- 公共静态类会被多个线程同时调用执行,此时成员属性就面临并发问题
这里我们通过一个简单的并发的实例来了解一下
object GameKPI {
def main(args: Array[String]): Unit = {
//"2016-02-01"
val startDate = args(0)
//"2016-02-02"
val endDate = args(1)
//查询条件
val dateFormat1 = new SimpleDateFormat("yyyy-MM-dd")
//查寻条件的的起始时间
val startTime = dateFormat1.parse(startDate).getTime
//查寻条件的的截止时间
val endTime = dateFormat1.parse(endDate).getTime
//Driver 定义的一个 simpledataformat
val dateFormat2 = new SimpleDateFormat("yyyy 年 MM 月 dd 日,E,HH:mm:ss")
val conf = new SparkConf().setAppName("GameKPI").setMaster("local[2]")
val sc = new SparkContext(conf)
//从哪里读取数据
val lines: RDD[String] = sc.textFile(args(2))
//整理并过滤
val splited: RDD[Array[String]] = lines.map(line => line.split("[|]"))
//按日期过过滤
val filterd = splited.filter(fields => {
val t = fields(0)
val time = fields(1)
val timeLong = dateFormat2.parse(time).getTime
t.equals("1") && timeLong >= startTime && timeLong < endTime
})
val dnu = filterd.count()
println(dnu)
sc.stop()
}
}
上述程序可以正常访问资源
定义一个工具类, 简化代码和业务操作
并发问题:
```java object FilterUtils{ //如果 object 使用了成员变量,那么会出现线程安全问题,因为 object 是一个单例,多线程可以同时调用这个方法 //val dateFormat = new SimpleDateFormat(“yyyy 年 MM 月 dd 日,E,HH:mm:ss”)//FastDateFormat 是线程安全的 val dateFormat = FastDateFormat.getInstance(“yyyy 年 MM 月 dd 日,E,HH:mm:ss”)
def filterByType(fields: Array[String], tp: String) = {
val _tp = fields(0)
_tp == tp
}
def filterByTime(fields: Array[String], startTime: Long, endTime: Long) = {
val time = fields(1)
val timeLong = dateFormat.parse(time).getTime
timeLong >= startTime && timeLong < endTime
} }
object GameKPIV2 { def main(args: Array[String]): Unit = { //“2016-02-01” val startDate = args(0) //“2016-02-05” val endDate = args(1) val dateFormat1 = new SimpleDateFormat(“yyyy-MM-dd”) val startTime = dateFormat1.parse(startDate).getTime val endTime = dateFormat1.parse(endDate).getTime val conf = new SparkConf().setAppName(“GameKPIV2”).setMaster(“local[2]”) val sc = new SparkContext(conf) //以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(2)) //整理并过滤 val splited: RDD[Array[String]] = lines.map(line => line.split(“[|]”)) // 按日期过过滤 val filteredByType = splited.filter(fields => { FilterUtils.filterByType(fields, “1”) }) val filtered = filteredByType.filter(fields => { FilterUtils.filterByTime(fields, startTime, endTime) }) val dnu = filtered.count() println(dnu) sc.stop() } }
```