1.coalesce(numPartitions)缩减分区
缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。
比如说通过distinct算子对rdd元素去重复,原来的话可能100W条,现在可能50W了,那么分区数不需要那么多了,就需要缩减分区了.
distinct算子可以缩减分区数量的参数,不用我们去处理.但是某些算子没有缩减分区数的功能,比如说filter算子.
filter算子将不符合元素的数据过滤掉了,可能原来有100w条数据,过滤掉还有20w条数据,那么我们就需要给分区数缩小一下了.
如果100G的数据,我们开100个分区处理还可以.如果我们只有1Mb的数据,还用100个分区处理那么就不合适了,这样效率就非常低了.
coalesce算子默认是没有shuffle的,所以扩大分区一定要指定shuffle
缩减分区案例
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark10_Transformation_coalesce222 {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//创建RDD
val numRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),3)
numRDD.mapPartitionsWithIndex{
(index,datas)=>{
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/*
0--->1,2
1--->3,4
2--->5,6
*/
println("********************************************")
//缩减分区,默认是没shuffle
/*参数1: 指定分区数量
参数2: true就是开启shuffle
* */
val newRDD: RDD[Int] = numRDD.coalesce(2,true)
newRDD.mapPartitionsWithIndex{
(index,datas)=>{
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/*
0--->1,3,5
1--->2,4,6
*/
// 关闭连接
sc.stop()
}
}
2.repartition(numPartitions)
作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.
新的分区数相比以前可以多, 也可以少
当然你不用repartition也行,你可以只用coalesce也行,coalesce算子也有指定shuffle的参数设置.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* -repartition
* 底层调用的就是coalesce,只不过默认是执行shuffle,一般用于扩大分区
*/
object Transformation {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//创建RDD
val numRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
numRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/*
0--->1,2
2--->5,6
1--->3,4
*/
println("********************************************")
//如果扩大分区 使用repartition
val newRDD: RDD[Int] = numRDD.repartition(4)
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/*
2--->
1--->
0--->2,4,6
3--->1,3,5
*/
// 关闭连接
sc.stop()
}
}
3.Coalasce和Repartition的区别
1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2. repartition实际上是调用的coalesce,进行shuffle。源码如下:
3. 如果是减少分区, 尽量避免 shuffle
4.关于shuffle
如果使用shuffle的话可以打乱分区里面所有的数据重组,这样可以防止数据倾斜的情况,
如果不用shuffle的话,不落盘,效率会高一些.这样效率会高.
具体要不要shuffle,就看你实际场景了,如果不需要shuffle就不用shuffle.