4.1、分区基本概述

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD 时指定。
在代码指定分区数的代码Demo:

  1. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
  2. val sparkContext = new SparkContext(sparkConf)
  3. val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 4) // 指定RDD分区数为4
  4. val fileRDD: RDD[String] = sparkContext.textFile("input", 2) // 指定RDD最小分区数为2
  5. fileRDD.collect().foreach(println)
  6. sparkContext.stop()
  7. 第二个参数可以不设置,会有一个默认值,由参数 spark.default.parallelism 指定,如果获取不到就指定为totalCoreCount(当前环境最大使用核数)
  8. //RDD的计算一个分区内的数据是一个一个执行的,只有前面一个数据执行完毕后才会执行下一个数据(分区内有序)
  9. //不同分区内的计算是无序的
  10. // 不同分区数设置得到的效果
  11. val rdd1: RDD[Int] = ctx.parallelize(1 to 10)
  12. val rdd2: RDD[Int] = ctx.parallelize(1 to 10,3)
  13. val rdd3: RDD[Int] = ctx.makeRDD(1 to 10)
  14. val rdd4: RDD[Int] = ctx.makeRDD(1 to 10,3)
  15. val rdd5: RDD[String] = ctx.textFile("input/data.txt")
  16. val rdd6: RDD[String] = ctx.textFile("input/data.txt",3)
  17. val rdd7: RDD[String] = ctx.textFile("input/dir/")
  18. val rdd8: RDD[String] = ctx.textFile("input/dir/",3)
  19. val rdd9: RDD[(String, String)] = ctx.wholeTextFiles("input/dir/")
  20. val rdd10: RDD[(String, String)] = ctx.wholeTextFiles("input/dir/",3)
  21. println("rdd1.partitions.length = "+rdd1.partitions.length)
  22. println("rdd2.partitions.length = "+rdd2.partitions.length)
  23. println("rdd3.partitions.length = "+rdd3.partitions.length)
  24. println("rdd4.partitions.length = "+rdd4.partitions.length)
  25. println("rdd5.partitions.length = "+rdd5.partitions.length)
  26. println("rdd6.partitions.length = "+rdd6.partitions.length)
  27. println("rdd7.partitions.length = "+rdd7.partitions.length)
  28. println("rdd8.partitions.length = "+rdd8.partitions.length)
  29. println("rdd9.partitions.length = "+rdd9.partitions.length)
  30. println("rdd10.partitions.length = "+rdd10.partitions.length)
  31. // 结果
  32. rdd1.partitions.length = 8 // 默认为cpu核数
  33. rdd2.partitions.length = 3 // 手动设置了分区数为3
  34. rdd3.partitions.length = 8 // 默认为cpu核数
  35. rdd4.partitions.length = 3 // 手动设置了分区数为3
  36. rdd5.partitions.length = 3 //
  37. rdd6.partitions.length = 4 //
  38. rdd7.partitions.length = 10 // 由于该目录下有10个文件,每个文件一个分区,导致有10个分区
  39. rdd8.partitions.length = 10 // textFile对小文件读取不友好,每个文件最小一个分区
  40. rdd9.partitions.length = 1 // wholeTextFiles 会对小文件进行优化
  41. rdd10.partitions.length =1 //

