Q1: 为 Spark SQL 添加一条自定义命令

SqlBase.g4
Screen Shot 2022-05-08 at 19.19.41.png
Screen Shot 2022-05-08 at 19.19.27.png

Screen Shot 2022-05-08 at 19.20.06.png
Screen Shot 2022-05-08 at 19.20.17.png

Screen Shot 2022-05-08 at 19.20.56.png
Screen Shot 2022-05-08 at 19.21.06.png

Screen Shot 2022-05-08 at 19.21.24.png
Screen Shot 2022-05-08 at 19.21.36.png

通过antlr4生成代码
Screen Shot 2022-05-08 at 19.21.40.png
Screen Shot 2022-05-08 at 19.21.47.png

转换成Unresolved Logical Plan

  1. 在SparkSqlParser.scala下,重载 visitShowVersion 方法。入参:ShowVersionContext是antlr生成的代码 ```scala visitShowVersion(ctx: ShowVersionContext): LogicalPlan = withOrigin(ctx) { ShowVersionCommand() }

case class ShowVersionCommand() extends LeafRunnableCommand {

override val output: Seq[Attribute] = Seq(AttributeReference(“version”, StringType)())

override def run(sparkSession: SparkSession): Seq[Row] = { val sparkVersion = sparkSession.version val javaVersion = System.getProperty(“java.version”) val output = “Spark Version: %s, Java Version: %s”.format(sparkVersion, javaVersion); Seq(Row(output)) }

}

  1. 编译:build/mvn clean package -DskipTests<br />后可通过Spark shell 执行 SHOW VERSION
  2. <a name="Cet12"></a>
  3. ### Q2.1: 构建 一条SQL同时满足满足优化规则:
  4. - **CombineFilters**:合并filter
  5. - **CollapseProject**:该规则主要针对SELECT操作(对应Project逻辑计划)进行合并
  6. - **BooleanSimplification**:简化boolean表达式。
  7. 如:去掉1=1;简化 x + 1 < y + 1 x < y;合并相同的表达式;去除not
  8. ```scala
  9. // combineFilters: 合并a11 > 1 & a1 > 10
  10. // 谓词下推:将a11 > 1 and 1 = 1 下推到 子查询,与 a1 > 10 平齐
  11. // collapseProject: 将 select a1 + 1 as a11, a2; select a11, a2 + 1 as a21
  12. // 简化为 select a1 + 1 as a11, a2 + 1 as a12
  13. // booleanSimplification: 1 = 1 去掉
  14. SELECT a11, (a2 + 1) AS a21
  15. FROM (
  16. SELECT (a1 + 1) AS a11, a2 FROM t1 WHERE a1 > 10
  17. ) WHERE a11 > 1 AND 1 = 1;

Screen Shot 2022-05-08 at 20.26.55.png

Q2.2: 构建 一条SQL同时满足满足优化规则:

  • ConstantFolding:SELECT 1+1 ===> SELECT 2
  • PushDownPredicates: 谓词下推,将Predicates下推到离数据源最近的地方
  • ReplaceDistinctWithAggregate:

SELECT DISTINCT f1, f2 FROM t ===> SELECT f1, f2 FROM t GROUP BY f1, f2

  • ReplaceExceptWithAntiJoin:

SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
===>
SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1 <=> b1 AND a2 <=> b2

  • FoldablePropagation:

SELECT 1.0 x, ‘abc’ y, Now() z ORDER BY x, y, 3
===>
SELECT 1.0 x, ‘abc’ y, Now() z ORDER BY 1.0, ‘abc’, Now()

  1. // pushDownPredicates:
  2. // 将a2 = 10 AND 1 = 1 推到子查询外,与 a1 > 5 AND 1 = 1 平齐
  3. // BooleanSimplification:
  4. // 将 1 = 1去掉
  5. // ReplaceExceptWithAntiJoin:
  6. // 将EXCEPT换成DISTINCT a1, a2 .. LEFT ANTI JOIN .. ON ..
  7. // ReplaceDistinceWithAggregate:
  8. // 将DISTINCT a1, a2 换为 GROUP BY a1, a2, a3
  9. // foldablePropagation:
  10. // 将 GROUP BY a1, a2, a3 换成 GROUP BY a1, a2, 'custom'
  11. SELECT DISTINCT a1, a2, 'custom' a3
  12. FROM (
  13. SELECT * FROM t1 WHERE a2 = 10 AND 1 = 1
  14. ) WHERE a1 > 5 AND 1 = 1
  15. EXCEPT SELECT b1, b2, 1.0 b3 FROM t2 WHERE b2 = 10;

Q3: 实现自定义Catalyst优化规则(静默规则)

  1. import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
  2. import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  3. import org.apache.spark.sql.catalyst.rules.Rule
  4. import org.apache.spark.sql.catalyst.expressions._
  5. case class MultiplyOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {
  6. def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
  7. case Multiply(left, right, failOnError)
  8. if right.isInstanceOf[Literal] && right.asInstanceOf[Literal].value.asInstanceOf[Int] == 1
  9. => left
  10. case Multiply(left, right, failOnError)
  11. if left.isInstanceOf[Literal] && left.asInstanceOf[Literal].value.asInstanceOf[Int] == 1
  12. => right
  13. }
  14. }
  15. class MultiplyOptimization extends (SparkSessionExtensions => Unit) {
  16. override def apply(extensions: SparkSessionExtensions): Unit = {
  17. extensions.injectOptimizerRule { session =>
  18. MultiplyOptimizationRule(session)
  19. }
  20. }
  21. }
  22. // 通过spark.sql.extensions 提交,将jar包放入spark的classPath
  23. // bin/spark-sql --jars bigdata-camp.jar --conf spark.sql.extensions=MultiplyOptimizationRule