1.ConnectorTableMetadata

image.png

  1. private final SchemaTableName table;
  2. private final Optional<String> comment;
  3. private final List<ColumnMetadata> columns;
  4. private final Optional<List<ColumnMetadata>> immutableColumns;
  5. private final Map<String, Object> properties;
  6. private final Optional<Set<String>> nonInheritableProperties;

2.ColumnMetadata

image.png

  1. private final String name;
  2. private final Type type;
  3. private final boolean nullable;
  4. private final String comment;
  5. private final String extraInfo;
  6. private final boolean hidden;
  7. private final boolean required;
  8. private final Map<String, Object> properties;

3.TableMetadata

image.png

  1. private final CatalogName catalogName;
  2. private final ConnectorTableMetadata metadata;

4.CatalogName

image.png

  1. private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
  2. private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";
  3. private final String catalogName;

5.ColumnHandle

�ColumnHandle是spi模块下的接口类,不同的类型实现的该句柄的信息不一样
image.png

5.1JdbcColumnHandle

以我们最熟悉的JDBC为例,查看对应的列信息
image.png

  1. private final String columnName;
  2. private final JdbcTypeHandle jdbcTypeHandle;
  3. private final Type columnType;
  4. private final boolean nullable;

5.1.1 Type

Type是一个接口,不同类型都要实现该类,比如常见:CharType,BigintType等
image.png

6.Field

字段信息保存的值
image.png

  1. private final Optional<QualifiedObjectName> originTable; // 该字段属于那个catalog的schema下表
  2. private final Optional<String> originColumnName; // 原始字段名称
  3. private final Optional<QualifiedName> relationAlias;
  4. private final Optional<String> name;
  5. private final Type type;
  6. private final boolean hidden;
  7. private final boolean aliased;

7.QualifiedObjectName

限定名,一般用于保存 catalogname.schemaname.object, 类似于:mysql.test.student,其中,mysql是catalogname,test是schemaname,student是object
image.png

  1. private final String catalogName; // catalogName
  2. private final String schemaName; // schemaName
  3. private final String objectName; // 对象名

8.QualifiedName

限定名
image.png

  1. // 用于保存小写格式的限定名,catalog.schema.table
  2. private final List<String> parts;
  3. // 保存原始的限定名
  4. private final List<Identifier> originalParts;

9.Scope

具有范围,作用域的意思
image.png

  1. private final Optional<Scope> parent;
  2. private final boolean queryBoundary;
  3. private final RelationId relationId;
  4. private final RelationType relation;
  5. private final Map<String, WithQuery> namedQueries;

10.RelationType

image.png

  1. // 可见字段
  2. private final List<Field> visibleFields;
  3. // 所有字段
  4. private final List<Field> allFields;
  5. // 字段与索引的映射关系
  6. private final Map<Field, Integer> fieldIndexes;

11.RelationPlanner

image.png

  1. private static final Logger LOG = Logger.get(RelationPlanner.class); // log日志
  2. private final Analysis analysis; // 保存分析过程的
  3. private final PlanSymbolAllocator planSymbolAllocator; // 标识分配器
  4. private final PlanNodeIdAllocator idAllocato // 计划节点分配器
  5. private final Map<NodeRef<LambdaArgumentDeclaration>, Symbol> lambdaDeclarationToSymbolMap; // lambda声明到标识映射关系
  6. private final Metadata metadata; // 元数据
  7. private final TypeCoercion typeCoercion; // 类型
  8. private final Session session; // 会话
  9. private final SubqueryPlanner subqueryPlanner; // 子计划
  10. private final Map<QualifiedName, Integer> namedSubPlan; // 列名 与 索引映射关系
  11. private final UniqueIdAllocator uniqueIdAllocator; // 唯一分配器

12.PlanSymbolAllocator

  1. private final Map<Symbol, Type> symbols; //字段映射关系
  2. private int nextId; //

13.RelationPlan

  1. private final PlanNode root; // 计划节点
  2. // 来自root节点对应的标识号
  3. private final List<Symbol> fieldMappings; // for each field in the relation, the corresponding symbol from "root"
  4. // 范围
  5. private final Scope scope;

14.TableScanNode

TableScanNode继承自PlanNode节点
image.png

  1. private final TableHandle table; // 表句柄信息,包含了连接表的信息
  2. private final List<Symbol> outputSymbols;
  3. private final Map<Symbol, ColumnHandle> assignments; // symbol -> column
  4. private final TupleDomain<ColumnHandle> enforcedConstraint;
  5. private final Optional<RowExpression> predicate;
  6. private final boolean forDelete;
  7. private ReuseExchangeOperator.STRATEGY strategy;
  8. private UUID reuseTableScanMappingId;
  9. private RowExpression filterExpr;
  10. private Integer consumerTableScanNodeCount;

