什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

image.png
要点:

1、弹性:
存储弹性:内存不足,自动落盘
容错弹性:过程中断后,数据可以根据RDD流程重新读取并恢复数据
计算弹性:shuffle阶段如果访问下一RDD,会有默认的3次重试读取下一阶段的机会
分片弹性:也就是分片,和mr的切片一样,不同切片负责处理不同的block,在RDD中,也是不同的partition处理block
2、不可变
RDD封装计算逻辑,不保存数据
而且不可改变,要想改变,只能产生新的RDD

RDD五大特性

RDD五大特性.pngRDD类中对应这这5大特性的方法:
1、分区列表(getPartitions): 数据是分布式存储,所以在计算的时候也是分布式,一般读取HDFS文件的时候一个文件切片对应一个RDD分区
2、作用在每个分区上的计算函数(compute(Partition,TaskContex)): task计算的时候并行的,每个task计算逻辑是一样的,只是处理的数据不一样(同一个task被复制传输到不同节点上实现相同逻辑)
3、依赖关系(getDependencies): RDD不存储数据,RDD中只是封装数据的处理逻辑,如果数据出错需要根据依赖关系从头进行处理得到数据
4、分区器(partitioner): 在shuffle阶段,需要通过分区器将相同key的数据聚在一个分区统一处理
5、优先位置(getPreferredLocations(partition)): spark在分配task的时候最好将task分配到数据所在的节点,避免网络拉取数据影响效率

rdd编程

RDD的创建

通过集合创建:一般测试用

  1. sc.makeRDD(集合)<br /> sc.parallelize(集合)<br /> makeRDD底层就是使用的parallelize

读取文件创建

  1. sc.textFile(path)<br />总结:写绝对路径不会错:<br />想访问hdfssc.textFile("hdfs://hadoop102:8020/..../..")<br />想访问本地文件:sc.textFile("file:///../...")
  2. **1、在spark集群中如果有配置HADOOP_CONF_DIR,此时默认读取的是HDFS文件**<br />1)读取HDFS文件的3中方式
  1. sc.textFile("/.../..")
  2. sc.textFile("hdfs://hadoop102:8020/..../..")
  3. sc.textFile("hdfs:///../..")

2)若配置了此参数,但还想读取本地文件:

  1. sc.textFile("file:///../...")

2、在spark集群中如果没有配置HADOOP_CONF_DIR,此时默认读取的是本地文件
1)读取HDFS文件的时候:

  1. sc.textFile("hdfs://hadoop102:8020/..../..")
  1. 2)读取本地文件:
  1. sc.textFile("/.../..")
  2. sc.textFile("file:///../...")

通过其他RDD衍生

  1. val rdd2 = rdd1.map/flatMap/..

分区规则

/**

根据集合创建RDD的分区数

  1. *<br /> * 1numSlices参数没有配置,默认情况下<br /> * 如果有配置spark.default.parallelism,此时RDD的分区数 = spark.default.parallelism的值<br /> * 配置:new SparkConf().set("spark.default.parallelism","1").setMaster("local[4]").setAppName("test")<br /> * 如果没有配置spark.default.parallelism<br /> * 1local模式<br /> * 如果是local[N]parallelism参数值<br /> * 如果是local[*],此时RDD的分区数 = cpu总核数<br /> * 如果是local, 此时RDD的分区数 = 1<br /> * 如果是standalone,此时RDD的分区数 = math.max(totalCoreCount.get(), 2)<br /> * totalCoreCount = 本次任务CPU总核数<br /> * 2、集群模式,此时RDD分区数 = 本次任务executor总核数<br /> * 2)如果numSlices有设置,此时RDD的分区数 = numSlices<br /> *<br /> */
  1. package tcode.day02
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.junit.Test
  4. class $02_RddPartition {
  5. val sc = new SparkContext(new SparkConf()/*.set("spark.default.parallelism","1")*/.setMaster("local[4]").setAppName("test"))
  6. @Test
  7. def createRddByCollectionPartition(): Unit ={
  8. val rdd = sc.parallelize(List(1,4,7,2,5,9,10,20,43,55),4)
  9. rdd.mapPartitionsWithIndex((index,it) => {
  10. println(s"分区号=${index} 分区数据=${it.toList}")
  11. it
  12. }).collect()
  13. println(rdd.partitions.length)
  14. }
  15. }

