- Preface
- Definition
- Difference between Datasets and RDDs of Case Classes
- When to Use the Low-Level APIs?
- Types of RDDs
- 属性
- 创建RDD
- Interoperating Between Dataframes, Datasets and RDD
- From a Local Collection
- From Data Sources
- Actions
- reduce
- take
- count
- first max and min
- Saving Files
- saveAsTextFile
- SequenceFiles
- Hadoop Files
- Caching
- setCheckPoint
Preface
本文为阅读Spark The Definitive Guide Chapter 12所做的归纳与整理
Definition
Immutable, Partitioned Collection Of Records - No Concept Of Rows in RDD, individual Records are just Java/Scala/Python Objects - Spark’s Structured API automatically store data in an optimized, compressed binary format while you need to implement this format inside your objects manually
Difference between Datasets and RDDs of Case Classes
Dataset take advantage of the optimizer and format conversion
Dataset donnot need to serialize the whole object
When to Use the Low-Level APIs?
When you’re calling a DataFrame transformation, it actually just becomes a set of RDD transformations
You need some functionality that you cannot find in the higher-level APIs; for example, if you need very tight control over physical data placement across the cluster
You need to maintain some legacy codebase written using RDDs.
You need to do some custom shared variable manipulation.
Types of RDDs
Generic RDD
Key-Value RDD
属性
计算位置 ptionally, a list of preferred locations on which to compute each split (e.g., block locations for a Hadoop Distributed File System [HDFS] file)
Partitioner Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hashpartitioned)
互相依赖 A list of dependencies on other RDDs
分区计算 A function for computing each split
分区list A list of partitions
创建RDD
Interoperating Between Dataframes, Datasets and RDD
- Dataset[T]→RDD[T]
// in Scala: converts a Dataset[Long] to RDD[Long]
spark.range(500).rdd
- Dataframe -> RDD[T]
To operate on this data, you will need to convert this Row object to the correct data type or extract values out of it.
// in Scala Dataframe -> RDD[Long]
spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
- RDD -> Dataframe
// in Scala
spark.range(10).rdd.toDF()
From a Local Collection
// in Scala
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)
words.setName("myWords")
From Data Sources
1.line by line
spark.sparkContext.textFile("/some/path/withTextFiles")
2.将整个file读取后作为一条记录
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")
Manipulating RDDs
所有对RDD的操作都是基于函数式编程的,并且与Dataframe不同的是,所有对RDD的操作都是基于原生的Java或者Scala对象
Transformations
filter
接受一个返回Boolean的函数(method)
// in Scala
words.filter(word => startsWithS(word)).collect()
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
words2.filter(record => record._3).take(5)
sort
Sometimes, each current row should return multiple rows, instead
words.sortBy(word => word.length() * -1).take(2)
distinct
words.distinct().count()
map
输入一行 apply相应的函数 输出一行
// in Scala
val words2 = words.map(word => (word, word(0), word.startsWith("S")))
flatMap
接受一个返回Iterable对象的函数
// in Scala
words.flatMap(word => word.toSeq).take(5)
RandomSplit
randomly split an RDD into an Array of RDDs
// in Scala
val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))
Actions
Actions either collect data to the driver or write to an external data source
reduce
You can use the reduce method to specify a function to “reduce” an RDD of any kind of value to one value
// in Scala
spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
// in Scala spark.sparkContext.parallelize(1 to 20).reduce(_ + _) // 210
// in Scala
def wordLengthReducer(leftWord:String, rightWord:String): String =
{ if (leftWord.length > rightWord.length)
return leftWord
else
return rightWord
}
words.reduce(wordLengthReducer)
take
words.take(5)
words.takeOrdered(5)
words.top(5)
val withReplacement = true
val numberToTake = 6
val randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)
take and its derivative methods take a number of values from your RDD. This works by first scanning one partition and then using the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
count
words.count()
val confidence = 0.95 val timeoutMilliseconds = 400
//如果超出一定时间,返回近似值
words.countApprox(timeoutMilliseconds, confidence)
//传入relative accuracy
words.countApproxDistinct(0.05)
//将result set 放进driver
words.countByValue()
words.countByValueApprox(1000, 0.95)
first max and min
words.first()
spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()
max and min return the maximum values and minimum values, respectively:
- first The first method returns the first value in the dataset:
Saving Files
Saving files means writing to plain-text files. With RDDs, you cannot actually “save” to a data source in the conventional sense. You must iterate over the partitions in order to save the contents of each partition to some external database. This is a low-level approach that reveals the underlying operation that is being performed in the higher-level APIs.
saveAsTextFile
words.saveAsTextFile("file:/tmp/bookTitle")
import org.apache.hadoop.io.compress.BZip2Codec
words.saveAsTextFile("file:/tmp/bookTitleCompressed", classOf[BZip2Codec])
SequenceFiles
A sequenceFile is a flat file consisting of binary key–value pairs. It is extensively used in MapReduce as input/output formats.
words.saveAsObjectFile("/tmp/my/sequenceFilePath")
Hadoop Files
Hadoop支持的file格式,参见HadoopTheDefinitiveGuide
Caching
cache将结果存在memory中
persist可以指定StorageLevel
// in Scala
words.getStorageLevel
words.cache()
rdd1.persist(StorageLevel.MEMORY_AND_DISK)
setCheckPoint
check pointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source
spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing")
words.checkpoint()
Pipe RDDs to System Commands
words.pipe("wc -l").collect()
mapPartition
Map an individual partition
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。
下面这行代码的结果将是words的分区数量
// in Scala
words.mapPartitions(part => Iterator[Int](1)).sum()
val sc = new SparkContext(new SparkConf().setAppName("map_mapPartitions_demo").setMaster("local"))
val arrayRDD =sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
arrayRDD.mapPartitions(elements=>{
var result = new ArrayBuffer[Int]()
elements.foreach(element=>{
result.+=(element)
})
result.iterator
}).foreach(println)
mapPartitionsWithIndex
函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引
def indexedFunc(partitionIndex:Int, withinPartIterator: Iterator[String]) = {
withinPartIterator.toList.map(
value => s"Partition: $partitionIndex => $value").iterator
}
words.mapPartitionsWithIndex(indexedFunc).collect()
forEachPartition
words.foreachPartition { iter =>
import java.io._
import scala.util.Random
val randomFileName = new Random().nextInt()
val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
while (iter.hasNext) {
pw.write(iter.next())
}
pw.close()
}
Glom
glom is an interesting function that takes every partition in your dataset and converts them to arrays. This can be useful if you’re going to collect the data to the driver and want to have an array for each partition
spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom().collect()