有闭包就需要进行序列化
/*
spark算子的函数是在Executor中执行,算子外的代码是在Driver中执行
所以如果算子中的函数有使用外部变量,此时要求该变量能够序列化,否则Driver定义的对象不能在Executor中使用
spark中序列化方式有两种:
1、java序列化[默认]: 会序列化类的信息、类的继承信息、类的属性信息、类型信息等等
2、kryo序列化: 只会序列化列的基本信息[类名、属性名、属性值等]
kryo序列化的性能比java序列化性能要高,工作中一般推荐使用kryo序列化
kryo序列化性能要比java序列化性能高10倍
spark中如何使用kryo序列化:
1、配置spark 序列化方式: sparkconf..set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
2、注册哪些类后续使用kryo序列化[可选]: .registerKryoClasses(Array(classOf[Student]),写了就不会序列化全类名,效率更高些,但如果需要序列化的类太多,手动写不方便,不建议使用
*/
package tcode.day05
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test
class $01_Ser{
val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test").set("spark.serializer","org.apache.spark.serializer.KryoSerializer"))
/**
* 闭包: 函数体中使用了外部变量的函数称之为闭包
*/
@Test
def closePackage(): Unit ={
val x = 10
val func = (a:Int) => a+x
println(func(20))
}
@Test
def sparkSeri(): Unit ={
val rdd = sc.parallelize(List(10,20,30,40,50))
//val x = 2
val person = Person()
val rdd2 = person.to(rdd)
/*val rdd2 = rdd.map(y=> {
y * person.age
})*/
println(rdd2.collect().toList)
}
}
/**
* 如果是内部类中有rdd方法,在类中调用内部类的rdd类型的函数,那么类中如果使用了内部类的属性,那么本类也需要继承序列化类
* 如果类在外部被定义且也有rdd方法,在类中调用外部类中的rdd方法,外部类需要继承序列化类,而本类不需要序列化,但外部类的rdd方法内部定义了变量,那么使用该变量不需要序列化
* 如果外部类没有继承,那么也可以使用样例类,样例类底层继承了序列化类
*/
case class Person() {
var name:String = "lisi"
var age:Int = 100
def to(rdd:RDD[Int]):RDD[Int]={
rdd.map(x=> x * age)
}
}
序列化大小实验
package tcode.day05
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import org.junit.Test
import java.io.{FileInputStream, FileOutputStream, ObjectOutputStream}
class $02_SparkSer {
/**
* java序列化写入
*/
@Test
def javaSerWrite(): Unit ={
val stu = new Student()
stu.name="lisi"
stu.age=20
val oos = new ObjectOutputStream( new FileOutputStream("d:/obj.txt") )
oos.writeObject(stu)
oos.flush()
oos.close()
}
/**
* kryo序列化写入
*/
@Test
def kryoWrite(): Unit ={
val stu = new Student()
stu.name="lisi"
stu.age=20
val kryo = new Kryo()
val output = new Output(new FileOutputStream("d:/kryoobj.txt"))
kryo.writeObject(output,stu)
output.flush()
output.close()
}
@Test
def kryoRead(): Unit ={
val kryo = new Kryo()
val input = new Input(new FileInputStream("d:/kryoobj.txt"))
val student = kryo.readObject(input,classOf[Student])
println(student.name)
}
}
class Student() extends Serializable{
var name:String = _
var age:Int = _
}