HashPartitioner
RangePartitioner
/**
使用时机:
当hashcode的数据取模后都到了一个分区,可以rangePartitinoer,此时,需要key数据比较均匀
不能使用在数据倾斜的情况
* RangePartitioner: 通过采样确定N-1个数据(每个分区的边界),后续拿到数据的key之后对比边界知道数据应该放在哪个分区<br /> * rangePartitioner分区间数据是有序的,分区内无序<br />rangePartitioner内部自己调用了一次collect会产生一次job<br /> */
package tcode.day05
import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
object $06_RangePartitioner {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
val rdd = sc.parallelize(List("aa"->1,"bb"->2,"aa"->5,"dd"->10,"bb"->15,"cc"->16,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1,"aa"->1))
//range
val rdd3 = rdd.partitionBy(new RangePartitioner(2,rdd))
rdd3.mapPartitionsWithIndex((index,it)=>{
println(s"index=${index} data=${it.toList}")
it
}).collect()
Thread.sleep(1000000000)
}
}
自定义分区
@Test
def partitionBy()={
val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
val rdd1 = rdd.partitionBy(new HashPartitioner(3))
val rdd2 = rdd1.mapPartitionsWithIndex((index,it)=>{
it
}).collect()
println(rdd2.toList)
}
// 自定义分区
@Test
def myPartition()={
val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
val rdd1 = rdd.partitionBy(new myPartitioner(4))// 使用,也可以在reduceByKey中使用
val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
println(s"${index},${it.toList}")
it
}).collect()
}
}
// 自定义分区器
class myPartitioner(partitions: Int) extends Partitioner{
override def numPartitions: Int = {
// 限制分区数不小于4
if (partitions < 4) {
4
} else {
partitions
}
}
override def getPartition(key: Any): Int = {
key match {
case "scala" => 1
case "spark" => 2
case _ => 3
}
}