1. SQL执行流程

Flink SQL概述 - 图1

28

https://zhuanlan.zhihu.com/p/157265381

标准的一条 flink sql 运行起来的流程如下:
  1. sql 解析阶段:calcite parser 解析(sql -> AST,AST 即 SqlNode Tree)
  2. SqlNode 验证阶段:calcite validator 校验(SqlNode -> SqlNode,语法、表达式、表信息)
  3. 语义分析阶段:SqlNode 转换为 RelNode,RelNode 即 Logical Plan(SqlNode -> RelNode)
  4. 优化阶段:calcite optimizer 优化(RelNode -> RelNode,剪枝、谓词下推等)
  5. Flink源码分析 - Flink SQL中的MiniBatch Aggregation优化 Flink入坑指南 第四章:SQL中的经典操作Group By+Agg-阿里云开发者社区
  6. 物理计划生成阶段:Logical Plan 转换为 Physical Plan
  7. 代码生成:生成operator、UDF、表达式,包装为transformation
  8. 调度执行:transformation -> streamGraph -> JobGraph -> ExecutionGraph -> Task exec

Flink SQL概述 - 图2

2. 执行计划

  1. == Abstract Syntax Tree ==
  2. LogicalSink(table=[default_catalog.default_database.sink_table], fields=[order_id, count_result, sum_result, avg_result, min_result, max_result])
  3. +- LogicalProject(order_id=[$0], count_result=[CAST($1):BIGINT], sum_result=[$2], avg_result=[CAST($3):DOUBLE], min_result=[$4], max_result=[$5])
  4. +- LogicalAggregate(group=[{0}], count_result=[COUNT()], sum_result=[SUM($1)], avg_result=[AVG($1)], min_result=[MIN($1)], max_result=[MAX($1)])
  5. +- LogicalTableScan(table=[[default_catalog, default_database, source_table]])
  6. == Optimized Physical Plan ==
  7. Sink(table=[default_catalog.default_database.sink_table], fields=[order_id, count_result, sum_result, avg_result, min_result, max_result])
  8. +- Calc(select=[order_id, CAST(count_result) AS count_result, sum_result, CAST(avg_result) AS avg_result, min_result, max_result])
  9. +- GroupAggregate(groupBy=[order_id], select=[order_id, COUNT(*) AS count_result, SUM(price) AS sum_result, AVG(price) AS avg_result, MIN(price) AS min_result, MAX(price) AS max_result])
  10. +- Exchange(distribution=[hash[order_id]])
  11. +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[order_id, price])
  12. == Optimized Execution Plan ==
  13. Sink(table=[default_catalog.default_database.sink_table], fields=[order_id, count_result, sum_result, avg_result, min_result, max_result])
  14. +- Calc(select=[order_id, CAST(count_result) AS count_result, sum_result, CAST(avg_result) AS avg_result, min_result, max_result])
  15. +- GroupAggregate(groupBy=[order_id], select=[order_id, COUNT(*) AS count_result, SUM(price) AS sum_result, AVG(price) AS avg_result, MIN(price) AS min_result, MAX(price) AS max_result])
  16. +- Exchange(distribution=[hash[order_id]])
  17. +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[order_id, price])

3. 查看生成的代码

Flink SQL概述 - 图3

Flink SQL概述 - 图4