Spark目前支持Hash、Range分区和用户自定义分区,非Key-Value类型的RDD不分区。
8.1 获取RDD分区
使用RDD的paritition属性获取分区,返回scala.Option对象。
8.2 Hash分区
对于给定的key,计算hashcode,除以分区的个数并取余,得到分区ID。
8.3 Ranger分区
可能导致每个分区中数据量的不均匀。
- 抽取样本数据,将样本数据排序,并计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds。
判断key在rangeBounds中所处的范围,给出分区下标
8.4 自定义分区
继承org.apache.spark.Partitioner并实现
- numPartitions: Int,返回创建出来的分区数
- getPartition(key: Any),返回给定键的分区编号
- equals(): Java,Java里的那个equals ```scala val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“customerPartitionerDemo”) val sparkContext = new SparkContext(sparkConf)
val data = sparkContext.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6))) val par = data.partitionBy(new Partitioner { override def numPartitions: Int = 3
override def getPartition(key: Any): Int = key.asInstanceOf[Int] % 3
override def equals(obj: Any): Boolean = super.equals(obj) }).mapPartitionsWithIndex((i,set) => set.map((i,_))).collect() println(par.mkString(“Array(“,”,”,”)”)) ```