自定义分区器
比如说key是手机号,那么可以自定义分区器.
下面案例是根据key进行分区,偶数分到0号区域,奇数分到1号区域
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Transformation_partitionBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "------" + datas.mkString(","))
datas
}
}.collect()
/* 输出
0------(1,aaa)
2------(3,ccc)
1------(2,bbb)
*/
println("----------------------------------------------")
val newRDD: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "------" + datas.mkString(","))
datas
}
}.collect()
/*
输出
0------(2,bbb)
1------(1,aaa),(3,ccc)
*/
// 关闭连接
sc.stop()
}
}
//自定义分区器
class MyPartitioner(partitions: Int) extends Partitioner {
//获取分区个数
override def numPartitions: Int = partitions
/*指定分区规则
分区的时候,每个元素都会调用getPartition方法,返回int类型的值,这个int类型的值就是分区号, 从0开始的分区号
*/
override def getPartition(key: Any): Int = {
val keyInt = key.toString.toInt
if (keyInt % 2 == 0) { //偶数
return 0
} else { // 奇数
return 1
}
}
}