Spark目前支持Hash、Range分区和用户自定义分区,非Key-Value类型的RDD不分区。

8.1 获取RDD分区

使用RDD的paritition属性获取分区,返回scala.Option对象。

8.2 Hash分区

对于给定的key,计算hashcode,除以分区的个数并取余,得到分区ID。

8.3 Ranger分区

可能导致每个分区中数据量的不均匀。

  1. 抽取样本数据,将样本数据排序,并计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds。
  2. 判断key在rangeBounds中所处的范围,给出分区下标

    8.4 自定义分区

  3. 继承org.apache.spark.Partitioner并实现

    1. numPartitions: Int,返回创建出来的分区数
    2. getPartition(key: Any),返回给定键的分区编号
    3. equals(): Java,Java里的那个equals ```scala val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“customerPartitionerDemo”) val sparkContext = new SparkContext(sparkConf)

val data = sparkContext.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6))) val par = data.partitionBy(new Partitioner { override def numPartitions: Int = 3

override def getPartition(key: Any): Int = key.asInstanceOf[Int] % 3

override def equals(obj: Any): Boolean = super.equals(obj) }).mapPartitionsWithIndex((i,set) => set.map((i,_))).collect() println(par.mkString(“Array(“,”,”,”)”)) ```