15.TableHandle

  1. private final CatalogName catalogName; // 属于哪个catalog
  2. private final ConnectorTableHandle connectorHandle; // 以及对应的表的连接信息
  3. private final ConnectorTransactionHandle transaction; //

16.Symbol

  1. private final String name;

17.ReuseExchangeOperator

  1. enum STRATEGY {
  2. REUSE_STRATEGY_DEFAULT, // 默认重启策略
  3. REUSE_STRATEGY_PRODUCER, // 生产者重启策略
  4. REUSE_STRATEGY_CONSUMER // 消费者重启策略
  5. }

18.RowExpression

RowExpression实现不同行表达式
image.png

19.PlanBuilder

  1. private final TranslationMap translations;
  2. private final List<Expression> parameters;
  3. private final PlanNode root;

20.TranslationMap

跟踪字段和表达式以及它们到当前计划中的符号的映射

  1. // all expressions are rewritten in terms of fields declared by this relation plan
  2. // 所有表达式都根据此关系计划声明的字段重写
  3. private final RelationPlan rewriteBase;
  4. private final Analysis analysis;
  5. private final Map<NodeRef<LambdaArgumentDeclaration>, Symbol> lambdaDeclarationToSymbolMap;
  6. // current mappings of underlying field -> symbol for translating direct field references
  7. // 基础字段的当前映射 -> 用于翻译直接字段引用的符号
  8. private final Symbol[] fieldSymbols;
  9. // current mappings of sub-expressions -> symbol
  10. private final Map<Expression, Symbol> expressionToSymbols = new HashMap<>();
  11. private final Map<Expression, Expression> expressionToExpressions = new HashMap<>();

21.CallExpression

  1. private final String displayName;
  2. private final FunctionHandle functionHandle;
  3. private final Type returnType;
  4. private final List<RowExpression> arguments;
  5. private final Optional<RowExpression> filter;

image.png

22.AggregationNode

  1. private final PlanNode source;
  2. private final Map<Symbol, Aggregation> aggregations;
  3. private final GroupingSetDescriptor groupingSets;
  4. private final List<Symbol> preGroupedSymbols;
  5. private final Step step;
  6. private final Optional<Symbol> hashSymbol;
  7. private final Optional<Symbol> groupIdSymbol;
  8. private final List<Symbol> outputs;

image.png

23.PlanOptimizers

PlanOptimizers 优化器集合

  1. private final List<PlanOptimizer> optimizers;
  2. private final RuleStatsRecorder ruleStats = new RuleStatsRecorder();
  3. private final OptimizerStatsRecorder optimizerStats = new OptimizerStatsRecorder();
  4. private final MBeanExporter exporter;

24.RuleStatsRecorder

规则状态记录器

  1. private final Map<Class<?>, RuleStats> stats = new HashMap<>(); // 标识一个类与规则状态映射关系

25.RuleStats

  1. private final AtomicLong hits = new AtomicLong();
  2. private final TimeDistribution time = new TimeDistribution(TimeUnit.MICROSECONDS);
  3. private final AtomicLong failures = new AtomicLong();

TimeDistribution 需要测试一下

26.OptimizerStatsRecorder

  1. private final Map<Class<?>, OptimizerStats> stats = new HashMap<>();

27.OptimizerStats

  1. private final AtomicLong failures = new AtomicLong();
  2. private final TimeDistribution time = new TimeDistribution(TimeUnit.MICROSECONDS);

28.PlanOptimizer

PlanOptimizer是一个接口,所有关于优化相关操作,都需要继承该接口,实现optimize方法
image.png

29.PlanFragment

  1. private final PlanFragmentId id; //
  2. private final PlanNode root;
  3. private final Map<Symbol, Type> symbols;
  4. private final PartitioningHandle partitioning;
  5. private final List<PlanNodeId> partitionedSources;
  6. private final Set<PlanNodeId> partitionedSourcesSet;
  7. private final List<Type> types;
  8. private final Set<PlanNode> partitionedSourceNodes;
  9. private final List<RemoteSourceNode> remoteSourceNodes;
  10. private final PartitioningScheme partitioningScheme;
  11. private final StageExecutionDescriptor stageExecutionDescriptor;
  12. private final StatsAndCosts statsAndCosts;
  13. private final Optional<String> jsonRepresentation;
  14. private final Optional<PlanFragmentId> feederCTEId;
  15. private final Optional<PlanNodeId> feederCTEParentId;

