1.根据SqlQueryExecution::analyzeQuery方法生成subPlan
2.根据获取subPlan信息,在SqlQueryExecution::planDistribution 方法调用了DistributedExecutionPlanner的plan方法
private void planDistribution(PlanRoot plan)
{
// time distribution planning 时间分配计划
stateMachine.beginDistributedPlanning();
// plan the execution on the active nodes 计划活动节点上的执行
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata);
StageExecutionPlan outputStageExecutionPlan;
Session session = stateMachine.getSession();
Map<Integer, Integer> parallelSources;
if (SystemSessionProperties.isSnapshotEnabled(session)) {
// Snapshot: UNION statement creates situation where multiple table scan stages communicate with the
// same ExchangeOperator, we need to be aware of that in order to adjust the taskCount on Markers coming from
// those table scan stages to have successful snapshots.
// The key represents the table scan stage's stageId and value represents how many table scan stages
// are in parallel in total.
//快照:UNION语句创建了多个表扫描阶段与同一个ExchangeOperator进行通信的情况,我们需要注意这一点,
// 以便调整来自那些表扫描阶段的Marks上的taskCount以获得成功的快照。
// 该键表示表扫描阶段的stageId,而值表示总共有多少个表扫描阶段并行。
parallelSources = new HashMap<>();
// Snapshot: need to plan different when snapshot is enabled. 启用快照时需要进行不同的计划
// See the "plan" method for difference between the different modes. 不同模式的区别见“计划”方法
MarkerAnnouncer announcer = splitManager.getMarkerAnnouncer(session);
outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), session, SNAPSHOT, null, announcer.currentSnapshotId(), parallelSources);
}
else {
parallelSources = null;
// 将子计划生成对应的 阶段执行计划
outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), session, NORMAL, null, 0, null);
}
// 结束分发计划
stateMachine.endDistributedPlanning();
// ensure split sources are closed 确保分片数据源已经关闭
stateMachine.addStateChangeListener(state -> {
if (state.isDone()) {
closeSplitSources(outputStageExecutionPlan);
}
});
// if query was canceled, skip creating scheduler
if (stateMachine.isDone()) {
return;
}
// record output field 获取输出字段 -- 》 <字段名,字段类型> 映射关系
stateMachine.setColumns(outputStageExecutionPlan.getFieldNames(), outputStageExecutionPlan.getFragment().getTypes());
PartitioningHandle partitioningHandle = plan.getRoot().getFragment().getPartitioningScheme().getPartitioning().getHandle();
OutputBuffers rootOutputBuffers = createInitialEmptyOutputBuffers(partitioningHandle)
.withBuffer(OUTPUT_BUFFER_ID, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds();
// build the stage execution objects (this doesn't schedule execution) 构建阶段执行对象(不是调度执行)
SqlQueryScheduler scheduler = createSqlQueryScheduler(
stateMachine,
locationFactory,
outputStageExecutionPlan,
nodePartitioningManager,
nodeScheduler,
remoteTaskFactory,
stateMachine.getSession(),
plan.isSummarizeTaskInfos(),
scheduleSplitBatchSize,
queryExecutor,
schedulerExecutor,
failureDetector,
rootOutputBuffers,
nodeTaskMap,
executionPolicy,
schedulerStats,
dynamicFilterService,
heuristicIndexerManager,
snapshotManager,
null,
parallelSources);
queryScheduler.set(scheduler);
// if query was canceled during scheduler creation, abort the scheduler
// directly since the callback may have already fired
if (stateMachine.isDone()) {
scheduler.abort();
queryScheduler.set(null);
}
}