Spark DataFrame repartioned by multiple columns
what is the internal?

问题引出

工作中有一个边表,A-B的,都是人的String类型id。希望通过repartition,使得同一个分区的数据,人id尽量相同,以便于后续的操作。

以2000条记录作为一个批次,统计去重后的人id,发现,
repartition(50), 去重后人id是3900个左右
repartition(50,col(A)), 去重后人id是3600个左右,
repartition(50,col(A),col(B)), 去重后人id是3900个左右,基本没有达到去重的效果。
很奇怪,进行了一定的搜索和探究

探究

  1. val tuples = Seq(
  2. ("1", "3"),
  3. ("1", "3"),
  4. ("1", "3"),
  5. ("2", "2"),
  6. ("2", "2"))
  7. var df = spark.createDataFrame(tuples).toDF("src", "dst")
  8. //可以按照src列进行合理分区,1都在同一个分区
  9. df = df.repartition(2,col("src"))
  10. //前三条记录都在一个分区,后两条在一个分区
  11. repartition(2, col("src"),col("dst"))
  12. //所有记录都在同一个分区
  13. repartition(2, col("dst"),col("src"))

暂时结论:

  • 输入的多列,hash规则未知,导致分区结果与设想不同
  • 输入列的顺序,对最终的分区结果有影响

观察HashPartitioner代码,用到这个方法Utils.nonNegativeMod

  1. class HashPartitioner(partitions: Int) extends Partitioner {
  2. require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  3. def numPartitions: Int = partitions
  4. def getPartition(key: Any): Int = key match {
  5. case null => 0
  6. case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  7. }
  8. override def equals(other: Any): Boolean = other match {
  9. case h: HashPartitioner =>
  10. h.numPartitions == numPartitions
  11. case _ =>
  12. false
  13. }
  14. override def hashCode: Int = numPartitions
  15. }

我们用这个方法简单地对多col的情况进行一个hash的测试,看看能不能找到一些蛛丝马迹。

  1. val tuples = Seq(
  2. ("1", "3"),
  3. ("1", "3"),
  4. ("1", "3"),
  5. ("2", "2"),
  6. ("2", "2"))
  7. //11100
  8. tuples.foreach(tuple => {
  9. println(nonNegativeMod(tuple._1.hashCode, 2))
  10. })
  11. //00000
  12. tuples.foreach(tuple => {
  13. println(nonNegativeMod((tuple._2 + tuple._1).hashCode, 2))
  14. })
  15. //00000
  16. tuples.foreach(tuple => {
  17. println(nonNegativeMod((tuple._2.hashCode + tuple._1.hashCode), 2))
  18. })
  • 从上面的打印信息也无法明确多个col的hash分区方法
  • 只能结合源码分析,待完善。

https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-LogicalPlan-Repartition-RepartitionByExpression.html

https://ihainan.gitbooks.io/spark-source-code/content/section1/partitioner.html

https://kontext.tech/column/spark/296/data-partitioning-in-spark-pyspark-in-depth-walkthrough

https://towardsdatascience.com/should-i-repartition-836f7842298c