自定义分区器

比如说key是手机号,那么可以自定义分区器.

下面案例是根据key进行分区,偶数分到0号区域,奇数分到1号区域

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
  3. object Transformation_partitionBy {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")), 3)
  8. rdd.mapPartitionsWithIndex {
  9. (index, datas) => {
  10. println(index + "------" + datas.mkString(","))
  11. datas
  12. }
  13. }.collect()
  14. /* 输出
  15. 0------(1,aaa)
  16. 2------(3,ccc)
  17. 1------(2,bbb)
  18. */
  19. println("----------------------------------------------")
  20. val newRDD: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
  21. newRDD.mapPartitionsWithIndex {
  22. (index, datas) => {
  23. println(index + "------" + datas.mkString(","))
  24. datas
  25. }
  26. }.collect()
  27. /*
  28. 输出
  29. 0------(2,bbb)
  30. 1------(1,aaa),(3,ccc)
  31. */
  32. // 关闭连接
  33. sc.stop()
  34. }
  35. }
  36. //自定义分区器
  37. class MyPartitioner(partitions: Int) extends Partitioner {
  38. //获取分区个数
  39. override def numPartitions: Int = partitions
  40. /*指定分区规则
  41. 分区的时候,每个元素都会调用getPartition方法,返回int类型的值,这个int类型的值就是分区号, 从0开始的分区号
  42. */
  43. override def getPartition(key: Any): Int = {
  44. val keyInt = key.toString.toInt
  45. if (keyInt % 2 == 0) { //偶数
  46. return 0
  47. } else { // 奇数
  48. return 1
  49. }
  50. }
  51. }