前言

本文为阅读”Spark The Definitive Guide”Chapter 7 所做的归纳和整理,部分代码来自于书中,部分为自己在本机试验所得

Definition of Aggregation

Aggregation is the act of collecting something together.

Brief Overview

聚合的类型主要有以下几种

  • 针对整个数据集(Dataframe)
  • 分组聚合
  • 窗口聚合
  • Grouping Sets
  • rollup (Hierarchically)
  • cube (所有可能的组合)

返回的结果: RelationalGroupedDataset

val df = spark.read.format(“csv”)
.option(“header”, “true”)
.option(“inferSchema”, “true”)
.load(“/data/retail-data/all/*.csv”)
.coalesce(5)
df.cache()
df.createOrReplaceTempView(“dfTable”)

Aggregation - 图1

Aggregation Functions

聚合函数大部分被定义在 or.apache.spark.sql.functions

count

import org.apache.spark.sql.functions.count
df.select(count(“StockCode”)).show() // 541909

countDistinct

| ``` import org.apache.spark.sql.functions.countDistinct df.select(countDistinct(“StockCode”)).show() // 4070

  1. |
  2. | --- |
  3. <a name="Aggregation-approx_count_distinct"></a>
  4. ## approx_count_distinct
  5. |

import org.apache.spark.sql.functions.approx_count_distinct df.select(approx_count_distinct(“StockCode”, 0.1)).show() // 3364

 |
| --- |

<a name="Aggregation-firstandlast"></a>
## first and last
|

import org.apache.spark.sql.functions.{first, last} df.select(first(“StockCode”), last(“StockCode”)).show()

 |
| --- |

<a name="Aggregation-minandmax"></a>
## min and max
|

import org.apache.spark.sql.functions.{min, max} df.select(min(“Quantity”), max(“Quantity”)).show()

 |
| --- |

<a name="Aggregation-sum"></a>
## sum
|

import org.apache.spark.sql.functions.sum df.select(sum(“Quantity”)).show() // 5176450

 |
| --- |

<a name="Aggregation-sumDistinct"></a>
## sumDistinct
|

import org.apache.spark.sql.functions.sumDistinct df.select(sumDistinct(“Quantity”)).show() // 29310

 |
| --- |

<a name="Aggregation-avg"></a>
## avg
|

import org.apache.spark.sql.functions.{sum, count, avg, expr}

df.select( count(“Quantity”).alias(“total_transactions”), sum(“Quantity”).alias(“total_purchases”), avg(“Quantity”).alias(“avg_purchases”), expr(“mean(Quantity)”).alias(“mean_purchases”)) .selectExpr( “total_purchases/total_transactions”, “avg_purchases”, “mean_purchases”).show()

 |
| --- |

<a name="Aggregation-varianceandstandarddeviation"></a>
## variance and standard deviation
|

import org.apache.spark.sql.functions.{var_pop, stddev_pop} import org.apache.spark.sql.functions.{var_samp, stddev_samp} df.select(var_pop(“Quantity”), var_samp(“Quantity”), stddev_pop(“Quantity”), stddev_samp(“Quantity”)).show()

 |
| --- |

<a name="Aggregation-skewnessandkurtosis"></a>
## skewness and kurtosis
|

import org.apache.spark.sql.functions.{skewness, kurtosis} df.select(skewness(“Quantity”), kurtosis(“Quantity”)).show()

 |
| --- |

<a name="Aggregation-covarianceandcorrelation"></a>
## covariance and correlation
| import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}<br />df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),<br />    covar_pop("InvoiceNo", "Quantity")).show() |
| --- |

<a name="Aggregation-aggregatingtocomplextypes"></a>
## aggregating to complex types
|

import org.apache.spark.sql.functions.{collect_set, collect_list} df.agg(collect_set(“Country”), collect_list(“Country”)).show()

 |
| --- |

<a name="Aggregation-Grouping"></a>
# Grouping
分组聚合,输入多行,输出一行
<a name="Aggregation-GroupingWithExpressions"></a>
## Grouping With Expressions
|

df.groupBy(“InvoiceNo”).agg( count(“Quantity”).alias(“quan”), expr(“count(Quantity)”)).show()

 |
| --- |

<a name="Aggregation-GroupingWithMaps"></a>
## Grouping With Maps
|

df.groupBy(“InvoiceNo”).agg(“Quantity”->”avg”, “Quantity”->”stddev_pop”).show()

 |
| --- |

<a name="Aggregation-WindowFunctions"></a>
# Window Functions
对输入的每一行在一个分组的窗口范围内做计算,最后为每一行输出一个结果<br />![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469216721-3b79da6f-5370-43cf-9af6-a8a551b11ce4.png#align=left&display=inline&height=366&margin=%5Bobject%20Object%5D&originHeight=366&originWidth=665&status=done&style=none&width=665)<br />执行一个窗口函数需要两个要素

- 定义一个窗口<br />
- 定义一个窗口函数 , 输入: 窗口内的所有行, 输出:一行,目前Spark支持三种窗口函数
   - aggregate function<br />
   - ranking function<br />
   - analytic function<br />
<a name="NjnjH"></a>
## 如何定义一个窗口
一个窗口的定义需要三个要件

- 分组定义,即partition by 的条件,窗口只在指定的分组内生效<br />
- 分组内排序条件,即order by的条件,指定了在一个分组内的每一行如何排列<br />
- Window Frame的定义,即rowsBetween, 指定了哪些行可以落在当前行的窗口内<br />

为了更好的说明,举个例子

|

import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.col val windowSpec = Window .partitionBy(“CustomerId”, “date”) .orderBy(col(“Quantity”).desc) .rowsBetween(Window.unboundedPreceding, Window.currentRow)

 |
| --- |

上述代码定义了一个窗口,首先`Window.partitionBy("CustomerId", "date")` 指定了当前窗口的分组为相同CustomerId, 相同日期的行,接着`orderBy`指定了在分组内按照Quantity来排序,最后,`rowsBetween` 定义了对于每一行来说窗口作用的范围,在上述例子中代表当前行前的所有行到当前行,当然也可以用数字,例如`rowsBetween(-1,0)`代表前一行到当前行
<a name="7YiRI"></a>
## 定义一个窗口函数
目前Spark支持的窗口函数主要有三种

- aggregate function<br />
- ranking function<br />
- analytic function<br />

分别举例说明
<a name="Aggregation-aggregatefunction"></a>
### aggregate function
|

import org.apache.spark.sql.functions.max val maxPurchaseQuantity = max(col(“Quantity”)).over(windowSpec)

 |
| --- |

上述代码返回了窗口内的最大Quantity值
<a name="Aggregation-rankingfunction"></a>
### ranking function
|

import org.apache.spark.sql.functions.{dense_rank, rank} val purchaseDenseRank = dense_rank().over(windowSpec) val purchaseRank = rank().over(windowSpec)

 |
| --- |

- dense_rank() 返回去重后的rank<br />
- rank() 返回真实排名,如果同一个排名有多个相同的值,后续的rank依次累加<br />
<a name="BEsmT"></a>
## 完整的例子
<a name="Aggregation-Scala"></a>
### Scala
|

import org.apache.spark.sql.functions.{col, to_date} val dfWithDate = df.withColumn(“date”, to_date(col(“InvoiceDate”), “MM/d/yyyy H:mm”)) dfWithDate.createOrReplaceTempView(“dfWithDate”)

// in Scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.col val windowSpec = Window .partitionBy(“CustomerId”, “date”) .orderBy(col(“Quantity”).desc) .rowsBetween(Window.unboundedPreceding, Window.currentRow)

import org.apache.spark.sql.functions.max val maxPurchaseQuantity = max(col(“Quantity”)).over(windowSpec)

// in Scala import org.apache.spark.sql.functions.{dense_rank, rank} val purchaseDenseRank = dense_rank().over(windowSpec) val purchaseRank = rank().over(windowSpec)

// in Scala import org.apache.spark.sql.functions.col

dfWithDate.where(“CustomerId IS NOT NULL”).orderBy(“CustomerId”) .select( col(“CustomerId”), col(“date”), col(“Quantity”), purchaseRank.alias(“quantityRank”), purchaseDenseRank.alias(“quantityDenseRank”), maxPurchaseQuantity.alias(“maxPurchaseQuantity”)).show()

 |
| --- |

<a name="Aggregation-SQL"></a>
### SQL
| SELECT CustomerId, date, Quantity,<br />  rank(Quantity) OVER (PARTITION BY CustomerId, date<br />                       ORDER BY Quantity DESC NULLS LAST<br />                       ROWS BETWEEN<br />                         UNBOUNDED PRECEDING AND<br />                         CURRENT ROW) as rank,<br />  dense_rank(Quantity) OVER (PARTITION BY CustomerId, date<br />                             ORDER BY Quantity DESC NULLS LAST<br />                             ROWS BETWEEN<br />                               UNBOUNDED PRECEDING AND<br />                               CURRENT ROW) as dRank,<br />  max(Quantity) OVER (PARTITION BY CustomerId, date<br />                      ORDER BY Quantity DESC NULLS LAST<br />                      ROWS BETWEEN<br />                        UNBOUNDED PRECEDING AND<br />                        CURRENT ROW) as maxPurchase<br />FROM dfWithDate WHERE CustomerId IS NOT NULL ORDER BY CustomerId |
| --- |

![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469216802-cd2a457e-fb94-4636-b605-982e3ead4dd3.png#align=left&display=inline&height=227&margin=%5Bobject%20Object%5D&originHeight=227&originWidth=649&status=done&style=none&width=649)
<a name="Aggregation-GroupingSets"></a>
# Grouping Sets
grouping sets可以理解为按照一个或多个维度分组并统计,grouping sets只作用于SQL<br />例如,下面两段SQL在结果上是等效的

| SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull<br />GROUP BY customerId, stockCode<br />ORDER BY CustomerId DESC, stockCode DESC |
| --- |

|

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode)) ORDER BY CustomerId DESC, stockCode DESC

 |
| --- |

如果我们想要得到对整个结果集以及根据 (CustomerId, stockCode)维度的统计,则需要用到grouping sets

|

SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),()) ORDER BY CustomerId DESC, stockCode DESC

 |
| --- |

同时需要注意的是,grouping sets的数据源必须没有null,否则将会和结果集的null有冲突<br />![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469216871-60ae41e8-b61b-40ba-bb8b-de0599feaf29.png#align=left&display=inline&height=113&margin=%5Bobject%20Object%5D&originHeight=113&originWidth=632&status=done&style=none&width=632)
<a name="Aggregation-Rollups"></a>
## Rollups
Rollups 将 group中的每个元素按照层级进行group by,例如

|

val rolledUpDf = Seq( (“2020-01-01”, “USA”, “30”), (“2020-01-02”, “USA”, “70”), (“2020-01-01”,”China”,”90”)) .toDF(“Date”,”Country”,”Quantity”) .rollup(“Date”,”Country”) .agg(sum(“Quantity”)) .selectExpr(“Date”, “Country”,”sum(Quantity) as totalQuantity”) rolledUpDf.show(false)

 |
| --- |

上述代码将按照三个level进行聚合

- ()<br />
- group by (Date)<br />
- group by (Date, Country)<br />

![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469216936-a8f5a654-1cb4-41c7-b787-6d0b7641dd0d.png#align=left&display=inline&height=221&margin=%5Bobject%20Object%5D&originHeight=221&originWidth=292&status=done&style=none&width=292)
<a name="Aggregation-Cube"></a>
## Cube
按照所有可能的组合进行group by

|

val cubedDF = Seq( (“2020-01-01”, “USA”, “30”), (“2020-01-02”, “USA”, “70”), (“2020-01-01”,”China”,”90”)) .toDF(“Date”,”Country”,”Quantity”) .cube(“Date”,”Country”) .agg(sum(“Quantity”)) .selectExpr(“Date”, “Country”,”sum(Quantity) as totalQuantity”) cubedDF.show(false)

 |
| --- |

![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469217011-ec459bf6-de52-4fad-af74-d80012c828d5.png#align=left&display=inline&height=257&margin=%5Bobject%20Object%5D&originHeight=257&originWidth=285&status=done&style=none&width=285)
<a name="Aggregation-GroupingMetadata"></a>
## Grouping Metadata
为不同的grouping打上编号

|

import org.apache.spark.sql.functions.{grouping_id, sum, expr}

val cubedDF = Seq( (“2020-01-01”, “USA”, “30”), (“2020-01-02”, “USA”, “70”), (“2020-01-01”,”China”,”90”)) .toDF(“Date”,”Country”,”Quantity”) .cube(“Date”,”Country”) .agg(grouping_id(),sum(“Quantity”)) .orderBy(expr(“grouping_id()”).desc) cubedDF.show(false)

 |
| --- |

![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469217083-8aa81a74-113b-4a76-92fa-95a92b44d42b.png#align=left&display=inline&height=253&margin=%5Bobject%20Object%5D&originHeight=253&originWidth=414&status=done&style=none&width=414)
<a name="Aggregation-Pivot"></a>
## Pivot
行转列

|

val pivoted = dfWithDate.groupBy(“date”).pivot(“Country”).sum() pivoted.where(“date > ‘2011-12-05’”).select(“date” ,”USA_sum(Quantity)“).show()

 |
| --- |

![](https://cdn.nlark.com/yuque/0/2021/png/12591877/1615469217154-b9e0d7e0-7e1d-417e-8aa2-27fdaa62823c.png#align=left&display=inline&height=143&margin=%5Bobject%20Object%5D&originHeight=143&originWidth=242&status=done&style=none&width=242)
<a name="Aggregation-User-DefinedAggregationFunctions"></a>
# User-Defined Aggregation Functions
用户自定义聚合函数(UDAF)的几个要件

- inputSchema<br />
- bufferSchema<br />
- deterministic<br />
- initialize<br />
- update<br />
- merge<br />
- evaluate<br />

例子:实现一个聚合函数,输入多行,返回多行是否都为true
```scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  )
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}
object BoolAnd {
  def main(args: Array[String]):Unit = {
    val ba = new BoolAnd
    val spark = SparkSession.builder().master("local").appName("yifan_spark_test2")
      .getOrCreate()
    spark.udf.register("booland", ba)
    spark.range(1)
      .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
      .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
      .select(ba(col("t")), expr("booland(f)"))
      .show()
  }

Aggregation - 图2

Q&A

Spark SQL 在执行聚合时(groupBy)底层调用了哪些函数?(TODO)
countDistinct 和 groupBy哪一个性能比较好? (TODO)