有闭包就需要进行序列化
    /*
    spark算子的函数是在Executor中执行,算子外的代码是在Driver中执行
    所以如果算子中的函数有使用外部变量,此时要求该变量能够序列化,否则Driver定义的对象不能在Executor中使用

    spark中序列化方式有两种:
    1、java序列化[默认]: 会序列化类的信息、类的继承信息、类的属性信息、类型信息等等
    2、kryo序列化: 只会序列化列的基本信息[类名、属性名、属性值等]
    kryo序列化的性能比java序列化性能要高,工作中一般推荐使用kryo序列化
    kryo序列化性能要比java序列化性能高10倍
    spark中如何使用kryo序列化:
    1、配置spark 序列化方式: sparkconf..set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
    2、注册哪些类后续使用kryo序列化[可选]: .registerKryoClasses(Array(classOf[Student]),写了就不会序列化全类名,效率更高些,但如果需要序列化的类太多,手动写不方便,不建议使用
    */

    1. package tcode.day05
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. import org.junit.Test
    5. class $01_Ser{
    6. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test").set("spark.serializer","org.apache.spark.serializer.KryoSerializer"))
    7. /**
    8. * 闭包: 函数体中使用了外部变量的函数称之为闭包
    9. */
    10. @Test
    11. def closePackage(): Unit ={
    12. val x = 10
    13. val func = (a:Int) => a+x
    14. println(func(20))
    15. }
    16. @Test
    17. def sparkSeri(): Unit ={
    18. val rdd = sc.parallelize(List(10,20,30,40,50))
    19. //val x = 2
    20. val person = Person()
    21. val rdd2 = person.to(rdd)
    22. /*val rdd2 = rdd.map(y=> {
    23. y * person.age
    24. })*/
    25. println(rdd2.collect().toList)
    26. }
    27. }
    28. /**
    29. * 如果是内部类中有rdd方法,在类中调用内部类的rdd类型的函数,那么类中如果使用了内部类的属性,那么本类也需要继承序列化类
    30. * 如果类在外部被定义且也有rdd方法,在类中调用外部类中的rdd方法,外部类需要继承序列化类,而本类不需要序列化,但外部类的rdd方法内部定义了变量,那么使用该变量不需要序列化
    31. * 如果外部类没有继承,那么也可以使用样例类,样例类底层继承了序列化类
    32. */
    33. case class Person() {
    34. var name:String = "lisi"
    35. var age:Int = 100
    36. def to(rdd:RDD[Int]):RDD[Int]={
    37. rdd.map(x=> x * age)
    38. }
    39. }

    序列化大小实验

    1. package tcode.day05
    2. import com.esotericsoftware.kryo.Kryo
    3. import com.esotericsoftware.kryo.io.{Input, Output}
    4. import org.junit.Test
    5. import java.io.{FileInputStream, FileOutputStream, ObjectOutputStream}
    6. class $02_SparkSer {
    7. /**
    8. * java序列化写入
    9. */
    10. @Test
    11. def javaSerWrite(): Unit ={
    12. val stu = new Student()
    13. stu.name="lisi"
    14. stu.age=20
    15. val oos = new ObjectOutputStream( new FileOutputStream("d:/obj.txt") )
    16. oos.writeObject(stu)
    17. oos.flush()
    18. oos.close()
    19. }
    20. /**
    21. * kryo序列化写入
    22. */
    23. @Test
    24. def kryoWrite(): Unit ={
    25. val stu = new Student()
    26. stu.name="lisi"
    27. stu.age=20
    28. val kryo = new Kryo()
    29. val output = new Output(new FileOutputStream("d:/kryoobj.txt"))
    30. kryo.writeObject(output,stu)
    31. output.flush()
    32. output.close()
    33. }
    34. @Test
    35. def kryoRead(): Unit ={
    36. val kryo = new Kryo()
    37. val input = new Input(new FileInputStream("d:/kryoobj.txt"))
    38. val student = kryo.readObject(input,classOf[Student])
    39. println(student.name)
    40. }
    41. }
    42. class Student() extends Serializable{
    43. var name:String = _
    44. var age:Int = _
    45. }