作用


对 RDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原 pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle 过程.假如说原来是根据hash分区的,你又分区,但是还是根据hash分区的,此时就不会出现shuffle过程.


RDD的特性是一组分区,每个分区的数据不一样. 默认分区器是HashPartitioner,HashPartitioner是根据每个值的key,然后去调用hashcode方法拿到一个hash值(int整数),然后用int整数对当前分区数做取模运算,比如说 有两个分区. hash值%2,不是0就是1, 如果结果是0就放0号分区,如果结果是1就放1号分区里面.
image.png

案例

默认HashPartitioner的使用

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
  3. object Transformation_partitionBy2 {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. //创建RDD
  8. //注意:RDD本身是没有partitionBy这个算子的,通过隐式转换动态给kv类型的RDD扩展的功能
  9. // val rdd2: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  10. // rdd2.partitionBy()如果是单个值的类型的话,调用rdd的partitionBy方法是不行的,因为没这个方法
  11. val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
  12. rdd.mapPartitionsWithIndex {
  13. (index, datas) => {
  14. println(index + "------" + datas.mkString(","))
  15. datas
  16. }
  17. }.collect()
  18. /*
  19. 1------(2,bbb)
  20. 2------(3,ccc)
  21. 0------(1,aaa)
  22. */
  23. println("----------------------------------------------")
  24. //用hash 分区
  25. val newRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))
  26. newRDD.mapPartitionsWithIndex {
  27. (index, datas) => {
  28. println(index + "------" + datas.mkString(","))
  29. datas
  30. }
  31. }.collect()
  32. /*
  33. 1------(1,aaa),(3,ccc)
  34. 0------(2,bbb)
  35. */
  36. sc.stop()
  37. }
  38. }