一、聚合操作

聚合行为对于大数据计算中往往是基础操作,那么对于Spark这类针对大数据的计算引擎来说更为如此,为了
让读者能够理解并知晓如何使用其中的各类函数,下述将逐一进行介绍。

1. 聚合函数

count

最简单的聚合计算即计数操作,可以通过 count(*)count(1)count(col) 进行统计数据的计数,当然
针对null值的处理需要有注意的地方。例如,当执行 count(*) 时,spark会队null值进行计数,而当队某个列计
数时,则不会对null值进行计数。

  1. df.select(functions.count("salary")).show();
  2. df.selectExpr("count(salary)").show();

countDistinct

有些场景可能需要进行去重后的计数,当然在采用SQL语句时就是将 countdistinct 组合即可。而使用
函数则单独提供了这类函数,需要注意的是这类计数必须指定特定的列才有意义。

  1. df.select(functions.countDistinct("salary")).show();
  2. df.selectExpr("count(distinct *)").show();

approx_count_distinct

在某些场景下,精确的统计计数并不那么重要,某种精度的近似值也可以接受。此时就可以使用该函数,并
通过参数指定可容忍的最大误差。

  1. df.select(functions.approxCountDistinct("salary", 0.1)).show();
  2. df.selectExpr("approx_count_distinct(salary, 0.1)").show();

开发环境为JDK11下会运行错误,需要切换到JDK1.8环境

first&last

通过标题可以知道这两个函数是用于获取数据的第一个与最后一个,但是其顺序是基于DataFrame中行的顺序
而不是DataFrame中值的顺序。

  1. df.select(functions.first("salary"), functions.last("salary")).show();
  2. df.selectExpr("first(salary)", "last(salary)").show();

min&max

顾名思义,其提供了针对指定列计算其最小与最大的值。

  1. df.select(functions.min("salary"), functions.max("salary")).show();
  2. df.selectExpr("min(salary)", "max(salary)").show();

sum&sumDistinct

count 函数类型,前者用于计算总和,后者用于计算去重后的总和。

  1. df.select(functions.sum("salary")).show();
  2. df.selectExpr("sum(salary)").show();
  3. df.select(functions.sumDistinct("salary")).show();
  4. df.selectExpr("sum(distinct salary)").show();

avg&mean

其提供了计算平均值的函数,当然大多数集合函数都支持组合使用去重进行计算。

  1. df.select(functions.avg("salary")).show();
  2. df.selectExpr("avg(salary)").show();
  3. df.select(functions.mean("salary")).show();
  4. df.selectExpr("mean(salary)").show();

方差&标准差

考虑到均值计算时,我们必然会想到方差与标准差。方差其实就是各数据样本与均值之间的差值的平方
的平均值,标准差则是方差的平方根。而Spark支持样本标准差与总体标准差。如果使用 variance
stddev 函数默认是计算样本标准差与样本方差。

  1. df.select(functions.variance("salary")).show(); // 样本方差
  2. df.select(functions.var_samp("salary")).show(); // 同上
  3. df.selectExpr("variance(salary)").show(); // 同上
  4. df.selectExpr("var_samp(salary)").show(); // 同上
  5. df.select(functions.stddev("salary")).show(); // 样本标准差
  6. df.select(functions.stddev_samp("salary")).show(); // 同上
  7. df.selectExpr("stddev(salary)").show(); // 同上
  8. df.selectExpr("stddev_samp(salary)").show(); // 同上
  9. df.select(functions.var_pop("salary")).show(); // 总体方差
  10. df.selectExpr("var_pop(salary)").show(); // 同上
  11. df.select(functions.stddev_pop("salary")).show(); // 总体标准差
  12. df.selectExpr("stddev_pop(salary)").show(); // 同上

偏度系数&峰度系数

偏度系数(skewness)和峰度系数(kurtosis)都是对数据集中极端数据点的衡量指标。偏度系数衡量数据相对于
平均值的不对称程度,而峰度系数衡量数据分布形态的陡缓程度。

  1. df.select(functions.skewness("salary")).show(); // 偏度系数
  2. df.selectExpr("skewness(salary)").show(); // 同上
  3. df.select(functions.kurtosis("salary")).show(); // 峰度系数
  4. df.selectExpr("kurtosis(salary)").show(); // 同上

协方差&相关性

