SqlQueryExecution::start方法
@Override
public void start()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try {
// transition to planning 过渡到计划
if (!stateMachine.transitionToPlanning()) {
// query already started or finished
// 查询已经开始或完成
return;
}
// analyze query 根据query生成执行计划
PlanRoot plan = analyzeQuery();
try {
// 处理跨区域动态过滤器
handleCrossRegionDynamicFilter(plan);
}
catch (Throwable e) {
// ignore any exception
log.warn("something unexpected happened.. cause: %s", e.getMessage());
}
// plan distribution of query 计划查询分布式
planDistribution(plan);
// transition to starting
if (!stateMachine.transitionToStarting()) {
// query already started or finished
return;
}
stateMachine.addStateChangeListener(state -> {
if (state == QueryState.RESUMING) {
// Snapshot: old stages/tasks have finished. Ready to resume.
try {
// 当完成分布式计划后,需要进行
resumeQuery(plan);
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
log.warn(e, "Encountered error while rescheduling query");
}
}
});
// if query is not finished, start the scheduler, otherwise cancel it
// 如果查询未完成,请启动调度程序,否则取消调度程序
SqlQueryScheduler scheduler = queryScheduler.get();
if (!stateMachine.isDone()) {
scheduler.start();
}
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
log.warn(e, "Encountered error while scheduling query");
}
}
}
SqlQueryScheduler::start方法开始进行调度
public void start()
{
if (started.compareAndSet(false, true)) {
executor.submit(this::schedule);
}
}
其实内部调用内部的schedule方法
private void schedule()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
// 保存stageId
Set<StageId> completedStages = new HashSet<>();
// 根据SqlStageExecution 属于哪个阶段的执行,执行策略创建对应的执行调度器
ExecutionSchedule executionSchedule = executionPolicy.createExecutionSchedule(stages.values());
// 如果
while (!executionSchedule.isFinished()) {
// 保存堵塞的stage
List<ListenableFuture<?>> blockedStages = new ArrayList<>();
//
for (SqlStageExecution stage : executionSchedule.getStagesToSchedule()) {
if (isReuseTableScanEnabled(session) && !SqlStageExecution.getReuseTableScanMappingIdStatus(stage.getStateMachine())) {
continue;
}
stage.beginScheduling();
// Resource group: Check if memory usage within the current resource grooup has exceeded
// configured limit. If yes throttle further split scheduling.
// Throttle Logic: Wait for x seconds (Wait time will increase till max as per THROTTLE_SLEEP_TIMER)
// and then let it schedule 10% of splits.
if (!canScheduleMoreSplits()) {
try {
SECONDS.sleep(THROTTLE_SLEEP_TIMER[currentTimerLevel]);
}
catch (InterruptedException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "interrupted while sleeping");
}
currentTimerLevel = Math.min(currentTimerLevel + 1, THROTTLE_SLEEP_TIMER.length - 1);
stage.setThrottledSchedule(true);
}
else {
stage.setThrottledSchedule(false);
currentTimerLevel = 0;
}
// perform some scheduling work 执行一些调度工作
// 根据stageId获取对应的调度器进行执行,并返回调度结果
ScheduleResult result = stageSchedulers.get(stage.getStageId())
.schedule();
// modify parent and children based on the results of the scheduling
if (result.isFinished()) {
stage.schedulingComplete();
}
else if (!result.getBlocked().isDone()) {
blockedStages.add(result.getBlocked());
}
stageLinkages.get(stage.getStageId())
.processScheduleResults(stage.getState(), result.getNewTasks());
schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
if (result.getBlockedReason().isPresent()) {
switch (result.getBlockedReason().get()) {
case WRITER_SCALING:
// no-op
break;
case WAITING_FOR_SOURCE:
schedulerStats.getWaitingForSource().update(1);
break;
case SPLIT_QUEUES_FULL:
schedulerStats.getSplitQueuesFull().update(1);
break;
case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
case NO_ACTIVE_DRIVER_GROUP:
break;
default:
throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
}
}
}
// make sure to update stage linkage at least once per loop to catch async state changes (e.g., partial cancel)
for (SqlStageExecution stage : stages.values()) {
if (!completedStages.contains(stage.getStageId()) && stage.getState().isDone()) {
stageLinkages.get(stage.getStageId())
.processScheduleResults(stage.getState(), ImmutableSet.of());
completedStages.add(stage.getStageId());
}
}
// wait for a state change and then schedule again
if (!blockedStages.isEmpty()) {
try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
}
for (ListenableFuture<?> blockedStage : blockedStages) {
blockedStage.cancel(true);
}
}
}
for (SqlStageExecution stage : stages.values()) {
StageState state = stage.getState();
// Snapshot: if state is resumable_failure, then state of stage and query will change soon again. Don't treat as an error.
if (state != SCHEDULED && state != RUNNING && !state.isDone() && state != RESUMABLE_FAILURE) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stage.getStageId(), state));
}
}
}
catch (Throwable t) {
queryStateMachine.transitionToFailed(t);
throw t;
}
finally {
RuntimeException closeError = new RuntimeException();
for (StageScheduler scheduler : stageSchedulers.values()) {
try {
// Snapshot: when trying to reschedule, then don't close the scheduler (and more importantly, split sources in it)
QueryState state = queryStateMachine.getQueryState();
if (state != QueryState.RESCHEDULING && state != QueryState.RESUMING) {
scheduler.close();
}
}
catch (Throwable t) {
queryStateMachine.transitionToFailed(t);
// Self-suppression not permitted
if (closeError != t) {
closeError.addSuppressed(t);
}
}
}
// Snpashot: if resuming, notify the new scheduler so it can start scheduling new stages
schedulingFuture.set(null);
if (closeError.getSuppressed().length > 0) {
throw closeError;
}
}
}
查看StageScheduler,是一个接口,实现不同阶段调度器的作用,我们以SourcePartitionedScheduler为例,schedule实现该方法
public PlanNodeId getPlanNodeId()
{
return partitionedNode;
}
/**
* Obtains an instance of {@code SourcePartitionedScheduler} suitable for use as a
* stage scheduler. 获取一个sourcepartitionedscheduler实例,作为一个阶段的调度器
* <p>
* This returns an ungrouped {@code SourcePartitionedScheduler} that requires
* minimal management from the caller, which is ideal for use as a stage scheduler.
* 返回一个为分组的source调度器
*
*/
public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
SqlStageExecution stage,
PlanNodeId partitionedNode,
SplitSource splitSource,
SplitPlacementPolicy splitPlacementPolicy,
int splitBatchSize,
Session session,
HeuristicIndexerManager heuristicIndexerManager,
Map<Integer, Integer> parallelSources)
{
// 创建源调度器,
SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(stage, partitionedNode, splitSource,
splitPlacementPolicy, splitBatchSize, false, session, heuristicIndexerManager,
parallelSources == null ? 1 : parallelSources.getOrDefault(stage.getStageId().getId(), 1));
sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NOT_PARTITIONED);
sourcePartitionedScheduler.noMoreLifespans();
// 重载源schedule调度方法
return new StageScheduler()
{
@Override
public ScheduleResult schedule()
{
ScheduleResult scheduleResult = sourcePartitionedScheduler.schedule();
sourcePartitionedScheduler.drainCompletedLifespans();
return scheduleResult;
}
@Override
public void close()
{
sourcePartitionedScheduler.close();
}
};
}
部分代码入口,source调度器进行调度使用,
进入sourcePartitionedScheduler::schedule方法内
@Override
public synchronized ScheduleResult schedule()
{
// 遍历分组的调度器
for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
// .....
// 分配好split进行task的创建
// assign the splits with successful placements
overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));
// ......
}
}
overallNewTasks 保存了所有的task,其中调用了assignSplits的方法,具体实现如下所示:
private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> splitAssignment, Multimap<InternalNode, Lifespan> noMoreSplitsNotification)
{
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
// 收集所有的节点
ImmutableSet<InternalNode> nodes = ImmutableSet.<InternalNode>builder()
.addAll(splitAssignment.keySet())
.addAll(noMoreSplitsNotification.keySet())
.build();
// 遍历每个node
for (InternalNode node : nodes) {
// source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
// 源分区任务只能接受广播数据,否则它有一个不同的分布式
ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
.putAll(partitionedNode, splitAssignment.get(node)) // 每个节点对应的split
.build();
// 每个节点对应的生命周阶段
ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
if (noMoreSplitsNotification.containsKey(node)) {
noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
}
// node, split, lifespan 共同创建task
newTasks.addAll(stage.scheduleSplits(
node,
splits,
noMoreSplits.build()));
}
return newTasks.build();
}
进一步分析SqlStageExecution::scheduleSplits的方法
public synchronized Set<RemoteTask> scheduleSplits(InternalNode node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");
// 如果检测到stage的状态已完成,则返回空任务, 并将split调度表示置为true
if (stateMachine.getState().isDone()) {
return ImmutableSet.of();
}
splitsScheduled.set(true);
checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");
// 创建新任务集合
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
Collection<RemoteTask> tasks = this.tasks.get(node);
RemoteTask task;
/**
* 1。如果tasks集合中没有对应的远程任务是需要进行创建
* 2。否则需要将splits添加到对应的任务中去
*/
if (tasks == null) {
// The output buffer depends on the task id starting from 0 and being sequential, since each
// task is assigned a private buffer based on task id.
TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
task = scheduleTask(node, taskId, splits, OptionalInt.empty()); // TODO 主要用于创建任务
newTasks.add(task);
}
else {
task = tasks.iterator().next();
task.addSplits(splits);
}
if (noMoreSplitsNotification.size() > 1) {
// The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
// If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
// These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
}
for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
task.noMoreSplits(entry.getKey(), entry.getValue());
}
return newTasks.build();
}
所有任务的创建都会调用到该方法:SqlStageExecution::scheduleTask
private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits, OptionalInt totalPartitions)
{
checkArgument(!allTasks.contains(taskId), "A task with id %s already exists", taskId);
// 是否开启快照功能
if (SystemSessionProperties.isSnapshotEnabled(stateMachine.getSession())) {
// Snapshot: inform snapshot manager so it knows about all tasks,
// and can determine if a snapshot is complete for all tasks.
snapshotManager.addNewTask(taskId);
}
ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
initialSplits.putAll(sourceSplits);
/**
* 1。遍历所有源任务,查看任务状态是否为完成,如果没有完成,则需要将远程任务,并根据任务ID创建split
*/
sourceTasks.forEach((planNodeId, task) -> {
TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
}
});
OutputBuffers outputBuffers = this.outputBuffers.get();
checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");
//
RemoteTask task = remoteTaskFactory.createRemoteTask(
stateMachine.getSession(),
taskId,
node,
stateMachine.getFragment(),
initialSplits.build(),
totalPartitions,
outputBuffers,
nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
summarizeTaskInfo,
Optional.ofNullable(parentId),
snapshotManager);
completeSources.forEach(task::noMoreSplits);
allTasks.add(taskId);
tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
nodeTaskMap.addTask(node, task);
task.addStateChangeListener(new StageTaskListener());
task.addFinalTaskInfoListener(this::updateFinalTaskInfo);
if (!stateMachine.getState().isDone()) {
task.start();
}
else {
// stage finished while we were scheduling this task
// 如果stage完成了,然而我们还是调用的话,需要丢弃该任务
task.abort();
}
return task;
}
最终会调用RemoteTaskFactory::createRemoteTask的方法来执行任务的创建,HttpRemoteTaskFactory重载该方法用于创建HttpRemoteTask,具体代码实现如下:
@Override
public RemoteTask createRemoteTask(Session session,
TaskId taskId,
InternalNode node,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
OptionalInt totalPartitions,
OutputBuffers outputBuffers,
PartitionedSplitCountTracker partitionedSplitCountTracker,
boolean summarizeTaskInfo,
Optional<PlanNodeId> parent,
QuerySnapshotManager snapshotManager)
{
return new HttpRemoteTask(session,
taskId,
node.getNodeIdentifier(),
locationFactory.createTaskLocation(node, taskId),
fragment,
initialSplits,
totalPartitions,
outputBuffers,
httpClient,
executor,
updateScheduledExecutor,
errorScheduledExecutor,
maxErrorDuration,
taskStatusRefreshMaxWait,
taskInfoUpdateInterval,
summarizeTaskInfo,
taskStatusCodec,
taskInfoCodec,
taskUpdateRequestCodec,
partitionedSplitCountTracker,
stats,
isBinaryEncoding,
parent,
snapshotManager);
}
�HttpRemoteTask构造函数中的具体实现方式
public HttpRemoteTask(Session session,
TaskId taskId,
String nodeId,
URI location,
PlanFragment planFragment,
Multimap<PlanNodeId, Split> initialSplits,
OptionalInt totalPartitions,
OutputBuffers outputBuffers,
HttpClient httpClient,
Executor executor,
ScheduledExecutorService updateScheduledExecutor,
ScheduledExecutorService errorScheduledExecutor,
Duration maxErrorDuration,
Duration taskStatusRefreshMaxWait,
Duration taskInfoUpdateInterval,
boolean summarizeTaskInfo,
Codec<TaskStatus> taskStatusCodec,
Codec<TaskInfo> taskInfoCodec,
Codec<TaskUpdateRequest> taskUpdateRequestCodec,
PartitionedSplitCountTracker partitionedSplitCountTracker,
RemoteTaskStats stats,
boolean isBinaryEncoding,
Optional<PlanNodeId> parent,
QuerySnapshotManager snapshotManager)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(nodeId, "nodeId is null");
requireNonNull(location, "location is null");
requireNonNull(planFragment, "planFragment is null");
requireNonNull(totalPartitions, "totalPartitions is null");
requireNonNull(outputBuffers, "outputBuffers is null");
requireNonNull(httpClient, "httpClient is null");
requireNonNull(executor, "executor is null");
requireNonNull(taskStatusCodec, "taskStatusCodec is null");
requireNonNull(taskInfoCodec, "taskInfoCodec is null");
requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
requireNonNull(stats, "stats is null");
requireNonNull(parent, "parent is null");
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
this.session = session;
this.nodeId = nodeId;
this.planFragment = planFragment;
this.totalPartitions = totalPartitions;
this.outputBuffers.set(outputBuffers);
this.httpClient = httpClient;
this.executor = executor;
this.errorScheduledExecutor = errorScheduledExecutor;
this.summarizeTaskInfo = summarizeTaskInfo;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
this.stats = stats;
this.isBinaryEncoding = isBinaryEncoding;
this.parent = parent;
for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
pendingSplits.put(entry.getKey(), scheduledSplit); // planNodeId --> split(splitId, plannodeId, Split)
}
// 统计所有split的总数
pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
.filter(initialSplits::containsKey)
.mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
.sum();
//
List<BufferInfo> bufferStates = outputBuffers.getBuffers()
.keySet().stream()
.map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
.collect(toImmutableList());
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
this::failTask,
initialTask.getTaskStatus(),
taskStatusRefreshMaxWait,
taskStatusCodec,
executor,
httpClient,
maxErrorDuration,
errorScheduledExecutor,
stats,
isBinaryEncoding,
snapshotManager);
this.taskInfoFetcher = new TaskInfoFetcher(
this::failTask,
initialTask,
httpClient,
taskInfoUpdateInterval,
taskInfoCodec,
maxErrorDuration,
summarizeTaskInfo,
executor,
updateScheduledExecutor,
errorScheduledExecutor,
stats,
isBinaryEncoding);
// 添加监听器, 当状态发生变化时,需要进行回调
taskStatusFetcher.addStateChangeListener(newStatus -> {
TaskState state = newStatus.getState();
if (state.isDone()) {
cleanUpTask(state);
}
else {
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
});
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
}
创建完remote task之后,继续看代码,可以看到任务执行start方法启动任务,
最终调用HttpRemoteTask::start方法,具体代码实现方式
@Override
public void start()
{
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
// to start we just need to trigger an update
// 启动调度任务
scheduleUpdate();
taskStatusFetcher.start(); // 任务状态收集器
taskInfoFetcher.start(); // 任务信息收集器
}
}
最终会调用HttpRemoteTask::�scheduleUpdate的方法进行开始任务的调度,具体代码:
private void scheduleUpdate()
{
executor.execute(this::sendUpdate);
}
�
private synchronized void sendUpdate()
{
if (abandoned.get()) {
// Snapshot: Corresponding task has been canceled to resume. Stop any communication with it.
// 相应任务已经取消需要重启的,需停止与他进行通信
return;
}
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
// 如果任务未开始,或者已经结束了,不需要更新
if (!needsUpdate.get() || taskStatus.getState().isDone()) {
return;
}
// if there is a request already running, wait for it to complete
// 有一个请求已经正在运行等待完成,不需要更新
if (this.currentRequest != null && !this.currentRequest.isDone()) {
return;
}
// if throttled due to error, asynchronously wait for timeout and try again
// 如果误杀任务导致错误的,异步的等待超时或者重试
ListenableFuture<?> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}
List<TaskSource> sources = getSources();
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
// Snapshot: Add task instance id to all task related requests,
// so receiver can verify if the instance id matches
taskStatus.getTaskInstanceId(),
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
sources,
outputBuffers.get(),
totalPartitions,
parent);
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = setContentTypeHeaders(isBinaryEncoding, preparePost())
.setUri(uriBuilder.build())
.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(taskUpdateRequestJson))
// .setBodyGenerator(createBodyGenerator(updateRequest))
.build();
// 是否需要进行二进制加密发送
ResponseHandler responseHandler;
if (isBinaryEncoding) {
responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);
}
else {
responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));
}
updateErrorTracker.startRequest();
// 开发发送
ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);
currentRequest = future;
currentRequestStartNanos = System.nanoTime();
// The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
// and does so without grabbing the instance lock.
needsUpdate.set(false);
// 当成功结果返回后,会执行SimpleHttpResponseHandler的onSuccess, 失败会执行onFailure
Futures.addCallback(future, new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats), executor);
}