1.coalesce(numPartitions)缩减分区


缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。

比如说通过distinct算子对rdd元素去重复,原来的话可能100W条,现在可能50W了,那么分区数不需要那么多了,就需要缩减分区了.
distinct算子可以缩减分区数量的参数,不用我们去处理.但是某些算子没有缩减分区数的功能,比如说filter算子.
filter算子将不符合元素的数据过滤掉了,可能原来有100w条数据,过滤掉还有20w条数据,那么我们就需要给分区数缩小一下了.


如果100G的数据,我们开100个分区处理还可以.如果我们只有1Mb的数据,还用100个分区处理那么就不合适了,这样效率就非常低了.

coalesce算子默认是没有shuffle的,所以扩大分区一定要指定shuffle

缩减分区案例

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark10_Transformation_coalesce222 {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkConf并设置App名称
  6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  7. //创建SparkContext,该对象是提交Spark App的入口
  8. val sc: SparkContext = new SparkContext(conf)
  9. //创建RDD
  10. val numRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
  11. numRDD.mapPartitionsWithIndex{
  12. (index,datas)=>{
  13. println(index + "--->" + datas.mkString(","))
  14. datas
  15. }
  16. }.collect()
  17. /*
  18. 0--->1,2
  19. 1--->3,4
  20. 2--->5,6
  21. */
  22. println("********************************************")
  23. //缩减分区,默认是没shuffle
  24. /*参数1: 指定分区数量
  25. 参数2: true就是开启shuffle
  26. * */
  27. val newRDD: RDD[Int] = numRDD.coalesce(2,true)
  28. newRDD.mapPartitionsWithIndex{
  29. (index,datas)=>{
  30. println(index + "--->" + datas.mkString(","))
  31. datas
  32. }
  33. }.collect()
  34. /*
  35. 0--->1,3,5
  36. 1--->2,4,6
  37. */
  38. // 关闭连接
  39. sc.stop()
  40. }
  41. }

2.repartition(numPartitions)

作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.
新的分区数相比以前可以多, 也可以少
当然你不用repartition也行,你可以只用coalesce也行,coalesce算子也有指定shuffle的参数设置.

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * -repartition
  5. * 底层调用的就是coalesce,只不过默认是执行shuffle,一般用于扩大分区
  6. */
  7. object Transformation {
  8. def main(args: Array[String]): Unit = {
  9. //创建SparkConf并设置App名称
  10. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  11. //创建SparkContext,该对象是提交Spark App的入口
  12. val sc: SparkContext = new SparkContext(conf)
  13. //创建RDD
  14. val numRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
  15. numRDD.mapPartitionsWithIndex {
  16. (index, datas) => {
  17. println(index + "--->" + datas.mkString(","))
  18. datas
  19. }
  20. }.collect()
  21. /*
  22. 0--->1,2
  23. 2--->5,6
  24. 1--->3,4
  25. */
  26. println("********************************************")
  27. //如果扩大分区 使用repartition
  28. val newRDD: RDD[Int] = numRDD.repartition(4)
  29. newRDD.mapPartitionsWithIndex {
  30. (index, datas) => {
  31. println(index + "--->" + datas.mkString(","))
  32. datas
  33. }
  34. }.collect()
  35. /*
  36. 2--->
  37. 1--->
  38. 0--->2,4,6
  39. 3--->1,3,5
  40. */
  41. // 关闭连接
  42. sc.stop()
  43. }
  44. }

3.Coalasce和Repartition的区别



1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2. repartition实际上是调用的coalesce,进行shuffle。源码如下:
3. 如果是减少分区, 尽量避免 shuffle

4.关于shuffle

如果使用shuffle的话可以打乱分区里面所有的数据重组,这样可以防止数据倾斜的情况,
如果不用shuffle的话,不落盘,效率会高一些.这样效率会高.

具体要不要shuffle,就看你实际场景了,如果不需要shuffle就不用shuffle.