Spark会通过DAG将一个Spark job中用到的所有RDD划分为不同的stage,每个stage内部都会有很多子任务处理数据,而每个stage的任务数是决定性能优劣的关键指标。
Spark job中最小执行单位为task,合理设置Spark job每个stage的task数是决定性能好坏的重要因素之一,但是Spark自己确定最佳并行度的能力有限,这就要求我们在了解其中内在机制的前提下,去各种测试、计算等来最终确定最佳参数配比。
Spark任务在执行时会将RDD划分为不同的stage,一个stage中task的数量跟最后一个RDD的分区数量相同。stage划分的关键是宽依赖,而宽依赖往往伴随着shuffle操作。对于一个stage接收另一个stage的输入,这种操作通常都会有一个参数numPartitions来显示指定分区数。最典型的就是一些ByKey算子,比如groupByKey(numPartitions: Int),但是这个分区数需要多次测试来确定合适的值。首先确定父RDD中的分区数(通过rdd.partitions().size()可以确定RDD的分区数),然后在此基础上增加分区数,多次调试直至在确定的资源任务能够平稳、安全的运行。
对于没有父RDD的RDD,比如通过加载HDFS上的数据生成的RDD,它的分区数由InputFormat切分机制决定。通常就是一个HDFS block块对应一个分区,对于不可切分文件则一个文件对应一个分区。
对于通过SparkContext的parallelize方法或者makeRDD生成的RDD分区数可以直接在方法中指定,如果未指定,则参考spark.default.parallelism的参数配置。下面是默认情况下确定defaultParallelism的源码:

  1. override def defaultParallelism(): Int = {
  2. conf.getInt(“spark.default.parallelism”, math.max(totalCoreCount.get(), 2))
  3. }

——————————————————————————————————————————
Spark对接不同的数据源,在第一次得到的分区数是不一样的,但都有一个共性:对于map类算子或者通过map算子产生的彼此之间具有窄依赖关系的RDD的分区数,子RDD分区与父RDD分区是一致的。而对于通过shuffle差生的子RDD则由分区器决定,当然默认分区器是HashPartitioner,我们完全可以根据实际业务场景进行自定义分区器,只需继承Parttioner组件,主要重写几个方法即可:

  1. abstract class Partitioner extends Serializable {
  2. def numPartitions: Int
  3. def getPartition(key: Any): Int
  4. }

以加载hdfs文件为例,Spark在读取hdfs文件还没有调用其他算子进行业务处理前,得到的RDD分区数由什么决定呢?关键在于文件是否可切分!
对于可切分文件,如text文件,那么通过加载文件得到的RDD的分区数默认与该文件的block数量保持一致;
对于不可切分文件,它只有一个block块,那么得到的RDD的分区数默认也就是1。
当然,我们可以通过调用一些算子对RDD进行重分区,如repartition。
这里必须要强调一点,很多小伙伴不理解,RDD既然不存储数据,那么加载过来的文件都跑哪里去了呢?这里先给大家提个引子——blockmanager,Spark自己实现的存储管理器。RDD的存储概念其实block,至于block的大小可以根据不同的数据源进行调整,blockmanager的数据存储、传输都是以block进行的。至于block内部传输的时候,它的大小也是可以通过参数控制的,比如广播变量、shuffle传输时block的大小等。

image.png
image.png
image.png