上述我们讨论了单列的概念,但是有的函数是去计算比较两个不同列的的值之间的相互关系。其中这两个函数为
covcorr 。她们分别用于计算协方差和相关性。相关性采用Pearson相关系数来衡量,范围是-1~+1。根 var
函数一样,协方差又分为样本协方差和总体协方差,因此在使用的时候需要指定,而相关性没有这个概念。

  1. df.select(functions.corr("salary", "count")).show(); // 相关性
  2. df.selectExpr("corr(salary, count)").show(); // 同上
  3. df.select(functions.covar_pop("salary", "count")).show(); // 总体协方差
  4. df.selectExpr("covar_pop(salary, count)").show(); // 同上
  5. df.select(functions.covar_samp("salary", "count")).show(); // 样本协方差
  6. df.selectExpr("covar_samp(salary, count)").show(); // 同上

输出复杂类型

除了可以针对数值类型进行聚合操作,还可以在其他复杂类型上进行操作,如收集某列上的值到一个list列表里,
或者将unique唯一值收集到一个set集合里。

  1. df.select(functions.collect_list("salary"), functions.collect_set("salary")).show();
  2. df.selectExpr("collect_list(salary)", "collect_set(salary)").show();

2. 分组

上述我们讲述了常用的聚合操作函数,但是在实际数据分析过程中往往是需要针对数据进行分组后在进行各类
聚合函数的计算,所以本章节将介绍各类分组的方式。

常规分组

在使用分组中我们不是将函数直接做为表达式传递到select中,而应该在agg函数中进行使用。

  1. df.groupBy("category").agg(functions.count("salary").alias("pre"), functions.expr("count(salary)")).orderBy(functions.col("pre").desc()).show();
  2. df.groupBy("category").sum("salary").show();

Map分组

读者可以通过 Map<String, String> 来实现按照Key指定的列进行Value的聚合函数操作,由于Java语言的特性,Key不能
存在重复,也就意味每个列只能进行一次聚合操作。

  1. df.groupBy("category").agg(new HashMap<String, String>() {{
  2. put("salary", "sum");
  3. put("count", "count");
  4. }}).show();

3. 窗口函数

在实际需求中我们往往需要计算每行数据的排名,或者本行数据累计前面数据的总值等各类聚合操作。为了实现这类需求
Spark提供了Window函数来帮助我们实现这一特性,而使用这一特性我们需要先定义窗口的分组,当然这里跟 group by
分组并不是一个概念,指定窗口滚动的策略,比如指定前面N行到后面N行,或者从第一行到本行等各种滚动策略,这些策略
将会影响聚合函数的数据操作的范围。
下面我们就定义了一个窗口,该窗口根据 category 分组,依据 salary 排序,且窗口的滚动策略采用第一行到本行。完成
了窗口的设定后我们就可以使用该窗口来组合各种聚合计算操作。

  1. WindowSpec windowSpec = Window.partitionBy("category")
  2. .orderBy(functions.col("salary"))
  3. .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
  4. Column salarySum = functions.sum("count").over(windowSpec);
  5. Column countRank = functions.avg("salary").over(windowSpec);

完成上述所有工作后,我们就可以跟获取列一样在正常的 select 中进行使用了,比如下述方式。

  1. df.select(functions.col("*"), salarySum, countRank).show();

4. 分组集

很多时候我们需要计算不同维度的聚合数据,通过传统的 group by 组合虽然可以解决这类需求,但是往往需要繁多的语句
才能解决。Spark为了解决这类需求,提供了分组集的功能,通过所提供的函数即可实现我们的诉求。 其中一种 GROUPING SETS
操作可以实现,但是其只能在SQL中进行使用,具体使用方式如下。

  1. SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
  2. GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
  3. ORDER BY CustomerId DESC, stockCode DESC

如果希望能够利用API进行操作可以利用 rollupcube 函数实现类似的功能。首先是 rollup 函数,该函数提供的维度
相比较于 cube 较低,如 namecategory 列进行多维聚合操作的情况下,仅计算 namecategory 的各种组合的聚
合计算,以及各种name 聚合计算结果,最后就是总计算结果。

  1. df.rollup("name", "category")
  2. .agg(new HashMap<String, String>() {{
  3. put("salary", "sum");
  4. put("count", "avg");
  5. }})
  6. .selectExpr("name", "category", "`sum(salary)` as total", "`avg(count)` as avg").show();

该函数无法实现全部的组合,那么我们就需要另一个函数 cube 来实现所有维度的组合计算。

  1. df.cube("name", "category")
  2. .agg(new HashMap<String, String>() {{
  3. put("salary", "sum");
  4. put("count", "avg");
  5. }})
  6. .selectExpr("name", "category", "`sum(salary)` as total", "`avg(count)` as avg").show();

