Spark DataFrame repartioned by multiple columns
what is the internal?
问题引出
工作中有一个边表,A-B的,都是人的String类型id。希望通过repartition,使得同一个分区的数据,人id尽量相同,以便于后续的操作。
以2000条记录作为一个批次,统计去重后的人id,发现,
repartition(50), 去重后人id是3900个左右
repartition(50,col(A)), 去重后人id是3600个左右,
repartition(50,col(A),col(B)), 去重后人id是3900个左右,基本没有达到去重的效果。
很奇怪,进行了一定的搜索和探究
探究
val tuples = Seq(
("1", "3"),
("1", "3"),
("1", "3"),
("2", "2"),
("2", "2"))
var df = spark.createDataFrame(tuples).toDF("src", "dst")
//可以按照src列进行合理分区,1都在同一个分区
df = df.repartition(2,col("src"))
//前三条记录都在一个分区,后两条在一个分区
repartition(2, col("src"),col("dst"))
//所有记录都在同一个分区
repartition(2, col("dst"),col("src"))
暂时结论:
- 输入的多列,hash规则未知,导致分区结果与设想不同
- 输入列的顺序,对最终的分区结果有影响
观察HashPartitioner代码,用到这个方法Utils.nonNegativeMod
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
我们用这个方法简单地对多col的情况进行一个hash的测试,看看能不能找到一些蛛丝马迹。
val tuples = Seq(
("1", "3"),
("1", "3"),
("1", "3"),
("2", "2"),
("2", "2"))
//11100
tuples.foreach(tuple => {
println(nonNegativeMod(tuple._1.hashCode, 2))
})
//00000
tuples.foreach(tuple => {
println(nonNegativeMod((tuple._2 + tuple._1).hashCode, 2))
})
//00000
tuples.foreach(tuple => {
println(nonNegativeMod((tuple._2.hashCode + tuple._1.hashCode), 2))
})
- 从上面的打印信息也无法明确多个col的hash分区方法
- 只能结合源码分析,待完善。
https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
https://ihainan.gitbooks.io/spark-source-code/content/section1/partitioner.html
https://kontext.tech/column/spark/296/data-partitioning-in-spark-pyspark-in-depth-walkthrough
https://towardsdatascience.com/should-i-repartition-836f7842298c