1.coalesce(numPartitions)缩减分区
缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。
比如说通过distinct算子对rdd元素去重复,原来的话可能100W条,现在可能50W了,那么分区数不需要那么多了,就需要缩减分区了.
distinct算子可以缩减分区数量的参数,不用我们去处理.但是某些算子没有缩减分区数的功能,比如说filter算子.
filter算子将不符合元素的数据过滤掉了,可能原来有100w条数据,过滤掉还有20w条数据,那么我们就需要给分区数缩小一下了.
如果100G的数据,我们开100个分区处理还可以.如果我们只有1Mb的数据,还用100个分区处理那么就不合适了,这样效率就非常低了.
coalesce算子默认是没有shuffle的,所以扩大分区一定要指定shuffle
缩减分区案例
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object Spark10_Transformation_coalesce222 {def main(args: Array[String]): Unit = {//创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)//创建RDDval numRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)numRDD.mapPartitionsWithIndex{(index,datas)=>{println(index + "--->" + datas.mkString(","))datas}}.collect()/*0--->1,21--->3,42--->5,6*/println("********************************************")//缩减分区,默认是没shuffle/*参数1: 指定分区数量参数2: true就是开启shuffle* */val newRDD: RDD[Int] = numRDD.coalesce(2,true)newRDD.mapPartitionsWithIndex{(index,datas)=>{println(index + "--->" + datas.mkString(","))datas}}.collect()/*0--->1,3,51--->2,4,6*/// 关闭连接sc.stop()}}
2.repartition(numPartitions)
作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.
新的分区数相比以前可以多, 也可以少
当然你不用repartition也行,你可以只用coalesce也行,coalesce算子也有指定shuffle的参数设置.
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** -repartition* 底层调用的就是coalesce,只不过默认是执行shuffle,一般用于扩大分区*/object Transformation {def main(args: Array[String]): Unit = {//创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)//创建RDDval numRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)numRDD.mapPartitionsWithIndex {(index, datas) => {println(index + "--->" + datas.mkString(","))datas}}.collect()/*0--->1,22--->5,61--->3,4*/println("********************************************")//如果扩大分区 使用repartitionval newRDD: RDD[Int] = numRDD.repartition(4)newRDD.mapPartitionsWithIndex {(index, datas) => {println(index + "--->" + datas.mkString(","))datas}}.collect()/*2--->1--->0--->2,4,63--->1,3,5*/// 关闭连接sc.stop()}}
3.Coalasce和Repartition的区别
1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2. repartition实际上是调用的coalesce,进行shuffle。源码如下:
3. 如果是减少分区, 尽量避免 shuffle
4.关于shuffle
如果使用shuffle的话可以打乱分区里面所有的数据重组,这样可以防止数据倾斜的情况,
如果不用shuffle的话,不落盘,效率会高一些.这样效率会高.
具体要不要shuffle,就看你实际场景了,如果不需要shuffle就不用shuffle.
