窗口函数可以使用户针对某个范围的数据进行聚合操作,如:

    • 累积和
    • 差值
    • 加权移动平均

    可以想象一个窗口在全量数据集上进行滑动,用户可以自定义在窗口中的操作,如下图所示。

    窗口函数 - 图1

    使用窗口函数,首先需要定义窗口,DataFrame 提供了 API 定义窗口,以及窗口中的计算逻辑,还是以学生成绩为例,现在需要得出每个学生单科最佳成绩以及成绩所在的年份,这个需求就要用到窗口中的 row_number 函数,row_number 函数可以根据窗口中的数据生成行号,首先来定义窗口:

    1. import org.apache.spark.sql.expressions.Window
    2. import org.apache.spark.sql.functions._
    3. val window = Window.partitionBy("name","subject").orderBy(desc("grade"))

    上面的代码定义了窗口的范围:按照每个人的姓名与科目的组合进行开窗,并控制了数据在窗口中的顺序:按照 grade 降序进行排序,row_number 函数就可以作用在这个窗口上,对每个人每个科目成绩赋予行号,代码如下:

    1. dfSG.select(col("name"), col("subject"), col("year"), col("grade"), row_number().over(window)).show()

    结果如图:

    窗口函数 - 图2

    最后只需要从这张表中过滤出 row_number 等于 1 的数据即可。

    此外,DataFrame 还提供了 rowsBetweenrangeBetween 来进一步定义窗口范围,其中 rowsBetween 是通过物理行号进行控制,rangeBetween 是通过逻辑条件来对窗口进行控制。

    1. // rangeBetween 窗口是当前行的 num 值 +2 到当前行的 num 值 +20 这个区间中的数据
    2. val windowSlide = Window
    3. .partitionBy("key")
    4. .orderBy("num")
    5. .rangeBetween(Window.currentRow + 2,Window.currentRow + 20)
    6. val dfWin = spark.read.json("json/window.json")
    7. dfWin
    8. .select(col("key"),sum("num").over(windowSlide))
    9. .sort("key")
    10. .show()
    11. // rowsBetween 通过行号来指定参与计算的物理窗口,窗口由当前行、当前行的前一行、当前行的后一行组成,窗口大小为 3
    12. val windowSlide = Window
    13. .partitionBy("key")
    14. .orderBy("num")
    15. .rowsBetween(Window.currentRow - 1,Window.currentRow + 1)
    16. dfWin
    17. .select(col("key"),sum("num").over(windowSlide))
    18. .sort("key")
    19. .show()

    在实际开发过程中,大量的需求都可以直接通过函数以及函数的组合完成,一般来说,函数的丰富程度往往超乎你的想象,所以在面临新需求时,建议首先查阅文档,看看有没有函数可以利用,如果实在不行,我们才会使用用户自定义函数(User Defined Function)。

    Spark SQL 的函数文档目前我没有发现特别全面的,所以我通常就会直接阅读源码,源码列出了所有的函数,如下:

    https://github.com/apache/spark/blob/6646b3e13e46b220a33b5798ef266d8a14f3c85b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala