自定义分区器
比如说key是手机号,那么可以自定义分区器.
下面案例是根据key进行分区,偶数分到0号区域,奇数分到1号区域
import org.apache.spark.rdd.RDDimport org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}object Transformation_partitionBy {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)rdd.mapPartitionsWithIndex {(index, datas) => {println(index + "------" + datas.mkString(","))datas}}.collect()/* 输出0------(1,aaa)2------(3,ccc)1------(2,bbb)*/println("----------------------------------------------")val newRDD: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))newRDD.mapPartitionsWithIndex {(index, datas) => {println(index + "------" + datas.mkString(","))datas}}.collect()/*输出0------(2,bbb)1------(1,aaa),(3,ccc)*/// 关闭连接sc.stop()}}//自定义分区器class MyPartitioner(partitions: Int) extends Partitioner {//获取分区个数override def numPartitions: Int = partitions/*指定分区规则分区的时候,每个元素都会调用getPartition方法,返回int类型的值,这个int类型的值就是分区号, 从0开始的分区号*/override def getPartition(key: Any): Int = {val keyInt = key.toString.toIntif (keyInt % 2 == 0) { //偶数return 0} else { // 奇数return 1}}}