30.SqlStageExecution

  1. private static final Logger log = Logger.get(SqlStageExecution.class);
  2. private final StageStateMachine stateMachine; // 状态机
  3. private final RemoteTaskFactory remoteTaskFactory; // 任务工厂类
  4. private final NodeTaskMap nodeTaskMap; // 保存 node 与 task 的映射关系
  5. private final boolean summarizeTaskInfo; // 是否总结任务信息
  6. private final Executor executor; // 执行器
  7. private final FailureDetector failureDetector; // 失败探测器
  8. private final Map<PlanFragmentId, RemoteSourceNode> exchangeSources; // 计划分段与源节点映射关系
  9. private final Map<InternalNode, Set<RemoteTask>> tasks = new ConcurrentHashMap<>(); // 节点 与 任务的映射关系, 每个节点所包含的任务个数
  10. @GuardedBy("this")
  11. private final AtomicInteger nextTaskId = new AtomicInteger(); // 下个taskId
  12. @GuardedBy("this")
  13. private final Set<TaskId> allTasks = newConcurrentHashSet(); // 所有的任务列表
  14. @GuardedBy("this")
  15. private final Set<TaskId> finishedTasks = newConcurrentHashSet(); // 已经完成的任务列表
  16. @GuardedBy("this")
  17. private final Set<TaskId> tasksWithFinalInfo = newConcurrentHashSet(); // 已经完成任务的最终消息
  18. @GuardedBy("this")
  19. private final AtomicBoolean splitsScheduled = new AtomicBoolean(); // 是否开启切片调度
  20. @GuardedBy("this")
  21. private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create(); // 源任务
  22. @GuardedBy("this")
  23. private final Set<PlanNodeId> completeSources = newConcurrentHashSet(); // 已经完成的 源节点
  24. @GuardedBy("this")
  25. private final Set<PlanFragmentId> completeSourceFragments = newConcurrentHashSet(); // 已经完成的计划分片
  26. private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>(); // 输出缓存
  27. private final ListenerManager<Set<Lifespan>> completedLifespansChangeListeners = new ListenerManager<>(); // 已经完成的 生命周期变化 监听器
  28. private final DynamicFilterService dynamicFilterService; // 动态过滤服务
  29. private final AtomicBoolean dynamicFilterSchedulingInfoPropagated = new AtomicBoolean(); // 传播的动态过滤器调度信息
  30. @GuardedBy("SqlStageExecution.class")
  31. public static Map<QueryId, List<UUID>> queryIdReuseTableScanMappingIdFinishedMap = new ConcurrentHashMap<>();
  32. private PlanNodeId parentId;
  33. private final QuerySnapshotManager snapshotManager;
  34. private boolean throttledSchedule;

31.QuerySnapshotManager