4.2、文件获取分片数源码

  1. /** Splits files returned by {@link #listStatus(JobConf)} when
  2. * they're too big.*/
  3. public InputSplit[] getSplits(JobConf job, int numSplits)
  4. throws IOException {
  5. Stopwatch sw = new Stopwatch().start();
  6. FileStatus[] files = listStatus(job);
  7. // Save the number of input files for metrics/loadgen
  8. job.setLong(NUM_INPUT_FILES, files.length);
  9. long totalSize = 0; // compute total size
  10. for (FileStatus file: files) { // check we have valid files
  11. if (file.isDirectory()) {
  12. throw new IOException("Not a file: "+ file.getPath());
  13. }
  14. totalSize += file.getLen(); //在这里获取要读取文件总数的大小,单位字节
  15. }
  16. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); // 由设置的分区数,计算出每个分片要存放多大字节的内容
  17. long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  18. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
  19. // generate splits
  20. ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  21. NetworkTopology clusterMap = new NetworkTopology();
  22. for (FileStatus file: files) {
  23. Path path = file.getPath();
  24. long length = file.getLen();
  25. if (length != 0) {
  26. FileSystem fs = path.getFileSystem(job);
  27. BlockLocation[] blkLocations;
  28. if (file instanceof LocatedFileStatus) {
  29. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  30. } else {
  31. blkLocations = fs.getFileBlockLocations(file, 0, length);
  32. }
  33. if (isSplitable(fs, path)) {
  34. long blockSize = file.getBlockSize();
  35. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  36. long bytesRemaining = length;
  37. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //这里在所有分片分配完成后还有剩余的,如果剩余的大于分片容量的1.1倍则新开一个分片否则不开
  38. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
  39. length-bytesRemaining, splitSize, clusterMap);
  40. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  41. splitHosts[0], splitHosts[1]));
  42. bytesRemaining -= splitSize;
  43. }
  44. if (bytesRemaining != 0) {
  45. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
  46. - bytesRemaining, bytesRemaining, clusterMap);
  47. splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
  48. splitHosts[0], splitHosts[1]));
  49. }
  50. } else {
  51. String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
  52. splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
  53. }
  54. } else {
  55. //Create empty hosts array for zero length files
  56. splits.add(makeSplit(path, 0, length, new String[0]));
  57. }
  58. }
  59. sw.stop();
  60. if (LOG.isDebugEnabled()) {
  61. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  62. + ", TimeTaken: " + sw.elapsedMillis());
  63. }
  64. return splits.toArray(new FileSplit[splits.size()]);
  65. }

4.3、分区数据的分配

在Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象数据集,其中每个RDD是由若干个Partition组成。在Job运行期间,参与运算的Partition数据分布在多台机器的内存当中。这里可将RDD看成一个非常大的数组,其中Partition是数组中的每个元素,并且这些元素分布在多台机器中。图一中,RDD1包含了5个Partition,RDD2包含了3个Partition,这些Partition分布在4个节点中。
image.png

Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)。一般而言,对于初始读入的数据是不具有任何的数据分区方式的。数据分区方式只作用于形式的数据。因此,当一个Job包含Shuffle操作类型的算子时,如groupByKey,reduceByKey etc,此时就会使用数据分区方式来对数据进行分区,即确定某一个Key对应的键值对数据分配到哪一个Partition中。在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段,其中在Shuffle Write阶段中,Shuffle Map Task对数据进行处理产生中间数据,然后再根据数据分区方式对中间数据进行分区。最终Shffle Read阶段中的Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据。图2中描述了Shuffle阶段与Partition关系。下面则分别介绍Spark中存在的两种数据分区方式。
image.png

4.3.1、HashPartitioner(哈希分区)

1、HashPartitioner原理简介
HashPartitioner采用哈希的方式对键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。图3简单描述了HashPartitioner的数据分区过程。
2、HashPartitioner源码详解

  1. class HashPartitioner(partitions: Int) extends Partitioner {
  2. require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  3. /**
  4. * 包含的分区个数
  5. */
  6. def numPartitions: Int = partitions
  7. /**
  8. * 获得Key对应的partitionId
  9. */
  10. def getPartition(key: Any): Int = key match {
  11. case null => 0
  12. case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  13. }
  14. override def equals(other: Any): Boolean = other match {
  15. case h: HashPartitioner =>
  16. h.numPartitions == numPartitions
  17. case _ =>
  18. false
  19. }
  20. override def hashCode: Int = numPartitions
  21. }
  22. def nonNegativeMod(x: Int, mod: Int): Int = {
  23. val rawMod = x % mod
  24. rawMod + (if (rawMod < 0) mod else 0)
  25. }

4.3.2、RangePartitioner(范围分区)

1、RangePartitioner原理简介

  1. Spark引入RangePartitioner的目的是**为了解决HashPartitioner所带来的分区倾斜问题**,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。图4简单描述了RangePartitioner的数据分区过程。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/2446452/1636649344978-30795b46-bf12-4bc2-a315-ae1d225dcd9d.png#clientId=u4257a166-05c5-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=260&id=u1643b1b1&margin=%5Bobject%20Object%5D&name=image.png&originHeight=346&originWidth=1066&originalType=url&ratio=1&rotation=0&showTitle=false&size=73781&status=done&style=none&taskId=u62f3af2d-31e4-4a83-8ab9-5a35a145818&title=&width=800)

