RDD Cache

  1. import org.apache.spark.rdd.RDD
  2. val rootPath: String = _
  3. val file: String = s"${rootPath}/wikiOfSpark.txt"
  4. // 读取文件内容
  5. val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
  6. // 以行为单位做分词
  7. val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
  8. val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
  9. // 把RDD元素转换为(Key,Value)的形式
  10. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
  11. // 按照单词做分组计数
  12. val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
  13. wordCounts.cache// 使用cache算子告知Spark对wordCounts加缓存
  14. wordCounts.count// 触发wordCounts的计算,并将wordCounts缓存到内存
  15. // 打印词频最高的5个词汇
  16. wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
  17. // 将分组计数结果落盘到文件
  18. val targetPath: String = _
  19. wordCounts.saveAsTextFile(targetPath)
  20. takesaveAsTextFile这两个操作执行的都很慢。
  21. cache函数并不会立即触发RDD在内存中的物化,因此还需要调用count算子来触发这一执行过程。
  22. cache函数实际上会进一步调用persisit来为其添加缓存,因此.cache与.persist(MEMORY_ONLY)是等价的

https://time.geekbang.org/column/article/418079
https://time.geekbang.org/column/article/421566
https://time.geekbang.org/column/article/423131

常用算子图表

image.png


Transformations

map

给定映射函数f,map(f)以元素为粒度对RDD做数据转换。其中f可以是带有明确签名的带名函数,也可以是匿名函数,它的型参类型必须与RDD的元素类型保持一致

  1. // 把普通RDD转换为Paired RDD
  2. // 匿名函数
  3. val cleanWordRDD: RDD[String] = _ // 省略完整代码
  4. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
  5. // 带名函数
  6. def f(word: String): (String, Int) = {
  7. return (word, 1)
  8. }
  9. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

map的性能考虑

  1. // 把普通RDD转换为Paired RDD
  2. import java.security.MessageDigest
  3. val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
  4. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  5. // 获取MD5对象实例
  6. val md5 = MessageDigest.getInstance("MD5")
  7. // 使用MD5计算哈希值
  8. val hash = md5.digest(word.getBytes).mkString
  9. // 返回哈希值与数字1的Pair
  10. (hash, 1)
  11. }

对于RDD中的每一条数据记录,我们都需要实力化一个MessageDigest对象来计算这个元素的哈希值。如果RDD有上百万甚至上亿级别的数据记录,实力化对象的开销就会聚成沙。因此,可以用mapPartitions和mapPartitionsWithIndex来解决类似的问题

mapPartitions

以数据分区为粒度的数据转换

  1. // 把普通RDD转换为Paired RDD
  2. import java.security.MessageDigest
  3. val cleanWordRDD: RDD[String] = _ // 请参考第一讲获取完整代码
  4. val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
  5. // 注意!这里是以数据分区为粒度,获取MD5对象实例
  6. val md5 = MessageDigest.getInstance("MD5")
  7. val newPartition = partition.map( word => {
  8. // 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
  9. (md5.digest(word.getBytes()).mkString,1)
  10. })
  11. newPartition
  12. })

image.png

flatMap

从元素到集合,再从集合到元素
image.png

  1. // 读取文件内容
  2. val lineRDD: RDD[String] = _ // 请参考第一讲获取完整代码
  3. // 以行为单位提取相邻单词
  4. val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
  5. // 将行转换为单词数组
  6. val words: Array[String] = line.split(" ")
  7. // 将单个单词数组,转换为相邻单词数组
  8. for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
  9. })

image.png

filter

  1. // 定义特殊字符列表
  2. val list: List[String] = List("&", "|", "#", "^", "@")
  3. // 定义判定函数f
  4. def f(s: String): Boolean = {
  5. val words: Array[String] = s.split("-")
  6. val b1: Boolean = list.contains(words(0))
  7. val b2: Boolean = list.contains(words(1))
  8. return !b1 && !b2 // 返回不在特殊字符列表中的词汇对
  9. }
  10. // 使用filter(f)对RDD进行过滤
  11. val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)

Actions

1.将分布式计算结果直接落盘的操作,如DataFrame的write, RDD的saveAsTextFile
2.将分布式结果收集到Driver端的操作,如first, take, collect