- 1.ConnectorTableMetadata
- 2.ColumnMetadata
- 3.TableMetadata
- 4.CatalogName
- 5.ColumnHandle
- 6.Field
- 7.QualifiedObjectName
- 8.QualifiedName
- 9.Scope
- 10.RelationType
- 11.RelationPlanner
- 12.PlanSymbolAllocator
- 13.RelationPlan
- 14.TableScanNode
- 15.TableHandle
- 16.Symbol
- 17.ReuseExchangeOperator
- 18.RowExpression
- 19.PlanBuilder
- 20.TranslationMap
- 21.CallExpression
- 22.AggregationNode
- 23.PlanOptimizers
- 24.RuleStatsRecorder
- 25.RuleStats
- 26.OptimizerStatsRecorder
- 27.OptimizerStats
- 28.PlanOptimizer
- 29.PlanFragment
- 30.SqlStageExecution
- 31.QuerySnapshotManager
- 32.NodeMap
- 33.NodeTaskMap
- 34.HttpRemoteTask
- 35.ScheduledSplit
- 36.TaskInfo
- 37.ContinuousTaskStatusFetcher
- 38.TaskInfoFetcher
1.ConnectorTableMetadata
private final SchemaTableName table;
private final Optional<String> comment;
private final List<ColumnMetadata> columns;
private final Optional<List<ColumnMetadata>> immutableColumns;
private final Map<String, Object> properties;
private final Optional<Set<String>> nonInheritableProperties;
2.ColumnMetadata
private final String name;
private final Type type;
private final boolean nullable;
private final String comment;
private final String extraInfo;
private final boolean hidden;
private final boolean required;
private final Map<String, Object> properties;
3.TableMetadata
private final CatalogName catalogName;
private final ConnectorTableMetadata metadata;
4.CatalogName
private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";
private final String catalogName;
5.ColumnHandle
�ColumnHandle是spi模块下的接口类,不同的类型实现的该句柄的信息不一样
5.1JdbcColumnHandle
以我们最熟悉的JDBC为例,查看对应的列信息
private final String columnName;
private final JdbcTypeHandle jdbcTypeHandle;
private final Type columnType;
private final boolean nullable;
5.1.1 Type
Type是一个接口,不同类型都要实现该类,比如常见:CharType,BigintType等
�
6.Field
字段信息保存的值
private final Optional<QualifiedObjectName> originTable; // 该字段属于那个catalog的schema下表
private final Optional<String> originColumnName; // 原始字段名称
private final Optional<QualifiedName> relationAlias;
private final Optional<String> name;
private final Type type;
private final boolean hidden;
private final boolean aliased;
7.QualifiedObjectName
限定名,一般用于保存 catalogname.schemaname.object, 类似于:mysql.test.student,其中,mysql是catalogname,test是schemaname,student是object
�
private final String catalogName; // catalogName
private final String schemaName; // schemaName
private final String objectName; // 对象名
8.QualifiedName
限定名
�
// 用于保存小写格式的限定名,catalog.schema.table
private final List<String> parts;
// 保存原始的限定名
private final List<Identifier> originalParts;
9.Scope
具有范围,作用域的意思
private final Optional<Scope> parent;
private final boolean queryBoundary;
private final RelationId relationId;
private final RelationType relation;
private final Map<String, WithQuery> namedQueries;
10.RelationType
// 可见字段
private final List<Field> visibleFields;
// 所有字段
private final List<Field> allFields;
// 字段与索引的映射关系
private final Map<Field, Integer> fieldIndexes;
11.RelationPlanner
private static final Logger LOG = Logger.get(RelationPlanner.class); // log日志
private final Analysis analysis; // 保存分析过程的
private final PlanSymbolAllocator planSymbolAllocator; // 标识分配器
private final PlanNodeIdAllocator idAllocato // 计划节点分配器
private final Map<NodeRef<LambdaArgumentDeclaration>, Symbol> lambdaDeclarationToSymbolMap; // lambda声明到标识映射关系
private final Metadata metadata; // 元数据
private final TypeCoercion typeCoercion; // 类型
private final Session session; // 会话
private final SubqueryPlanner subqueryPlanner; // 子计划
private final Map<QualifiedName, Integer> namedSubPlan; // 列名 与 索引映射关系
private final UniqueIdAllocator uniqueIdAllocator; // 唯一分配器
12.PlanSymbolAllocator
private final Map<Symbol, Type> symbols; //字段映射关系
private int nextId; //
13.RelationPlan
private final PlanNode root; // 计划节点
// 来自root节点对应的标识号
private final List<Symbol> fieldMappings; // for each field in the relation, the corresponding symbol from "root"
// 范围
private final Scope scope;
14.TableScanNode
TableScanNode继承自PlanNode节点
private final TableHandle table; // 表句柄信息,包含了连接表的信息
private final List<Symbol> outputSymbols;
private final Map<Symbol, ColumnHandle> assignments; // symbol -> column
private final TupleDomain<ColumnHandle> enforcedConstraint;
private final Optional<RowExpression> predicate;
private final boolean forDelete;
private ReuseExchangeOperator.STRATEGY strategy;
private UUID reuseTableScanMappingId;
private RowExpression filterExpr;
private Integer consumerTableScanNodeCount;
15.TableHandle
private final CatalogName catalogName; // 属于哪个catalog
private final ConnectorTableHandle connectorHandle; // 以及对应的表的连接信息
private final ConnectorTransactionHandle transaction; //
16.Symbol
private final String name;
17.ReuseExchangeOperator
enum STRATEGY {
REUSE_STRATEGY_DEFAULT, // 默认重启策略
REUSE_STRATEGY_PRODUCER, // 生产者重启策略
REUSE_STRATEGY_CONSUMER // 消费者重启策略
}
18.RowExpression
RowExpression实现不同行表达式
19.PlanBuilder
private final TranslationMap translations;
private final List<Expression> parameters;
private final PlanNode root;
20.TranslationMap
跟踪字段和表达式以及它们到当前计划中的符号的映射
// all expressions are rewritten in terms of fields declared by this relation plan
// 所有表达式都根据此关系计划声明的字段重写
private final RelationPlan rewriteBase;
private final Analysis analysis;
private final Map<NodeRef<LambdaArgumentDeclaration>, Symbol> lambdaDeclarationToSymbolMap;
// current mappings of underlying field -> symbol for translating direct field references
// 基础字段的当前映射 -> 用于翻译直接字段引用的符号
private final Symbol[] fieldSymbols;
// current mappings of sub-expressions -> symbol
private final Map<Expression, Symbol> expressionToSymbols = new HashMap<>();
private final Map<Expression, Expression> expressionToExpressions = new HashMap<>();
21.CallExpression
private final String displayName;
private final FunctionHandle functionHandle;
private final Type returnType;
private final List<RowExpression> arguments;
private final Optional<RowExpression> filter;
22.AggregationNode
private final PlanNode source;
private final Map<Symbol, Aggregation> aggregations;
private final GroupingSetDescriptor groupingSets;
private final List<Symbol> preGroupedSymbols;
private final Step step;
private final Optional<Symbol> hashSymbol;
private final Optional<Symbol> groupIdSymbol;
private final List<Symbol> outputs;
23.PlanOptimizers
PlanOptimizers 优化器集合
private final List<PlanOptimizer> optimizers;
private final RuleStatsRecorder ruleStats = new RuleStatsRecorder();
private final OptimizerStatsRecorder optimizerStats = new OptimizerStatsRecorder();
private final MBeanExporter exporter;
24.RuleStatsRecorder
规则状态记录器
private final Map<Class<?>, RuleStats> stats = new HashMap<>(); // 标识一个类与规则状态映射关系
25.RuleStats
private final AtomicLong hits = new AtomicLong();
private final TimeDistribution time = new TimeDistribution(TimeUnit.MICROSECONDS);
private final AtomicLong failures = new AtomicLong();
TimeDistribution 需要测试一下
26.OptimizerStatsRecorder
private final Map<Class<?>, OptimizerStats> stats = new HashMap<>();
27.OptimizerStats
private final AtomicLong failures = new AtomicLong();
private final TimeDistribution time = new TimeDistribution(TimeUnit.MICROSECONDS);
28.PlanOptimizer
PlanOptimizer是一个接口,所有关于优化相关操作,都需要继承该接口,实现optimize方法
29.PlanFragment
private final PlanFragmentId id; //
private final PlanNode root;
private final Map<Symbol, Type> symbols;
private final PartitioningHandle partitioning;
private final List<PlanNodeId> partitionedSources;
private final Set<PlanNodeId> partitionedSourcesSet;
private final List<Type> types;
private final Set<PlanNode> partitionedSourceNodes;
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final StageExecutionDescriptor stageExecutionDescriptor;
private final StatsAndCosts statsAndCosts;
private final Optional<String> jsonRepresentation;
private final Optional<PlanFragmentId> feederCTEId;
private final Optional<PlanNodeId> feederCTEParentId;
30.SqlStageExecution
private static final Logger log = Logger.get(SqlStageExecution.class);
private final StageStateMachine stateMachine; // 状态机
private final RemoteTaskFactory remoteTaskFactory; // 任务工厂类
private final NodeTaskMap nodeTaskMap; // 保存 node 与 task 的映射关系
private final boolean summarizeTaskInfo; // 是否总结任务信息
private final Executor executor; // 执行器
private final FailureDetector failureDetector; // 失败探测器
private final Map<PlanFragmentId, RemoteSourceNode> exchangeSources; // 计划分段与源节点映射关系
private final Map<InternalNode, Set<RemoteTask>> tasks = new ConcurrentHashMap<>(); // 节点 与 任务的映射关系, 每个节点所包含的任务个数
@GuardedBy("this")
private final AtomicInteger nextTaskId = new AtomicInteger(); // 下个taskId
@GuardedBy("this")
private final Set<TaskId> allTasks = newConcurrentHashSet(); // 所有的任务列表
@GuardedBy("this")
private final Set<TaskId> finishedTasks = newConcurrentHashSet(); // 已经完成的任务列表
@GuardedBy("this")
private final Set<TaskId> tasksWithFinalInfo = newConcurrentHashSet(); // 已经完成任务的最终消息
@GuardedBy("this")
private final AtomicBoolean splitsScheduled = new AtomicBoolean(); // 是否开启切片调度
@GuardedBy("this")
private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create(); // 源任务
@GuardedBy("this")
private final Set<PlanNodeId> completeSources = newConcurrentHashSet(); // 已经完成的 源节点
@GuardedBy("this")
private final Set<PlanFragmentId> completeSourceFragments = newConcurrentHashSet(); // 已经完成的计划分片
private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>(); // 输出缓存
private final ListenerManager<Set<Lifespan>> completedLifespansChangeListeners = new ListenerManager<>(); // 已经完成的 生命周期变化 监听器
private final DynamicFilterService dynamicFilterService; // 动态过滤服务
private final AtomicBoolean dynamicFilterSchedulingInfoPropagated = new AtomicBoolean(); // 传播的动态过滤器调度信息
@GuardedBy("SqlStageExecution.class")
public static Map<QueryId, List<UUID>> queryIdReuseTableScanMappingIdFinishedMap = new ConcurrentHashMap<>();
private PlanNodeId parentId;
private final QuerySnapshotManager snapshotManager;
private boolean throttledSchedule;
31.QuerySnapshotManager
/*
- On the coordinator, it keeps track of snapshot status of all queries 在协调器节点, 它始终跟踪所有查询的状态
- On workers, it serves as a bridge between TaskSnapshotManager instances and the SnapshotStoreClient 在work节点, 工作节点作为 任务快照管理实例 和 快照存储客户端的 连接桥梁
/
32.NodeMap
private final SetMultimap<HostAddress, InternalNode> nodesByHostAndPort;
private final SetMultimap<InetAddress, InternalNode> nodesByHost;
private final SetMultimap<NetworkLocation, InternalNode> workersByNetworkPath;
private final Set<String> coordinatorNodeIds;
33.NodeTaskMap
private static final Logger log = Logger.get(NodeTaskMap.class);
private final ConcurrentHashMap<InternalNode, NodeTasks> nodeTasksMap = new ConcurrentHashMap<>(); // 节点与任务的关系
private final FinalizerService finalizerService;
34.HttpRemoteTask
private static final Logger log = Logger.get(HttpRemoteTask.class);
private final TaskId taskId; // 任务的ID
private final Session session; // 会话信息
private final String nodeId; // nodeID
private final PlanFragment planFragment; // 计划片段
private final OptionalInt totalPartitions; // 所有分区数
private final AtomicLong nextSplitId = new AtomicLong(); // 下一个split
private final RemoteTaskStats stats; // 保存任务统计信息
private final TaskInfoFetcher taskInfoFetcher; // 任务信息收集器, 包括任务失败,成功的状态, 链接信息
private final ContinuousTaskStatusFetcher taskStatusFetcher; // 持续
@GuardedBy("this")
private Future<?> currentRequest; // 当前请求信息
@GuardedBy("this")
private long currentRequestStartNanos; // 当前请求时间
@GuardedBy("this")
//LinkedHashMultimap is used to preserve the order of insertion in addSplits.
//It guarantees that TaskSources and their Splits will be sent in the order they're received.
private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = LinkedHashMultimap.create(); // 保存已经添加的split集合
@GuardedBy("this")
private volatile int pendingSourceSplitCount; // 已经添加源split的个数
@GuardedBy("this")
private final SetMultimap<PlanNodeId, Lifespan> pendingNoMoreSplitsForLifespan = HashMultimap.create();
@GuardedBy("this")
// The keys of this map represent all plan nodes that have "no more splits".
// The boolean value of each entry represents whether the "no more splits" notification is pending delivery to workers.
// 这个map中的key表示所有计划节点没有split
// value的boolean表示否是将"no more splits" 通知信息发送给worker节点
private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<>();
@GuardedBy("this")
private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>(); // 输出缓存
private final FutureStateChange<?> whenSplitQueueHasSpace = new FutureStateChange<>(); //
@GuardedBy("this")
private boolean splitQueueHasSpace = true;
@GuardedBy("this")
private OptionalInt whenSplitQueueHasSpaceThreshold = OptionalInt.empty();
private final boolean summarizeTaskInfo;
private final HttpClient httpClient;
private final Executor executor;
private final ScheduledExecutorService errorScheduledExecutor; // 调度错误的执行期
private final Codec<TaskInfo> taskInfoCodec; // 序列化任务信息
private final Codec<TaskUpdateRequest> taskUpdateRequestCodec; // 序列化任务更新请求
private final RequestErrorTracker updateErrorTracker; //
private final AtomicBoolean needsUpdate = new AtomicBoolean(true);
private final AtomicBoolean sendPlan = new AtomicBoolean(true);
private final PartitionedSplitCountTracker partitionedSplitCountTracker;
private final AtomicBoolean aborting = new AtomicBoolean(false);
private final AtomicBoolean abandoned = new AtomicBoolean(false);
private final AtomicBoolean cancelledToResume = new AtomicBoolean(false);
private final boolean isBinaryEncoding;
private Optional<PlanNodeId> parent; // 是否有父节点
35.ScheduledSplit
private final long sequenceId; // 顺序ID
private final PlanNodeId planNodeId; // 节点的ID
private final Split split; // 节点对应的split
36.TaskInfo
private final TaskStatus taskStatus; // 任务状态数据
private final DateTime lastHeartbeat; // 记录最后一次的时间
private final OutputBufferInfo outputBuffers; // 输出的缓存信息
private final Set<PlanNodeId> noMoreSplits; // planNode集合
private final TaskStats stats; // 任务统计信息
private final boolean needsPlan; //
37.ContinuousTaskStatusFetcher
private static final Logger log = Logger.get(ContinuousTaskStatusFetcher.class);
private final TaskId taskId; // task ID
private final Consumer<Throwable> onFail; // task失败进行的回调函数
private final StateMachine<TaskStatus> taskStatus; // 任务状态机
private final Codec<TaskStatus> taskStatusCodec; // 序列化任务状态信息
private final Duration refreshMaxWait;
private final Executor executor;
private final HttpClient httpClient;
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
private final boolean isBinaryEncoding;
private final AtomicLong currentRequestStartNanos = new AtomicLong();
@GuardedBy("this")
private boolean running;
@GuardedBy("this")
private ListenableFuture<BaseResponse<TaskStatus>> future;
private final QuerySnapshotManager snapshotManager;
38.TaskInfoFetcher
用于收集worker任务信息。保存的信息与ContinuousTaskStatusFetcher类似
private final TaskId taskId;
private final Consumer<Throwable> onFail;
private final StateMachine<TaskInfo> taskInfo;
private final StateMachine<Optional<TaskInfo>> finalTaskInfo;
private final Codec<TaskInfo> taskInfoCodec;
private final long updateIntervalMillis;
private final AtomicLong lastUpdateNanos = new AtomicLong();
private final ScheduledExecutorService updateScheduledExecutor;
private final Executor executor;
private final HttpClient httpClient;
private final RequestErrorTracker errorTracker;
private final boolean summarizeTaskInfo;
@GuardedBy("this")
private final AtomicLong currentRequestStartNanos = new AtomicLong();
private final RemoteTaskStats stats;
private final boolean isBinaryEncoding;
@GuardedBy("this")
private boolean running;