2、RangePartitioner源码详解

① 确定采样数据的规模:RangePartitioner默认对生成的子RDD中的每个Partition采集20条数据,样本数据最大为1e6条。

  1. // 总共需要采集的样本数据个数,其中partitions代表最终子RDD中包含的Partition个数
  2. val sampleSize = math.min(20.0 * partitions, 1e6)

② 确定父RDD中每个Partition中应当采集的数据量:这里注意的是,对父RDD中每个Partition采集的数据量会在平均值上乘以3,这里是为了后继在进行判断一个Partition是否发生了倾斜,当一个Partition包含的数据量超过了平均值的三倍,此时会认为该Partition发生了数据倾斜,会对该Partition调用sample算子进行重新采样。

  1. // 被采样的RDD中每个partition应该被采集的数据,这里将平均采集每个partition中数据的3倍
  2. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

③ 调用sketch方法进行数据采样:sketch方法返回的结果为<采样RDD的数据量,>。在sketch方法中会使用水塘抽样算法对待采样的各个分区进行数据采样,这里采用水塘抽样算法是由于实现无法知道每个Partition中包含的数据量,而水塘抽样算法可以保证在不知道整体的数据量下仍然可以等概率地抽取出每条数据。图4简单描述了水塘抽样过程。
image.png

  1. // 使用sketch方法进行数据抽样
  2. val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
  3. /**
  4. * @param rdd 需要采集数据的RDD
  5. * @param sampleSizePerPartition 每个partition采集的数据量
  6. * @return <采样RDD数据总量,<partitionId, 当前分区的数据量,当前分区采集的数据量>>
  7. */
  8. def sketch[K : ClassTag](
  9. rdd: RDD[K],
  10. sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
  11. val shift = rdd.id
  12. val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
  13. val seed = byteswap32(idx ^ (shift << 16))
  14. // 使用水塘抽样算法进行抽样,抽样结果是个二元组<Partition中抽取的样本量,Partition中包含的数据量>
  15. val (sample, n) = SamplingUtils.reservoirSampleAndCount(
  16. iter, sampleSizePerPartition, seed)
  17. Iterator((idx, n, sample))
  18. }.collect()
  19. val numItems = sketched.map(_._2).sum
  20. (numItems, sketched)
  21. }

④ 数据抽样完成后,需要对不均衡的Partition重新进行抽样,默认当Partition中包含的数据量大于平均值的三倍时,该Partition是不均衡的。当采样完成后,利用样本容量和RDD中包含的数据总量,可以得到整体的一个数据采样率fraction。利用此采样率对不均衡的Partition调用sample算子重新进行抽样。

  1. // 计算数据采样率
  2. val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
  3. // 存放采样Key以及采样权重
  4. val candidates = ArrayBuffer.empty[(K, Float)]
  5. // 存放不均衡的Partition
  6. val imbalancedPartitions = mutable.Set.empty[Int]
  7. //(idx, n, sample)=> (partition id, 当前分区数据个数,当前partition的采样数据)
  8. sketched.foreach { case (idx, n, sample) =>
  9. // 当一个分区中的数据量大于平均分区数据量的3倍时,认为该分区是倾斜的
  10. if (fraction * n > sampleSizePerPartition) {
  11. imbalancedPartitions += idx
  12. }
  13. // 在三倍之内的认为没有发生数据倾斜
  14. else {
  15. // 每条数据的采样间隔 = 1/采样率 = 1/(sample.size/n.toDouble) = n.toDouble/sample.size
  16. val weight = (n.toDouble / sample.length).toFloat
  17. // 对当前分区中的采样数据,对每个key形成一个二元组<key, weight>
  18. for (key <- sample) {
  19. candidates += ((key, weight))
  20. }
  21. }
  22. }
  23. // 对于非均衡的partition,重新采用sample算子进行抽样
  24. if (imbalancedPartitions.nonEmpty) {
  25. val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
  26. val seed = byteswap32(-rdd.id - 1)
  27. val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
  28. val weight = (1.0 / fraction).toFloat
  29. candidates ++= reSampled.map(x => (x, weight))
  30. }

