1.根据SqlQueryExecution::analyzeQuery方法生成subPlan
    2.根据获取subPlan信息,在SqlQueryExecution::planDistribution 方法调用了DistributedExecutionPlanner的plan方法

    1. private void planDistribution(PlanRoot plan)
    2. {
    3. // time distribution planning 时间分配计划
    4. stateMachine.beginDistributedPlanning();
    5. // plan the execution on the active nodes 计划活动节点上的执行
    6. DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata);
    7. StageExecutionPlan outputStageExecutionPlan;
    8. Session session = stateMachine.getSession();
    9. Map<Integer, Integer> parallelSources;
    10. if (SystemSessionProperties.isSnapshotEnabled(session)) {
    11. // Snapshot: UNION statement creates situation where multiple table scan stages communicate with the
    12. // same ExchangeOperator, we need to be aware of that in order to adjust the taskCount on Markers coming from
    13. // those table scan stages to have successful snapshots.
    14. // The key represents the table scan stage's stageId and value represents how many table scan stages
    15. // are in parallel in total.
    16. //快照:UNION语句创建了多个表扫描阶段与同一个ExchangeOperator进行通信的情况,我们需要注意这一点,
    17. // 以便调整来自那些表扫描阶段的Marks上的taskCount以获得成功的快照。
    18. // 该键表示表扫描阶段的stageId,而值表示总共有多少个表扫描阶段并行。
    19. parallelSources = new HashMap<>();
    20. // Snapshot: need to plan different when snapshot is enabled. 启用快照时需要进行不同的计划
    21. // See the "plan" method for difference between the different modes. 不同模式的区别见“计划”方法
    22. MarkerAnnouncer announcer = splitManager.getMarkerAnnouncer(session);
    23. outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), session, SNAPSHOT, null, announcer.currentSnapshotId(), parallelSources);
    24. }
    25. else {
    26. parallelSources = null;
    27. // 将子计划生成对应的 阶段执行计划
    28. outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), session, NORMAL, null, 0, null);
    29. }
    30. // 结束分发计划
    31. stateMachine.endDistributedPlanning();
    32. // ensure split sources are closed 确保分片数据源已经关闭
    33. stateMachine.addStateChangeListener(state -> {
    34. if (state.isDone()) {
    35. closeSplitSources(outputStageExecutionPlan);
    36. }
    37. });
    38. // if query was canceled, skip creating scheduler
    39. if (stateMachine.isDone()) {
    40. return;
    41. }
    42. // record output field 获取输出字段 -- 》 <字段名,字段类型> 映射关系
    43. stateMachine.setColumns(outputStageExecutionPlan.getFieldNames(), outputStageExecutionPlan.getFragment().getTypes());
    44. PartitioningHandle partitioningHandle = plan.getRoot().getFragment().getPartitioningScheme().getPartitioning().getHandle();
    45. OutputBuffers rootOutputBuffers = createInitialEmptyOutputBuffers(partitioningHandle)
    46. .withBuffer(OUTPUT_BUFFER_ID, BROADCAST_PARTITION_ID)
    47. .withNoMoreBufferIds();
    48. // build the stage execution objects (this doesn't schedule execution) 构建阶段执行对象(不是调度执行)
    49. SqlQueryScheduler scheduler = createSqlQueryScheduler(
    50. stateMachine,
    51. locationFactory,
    52. outputStageExecutionPlan,
    53. nodePartitioningManager,
    54. nodeScheduler,
    55. remoteTaskFactory,
    56. stateMachine.getSession(),
    57. plan.isSummarizeTaskInfos(),
    58. scheduleSplitBatchSize,
    59. queryExecutor,
    60. schedulerExecutor,
    61. failureDetector,
    62. rootOutputBuffers,
    63. nodeTaskMap,
    64. executionPolicy,
    65. schedulerStats,
    66. dynamicFilterService,
    67. heuristicIndexerManager,
    68. snapshotManager,
    69. null,
    70. parallelSources);
    71. queryScheduler.set(scheduler);
    72. // if query was canceled during scheduler creation, abort the scheduler
    73. // directly since the callback may have already fired
    74. if (stateMachine.isDone()) {
    75. scheduler.abort();
    76. queryScheduler.set(null);
    77. }
    78. }