因为组合存在各种层级,我们可以通过 grouping_id 函数进行输出,使用方式如下。

  1. df.cube("name", "category")
  2. .agg(functions.expr("grouping_id()"), functions.sum("salary"), functions.avg("count"))
  3. .orderBy(functions.expr("grouping_id()").desc())
  4. .selectExpr("name", "category", "`sum(salary)` as total", "`avg(count)` as avg", "`grouping_id()`").show();

透视转换

通过该函数可以根据某列中的不同行创建多个列,也就是我们所说的行转列,比如我们统计了每个地区的消费
金额,但是我们希望将各个地方做为列,最终拼接成为一行数据,那么就可以利用本章函数。

  1. df.groupBy("category").pivot("name").sum().show();

5. 自定义聚合函数

只能在scala与java中自定义聚合函数,自定义函数需要继承 UserDefinedAggregateFunction 类,具体实现如下。

  1. public static class BoolAnd extends UserDefinedAggregateFunction {
  2. /**
  3. * 指定UDAF中间结果
  4. */
  5. @Override
  6. public StructType bufferSchema() {
  7. return DataTypes.createStructType(Arrays.asList(
  8. DataTypes.createStructField("result", DataTypes.BooleanType, true)
  9. ));
  10. }
  11. /**
  12. * 指定结果的类型
  13. */
  14. @Override
  15. public DataType dataType() {
  16. return DataTypes.BooleanType;
  17. }
  18. /**
  19. * 指定UDAF对于某个输入是否会返回相同的结果
  20. */
  21. @Override
  22. public boolean deterministic() {
  23. return true;
  24. }
  25. /**
  26. * 用于生成聚合最终结果
  27. */
  28. @Override
  29. public Object evaluate(Row arg0) {
  30. return arg0.getBoolean(0);
  31. }
  32. /**
  33. * 初始化聚合缓冲区的初始值
  34. */
  35. @Override
  36. public void initialize(MutableAggregationBuffer arg0) {
  37. arg0.update(0, true);
  38. }
  39. /**
  40. * 指定输入的参数类型
  41. */
  42. @Override
  43. public StructType inputSchema() {
  44. return DataTypes.createStructType(Arrays.asList(
  45. DataTypes.createStructField("value", DataTypes.BooleanType, true)
  46. ));
  47. }
  48. /**
  49. * 如何合并两个聚合缓冲区
  50. * /
  51. @Override
  52. public void merge(MutableAggregationBuffer arg0, Row arg1) {
  53. arg0.update(0, (boolean)arg1.getAs(0));
  54. }
  55. /**
  56. * 如何根据给定行更新内部缓冲区
  57. * /
  58. @Override
  59. public void update(MutableAggregationBuffer arg0, Row arg1) {
  60. arg0.update(0, (boolean)arg0.getAs(0) && (boolean)arg1.getAs(0));
  61. }
  62. }

完成以上自定函数的编写后,我们就需要注册并使用该函数。

  1. session.udf().register("booland", new BoolAnd());
  2. df.selectExpr("booland(true)").show();

二、连接

实际开发过程中,往往需要进行多表的连接操作。而Spark提供了类似传统数据库的各类连接操作,其主要通过使用
连接函数中传入不同的连接类型参数进行控制,所以下述可以列举中各类方式。

  1. // 内连接
  2. empy.join(category, empy.col("category").equalTo(category.col("cag"))).show();
  3. empy.join(category, empy.col("category").equalTo(category.col("cag")), "inner").show();
  4. // 外连接
  5. empy.join(category, empy.col("category").equalTo(category.col("cag")), "outer").show();
  6. // 左外连接
  7. empy.join(category, empy.col("category").equalTo(category.col("cag")), "left_outer").show();
  8. // 右外连接
  9. empy.join(category, empy.col("category").equalTo(category.col("cag")), "right_outer").show();
  10. // 左半连接
  11. empy.join(category, empy.col("category").equalTo(category.col("cag")), "left_semi").show();
  12. // 左反连接
  13. empy.join(category, empy.col("category").equalTo(category.col("cag")), "left_anti").show();
  14. // 交叉连接
  15. empy.join(category, empy.col("category").equalTo(category.col("cag")), "left_anti").show();

内连接:匹配两个DataFrame中指定键相等的任意两行,并将其连接后返回;
外连接:即满足内连接的情况下也会将其他不匹配的进行数据,对应不存在的列以NULL填充;
左外连接:即满足内连接后,左侧即使不满足的数据均需要展示,不存在关联数据的列以NULL填充;
右外连接:即满足内连接后,右侧即使不满足的数据均需要展示,不存在关联数据的列以NULL填充;
左半连接:它只是查看左侧DataFrame的值是否存在于右侧DataFrame里,如果存在则在连接结果中保留;
左反连接:左反连接与左半连接相反;