⑤ 确定各个Partition的Key范围:使用determineBounds方法来确定每个Partition中包含的Key范围,先对采样的Key进行排序,然后计算每个Partition平均包含的Key权重,然后采用平均分配原则来确定各个Partition包含的Key范围。如当前采样Key以及权重为:<1, 0.2>, <2, 0.1>, <3, 0.1>, <4, 0.3>, <5, 0.1>, <6, 0.3>,现在将其分配到3个Partition中,则每个Partition的平均权重为:(0.2 + 0.1 + 0.1 + 0.3 + 0.1 + 0.3) / 3 = 0.36。此时Partition1 ~ 3分配的Key以及总权重为

  1. /**
  2. * @param candidates 未按采样间隔排序的抽样数据
  3. * @param partitions 最终生成的RDD包含的分区个数
  4. * @return 分区边界
  5. */
  6. def determineBounds[K : Ordering : ClassTag](
  7. candidates: ArrayBuffer[(K, Float)],
  8. partitions: Int): Array[K] = {
  9. val ordering = implicitly[Ordering[K]]
  10. // 对样本按照key进行排序
  11. val ordered = candidates.sortBy(_._1)
  12. // 抽取的样本容量
  13. val numCandidates = ordered.size
  14. // 抽取的样本对应的采样间隔之和
  15. val sumWeights = ordered.map(_._2.toDouble).sum
  16. // 平均每个分区的步长
  17. val step = sumWeights / partitions
  18. var cumWeight = 0.0
  19. var target = step
  20. // 分区边界值
  21. val bounds = ArrayBuffer.empty[K]
  22. var i = 0
  23. var j = 0
  24. var previousBound = Option.empty[K]
  25. while ((i < numCandidates) && (j < partitions - 1)) {
  26. val (key, weight) = ordered(i)
  27. cumWeight += weight
  28. // 当前的采样间隔小于target,继续迭代,也即这些key应该放在同一个partition中
  29. if (cumWeight >= target) {
  30. // Skip duplicate values.
  31. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
  32. bounds += key
  33. target += step
  34. j += 1
  35. previousBound = Some(key)
  36. }
  37. }
  38. i += 1
  39. }
  40. bounds.toArray
  41. }

⑥ 计算每个Key所在Partition:当分区范围长度在128以内,使用顺序搜索来确定Key所在的Partition,否则使用二分查找算法来确定Key所在的Partition。

  1. /**
  2. * 获得每个Key所在的partitionId
  3. */
  4. def getPartition(key: Any): Int = {
  5. val k = key.asInstanceOf[K]
  6. var partition = 0
  7. // 如果得到的范围不大于128,则进行顺序搜索
  8. if (rangeBounds.length <= 128) {
  9. // If we have less than 128 partitions naive search
  10. while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
  11. partition += 1
  12. }
  13. }
  14. // 范围大于128,则进行二分搜索该key所在范围,即可得到该key所在的partitionId
  15. else {
  16. // Determine which binary search method to use only once.
  17. partition = binarySearch(rangeBounds, k)
  18. // binarySearch either returns the match location or -[insertion point]-1
  19. if (partition < 0) {
  20. partition = -partition-1
  21. }
  22. if (partition > rangeBounds.length) {
  23. partition = rangeBounds.length
  24. }
  25. }
  26. if (ascending) {
  27. partition
  28. } else {
  29. rangeBounds.length - partition
  30. }
  31. }