为什么要有持久化

因为RDD是不存储数据的,如果一个RDD需要重复使用,那么需要从头再次执行来获取数据,会使得性能下降。

RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存 在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

  1. // cache 操作会增加血缘关系,不改变原有的血缘关系
  2. println(wordToOneRdd.toDebugString)
  3. // 数据缓存。
  4. wordToOneRdd.cache()
  5. // 可以更改存储级别
  6. //mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别:
RDD持久化 - 图1

缓存级别 userDisk 是否使用磁盘 useMemory 是否使用内存 useOffHeap 是否使用堆外内存 deserialized 是否以反序列化形式存储 replication 副本数
NONE false false false false 1
DISK_ONLY true false false false 1
DISK_ONLY_2 true false false false 2
MEMORY_ONLY false true false true 1
MEMORY_ONLY_2 false true false true 2
MEMORY_ONLY_SER false true false false 1
MEMORY_ONLY_SER_2 false true false false 2
MEMORY_AND_DISK true true false true 1
MEMORY_AND_DISK_2 true true false true 2
MEMORY_AND_DISK_SER true true false false 1
MEMORY_AND_DISK_SER_2 true true false false 2
OFF_HEAP true true true false 1

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机 制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可, 并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样 做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时 候,如果想重用数据,仍然建议调用 persist 或 cache。缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。 Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样 做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。

RDD CheckPoint 检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点 之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

  1. // 设置检查点路径
  2. sc.setCheckpointDir("./checkpoint1")
  3. // 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
  4. val lineRdd: RDD[String] = sc.textFile("input/1.txt")
  5. // 业务逻辑
  6. val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
  7. val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
  8. word => {
  9. (word, System.currentTimeMillis())
  10. } }
  11. // 增加缓存,避免再重新跑一个 job 做 checkpoint
  12. wordToOneRdd.cache()
  13. // 数据检查点:针对 wordToOneRdd 做检查点计算
  14. wordToOneRdd.checkpoint()
  15. // 触发执行逻辑
  16. wordToOneRdd.collect().foreach(println)

缓存和检查点的区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。

Cache:将数据临时存储在内存中进行数据重用,会在血缘关系中添加新的依赖,一旦出现问题,可以重头读取数据

Persist:将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,如果作业执行完毕,临时保存的数据文件就会丢失。

Checkpoint:将数据长久的保存到磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,为了保证数据安全,所以一般情况下,会独立执行作业,为了能够提高效率,一般情况下,是需要和cache联合使用,执行过程中,会切断血缘关系,重新简历新的血缘关系,checkpoint等同于改变数据源。