如下一个数据集,需要进行如下操作:
val df = spark.sparkContext.parallelize(
Seq(
("1", 15), ("1", 120), ("1", 16), ("1", 14),
("2", 17), ("2", 12), ("3", 13), ("3", 14)
)
).toDF("lot", "cnt")
-------------------------------------------------------------------------------------
+---+---+
|lot|cnt|
+---+---+
|1 |15 |
|1 |120|
|1 |16 |
|1 |14 |
|2 |17 |
|2 |12 |
|3 |13 |
|3 |14 |
+---+---+
1构建json,
2按照lot分组,
3同组json合并为数组,
4数组内按照cnt字段排序
思路一:to_json+groupBy+collect_list+sort_array(先说结论,不可行❌)
因为是collect_list后数组里的元素是json,所以sort_array是按照字典顺序排序的,结果就不对。
思路二:groupByKey+mapGroups+JSON.parseObject+sortBy(先说结论,可行✔️)
通过groupByKey+mapGroups,在每个分区里解析json中排序字段进行数组json元素的排序
val df = spark.sparkContext.parallelize(
Seq(
("1", 15), ("1", 120), ("1", 16), ("1", 14),
("2", 17), ("2", 12), ("3", 13), ("3", 14)
)
).toDF("lot", "cnt")
println("========= row_number ========")
val frame = df.withColumn("rk", row_number() over (Window.partitionBy($"lot").orderBy($"cnt".desc_nulls_last)))
frame.show(false)
println("========= sort_array + collect_list========")
val frame1 = frame
.select($"lot", to_json(struct($"lot", $"cnt")).as("v"))
.groupBy("lot")
.agg(sort_array(collect_list("v"), false).as("v1"))
frame1.printSchema()
frame1.show(false)
println("========= groupByKey + flatMapGroups========")
frame
.select($"lot", to_json(struct($"lot", $"cnt")).as("v"))
.groupByKey((row: Row) => row.getAs[String]("lot"))
.mapGroups((k, vs) =>
(k, vs.map(_.getAs[String]("v")).toList.sortBy(j => JSON.parseObject(j).getIntValue("cnt")).reverse)
).toDF("lot", "v")
.show(false)
打印结果
========= row_number ========
+---+---+---+
|lot|cnt|rk |
+---+---+---+
|3 |14 |1 |
|3 |13 |2 |
|1 |120|1 |
|1 |16 |2 |
|1 |15 |3 |
|1 |14 |4 |
|2 |17 |1 |
|2 |12 |2 |
+---+---+---+
========= sort_array + collect_list========
root
|-- lot: string (nullable = true)
|-- v1: array (nullable = true)
| |-- element: string (containsNull = true)
+---+-----------------------------------------------------------------------------------------+
|lot|v1 |
+---+-----------------------------------------------------------------------------------------+
|3 |[{"lot":"3","cnt":14}, {"lot":"3","cnt":13}] |
|1 |[{"lot":"1","cnt":16}, {"lot":"1","cnt":15}, {"lot":"1","cnt":14}, {"lot":"1","cnt":120}]|
|2 |[{"lot":"2","cnt":17}, {"lot":"2","cnt":12}] |
+---+-----------------------------------------------------------------------------------------+
========= groupByKey + flatMapGroups========
+---+-----------------------------------------------------------------------------------------+
|lot|v |
+---+-----------------------------------------------------------------------------------------+
|3 |[{"lot":"3","cnt":14}, {"lot":"3","cnt":13}] |
|1 |[{"lot":"1","cnt":120}, {"lot":"1","cnt":16}, {"lot":"1","cnt":15}, {"lot":"1","cnt":14}]|
|2 |[{"lot":"2","cnt":17}, {"lot":"2","cnt":12}] |
+---+-----------------------------------------------------------------------------------------+