什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
要点:
1、弹性:
存储弹性:内存不足,自动落盘
容错弹性:过程中断后,数据可以根据RDD流程重新读取并恢复数据
计算弹性:shuffle阶段如果访问下一RDD,会有默认的3次重试读取下一阶段的机会
分片弹性:也就是分片,和mr的切片一样,不同切片负责处理不同的block,在RDD中,也是不同的partition处理block
2、不可变
RDD封装计算逻辑,不保存数据
而且不可改变,要想改变,只能产生新的RDD
RDD五大特性
RDD类中对应这这5大特性的方法:
1、分区列表(getPartitions): 数据是分布式存储,所以在计算的时候也是分布式,一般读取HDFS文件的时候一个文件切片对应一个RDD分区
2、作用在每个分区上的计算函数(compute(Partition,TaskContex)): task计算的时候并行的,每个task计算逻辑是一样的,只是处理的数据不一样(同一个task被复制传输到不同节点上实现相同逻辑)
3、依赖关系(getDependencies): RDD不存储数据,RDD中只是封装数据的处理逻辑,如果数据出错需要根据依赖关系从头进行处理得到数据
4、分区器(partitioner): 在shuffle阶段,需要通过分区器将相同key的数据聚在一个分区统一处理
5、优先位置(getPreferredLocations(partition)): spark在分配task的时候最好将task分配到数据所在的节点,避免网络拉取数据影响效率
rdd编程
RDD的创建
通过集合创建:一般测试用
sc.makeRDD(集合)<br /> sc.parallelize(集合)<br /> makeRDD底层就是使用的parallelize
读取文件创建
sc.textFile(path)<br />总结:写绝对路径不会错:<br />想访问hdfs:sc.textFile("hdfs://hadoop102:8020/..../..")<br />想访问本地文件:sc.textFile("file:///../...")
**1、在spark集群中如果有配置HADOOP_CONF_DIR,此时默认读取的是HDFS文件**<br />1)读取HDFS文件的3中方式
sc.textFile("/.../..")
sc.textFile("hdfs://hadoop102:8020/..../..")
sc.textFile("hdfs:///../..")
2)若配置了此参数,但还想读取本地文件:
sc.textFile("file:///../...")
2、在spark集群中如果没有配置HADOOP_CONF_DIR,此时默认读取的是本地文件
1)读取HDFS文件的时候:
sc.textFile("hdfs://hadoop102:8020/..../..")
2)读取本地文件:
sc.textFile("/.../..")
sc.textFile("file:///../...")
通过其他RDD衍生
val rdd2 = rdd1.map/flatMap/..
分区规则
根据集合创建RDD的分区数
*<br /> * 1)numSlices参数没有配置,默认情况下<br /> * 如果有配置spark.default.parallelism,此时RDD的分区数 = spark.default.parallelism的值<br /> * 配置:new SparkConf().set("spark.default.parallelism","1").setMaster("local[4]").setAppName("test")<br /> * 如果没有配置spark.default.parallelism<br /> * 1、local模式<br /> * 如果是local[N]parallelism参数值<br /> * 如果是local[*],此时RDD的分区数 = cpu总核数<br /> * 如果是local, 此时RDD的分区数 = 1<br /> * 如果是standalone,此时RDD的分区数 = math.max(totalCoreCount.get(), 2)<br /> * totalCoreCount = 本次任务CPU总核数<br /> * 2、集群模式,此时RDD分区数 = 本次任务executor总核数<br /> * 2)如果numSlices有设置,此时RDD的分区数 = numSlices<br /> *<br /> */
package tcode.day02
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test
class $02_RddPartition {
val sc = new SparkContext(new SparkConf()/*.set("spark.default.parallelism","1")*/.setMaster("local[4]").setAppName("test"))
@Test
def createRddByCollectionPartition(): Unit ={
val rdd = sc.parallelize(List(1,4,7,2,5,9,10,20,43,55),4)
rdd.mapPartitionsWithIndex((index,it) => {
println(s"分区号=${index} 分区数据=${it.toList}")
it
}).collect()
println(rdd.partitions.length)
}
}
源码解释
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
//ParallelCollectionRDD.slice(data, numSlices).toArray
//data = List(1,4,7,2,5,9,10,20,43,55)
//numSlices = 4
//def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// if (numSlices < 1) {
// throw new IllegalArgumentException("Positive number of partitions required")
// }
// def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
// //length=10 numSlices=4
//
// (0 until numSlices)
// // Range(0,1,2,3)
// .iterator.map { i =>
// //第一次执行 i = 0
// // start = 0 end = 2
// //第一次执行结果: (0,2)
// //第二次执行 i = 1
// // start = 2 end = 5
// // 第二次执行结果: (2,5)
// //第三次执行 i = 2
// // start = 5 end = 7
// // 第三次执行结果: (5,7)
// //第四次执行 i = 3
// // start = 7 end = 10
// // 第四次执行结果: (7,10)
// val start = ((i * length) / numSlices).toInt
// val end = (((i + 1) * length) / numSlices).toInt
// (start, end)
// }
//
// }
// seq match {
// case _ =>
// val array = seq.toArray
// // array = Array(1,4,7,2,5,9,10,20,43,55)
// //positions(10, 4)
// //positions(array.length, numSlices)
// //执行结果: Iterator[(0,2),(2,5),(5,7),(7,10)]
// .map { case (start, end) =>
// //第一次执行: (0,2)
// //array.slice(0,2) 结果: Array(1,4)
// //第二次执行: (2,5)
// //array.slice(2,5) 结果: Array(7,2,5)
// //第三次执行: (5,7)
// //array.slice(5,7) 结果: Array(9,10)
// //第四次执行: (7,10)
// //array.slice(7,10) 结果: Array(20,43,55)
// array.slice(start, end).toSeq
// }
//
// .toSeq
// //执行结果: Seq(Array(1,4),Array(7,2,5),Array(9,10),Array(20,43,55))
// }
//}
//slices = Array(Array(1,4),Array(7,2,5),Array(9,10),Array(20,43,55))
slices.indices
//0 until slices.length = [0,1,2,3]
.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
//第一次: i = 0 new ParallelCollectionPartition(id, 0, Array(1,4))
通过读取文件创建RDD
RDD默认分区数>= math.min(defaultParallelism,2)<br /> 读取文件创建的RDD的分区数 = 文件的切片数
/**
* 根据读取文件创建rdd的分区数 >= math.min(defaultParallelism, 2)
*/
@Test
def createRddByFilePartition(): Unit ={
val rdd = sc.textFile("datas/wc.txt",4)// 如果这里不设置是4,则默认是RDD默认分区数>= math.min(defaultParallelism,2)
println(rdd.partitions.length)
}
源码解释
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions)
new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)
//得到文件所有切片
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
//对文件切片
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
//获取要读取的所有文件
FileStatus[] files = listStatus(job);
//统计要读取的所有文件的总大小
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
// totalSize = 75B
// numSplits = 4
// goalSize = 75 / 4 = 18 B
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
//
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
//minSize = Math.max(0,1) = 1
//创建一个装载切片的数组
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
//遍历要读取的文件
for (FileStatus file: files) {
//获取当前文件的大小
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
//获取文件块位置
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//
if (isSplitable(fs, path)) {
//获取当前文件块的大小 【HDFS文件块大小=128M,本地文件块的大小=32M】
long blockSize = file.getBlockSize();
// 计算切片大小
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
//splitSize = Math.max(1, Math.min(18, 32)) = 18B
//循环切片
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 因为goalSize是整除求出来的,而这里循环用的是double,所以会有5个切片
// 第一次循环: 75/18 = 4.12 >1.1
// 第一个切片: 从0B开始切18B
// bytesRemaining = 75-18 = 57
// 第二次循环: 57/18 = 3.2 >1.1
// 第二个切片: 从18B开始切18B
// bytesRemaining = 57-18 = 39
// 第三次循环: 39/18 = 2.25 >1.1
// 第三次切片: 从36B开始切18B
// bytesRemaining = 39-18 = 21
// 第四次循环: 21/18 = 1.16 >1.1
// 第四次切片: 从54B开始切18B
// bytesRemaining = 21-18 = 3
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
//获取最后一个切片
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
//第五个切片: 从72B开始切3B
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}
通过其他RDD衍生的RDD
衍生出的RDD的分区数 = 父RDD的分区
/**
* 根据rdd创建的rdd的分区数默认 = 依赖的rdd的分区数
*/
@Test
def createRddByRddPartition(): Unit ={
val rdd = sc.parallelize(List(1,4,7,2,5,9,10,20,43,55),20)
val rdd2 = rdd.map(x=>x*x)
println(rdd2.partitions.length)
}
//spark中算子中的函数是在executor中执行的,算子外面的代码是在Driver中执行的
分区种类
shuffle前分区
shuffle后分区(reduce)
默认使用hashPartitioner进行分区: