需要我们深入Spark的核心原理,不断去尝试每一个API,算子,设置不同的配置参数,最终找到最佳的排列组合

    Spark通用调优 - 图1

    小🌰 1:

    1. //实现方案1 —— 反例
    2. val extractFields: Seq[Row] => Seq[(String, Int)] = {
    3. (rows: Seq[Row]) => {
    4. var fields = Seq[(String, Int)]()
    5. rows.map(row => {
    6. fields = fields :+ (row.getString(2), row.getInt(4))
    7. })
    8. fields
    9. }
    10. }

    函数式编程范式:原则之一就是尽可能在函数体中避免副作用—-函数对于状态的修改和变更

    1. //实现方案2 —— 正例
    2. val extractFields: Seq[Row] => Seq[(String, Int)] = {
    3. (rows: Seq[Row]) =>
    4. rows.map(row => (row.getString(2), row.getInt(4))).toSeq
    5. }

    小🌰 2:

    1. //实现方案1 —— 反例
    2. def createInstance(factDF: DataFrame, startDate: String, endDate: String): DataFrame = {
    3. val instanceDF = factDF
    4. .filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate))
    5. .groupBy("dim1", "dim2", "dim3", "event_date")
    6. .agg(sum("value") as "sum_value")
    7. instanceDF
    8. }
    9. pairDF.collect.foreach{
    10. case (startDate: String, endDate: String) =>
    11. val instance = createInstance(factDF, startDate, endDate)
    12. val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}"
    13. instance.write.parquet(outPath)
    14. }
    1. collect是将结果收集到Driver端的操作,Driver可能形成单点瓶颈
    2. pariDF.collect后foreach,每一次循环都会扫全量factDF ```scala //实现方案2 —— 正例 val instances = factDF .join(pairDF, factDF(“eventDate”) > pairDF(“startDate”) && factDF(“eventDate”) <= pairDF(“endDate”)) .groupBy(“dim1”, “dim2”, “dim3”, “eventDate”, “startDate”, “endDate”) .agg(sum(“value”) as “sum_value”)

    instances.write.partitionBy(“endDate”, “startDate”).parquet(rootPath)

    1. <a name="JURK5"></a>
    2. ### 常规操作
    3. 1. 对多次使用的RDD进行持久化
    4. 1. 尽可能避免使用shuffle类算子:尽可能避免使用reduceByKey, join(可采用 Broadcast Join), distinct, repartition
    5. 1. 尽量使用map-side预聚合的算子reduceByKey代替groupByKey
    6. 1. 使用高性能算子
    7. Filter + Coalesce和mapPartitions代替map<br />foreachPartitions代替foreach
    8. 5. 广播大变量
    9. <a name="P8q16"></a>
    10. ### 能省则省,能拖则拖<br /><br />
    11. 1. 尽量把能节省数据扫描量和数据处理量的操作往前推
    12. 1. 尽力消灭Shuffle,省去数据罗盘与分发的开销
    13. 1. 如果不能干掉Shuffle,尽可能把涉及Shuffle的操作拖到最后去执行
    14. 🌰
    15. ```scala
    16. val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
    17. val rootPath: String = _
    18. //读取日志文件,去重、并展开userInterestList
    19. def createDF(rootPath: String, date: String): DataFrame = {
    20. val path: String = rootPath + date
    21. val df = spark.read.parquet(path)
    22. .distinct
    23. .withColumn("userInterest", explode(col("userInterestList")))
    24. df
    25. }
    26. //提取字段、过滤,再次去重,把多天的结果用union合并
    27. val distinctItems: DataFrame = dates.map{
    28. case date: String =>
    29. val df: DataFrame = createDF(rootPath, date)
    30. .select("userId", "itemId", "userInterest", "accessFreq")
    31. .filter("accessFreq in ('High', 'Medium')")
    32. .distinct
    33. df
    34. }.reduce(_ union _)

    优化后

    1. val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
    2. val rootPath: String = _
    3. val filePaths: List[String] = dates.map(rootPath + _)
    4. /**
    5. 一次性调度所有文件
    6. 先进行过滤和列剪枝
    7. 然后再展开userInterestList
    8. 最后统一去重
    9. */
    10. val distinctItems = spark.read.parquet(filePaths: _*)
    11. .filter("accessFreq in ('High', 'Medium'))")
    12. .select("userId", "itemId", "userInterestList")
    13. .withColumn("userInterest", explode(col("userInterestList")))
    14. .select("userId", "itemId", "userInterest")
    15. .distinct
    16. val ds: Dataset[Row] = df.mapPartitions(iterator => {
    17. val util = new Util() // 实例化Util的操作不需要那么多次
    18. val res = iterator.map{
    19. case row=>{
    20. val s: String = row.getString(0) + row.getString(1) + row.getString(2)
    21. val hashKey: String = util.md5.digest(s.getBytes).map("%02X".format(_)).mkString
    22. (hashKey, row.getInt(3)) }}
    23. res
    24. })
    25. import java.security.MessageDigest
    26. class Util {
    27. val md5: MessageDigest = MessageDigest.getInstance("MD5")
    28. val sha256: MessageDigest = _ //其他哈希算法
    29. }