SqlQueryExecution::start方法

    1. @Override
    2. public void start()
    3. {
    4. try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
    5. try {
    6. // transition to planning 过渡到计划
    7. if (!stateMachine.transitionToPlanning()) {
    8. // query already started or finished
    9. // 查询已经开始或完成
    10. return;
    11. }
    12. // analyze query 根据query生成执行计划
    13. PlanRoot plan = analyzeQuery();
    14. try {
    15. // 处理跨区域动态过滤器
    16. handleCrossRegionDynamicFilter(plan);
    17. }
    18. catch (Throwable e) {
    19. // ignore any exception
    20. log.warn("something unexpected happened.. cause: %s", e.getMessage());
    21. }
    22. // plan distribution of query 计划查询分布式
    23. planDistribution(plan);
    24. // transition to starting
    25. if (!stateMachine.transitionToStarting()) {
    26. // query already started or finished
    27. return;
    28. }
    29. stateMachine.addStateChangeListener(state -> {
    30. if (state == QueryState.RESUMING) {
    31. // Snapshot: old stages/tasks have finished. Ready to resume.
    32. try {
    33. // 当完成分布式计划后,需要进行
    34. resumeQuery(plan);
    35. }
    36. catch (Throwable e) {
    37. fail(e);
    38. throwIfInstanceOf(e, Error.class);
    39. log.warn(e, "Encountered error while rescheduling query");
    40. }
    41. }
    42. });
    43. // if query is not finished, start the scheduler, otherwise cancel it
    44. // 如果查询未完成,请启动调度程序,否则取消调度程序
    45. SqlQueryScheduler scheduler = queryScheduler.get();
    46. if (!stateMachine.isDone()) {
    47. scheduler.start();
    48. }
    49. }
    50. catch (Throwable e) {
    51. fail(e);
    52. throwIfInstanceOf(e, Error.class);
    53. log.warn(e, "Encountered error while scheduling query");
    54. }
    55. }
    56. }

    image.png

    SqlQueryScheduler::start方法开始进行调度

    1. public void start()
    2. {
    3. if (started.compareAndSet(false, true)) {
    4. executor.submit(this::schedule);
    5. }
    6. }

    其实内部调用内部的schedule方法

    1. private void schedule()
    2. {
    3. try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
    4. // 保存stageId
    5. Set<StageId> completedStages = new HashSet<>();
    6. // 根据SqlStageExecution 属于哪个阶段的执行,执行策略创建对应的执行调度器
    7. ExecutionSchedule executionSchedule = executionPolicy.createExecutionSchedule(stages.values());
    8. // 如果
    9. while (!executionSchedule.isFinished()) {
    10. // 保存堵塞的stage
    11. List<ListenableFuture<?>> blockedStages = new ArrayList<>();
    12. //
    13. for (SqlStageExecution stage : executionSchedule.getStagesToSchedule()) {
    14. if (isReuseTableScanEnabled(session) && !SqlStageExecution.getReuseTableScanMappingIdStatus(stage.getStateMachine())) {
    15. continue;
    16. }
    17. stage.beginScheduling();
    18. // Resource group: Check if memory usage within the current resource grooup has exceeded
    19. // configured limit. If yes throttle further split scheduling.
    20. // Throttle Logic: Wait for x seconds (Wait time will increase till max as per THROTTLE_SLEEP_TIMER)
    21. // and then let it schedule 10% of splits.
    22. if (!canScheduleMoreSplits()) {
    23. try {
    24. SECONDS.sleep(THROTTLE_SLEEP_TIMER[currentTimerLevel]);
    25. }
    26. catch (InterruptedException e) {
    27. throw new PrestoException(GENERIC_INTERNAL_ERROR, "interrupted while sleeping");
    28. }
    29. currentTimerLevel = Math.min(currentTimerLevel + 1, THROTTLE_SLEEP_TIMER.length - 1);
    30. stage.setThrottledSchedule(true);
    31. }
    32. else {
    33. stage.setThrottledSchedule(false);
    34. currentTimerLevel = 0;
    35. }
    36. // perform some scheduling work 执行一些调度工作
    37. // 根据stageId获取对应的调度器进行执行,并返回调度结果
    38. ScheduleResult result = stageSchedulers.get(stage.getStageId())
    39. .schedule();
    40. // modify parent and children based on the results of the scheduling
    41. if (result.isFinished()) {
    42. stage.schedulingComplete();
    43. }
    44. else if (!result.getBlocked().isDone()) {
    45. blockedStages.add(result.getBlocked());
    46. }
    47. stageLinkages.get(stage.getStageId())
    48. .processScheduleResults(stage.getState(), result.getNewTasks());
    49. schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
    50. if (result.getBlockedReason().isPresent()) {
    51. switch (result.getBlockedReason().get()) {
    52. case WRITER_SCALING:
    53. // no-op
    54. break;
    55. case WAITING_FOR_SOURCE:
    56. schedulerStats.getWaitingForSource().update(1);
    57. break;
    58. case SPLIT_QUEUES_FULL:
    59. schedulerStats.getSplitQueuesFull().update(1);
    60. break;
    61. case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
    62. case NO_ACTIVE_DRIVER_GROUP:
    63. break;
    64. default:
    65. throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
    66. }
    67. }
    68. }
    69. // make sure to update stage linkage at least once per loop to catch async state changes (e.g., partial cancel)
    70. for (SqlStageExecution stage : stages.values()) {
    71. if (!completedStages.contains(stage.getStageId()) && stage.getState().isDone()) {
    72. stageLinkages.get(stage.getStageId())
    73. .processScheduleResults(stage.getState(), ImmutableSet.of());
    74. completedStages.add(stage.getStageId());
    75. }
    76. }
    77. // wait for a state change and then schedule again
    78. if (!blockedStages.isEmpty()) {
    79. try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
    80. tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
    81. }
    82. for (ListenableFuture<?> blockedStage : blockedStages) {
    83. blockedStage.cancel(true);
    84. }
    85. }
    86. }
    87. for (SqlStageExecution stage : stages.values()) {
    88. StageState state = stage.getState();
    89. // Snapshot: if state is resumable_failure, then state of stage and query will change soon again. Don't treat as an error.
    90. if (state != SCHEDULED && state != RUNNING && !state.isDone() && state != RESUMABLE_FAILURE) {
    91. throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stage.getStageId(), state));
    92. }
    93. }
    94. }
    95. catch (Throwable t) {
    96. queryStateMachine.transitionToFailed(t);
    97. throw t;
    98. }
    99. finally {
    100. RuntimeException closeError = new RuntimeException();
    101. for (StageScheduler scheduler : stageSchedulers.values()) {
    102. try {
    103. // Snapshot: when trying to reschedule, then don't close the scheduler (and more importantly, split sources in it)
    104. QueryState state = queryStateMachine.getQueryState();
    105. if (state != QueryState.RESCHEDULING && state != QueryState.RESUMING) {
    106. scheduler.close();
    107. }
    108. }
    109. catch (Throwable t) {
    110. queryStateMachine.transitionToFailed(t);
    111. // Self-suppression not permitted
    112. if (closeError != t) {
    113. closeError.addSuppressed(t);
    114. }
    115. }
    116. }
    117. // Snpashot: if resuming, notify the new scheduler so it can start scheduling new stages
    118. schedulingFuture.set(null);
    119. if (closeError.getSuppressed().length > 0) {
    120. throw closeError;
    121. }
    122. }
    123. }

    image.png
    查看StageScheduler,是一个接口,实现不同阶段调度器的作用,我们以SourcePartitionedScheduler为例,schedule实现该方法
    image.png

    1. public PlanNodeId getPlanNodeId()
    2. {
    3. return partitionedNode;
    4. }
    5. /**
    6. * Obtains an instance of {@code SourcePartitionedScheduler} suitable for use as a
    7. * stage scheduler. 获取一个sourcepartitionedscheduler实例,作为一个阶段的调度器
    8. * <p>
    9. * This returns an ungrouped {@code SourcePartitionedScheduler} that requires
    10. * minimal management from the caller, which is ideal for use as a stage scheduler.
    11. * 返回一个为分组的source调度器
    12. *
    13. */
    14. public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
    15. SqlStageExecution stage,
    16. PlanNodeId partitionedNode,
    17. SplitSource splitSource,
    18. SplitPlacementPolicy splitPlacementPolicy,
    19. int splitBatchSize,
    20. Session session,
    21. HeuristicIndexerManager heuristicIndexerManager,
    22. Map<Integer, Integer> parallelSources)
    23. {
    24. // 创建源调度器,
    25. SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(stage, partitionedNode, splitSource,
    26. splitPlacementPolicy, splitBatchSize, false, session, heuristicIndexerManager,
    27. parallelSources == null ? 1 : parallelSources.getOrDefault(stage.getStageId().getId(), 1));
    28. sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NOT_PARTITIONED);
    29. sourcePartitionedScheduler.noMoreLifespans();
    30. // 重载源schedule调度方法
    31. return new StageScheduler()
    32. {
    33. @Override
    34. public ScheduleResult schedule()
    35. {
    36. ScheduleResult scheduleResult = sourcePartitionedScheduler.schedule();
    37. sourcePartitionedScheduler.drainCompletedLifespans();
    38. return scheduleResult;
    39. }
    40. @Override
    41. public void close()
    42. {
    43. sourcePartitionedScheduler.close();
    44. }
    45. };
    46. }

    部分代码入口,source调度器进行调度使用,
    image.png
    进入sourcePartitionedScheduler::schedule方法内

    1. @Override
    2. public synchronized ScheduleResult schedule()
    3. {
    4. // 遍历分组的调度器
    5. for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
    6. // .....
    7. // 分配好split进行task的创建
    8. // assign the splits with successful placements
    9. overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));
    10. // ......
    11. }
    12. }

    overallNewTasks 保存了所有的task,其中调用了assignSplits的方法,具体实现如下所示:

    1. private Set<RemoteTask> assignSplits(Multimap<InternalNode, Split> splitAssignment, Multimap<InternalNode, Lifespan> noMoreSplitsNotification)
    2. {
    3. ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
    4. // 收集所有的节点
    5. ImmutableSet<InternalNode> nodes = ImmutableSet.<InternalNode>builder()
    6. .addAll(splitAssignment.keySet())
    7. .addAll(noMoreSplitsNotification.keySet())
    8. .build();
    9. // 遍历每个node
    10. for (InternalNode node : nodes) {
    11. // source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
    12. // 源分区任务只能接受广播数据,否则它有一个不同的分布式
    13. ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
    14. .putAll(partitionedNode, splitAssignment.get(node)) // 每个节点对应的split
    15. .build();
    16. // 每个节点对应的生命周阶段
    17. ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
    18. if (noMoreSplitsNotification.containsKey(node)) {
    19. noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
    20. }
    21. // node, split, lifespan 共同创建task
    22. newTasks.addAll(stage.scheduleSplits(
    23. node,
    24. splits,
    25. noMoreSplits.build()));
    26. }
    27. return newTasks.build();
    28. }

    进一步分析SqlStageExecution::scheduleSplits的方法

    1. public synchronized Set<RemoteTask> scheduleSplits(InternalNode node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
    2. {
    3. requireNonNull(node, "node is null");
    4. requireNonNull(splits, "splits is null");
    5. // 如果检测到stage的状态已完成,则返回空任务, 并将split调度表示置为true
    6. if (stateMachine.getState().isDone()) {
    7. return ImmutableSet.of();
    8. }
    9. splitsScheduled.set(true);
    10. checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");
    11. // 创建新任务集合
    12. ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
    13. Collection<RemoteTask> tasks = this.tasks.get(node);
    14. RemoteTask task;
    15. /**
    16. * 1。如果tasks集合中没有对应的远程任务是需要进行创建
    17. * 2。否则需要将splits添加到对应的任务中去
    18. */
    19. if (tasks == null) {
    20. // The output buffer depends on the task id starting from 0 and being sequential, since each
    21. // task is assigned a private buffer based on task id.
    22. TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
    23. task = scheduleTask(node, taskId, splits, OptionalInt.empty()); // TODO 主要用于创建任务
    24. newTasks.add(task);
    25. }
    26. else {
    27. task = tasks.iterator().next();
    28. task.addSplits(splits);
    29. }
    30. if (noMoreSplitsNotification.size() > 1) {
    31. // The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
    32. // If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
    33. // These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
    34. throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
    35. }
    36. for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
    37. task.noMoreSplits(entry.getKey(), entry.getValue());
    38. }
    39. return newTasks.build();
    40. }

    所有任务的创建都会调用到该方法:SqlStageExecution::scheduleTask

    1. private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits, OptionalInt totalPartitions)
    2. {
    3. checkArgument(!allTasks.contains(taskId), "A task with id %s already exists", taskId);
    4. // 是否开启快照功能
    5. if (SystemSessionProperties.isSnapshotEnabled(stateMachine.getSession())) {
    6. // Snapshot: inform snapshot manager so it knows about all tasks,
    7. // and can determine if a snapshot is complete for all tasks.
    8. snapshotManager.addNewTask(taskId);
    9. }
    10. ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
    11. initialSplits.putAll(sourceSplits);
    12. /**
    13. * 1。遍历所有源任务,查看任务状态是否为完成,如果没有完成,则需要将远程任务,并根据任务ID创建split
    14. */
    15. sourceTasks.forEach((planNodeId, task) -> {
    16. TaskStatus status = task.getTaskStatus();
    17. if (status.getState() != TaskState.FINISHED) {
    18. initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
    19. }
    20. });
    21. OutputBuffers outputBuffers = this.outputBuffers.get();
    22. checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");
    23. //
    24. RemoteTask task = remoteTaskFactory.createRemoteTask(
    25. stateMachine.getSession(),
    26. taskId,
    27. node,
    28. stateMachine.getFragment(),
    29. initialSplits.build(),
    30. totalPartitions,
    31. outputBuffers,
    32. nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
    33. summarizeTaskInfo,
    34. Optional.ofNullable(parentId),
    35. snapshotManager);
    36. completeSources.forEach(task::noMoreSplits);
    37. allTasks.add(taskId);
    38. tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
    39. nodeTaskMap.addTask(node, task);
    40. task.addStateChangeListener(new StageTaskListener());
    41. task.addFinalTaskInfoListener(this::updateFinalTaskInfo);
    42. if (!stateMachine.getState().isDone()) {
    43. task.start();
    44. }
    45. else {
    46. // stage finished while we were scheduling this task
    47. // 如果stage完成了,然而我们还是调用的话,需要丢弃该任务
    48. task.abort();
    49. }
    50. return task;
    51. }

    最终会调用RemoteTaskFactory::createRemoteTask的方法来执行任务的创建,HttpRemoteTaskFactory重载该方法用于创建HttpRemoteTask,具体代码实现如下:

    1. @Override
    2. public RemoteTask createRemoteTask(Session session,
    3. TaskId taskId,
    4. InternalNode node,
    5. PlanFragment fragment,
    6. Multimap<PlanNodeId, Split> initialSplits,
    7. OptionalInt totalPartitions,
    8. OutputBuffers outputBuffers,
    9. PartitionedSplitCountTracker partitionedSplitCountTracker,
    10. boolean summarizeTaskInfo,
    11. Optional<PlanNodeId> parent,
    12. QuerySnapshotManager snapshotManager)
    13. {
    14. return new HttpRemoteTask(session,
    15. taskId,
    16. node.getNodeIdentifier(),
    17. locationFactory.createTaskLocation(node, taskId),
    18. fragment,
    19. initialSplits,
    20. totalPartitions,
    21. outputBuffers,
    22. httpClient,
    23. executor,
    24. updateScheduledExecutor,
    25. errorScheduledExecutor,
    26. maxErrorDuration,
    27. taskStatusRefreshMaxWait,
    28. taskInfoUpdateInterval,
    29. summarizeTaskInfo,
    30. taskStatusCodec,
    31. taskInfoCodec,
    32. taskUpdateRequestCodec,
    33. partitionedSplitCountTracker,
    34. stats,
    35. isBinaryEncoding,
    36. parent,
    37. snapshotManager);
    38. }

    �HttpRemoteTask构造函数中的具体实现方式

    1. public HttpRemoteTask(Session session,
    2. TaskId taskId,
    3. String nodeId,
    4. URI location,
    5. PlanFragment planFragment,
    6. Multimap<PlanNodeId, Split> initialSplits,
    7. OptionalInt totalPartitions,
    8. OutputBuffers outputBuffers,
    9. HttpClient httpClient,
    10. Executor executor,
    11. ScheduledExecutorService updateScheduledExecutor,
    12. ScheduledExecutorService errorScheduledExecutor,
    13. Duration maxErrorDuration,
    14. Duration taskStatusRefreshMaxWait,
    15. Duration taskInfoUpdateInterval,
    16. boolean summarizeTaskInfo,
    17. Codec<TaskStatus> taskStatusCodec,
    18. Codec<TaskInfo> taskInfoCodec,
    19. Codec<TaskUpdateRequest> taskUpdateRequestCodec,
    20. PartitionedSplitCountTracker partitionedSplitCountTracker,
    21. RemoteTaskStats stats,
    22. boolean isBinaryEncoding,
    23. Optional<PlanNodeId> parent,
    24. QuerySnapshotManager snapshotManager)
    25. {
    26. requireNonNull(session, "session is null");
    27. requireNonNull(taskId, "taskId is null");
    28. requireNonNull(nodeId, "nodeId is null");
    29. requireNonNull(location, "location is null");
    30. requireNonNull(planFragment, "planFragment is null");
    31. requireNonNull(totalPartitions, "totalPartitions is null");
    32. requireNonNull(outputBuffers, "outputBuffers is null");
    33. requireNonNull(httpClient, "httpClient is null");
    34. requireNonNull(executor, "executor is null");
    35. requireNonNull(taskStatusCodec, "taskStatusCodec is null");
    36. requireNonNull(taskInfoCodec, "taskInfoCodec is null");
    37. requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
    38. requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
    39. requireNonNull(stats, "stats is null");
    40. requireNonNull(parent, "parent is null");
    41. try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
    42. this.taskId = taskId;
    43. this.session = session;
    44. this.nodeId = nodeId;
    45. this.planFragment = planFragment;
    46. this.totalPartitions = totalPartitions;
    47. this.outputBuffers.set(outputBuffers);
    48. this.httpClient = httpClient;
    49. this.executor = executor;
    50. this.errorScheduledExecutor = errorScheduledExecutor;
    51. this.summarizeTaskInfo = summarizeTaskInfo;
    52. this.taskInfoCodec = taskInfoCodec;
    53. this.taskUpdateRequestCodec = taskUpdateRequestCodec;
    54. this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
    55. this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
    56. this.stats = stats;
    57. this.isBinaryEncoding = isBinaryEncoding;
    58. this.parent = parent;
    59. for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
    60. ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
    61. pendingSplits.put(entry.getKey(), scheduledSplit); // planNodeId --> split(splitId, plannodeId, Split)
    62. }
    63. // 统计所有split的总数
    64. pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
    65. .filter(initialSplits::containsKey)
    66. .mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
    67. .sum();
    68. //
    69. List<BufferInfo> bufferStates = outputBuffers.getBuffers()
    70. .keySet().stream()
    71. .map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
    72. .collect(toImmutableList());
    73. TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
    74. this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
    75. this::failTask,
    76. initialTask.getTaskStatus(),
    77. taskStatusRefreshMaxWait,
    78. taskStatusCodec,
    79. executor,
    80. httpClient,
    81. maxErrorDuration,
    82. errorScheduledExecutor,
    83. stats,
    84. isBinaryEncoding,
    85. snapshotManager);
    86. this.taskInfoFetcher = new TaskInfoFetcher(
    87. this::failTask,
    88. initialTask,
    89. httpClient,
    90. taskInfoUpdateInterval,
    91. taskInfoCodec,
    92. maxErrorDuration,
    93. summarizeTaskInfo,
    94. executor,
    95. updateScheduledExecutor,
    96. errorScheduledExecutor,
    97. stats,
    98. isBinaryEncoding);
    99. // 添加监听器, 当状态发生变化时,需要进行回调
    100. taskStatusFetcher.addStateChangeListener(newStatus -> {
    101. TaskState state = newStatus.getState();
    102. if (state.isDone()) {
    103. cleanUpTask(state);
    104. }
    105. else {
    106. partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
    107. updateSplitQueueSpace();
    108. }
    109. });
    110. partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
    111. updateSplitQueueSpace();
    112. }
    113. }

    创建完remote task之后,继续看代码,可以看到任务执行start方法启动任务,
    image.png

    最终调用HttpRemoteTask::start方法,具体代码实现方式

    1. @Override
    2. public void start()
    3. {
    4. try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
    5. // to start we just need to trigger an update
    6. // 启动调度任务
    7. scheduleUpdate();
    8. taskStatusFetcher.start(); // 任务状态收集器
    9. taskInfoFetcher.start(); // 任务信息收集器
    10. }
    11. }

    最终会调用HttpRemoteTask::�scheduleUpdate的方法进行开始任务的调度,具体代码:

    1. private void scheduleUpdate()
    2. {
    3. executor.execute(this::sendUpdate);
    4. }

    1. private synchronized void sendUpdate()
    2. {
    3. if (abandoned.get()) {
    4. // Snapshot: Corresponding task has been canceled to resume. Stop any communication with it.
    5. // 相应任务已经取消需要重启的,需停止与他进行通信
    6. return;
    7. }
    8. TaskStatus taskStatus = getTaskStatus();
    9. // don't update if the task hasn't been started yet or if it is already finished
    10. // 如果任务未开始,或者已经结束了,不需要更新
    11. if (!needsUpdate.get() || taskStatus.getState().isDone()) {
    12. return;
    13. }
    14. // if there is a request already running, wait for it to complete
    15. // 有一个请求已经正在运行等待完成,不需要更新
    16. if (this.currentRequest != null && !this.currentRequest.isDone()) {
    17. return;
    18. }
    19. // if throttled due to error, asynchronously wait for timeout and try again
    20. // 如果误杀任务导致错误的,异步的等待超时或者重试
    21. ListenableFuture<?> errorRateLimit = updateErrorTracker.acquireRequestPermit();
    22. if (!errorRateLimit.isDone()) {
    23. errorRateLimit.addListener(this::sendUpdate, executor);
    24. return;
    25. }
    26. List<TaskSource> sources = getSources();
    27. Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment) : Optional.empty();
    28. TaskUpdateRequest updateRequest = new TaskUpdateRequest(
    29. // Snapshot: Add task instance id to all task related requests,
    30. // so receiver can verify if the instance id matches
    31. taskStatus.getTaskInstanceId(),
    32. session.toSessionRepresentation(),
    33. session.getIdentity().getExtraCredentials(),
    34. fragment,
    35. sources,
    36. outputBuffers.get(),
    37. totalPartitions,
    38. parent);
    39. byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
    40. if (fragment.isPresent()) {
    41. stats.updateWithPlanBytes(taskUpdateRequestJson.length);
    42. }
    43. HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
    44. Request request = setContentTypeHeaders(isBinaryEncoding, preparePost())
    45. .setUri(uriBuilder.build())
    46. .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(taskUpdateRequestJson))
    47. // .setBodyGenerator(createBodyGenerator(updateRequest))
    48. .build();
    49. // 是否需要进行二进制加密发送
    50. ResponseHandler responseHandler;
    51. if (isBinaryEncoding) {
    52. responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);
    53. }
    54. else {
    55. responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));
    56. }
    57. updateErrorTracker.startRequest();
    58. // 开发发送
    59. ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);
    60. currentRequest = future;
    61. currentRequestStartNanos = System.nanoTime();
    62. // The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
    63. // and does so without grabbing the instance lock.
    64. needsUpdate.set(false);
    65. // 当成功结果返回后,会执行SimpleHttpResponseHandler的onSuccess, 失败会执行onFailure
    66. Futures.addCallback(future, new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats), executor);
    67. }

    image.png