作用
对 RDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原 pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle 过程.假如说原来是根据hash分区的,你又分区,但是还是根据hash分区的,此时就不会出现shuffle过程.
RDD的特性是一组分区,每个分区的数据不一样. 默认分区器是HashPartitioner,HashPartitioner是根据每个值的key,然后去调用hashcode方法拿到一个hash值(int整数),然后用int整数对当前分区数做取模运算,比如说 有两个分区. hash值%2,不是0就是1, 如果结果是0就放0号分区,如果结果是1就放1号分区里面.
案例
默认HashPartitioner的使用
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Transformation_partitionBy2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//创建RDD
//注意:RDD本身是没有partitionBy这个算子的,通过隐式转换动态给kv类型的RDD扩展的功能
// val rdd2: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// rdd2.partitionBy()如果是单个值的类型的话,调用rdd的partitionBy方法是不行的,因为没这个方法
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "------" + datas.mkString(","))
datas
}
}.collect()
/*
1------(2,bbb)
2------(3,ccc)
0------(1,aaa)
*/
println("----------------------------------------------")
//用hash 分区
val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "------" + datas.mkString(","))
datas
}
}.collect()
/*
1------(1,aaa),(3,ccc)
0------(2,bbb)
*/
sc.stop()
}
}