源码解释

  1. val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  2. //ParallelCollectionRDD.slice(data, numSlices).toArray
  3. //data = List(1,4,7,2,5,9,10,20,43,55)
  4. //numSlices = 4
  5. //def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
  6. // if (numSlices < 1) {
  7. // throw new IllegalArgumentException("Positive number of partitions required")
  8. // }
  9. // def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  10. // //length=10 numSlices=4
  11. //
  12. // (0 until numSlices)
  13. // // Range(0,1,2,3)
  14. // .iterator.map { i =>
  15. // //第一次执行 i = 0
  16. // // start = 0 end = 2
  17. // //第一次执行结果: (0,2)
  18. // //第二次执行 i = 1
  19. // // start = 2 end = 5
  20. // // 第二次执行结果: (2,5)
  21. // //第三次执行 i = 2
  22. // // start = 5 end = 7
  23. // // 第三次执行结果: (5,7)
  24. // //第四次执行 i = 3
  25. // // start = 7 end = 10
  26. // // 第四次执行结果: (7,10)
  27. // val start = ((i * length) / numSlices).toInt
  28. // val end = (((i + 1) * length) / numSlices).toInt
  29. // (start, end)
  30. // }
  31. //
  32. // }
  33. // seq match {
  34. // case _ =>
  35. // val array = seq.toArray
  36. // // array = Array(1,4,7,2,5,9,10,20,43,55)
  37. // //positions(10, 4)
  38. // //positions(array.length, numSlices)
  39. // //执行结果: Iterator[(0,2),(2,5),(5,7),(7,10)]
  40. // .map { case (start, end) =>
  41. // //第一次执行: (0,2)
  42. // //array.slice(0,2) 结果: Array(1,4)
  43. // //第二次执行: (2,5)
  44. // //array.slice(2,5) 结果: Array(7,2,5)
  45. // //第三次执行: (5,7)
  46. // //array.slice(5,7) 结果: Array(9,10)
  47. // //第四次执行: (7,10)
  48. // //array.slice(7,10) 结果: Array(20,43,55)
  49. // array.slice(start, end).toSeq
  50. // }
  51. //
  52. // .toSeq
  53. // //执行结果: Seq(Array(1,4),Array(7,2,5),Array(9,10),Array(20,43,55))
  54. // }
  55. //}
  56. //slices = Array(Array(1,4),Array(7,2,5),Array(9,10),Array(20,43,55))
  57. slices.indices
  58. //0 until slices.length = [0,1,2,3]
  59. .map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  60. //第一次: i = 0 new ParallelCollectionPartition(id, 0, Array(1,4))

通过读取文件创建RDD

  1. RDD默认分区数>= math.min(defaultParallelism,2)<br /> 读取文件创建的RDD的分区数 = 文件的切片数
  1. /**
  2. * 根据读取文件创建rdd的分区数 >= math.min(defaultParallelism, 2)
  3. */
  4. @Test
  5. def createRddByFilePartition(): Unit ={
  6. val rdd = sc.textFile("datas/wc.txt",4)// 如果这里不设置是4,则默认是RDD默认分区数>= math.min(defaultParallelism,2)
  7. println(rdd.partitions.length)
  8. }

