HashPartitioner

image.png

RangePartitioner

image.png
/**

使用时机:
当hashcode的数据取模后都到了一个分区,可以rangePartitinoer,此时,需要key数据比较均匀
不能使用在数据倾斜的情况

  1. * RangePartitioner 通过采样确定N-1个数据(每个分区的边界),后续拿到数据的key之后对比边界知道数据应该放在哪个分区<br /> * rangePartitioner分区间数据是有序的,分区内无序<br />rangePartitioner内部自己调用了一次collect会产生一次job<br /> */
  1. package tcode.day05
  2. import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
  3. object $06_RangePartitioner {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. val rdd = sc.parallelize(List("aa"->1,"bb"->2,"aa"->5,"dd"->10,"bb"->15,"cc"->16,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1))
  7. //range
  8. val rdd3 = rdd.partitionBy(new RangePartitioner(2,rdd))
  9. rdd3.mapPartitionsWithIndex((index,it)=>{
  10. println(s"index=${index} data=${it.toList}")
  11. it
  12. }).collect()
  13. Thread.sleep(1000000000)
  14. }
  15. }

自定义分区

  1. @Test
  2. def partitionBy()={
  3. val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
  4. val rdd1 = rdd.partitionBy(new HashPartitioner(3))
  5. val rdd2 = rdd1.mapPartitionsWithIndex((index,it)=>{
  6. it
  7. }).collect()
  8. println(rdd2.toList)
  9. }
  10. // 自定义分区
  11. @Test
  12. def myPartition()={
  13. val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
  14. val rdd1 = rdd.partitionBy(new myPartitioner(4))// 使用,也可以在reduceByKey中使用
  15. val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
  16. println(s"${index},${it.toList}")
  17. it
  18. }).collect()
  19. }
  20. }
  21. // 自定义分区器
  22. class myPartitioner(partitions: Int) extends Partitioner{
  23. override def numPartitions: Int = {
  24. // 限制分区数不小于4
  25. if (partitions < 4) {
  26. 4
  27. } else {
  28. partitions
  29. }
  30. }
  31. override def getPartition(key: Any): Int = {
  32. key match {
  33. case "scala" => 1
  34. case "spark" => 2
  35. case _ => 3
  36. }
  37. }