/*
- On the coordinator, it keeps track of snapshot status of all queries 在协调器节点, 它始终跟踪所有查询的状态
- On workers, it serves as a bridge between TaskSnapshotManager instances and the SnapshotStoreClient 在work节点, 工作节点作为 任务快照管理实例 和 快照存储客户端的 连接桥梁
/

32.NodeMap

  1. private final SetMultimap<HostAddress, InternalNode> nodesByHostAndPort;
  2. private final SetMultimap<InetAddress, InternalNode> nodesByHost;
  3. private final SetMultimap<NetworkLocation, InternalNode> workersByNetworkPath;
  4. private final Set<String> coordinatorNodeIds;

33.NodeTaskMap

  1. private static final Logger log = Logger.get(NodeTaskMap.class);
  2. private final ConcurrentHashMap<InternalNode, NodeTasks> nodeTasksMap = new ConcurrentHashMap<>(); // 节点与任务的关系
  3. private final FinalizerService finalizerService;

34.HttpRemoteTask

  1. private static final Logger log = Logger.get(HttpRemoteTask.class);
  2. private final TaskId taskId; // 任务的ID
  3. private final Session session; // 会话信息
  4. private final String nodeId; // nodeID
  5. private final PlanFragment planFragment; // 计划片段
  6. private final OptionalInt totalPartitions; // 所有分区数
  7. private final AtomicLong nextSplitId = new AtomicLong(); // 下一个split
  8. private final RemoteTaskStats stats; // 保存任务统计信息
  9. private final TaskInfoFetcher taskInfoFetcher; // 任务信息收集器, 包括任务失败,成功的状态, 链接信息
  10. private final ContinuousTaskStatusFetcher taskStatusFetcher; // 持续
  11. @GuardedBy("this")
  12. private Future<?> currentRequest; // 当前请求信息
  13. @GuardedBy("this")
  14. private long currentRequestStartNanos; // 当前请求时间
  15. @GuardedBy("this")
  16. //LinkedHashMultimap is used to preserve the order of insertion in addSplits.
  17. //It guarantees that TaskSources and their Splits will be sent in the order they're received.
  18. private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = LinkedHashMultimap.create(); // 保存已经添加的split集合
  19. @GuardedBy("this")
  20. private volatile int pendingSourceSplitCount; // 已经添加源split的个数
  21. @GuardedBy("this")
  22. private final SetMultimap<PlanNodeId, Lifespan> pendingNoMoreSplitsForLifespan = HashMultimap.create();
  23. @GuardedBy("this")
  24. // The keys of this map represent all plan nodes that have "no more splits".
  25. // The boolean value of each entry represents whether the "no more splits" notification is pending delivery to workers.
  26. // 这个map中的key表示所有计划节点没有split
  27. // value的boolean表示否是将"no more splits" 通知信息发送给worker节点
  28. private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<>();
  29. @GuardedBy("this")
  30. private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>(); // 输出缓存
  31. private final FutureStateChange<?> whenSplitQueueHasSpace = new FutureStateChange<>(); //
  32. @GuardedBy("this")
  33. private boolean splitQueueHasSpace = true;
  34. @GuardedBy("this")
  35. private OptionalInt whenSplitQueueHasSpaceThreshold = OptionalInt.empty();
  36. private final boolean summarizeTaskInfo;
  37. private final HttpClient httpClient;
  38. private final Executor executor;
  39. private final ScheduledExecutorService errorScheduledExecutor; // 调度错误的执行期
  40. private final Codec<TaskInfo> taskInfoCodec; // 序列化任务信息
  41. private final Codec<TaskUpdateRequest> taskUpdateRequestCodec; // 序列化任务更新请求
  42. private final RequestErrorTracker updateErrorTracker; //
  43. private final AtomicBoolean needsUpdate = new AtomicBoolean(true);
  44. private final AtomicBoolean sendPlan = new AtomicBoolean(true);
  45. private final PartitionedSplitCountTracker partitionedSplitCountTracker;
  46. private final AtomicBoolean aborting = new AtomicBoolean(false);
  47. private final AtomicBoolean abandoned = new AtomicBoolean(false);
  48. private final AtomicBoolean cancelledToResume = new AtomicBoolean(false);
  49. private final boolean isBinaryEncoding;
  50. private Optional<PlanNodeId> parent; // 是否有父节点

35.ScheduledSplit

  1. private final long sequenceId; // 顺序ID
  2. private final PlanNodeId planNodeId; // 节点的ID
  3. private final Split split; // 节点对应的split

36.TaskInfo

  1. private final TaskStatus taskStatus; // 任务状态数据
  2. private final DateTime lastHeartbeat; // 记录最后一次的时间
  3. private final OutputBufferInfo outputBuffers; // 输出的缓存信息
  4. private final Set<PlanNodeId> noMoreSplits; // planNode集合
  5. private final TaskStats stats; // 任务统计信息
  6. private final boolean needsPlan; //

37.ContinuousTaskStatusFetcher

  1. private static final Logger log = Logger.get(ContinuousTaskStatusFetcher.class);
  2. private final TaskId taskId; // task ID
  3. private final Consumer<Throwable> onFail; // task失败进行的回调函数
  4. private final StateMachine<TaskStatus> taskStatus; // 任务状态机
  5. private final Codec<TaskStatus> taskStatusCodec; // 序列化任务状态信息
  6. private final Duration refreshMaxWait;
  7. private final Executor executor;
  8. private final HttpClient httpClient;
  9. private final RequestErrorTracker errorTracker;
  10. private final RemoteTaskStats stats;
  11. private final boolean isBinaryEncoding;
  12. private final AtomicLong currentRequestStartNanos = new AtomicLong();
  13. @GuardedBy("this")
  14. private boolean running;
  15. @GuardedBy("this")
  16. private ListenableFuture<BaseResponse<TaskStatus>> future;
  17. private final QuerySnapshotManager snapshotManager;

38.TaskInfoFetcher

用于收集worker任务信息。保存的信息与ContinuousTaskStatusFetcher类似

  1. private final TaskId taskId;
  2. private final Consumer<Throwable> onFail;
  3. private final StateMachine<TaskInfo> taskInfo;
  4. private final StateMachine<Optional<TaskInfo>> finalTaskInfo;
  5. private final Codec<TaskInfo> taskInfoCodec;
  6. private final long updateIntervalMillis;
  7. private final AtomicLong lastUpdateNanos = new AtomicLong();
  8. private final ScheduledExecutorService updateScheduledExecutor;
  9. private final Executor executor;
  10. private final HttpClient httpClient;
  11. private final RequestErrorTracker errorTracker;
  12. private final boolean summarizeTaskInfo;
  13. @GuardedBy("this")
  14. private final AtomicLong currentRequestStartNanos = new AtomicLong();
  15. private final RemoteTaskStats stats;
  16. private final boolean isBinaryEncoding;
  17. @GuardedBy("this")
  18. private boolean running;