第一章.Action行动算子
1.reduce
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}import org.junit.Testclass $01_Action {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))/*** reduce(func:(RDD元素类型,RDD元素类型)=>RDD元素类型):对一个RDD所有元素聚合* reduce是通过函数在每个分区中对分区内的所有数据先聚合,然后将每个分区的聚合结果发给driver,又driver汇总聚合*/@Testdef reduce():Unit={val rdd = sc.parallelize(List(1,4,3,2,8,10,9,7))val result = rdd.reduce((agg,curr)=>{println(s"${Thread.currentThread().getName} agg=${agg} curr=${curr}")agg+curr})println(result)Thread.sleep(100000)}}
2.collect
/*** collect:收集Rdd每个分区的数据返回给Driver<重点>* 如果RDD分区数据比较大,Driver内存默认只有1G,所以*/@Testdef collect():Unit={val rdd = sc.parallelize(List(1,4,3,2,8,10,9,7))println(rdd.collect().toList)}
3.count
/*** count:统计RDD元素个数*/@Testdef count():Unit={val rdd = sc.parallelize(List(1,4,3,2,8,10,9,7))println(rdd.count())}
4.first
/*** first:获取RDD第一个元素**/@Testdef first():Unit={val rdd1 = sc.parallelize(List(10,4,3,2,8,10,9,7))rdd1.mapPartitionsWithIndex((index,it)=>{println(s"rdd1 index=${index} data=${it.toList}")it}).collect()println(rdd1.first())val rdd2 = rdd1.map(x => (x, x)).partitionBy(new HashPartitioner(3))rdd2.mapPartitionsWithIndex((index,it)=>{println(s"rdd2 index=${index} data=${it.toList}")it}).collect()println(rdd2.first())}
5.take
/*** take:获取前N个元素*/@Testdef take():Unit={val rdd1 = sc.parallelize(List(10,4,3,2,8,10,9,7))println(rdd1.take(3).toList)}
6.takeOrdered
/*** takeOrdered:对RDD元素排序之后取前N个元素**/@Testdef takeOrdered():Unit={val rdd1 = sc.parallelize(List(10,4,3,2,8,10,9,7))println(rdd1.takeOrdered(3).toList)}
7.aggregate
/*** aggregate(默认值)(seqop:(默认值类型,RDD元素类型)=>默认值类型,comop:(默认值类型,默认值类型)=>默认值类型):* 先对每个分区所有数据聚合,然后将所有分区的聚合结果发给driver汇总* seqop:在每个分区中对分区的所有数据进行聚合,每个分区第一次聚合的时候,函数第一个参数的初始值=默认值* comop:在Driver中对每个分区汇总结果再次全局汇总,Driver第一次计算的时候,函数的第一个参数的初始值=默认值*/@Testdef aggregate():Unit={val rdd = sc.parallelize(List(10,4,3,2,8,10,9,7))rdd.mapPartitionsWithIndex((index,it)=>{println(s"rdd index=${index} data=${it.toList}")it}).collect()rdd.aggregate(1000)((agg,curr)=>{println(s"分区汇总过程: agg=${agg} curr=${curr}")agg+curr}, (agg,curr)=>{println(s"driver汇总过程: agg=${agg} curr=${curr}")agg+curr})}
8.fold
/*** fold与aggreate的区别* aggreate的分区间与Driver汇总计算逻辑可以不一样* fold的分区间与Driver汇总计算逻辑一样*/@Testdef fold():Unit= {val rdd = sc.parallelize(List(10, 4, 3, 2, 8, 10, 9, 7))val rdd2 = rdd.fold(1000)((agg,curr)=>{println(s"汇总过程: agg=${agg} curr=${curr}")agg+curr})}
9.countByKey
/*** countByKey:统计每个Key的个数* countByKey一般结合sample使用,工作中出现数据倾斜以后一般先用sample采集样本数据,* 然后使用countByKey统计样本数据中每个key的个数,从而判断哪些key出现了数据倾斜,从而判断哪些key使用了数据倾斜*/@Testdef countByKey()={val rdd = sc.parallelize(List("aa"->10,"bb"->20,"aa"->3,"aa"->4,"cc"->50,"bb"->1))println(rdd.countByKey)}
10.save
/*** save:保存RDD数据到磁盘*/@Testdef save()={val rdd = sc.parallelize(List("aa"->10,"bb"->20,"aa"->3,"aa"->4,"cc"->50,"bb"->1))rdd.saveAsTextFile("output/text")}
11.foreach
/*** foreach(func:RDD元素类型=>Unit):遍历* foreach与map的区别* map是转换算子,会生成新的RDD* foreach是行动算子,没有返回值* foreach里面的函数是针对元素操作,RDD有多少元素,函数就调用多少次*/@Testdef foreach()={val rdd = sc.parallelize(List("aa"->10,"bb"->20,"aa"->3,"aa"->4,"cc"->50,"bb"->1))rdd.foreach(x=>println(x))}
11.foreachPartition
/*** foreachPartition(func:Iterator[RDD元素类型]=>Unit):Unit<常用>* foreachPartition一般用于将数据保存在mysql,hbase,redis等地方*/@Testdef foreachPartition():Unit={val rdd = sc.parallelize(List("zhangsan"->10,"lisi"->20,"wangwu"->3,"zhaoliu"->4))rdd.foreachPartition(it=>{var connection:Connection = nullvar statement:PreparedStatement = nulltry{connection = DriverManager.getConnection(".....")statement = connection.prepareStatement("insert into .. values(?,?)")var i = 0it.foreach(x=>{statement.setString(1,x._1)statement.setInt(2,x._2)statement.addBatch()if(i%1000==0){//提交一个批次的数据statement.executeBatch()statement.clearBatch()}i=i+1})//提交最后一个不满1000条的批次数据statement.executeBatch()}catch{case e:Exception => e.printStackTrace()}finally{if(statement!=null)statement.close()if(connection!=null)connection.close()}})}
第二章.RDD序列化
1.闭包
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}import org.junit.Testclass $02_closePackage {/*** 闭包:函数体中调用外部变量的函数称之为闭包*/@Testdef m(): Unit ={val y = 10//闭包函数val func = (x:Int)=>x+yprintln(func(100))val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd = sc.parallelize(List(1,4,3,2,8,9,5,7))//Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的//此时y是Int类型,底层实现了序列化val rdd2 = rdd.map(x => {x * y})println(rdd2.collect().toList)}}
2.Spark序列化
- 闭包使用未序列化的外部变量时
@Testdef n()={val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd = sc.parallelize(List(1,4,3,2,8,9,5,7))val p = new Person//Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的//此时p是person类型,是在driver里面定义的,p并没有序列化val rdd2 = rdd.map(x => {x * p.y})println(rdd2.collect().toList)}class Person{var y = 10}
运行结果报错:
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/2975ab4f0eb35fbdef7b5f931484145e.png)
- driver中使用的变量可以序列化时
@Testdef p()={val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd = sc.parallelize(List(1,4,3,2,8,9,5,7))val p = new Person1//Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的//此时a是局部变量,是Int类型,可以被序列化val rdd2 = p.cover(rdd)println(rdd2.collect().toList)}class Person1{var y = 10def cover(rdd:RDD[Int])={val a:Int = yrdd.map(x=>x*a)}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/fbac2107377ab3a5a99721f19f987143.png)
- 使用样例类,样例类底层实现了序列化接口
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDobject $03_closePackage {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd = sc.parallelize(List(1, 4, 3, 2, 8, 9, 5, 7))val p = new Person2//Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的//此时a是局部变量,是Int类型,可以被序列化val rdd2 = p.cover(rdd)println(rdd2.collect().toList)}case class Person2() {var y = 10def cover(rdd: RDD[Int]) = {val a: Int = yrdd.map(x => x * y)}}}
- Spark序列化方式
spark为什么要序列化?(重点)
- spark算子里面的代码是在Executor中执行,算子外面的代码是在Driver中执行,如果算子里面有使用Driver定义的对象,此时需要将Driver定义的对象序列化之后传给Task使用
spark序列化分为两种
- java序列化方式:spark默认使用
- 使用java序列化时会将类的全类名,继承信息,属性信息,属性的类型信息,其他信息全部都会序列化
- Kryo序列化方式
- 使用Kryo序列化时只会序列化类的基本信息,比如类名,属性名,属性类型
- kryo序列化性能上比java序列化高10倍左右
- 工作中一般使用kryo序列化
spark如何使用序列化?(重点)
- 需要在sparkConf中配置spark序列化的默认方式: new SparkConf().set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”) 【spark优化】
- 配置哪些类的对象后续使用kryo序列化[可选]: conf.registerKryoClasses(Array(classOf[Student]))
验证kryo序列化比java序列化方式性能高
- java序列化
package com.atguigu.spark.day05import org.junit.Testimport java.io.{FileOutputStream, ObjectOutputStream}import scala.beans.BeanPropertyclass $03_Ser {/*** java序列化*/@Testdef javaSer():Unit={val student = new Studentstudent.setName("lisi")student.setAge(20)val oos = new ObjectOutputStream(new FileOutputStream("d:/java.txt"))oos.writeObject(student)oos.flush()oos.close()}}class Student extends Serializable{@BeanPropertyvar name:String = _@BeanPropertyvar age:Int = _}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/e23fa46007277cc97d9d501e0436fce1.png)
- kryo序列化
package com.atguigu.spark.day05import com.esotericsoftware.kryo.Kryoimport com.esotericsoftware.kryo.io.Outputimport org.junit.Testimport java.io.{FileOutputStream, ObjectOutputStream}import scala.beans.BeanPropertyclass $04_Ser {/*** kryo序列化*/@Testdef kryoSer():Unit={val student = new Studentstudent.setName("lisi")student.setAge(20)val kryo = new Kryo()val output = new Output(new FileOutputStream("d:/kryo.txt"))kryo.writeObject(output,student)output.flush()output.close()}}class Student extends Serializable{@BeanPropertyvar name:String = _@BeanPropertyvar age:Int = _}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/5f5587ce0dd00e98f40cd36c6394bf33.png)
- 读取kryo序列化的对象
package com.atguigu.spark.day05import com.esotericsoftware.kryo.Kryoimport com.esotericsoftware.kryo.io.{Input, Output}import org.junit.Testimport java.io.{FileInputStream, FileOutputStream, ObjectOutputStream}import scala.beans.BeanPropertyclass $04_Ser {/**读取kryo序列化的对象*/@Testdef kryoRead():Unit={val kryo = new Kryo()val input = new Input(new FileInputStream("d:/kryo.txt"))val student = kryo.readObject(input,classOf[Student])println(student.name)println(student.age)}}class Student extends Serializable{@BeanPropertyvar name:String = _@BeanPropertyvar age:Int = _}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图5](/uploads/projects/liuye-6lcqc@gws1uf/ef1a9da9ca124717c3c8576a664f4bf8.png)
Spark配置序列化
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDobject $03_closePackage {def main(args: Array[String]): Unit = {//val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))/*** Spark配置序列化*/val conf = new SparkConf().set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]").setAppName("test")conf.registerKryoClasses(Array(classOf[Student]))val sc = new SparkContext(conf)val rdd = sc.parallelize(List(1, 4, 3, 2, 8, 9, 5, 7))val p = new Person2//Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的//此时a是局部变量,是Int类型,可以被序列化val rdd2 = p.cover(rdd)println(rdd2.collect().toList)}case class Person2() {var y = 10def cover(rdd: RDD[Int]) = {val a: Int = yrdd.map(x => x * y)}}}
第三章.RDD依赖关系
1.血统
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}/*** 血统:是指一个job中一系列RDD的依赖关系,是指一个家族之间的关系* 可以通过toDebugString查看RDD 的血统*/object $05_Lineage {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map(x => (x, 1))val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString)println(rdd4.collect().toList)}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图6](/uploads/projects/liuye-6lcqc@gws1uf/2c232f8dce63f482bcbda4496c74de05.png)
2.依赖关系
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}/*** 依赖关系:父子RDD的关系*/object $04_Dependcy {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))println("-" * 100)val rdd1 = sc.textFile("datas/wc.txt")println(rdd1.dependencies)println("-" * 100)val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.dependencies)println("-" * 100)val rdd3 = rdd2.map(x => (x, 1))println(rdd3.dependencies)println("-" * 100)val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.dependencies)println("-" * 100)println(rdd4.collect().toList)}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图7](/uploads/projects/liuye-6lcqc@gws1uf/2f204070d3839974692e9498a4a6d343.png)
- Spark之间的RDD依赖关系有两种
宽依赖:有shuffle操作的称之为宽依赖[父RDD一个分区的数据被子RDD多个分区所使用]
窄依赖:没有shuffle操作的称之为窄依赖[父RDD一个分区的数据只被子RDD一个分区所使用]
- job进行stage切分的时候是根据最后一个RDD的依赖关系依次从后往前查询,遇到宽依赖就切分stage,然后再次向前查询,直到查询到第一个RDD为止
- job中stage执行的时候是从前往后执行的,因为后面stage的输入数据是前面stage的输出数据
- Application:应用[一个Sparkcontext为一个应用]
- job :任务[一个action算子产生一个job]
- stage:阶段[一个job中stage的个数=shuffle个数+1]
- task:子任务[一个stage中task个数=该stage中最后一个RDD的分区数]
- 一个Application中多个job之间并行
- 一个job中多个stage串行
- 一个stage中多个task是并行的
3.stage切分源码
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)//this是调用collect算子的rddrunJob(rdd, func, 0 until rdd.partitions.length)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)//提交jobval waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)//将job信息放入消息池中,由专门的线程统一处理eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))//处理job提交dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)//切分stage并返回最后一个stagefinalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)//切分stageval parents = getOrCreateParentStages(rdd, jobId)//获取最靠近当前rdd的shuffle依赖getShuffleDependencies(rdd)//创建stagegetOrCreateShuffleMapStage(shuffleDep, firstJobId)//根据shuffle依赖再次向前找其他shufflegetMissingAncestorShuffleDependencies(shuffleDep.rdd)//创建stagecreateShuffleMapStage(shuffleDep, firstJobId)//创建stage对象val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)//创建最后一个stageval stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)//提交stagesubmitStage(finalStage)//获取当前stage的所有父stage,并根据id排序val missing = getMissingParentStages(stage).sortBy(_.id)//执行stagesubmitMissingTasks(stage, jobId.get)//将stage转成taskval tasks: Seq[Task[_]] = ....//提交tasktaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))//调度执行backend.reviveOffers()//给Driver发送提交task的指令override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers)}//Driver接受到提交task指令,开始提交taskcase ReviveOffers => makeOffers()//提交tasklaunchTasks(taskDescs)//向executor发送执行task指令executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))//val rdd1 = sc.textFile("...")//val rdd2 = rdd1.flatMap(_.split(" "))//val rdd3 = rdd2.map(x=>(x,1))//val rdd4 = rdd3.reduceByKey(_+_)//val rdd5 = rdd4.coalesce(1)//rdd5.collect//获取最靠近当前rdd的shuffle依赖private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {//创建一个装载shuffle依赖的容器val parents = new HashSet[ShuffleDependency[_, _, _]]//创建一个装载访问过的rdd的容器val visited = new HashSet[RDD[_]]//创建一个状态待访问的RDD的容器val waitingForVisit = new ListBuffer[RDD[_]]//将当前rdd放入待访问容器中waitingForVisit += rdd//判断是否有带访问的RDDwhile (waitingForVisit.nonEmpty) {//从带访问容器中取出第一个rdd进行访问val toVisit = waitingForVisit.remove(0)//判断该rdd之前是否被访问过if (!visited(toVisit)) {//将目前待访问的rdd放入已访问的容器中,后续在遇到该rdd就不用再访问了visited += toVisit//获取当前待访问rdd的依赖toVisit.dependencies//遍历每个依赖.foreach {//shuffle依赖case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDep//窄依赖case dependency =>waitingForVisit.prepend(dependency.rdd)}}}parents}
第四章.RDD持久化
1.RDD持久化的原因
- 未使用RDD持久化
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}object $06_Cache {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))sc.setLogLevel("error")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(line=>{println(s"----------------------->${line}")line.split(" ")})val rdd3 = rdd2.map(x => (x, 1))val rdd4 = rdd2.map(x=>x.length)rdd3.collect()rdd4.collect()}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图8](/uploads/projects/liuye-6lcqc@gws1uf/0194e897832a9faa82e832729d7950de.png)
- 使用RDD持久化
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}/*** RDD持久化的原因:如果同一个RDD在多个job中重复使用,那么默认情况下,该RDD之前的步骤会执行多次,会影响效率,所以需要在* 第一个job执行完成之后将该RDD数据持久化到磁盘,后续其他job需要该RDD数据的时候直接从磁盘获取数据,不用重复计算了** RDD持久化使用场景:* 1.同一个RDD在job中重复使用的时候,可以减少该RDD重复计算的次数* 2.一个job中RDD依赖链条太长的时候,可以使用RDD持久化防止计算出错导致重新计算花费大量的时间** RDD持久化方式* 1.缓存* 数据保存的位置: 内存/分区所在主机的本地磁盘* 如何使用缓存:* 1.rdd.cache()* 2.rdd.persist* cache与persist的区别:* cache的数据只保存在内存中* persist可以设置数据保存在哪里[内存/磁盘]* 存储级别* NONE: 代表不存储* DISK_ONLY: 只保存在磁盘中* DISK_ONLY_2: 只保存在磁盘中,数据保存两份* MEMORY_ONLY: 数据只保存在内存中* MEMORY_ONLY_2: 数据只保存在内存中,数据保存两份* MEMORY_ONLY_SER: 数据只保存在内存中以序列化形式存储* MEMORY_ONLY_SER_2 : 数据只保存在内存中以序列化形式存储,数据保存两份* MEMORY_AND_DISK : 数据一部分保存在内存一部分在磁盘* MEMORY_AND_DISK_2 : 数据一部分保存在内存一部分在磁盘,数据保存两份* MEMORY_AND_DISK_SER : 数据一部分保存在内存一部分在磁盘以序列化形式存储* MEMORY_AND_DISK_SER_2 : 数据一部分保存在内存一部分在磁盘以序列化形式存储,数据保存两份* OFF_HEAP: 数据保存在堆外内存中* 工作常用的存储级别: MEMORY_AND_DISK<用于大数据量场景>、MEMORY_ONLY<用于小数据量场景>* 2.chackpoint***/object $06_Cache {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))sc.setLogLevel("error")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(line=>{println(s"----------------------->${line}")line.split(" ")})//将RDD数据缓存下来,供后续job直接使用//val rdd21 = rdd2.cache()val rdd21 = rdd2.persist()val rdd3 = rdd21.map(x => (x, 1))val rdd4 = rdd21.map(x=>x.length)rdd3.collect()rdd4.collect()}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图9](/uploads/projects/liuye-6lcqc@gws1uf/83dc978055ca8eef5d0d86526c538ba9.png)
2.CheckPoint(检查点)
checkpoint
- 原因:缓存数据是保存在分区所在机器的磁盘/内存中,数据可能丢失,如果数据丢失需要重新计算,影响效率,所以最好将数据保存在可靠存储介质[HDFS]中
- 数据存储位置:数据存储在HDFS上
- 如何使用checkpoint
- 设置数据持久化路径: sc.setCheckpointDir(…)
- 持久化RDD
- checkpoint持久化时机:等到RDD所在的第一个job执行完成之后,会出发该RDDcheckpoint操作[该操作会重新出发一个新的job,该job执行当前RDD之前的数据处理步骤]
- 可以在checkpoint之前将RDD缓存起来避免重复执行
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}object $07_CheckPoint {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))sc.setLogLevel("error")//设置数据持久化路径sc.setCheckpointDir("checkpoint")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(line => {println(s"----------------------->${line}")line.split(" ")})rdd2.cache()rdd2.checkpoint()val rdd3 = rdd2.map(x => (x, 1))val rdd4 = rdd2.map(x => x.length)rdd3.collect()rdd4.collect()}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图10](/uploads/projects/liuye-6lcqc@gws1uf/3c7a4f40c1f3bbdbcd8e78584e2e5aa2.png)
3.缓存与checkpoint的区别
缓存与checkpoint的区别
- 数据持久化的位置不一样
- 缓存是将数据持久化到分区所在主机的内存/本地磁盘
- checkpoint是将数据持久化到HDFS中
- 依赖关系是否切除不一样
- 缓存是将数据持久化到分区所在主机的内存/本地磁盘,如果服务器宕机,数据丢失之后需要根据RDD的依赖关系重新计算得到数据
- checkpoint是将数据持久化到HDFS中,数据不会丢失,此时RDD的依赖关系会切除
4.shuffle与缓存
package com.atguigu.spark.day05import org.apache.spark.{SparkConf, SparkContext}/*** shuffle算子的数据会落盘相当于自带缓存操作*/object $08_Shuffle {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))sc.setLogLevel("error")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(line => {println(s"----------------------->${line}")line.split(" ")})val rdd3 = rdd2.map(x => (x, 1))val rdd4 = rdd3.reduceByKey(_+_)val rdd5 = rdd4.map(x => (x._1, x._2 + 100))val rdd6 = rdd4.map(x => (x._1, x._2 + 1000))rdd5.collect()rdd6.collect()}}
![$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图11](/uploads/projects/liuye-6lcqc@gws1uf/ecb34d80e348a9b30e62763f0cf14b5b.png)
