Q1: 为 Spark SQL 添加一条自定义命令
SqlBase.g4
通过antlr4生成代码
转换成Unresolved Logical Plan
- 在SparkSqlParser.scala下,重载 visitShowVersion 方法。入参:ShowVersionContext是antlr生成的代码 ```scala visitShowVersion(ctx: ShowVersionContext): LogicalPlan = withOrigin(ctx) { ShowVersionCommand() }
case class ShowVersionCommand() extends LeafRunnableCommand {
override val output: Seq[Attribute] = Seq(AttributeReference(“version”, StringType)())
override def run(sparkSession: SparkSession): Seq[Row] = { val sparkVersion = sparkSession.version val javaVersion = System.getProperty(“java.version”) val output = “Spark Version: %s, Java Version: %s”.format(sparkVersion, javaVersion); Seq(Row(output)) }
}
编译:build/mvn clean package -DskipTests<br />后可通过Spark shell 执行 SHOW VERSION
<a name="Cet12"></a>
### Q2.1: 构建 一条SQL同时满足满足优化规则:
- **CombineFilters**:合并filter
- **CollapseProject**:该规则主要针对SELECT操作(对应Project逻辑计划)进行合并
- **BooleanSimplification**:简化boolean表达式。
如:去掉1=1;简化 x + 1 < y + 1 为 x < y;合并相同的表达式;去除not
```scala
// combineFilters: 合并a11 > 1 & a1 > 10
// 谓词下推:将a11 > 1 and 1 = 1 下推到 子查询,与 a1 > 10 平齐
// collapseProject: 将 select a1 + 1 as a11, a2; select a11, a2 + 1 as a21
// 简化为 select a1 + 1 as a11, a2 + 1 as a12
// booleanSimplification: 1 = 1 去掉
SELECT a11, (a2 + 1) AS a21
FROM (
SELECT (a1 + 1) AS a11, a2 FROM t1 WHERE a1 > 10
) WHERE a11 > 1 AND 1 = 1;
Q2.2: 构建 一条SQL同时满足满足优化规则:
- ConstantFolding:SELECT 1+1 ===> SELECT 2
- PushDownPredicates: 谓词下推,将Predicates下推到离数据源最近的地方
- ReplaceDistinctWithAggregate:
SELECT DISTINCT f1, f2 FROM t ===> SELECT f1, f2 FROM t GROUP BY f1, f2
- ReplaceExceptWithAntiJoin:
SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
===>
SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1 <=> b1 AND a2 <=> b2
- FoldablePropagation:
SELECT 1.0 x, ‘abc’ y, Now() z ORDER BY x, y, 3
===>
SELECT 1.0 x, ‘abc’ y, Now() z ORDER BY 1.0, ‘abc’, Now()
// pushDownPredicates:
// 将a2 = 10 AND 1 = 1 推到子查询外,与 a1 > 5 AND 1 = 1 平齐
// BooleanSimplification:
// 将 1 = 1去掉
// ReplaceExceptWithAntiJoin:
// 将EXCEPT换成DISTINCT a1, a2 .. LEFT ANTI JOIN .. ON ..
// ReplaceDistinceWithAggregate:
// 将DISTINCT a1, a2 换为 GROUP BY a1, a2, a3
// foldablePropagation:
// 将 GROUP BY a1, a2, a3 换成 GROUP BY a1, a2, 'custom'
SELECT DISTINCT a1, a2, 'custom' a3
FROM (
SELECT * FROM t1 WHERE a2 = 10 AND 1 = 1
) WHERE a1 > 5 AND 1 = 1
EXCEPT SELECT b1, b2, 1.0 b3 FROM t2 WHERE b2 = 10;
Q3: 实现自定义Catalyst优化规则(静默规则)
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
case class MultiplyOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left, right, failOnError)
if right.isInstanceOf[Literal] && right.asInstanceOf[Literal].value.asInstanceOf[Int] == 1
=> left
case Multiply(left, right, failOnError)
if left.isInstanceOf[Literal] && left.asInstanceOf[Literal].value.asInstanceOf[Int] == 1
=> right
}
}
class MultiplyOptimization extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule { session =>
MultiplyOptimizationRule(session)
}
}
}
// 通过spark.sql.extensions 提交,将jar包放入spark的classPath
// bin/spark-sql --jars bigdata-camp.jar --conf spark.sql.extensions=MultiplyOptimizationRule