如下一个数据集,需要进行如下操作:

    1. val df = spark.sparkContext.parallelize(
    2. Seq(
    3. ("1", 15), ("1", 120), ("1", 16), ("1", 14),
    4. ("2", 17), ("2", 12), ("3", 13), ("3", 14)
    5. )
    6. ).toDF("lot", "cnt")
    7. -------------------------------------------------------------------------------------
    8. +---+---+
    9. |lot|cnt|
    10. +---+---+
    11. |1 |15 |
    12. |1 |120|
    13. |1 |16 |
    14. |1 |14 |
    15. |2 |17 |
    16. |2 |12 |
    17. |3 |13 |
    18. |3 |14 |
    19. +---+---+

    1构建json,

    2按照lot分组,

    3同组json合并为数组,

    4数组内按照cnt字段排序

    思路一:to_json+groupBy+collect_list+sort_array(先说结论,不可行❌)

    因为是collect_list后数组里的元素是json,所以sort_array是按照字典顺序排序的,结果就不对。

    思路二:groupByKey+mapGroups+JSON.parseObject+sortBy(先说结论,可行✔️)

    通过groupByKey+mapGroups,在每个分区里解析json中排序字段进行数组json元素的排序

    1. val df = spark.sparkContext.parallelize(
    2. Seq(
    3. ("1", 15), ("1", 120), ("1", 16), ("1", 14),
    4. ("2", 17), ("2", 12), ("3", 13), ("3", 14)
    5. )
    6. ).toDF("lot", "cnt")
    7. println("========= row_number ========")
    8. val frame = df.withColumn("rk", row_number() over (Window.partitionBy($"lot").orderBy($"cnt".desc_nulls_last)))
    9. frame.show(false)
    10. println("========= sort_array + collect_list========")
    11. val frame1 = frame
    12. .select($"lot", to_json(struct($"lot", $"cnt")).as("v"))
    13. .groupBy("lot")
    14. .agg(sort_array(collect_list("v"), false).as("v1"))
    15. frame1.printSchema()
    16. frame1.show(false)
    17. println("========= groupByKey + flatMapGroups========")
    18. frame
    19. .select($"lot", to_json(struct($"lot", $"cnt")).as("v"))
    20. .groupByKey((row: Row) => row.getAs[String]("lot"))
    21. .mapGroups((k, vs) =>
    22. (k, vs.map(_.getAs[String]("v")).toList.sortBy(j => JSON.parseObject(j).getIntValue("cnt")).reverse)
    23. ).toDF("lot", "v")
    24. .show(false)

    打印结果

    1. ========= row_number ========
    2. +---+---+---+
    3. |lot|cnt|rk |
    4. +---+---+---+
    5. |3 |14 |1 |
    6. |3 |13 |2 |
    7. |1 |120|1 |
    8. |1 |16 |2 |
    9. |1 |15 |3 |
    10. |1 |14 |4 |
    11. |2 |17 |1 |
    12. |2 |12 |2 |
    13. +---+---+---+
    14. ========= sort_array + collect_list========
    15. root
    16. |-- lot: string (nullable = true)
    17. |-- v1: array (nullable = true)
    18. | |-- element: string (containsNull = true)
    19. +---+-----------------------------------------------------------------------------------------+
    20. |lot|v1 |
    21. +---+-----------------------------------------------------------------------------------------+
    22. |3 |[{"lot":"3","cnt":14}, {"lot":"3","cnt":13}] |
    23. |1 |[{"lot":"1","cnt":16}, {"lot":"1","cnt":15}, {"lot":"1","cnt":14}, {"lot":"1","cnt":120}]|
    24. |2 |[{"lot":"2","cnt":17}, {"lot":"2","cnt":12}] |
    25. +---+-----------------------------------------------------------------------------------------+
    26. ========= groupByKey + flatMapGroups========
    27. +---+-----------------------------------------------------------------------------------------+
    28. |lot|v |
    29. +---+-----------------------------------------------------------------------------------------+
    30. |3 |[{"lot":"3","cnt":14}, {"lot":"3","cnt":13}] |
    31. |1 |[{"lot":"1","cnt":120}, {"lot":"1","cnt":16}, {"lot":"1","cnt":15}, {"lot":"1","cnt":14}]|
    32. |2 |[{"lot":"2","cnt":17}, {"lot":"2","cnt":12}] |
    33. +---+-----------------------------------------------------------------------------------------+