源码解释

  1. hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions)
  2. new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)
  3. //得到文件所有切片
  4. val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
  5. //对文件切片
  6. public InputSplit[] getSplits(JobConf job, int numSplits)
  7. throws IOException {
  8. //获取要读取的所有文件
  9. FileStatus[] files = listStatus(job);
  10. //统计要读取的所有文件的总大小
  11. long totalSize = 0;
  12. for (FileStatus file: files) {
  13. if (file.isDirectory()) {
  14. throw new IOException("Not a file: "+ file.getPath());
  15. }
  16. totalSize += file.getLen();
  17. }
  18. // totalSize = 75B
  19. // numSplits = 4
  20. // goalSize = 75 / 4 = 18 B
  21. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  22. //
  23. long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  24. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
  25. //minSize = Math.max(0,1) = 1
  26. //创建一个装载切片的数组
  27. ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  28. NetworkTopology clusterMap = new NetworkTopology();
  29. //遍历要读取的文件
  30. for (FileStatus file: files) {
  31. //获取当前文件的大小
  32. long length = file.getLen();
  33. if (length != 0) {
  34. FileSystem fs = path.getFileSystem(job);
  35. BlockLocation[] blkLocations;
  36. //获取文件块位置
  37. if (file instanceof LocatedFileStatus) {
  38. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  39. } else {
  40. blkLocations = fs.getFileBlockLocations(file, 0, length);
  41. }
  42. //
  43. if (isSplitable(fs, path)) {
  44. //获取当前文件块的大小 【HDFS文件块大小=128M,本地文件块的大小=32M】
  45. long blockSize = file.getBlockSize();
  46. // 计算切片大小
  47. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  48. //splitSize = Math.max(1, Math.min(18, 32)) = 18B
  49. //循环切片
  50. long bytesRemaining = length;
  51. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  52. // 因为goalSize是整除求出来的,而这里循环用的是double,所以会有5个切片
  53. // 第一次循环: 75/18 = 4.12 >1.1
  54. // 第一个切片: 从0B开始切18B
  55. // bytesRemaining = 75-18 = 57
  56. // 第二次循环: 57/18 = 3.2 >1.1
  57. // 第二个切片: 从18B开始切18B
  58. // bytesRemaining = 57-18 = 39
  59. // 第三次循环: 39/18 = 2.25 >1.1
  60. // 第三次切片: 从36B开始切18B
  61. // bytesRemaining = 39-18 = 21
  62. // 第四次循环: 21/18 = 1.16 >1.1
  63. // 第四次切片: 从54B开始切18B
  64. // bytesRemaining = 21-18 = 3
  65. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
  66. length-bytesRemaining, splitSize, clusterMap);
  67. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  68. splitHosts[0], splitHosts[1]));
  69. bytesRemaining -= splitSize;
  70. }
  71. //获取最后一个切片
  72. if (bytesRemaining != 0) {
  73. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
  74. - bytesRemaining, bytesRemaining, clusterMap);
  75. //第五个切片: 从72B开始切3B
  76. splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
  77. splitHosts[0], splitHosts[1]));
  78. }
  79. } else {
  80. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
  81. splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
  82. }
  83. } else {
  84. //Create empty hosts array for zero length files
  85. splits.add(makeSplit(path, 0, length, new String[0]));
  86. }
  87. }
  88. sw.stop();
  89. if (LOG.isDebugEnabled()) {
  90. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  91. + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  92. }
  93. return splits.toArray(new FileSplit[splits.size()]);
  94. }

通过其他RDD衍生的RDD

  1. 衍生出的RDD的分区数 = RDD的分区
  1. /**
  2. * 根据rdd创建的rdd的分区数默认 = 依赖的rdd的分区数
  3. */
  4. @Test
  5. def createRddByRddPartition(): Unit ={
  6. val rdd = sc.parallelize(List(1,4,7,2,5,9,10,20,43,55),20)
  7. val rdd2 = rdd.map(x=>x*x)
  8. println(rdd2.partitions.length)
  9. }

//spark中算子中的函数是在executor中执行的,算子外面的代码是在Driver中执行的

分区种类

shuffle前分区

按照索引分区:
image.png

shuffle后分区(reduce)

默认使用hashPartitioner进行分区:
image.png