需要我们深入Spark的核心原理,不断去尝试每一个API,算子,设置不同的配置参数,最终找到最佳的排列组合
小🌰 1:
//实现方案1 —— 反例
val extractFields: Seq[Row] => Seq[(String, Int)] = {
(rows: Seq[Row]) => {
var fields = Seq[(String, Int)]()
rows.map(row => {
fields = fields :+ (row.getString(2), row.getInt(4))
})
fields
}
}
函数式编程范式:原则之一就是尽可能在函数体中避免副作用—-函数对于状态的修改和变更
//实现方案2 —— 正例
val extractFields: Seq[Row] => Seq[(String, Int)] = {
(rows: Seq[Row]) =>
rows.map(row => (row.getString(2), row.getInt(4))).toSeq
}
小🌰 2:
//实现方案1 —— 反例
def createInstance(factDF: DataFrame, startDate: String, endDate: String): DataFrame = {
val instanceDF = factDF
.filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate))
.groupBy("dim1", "dim2", "dim3", "event_date")
.agg(sum("value") as "sum_value")
instanceDF
}
pairDF.collect.foreach{
case (startDate: String, endDate: String) =>
val instance = createInstance(factDF, startDate, endDate)
val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}"
instance.write.parquet(outPath)
}
- collect是将结果收集到Driver端的操作,Driver可能形成单点瓶颈
- pariDF.collect后foreach,每一次循环都会扫全量factDF ```scala //实现方案2 —— 正例 val instances = factDF .join(pairDF, factDF(“eventDate”) > pairDF(“startDate”) && factDF(“eventDate”) <= pairDF(“endDate”)) .groupBy(“dim1”, “dim2”, “dim3”, “eventDate”, “startDate”, “endDate”) .agg(sum(“value”) as “sum_value”)
instances.write.partitionBy(“endDate”, “startDate”).parquet(rootPath)
<a name="JURK5"></a>
### 常规操作
1. 对多次使用的RDD进行持久化
1. 尽可能避免使用shuffle类算子:尽可能避免使用reduceByKey, join(可采用 Broadcast Join), distinct, repartition
1. 尽量使用map-side预聚合的算子reduceByKey代替groupByKey
1. 使用高性能算子
Filter + Coalesce和mapPartitions代替map<br />foreachPartitions代替foreach
5. 广播大变量
<a name="P8q16"></a>
### 能省则省,能拖则拖<br /><br />
1. 尽量把能节省数据扫描量和数据处理量的操作往前推
1. 尽力消灭Shuffle,省去数据罗盘与分发的开销
1. 如果不能干掉Shuffle,尽可能把涉及Shuffle的操作拖到最后去执行
🌰
```scala
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
//读取日志文件,去重、并展开userInterestList
def createDF(rootPath: String, date: String): DataFrame = {
val path: String = rootPath + date
val df = spark.read.parquet(path)
.distinct
.withColumn("userInterest", explode(col("userInterestList")))
df
}
//提取字段、过滤,再次去重,把多天的结果用union合并
val distinctItems: DataFrame = dates.map{
case date: String =>
val df: DataFrame = createDF(rootPath, date)
.select("userId", "itemId", "userInterest", "accessFreq")
.filter("accessFreq in ('High', 'Medium')")
.distinct
df
}.reduce(_ union _)
优化后
val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _
val filePaths: List[String] = dates.map(rootPath + _)
/**
一次性调度所有文件
先进行过滤和列剪枝
然后再展开userInterestList
最后统一去重
*/
val distinctItems = spark.read.parquet(filePaths: _*)
.filter("accessFreq in ('High', 'Medium'))")
.select("userId", "itemId", "userInterestList")
.withColumn("userInterest", explode(col("userInterestList")))
.select("userId", "itemId", "userInterest")
.distinct
val ds: Dataset[Row] = df.mapPartitions(iterator => {
val util = new Util() // 实例化Util的操作不需要那么多次
val res = iterator.map{
case row=>{
val s: String = row.getString(0) + row.getString(1) + row.getString(2)
val hashKey: String = util.md5.digest(s.getBytes).map("%02X".format(_)).mkString
(hashKey, row.getInt(3)) }}
res
})
import java.security.MessageDigest
class Util {
val md5: MessageDigest = MessageDigest.getInstance("MD5")
val sha256: MessageDigest = _ //其他哈希算法
}