在前面的章节中详细说明了各种类型的Task的调度过程, 我们发现, 无论是哪种类型的Task, 最终都需要调用方法scheduleTask(in tid, Node node, Plan NodeId sourceId, Iterable<?extends Split>source Splits) 创建Task, 或者调用方法Http Remote Task.add Splits(Plan NodeId sourceId, Iterablesplits) 更新Task。因此本节主要从这两个方法入手, 对Task的执行进行详细讲解

1.Task的创建

从上面的代码可以看出, Coordinator_Only、Single和Fixed类型的Task, 在Task调度节点都是直接创建Task, 而Source类型的Task, 由于其分批调度Splits, 因此有可能会多次调用schedule Task方法。所以, 在调用schedule Task方法的时候会判断对应的Node上是否已经创建了Task:若已经创建, 就更新Task; 否则就创建Task。其实在创建Task的时候,主要是调用Http Remote Task类的构造方法创建一个Http Remote Task对象,并调用
Http Remote Task对象的Start(方法, 而Start() 方法最终又会调用封装在Http Remote Task内部的httpClient, 向特定的Worker Node上的Task Resource服务发起RESTful请求, 从而在特定的Worker Node上启动一个对应的Sql Task Execution进行数据处理和计算。因此创建Task分为两部分:

  1. RESTful Client端创建Http Remote Task对象。
  2. RESTful Resource端创建Sql Task Execution对象。

1)Client端

在Client端创建Task的过程分为两步:创建HttpRemoteTask对象和调用该对象的start()方法,相关代码如下:
image.png

(1)调用构造方法
createRemoteTask最终会调用HttpRemoteTask的构造方法,具体实现如下

  1. public HttpRemoteTask(Session session,
  2. TaskId taskId,
  3. String nodeId,
  4. URI location,
  5. PlanFragment planFragment,
  6. Multimap<PlanNodeId, Split> initialSplits,
  7. OptionalInt totalPartitions,
  8. OutputBuffers outputBuffers,
  9. HttpClient httpClient,
  10. Executor executor,
  11. ScheduledExecutorService updateScheduledExecutor,
  12. ScheduledExecutorService errorScheduledExecutor,
  13. Duration maxErrorDuration,
  14. Duration taskStatusRefreshMaxWait,
  15. Duration taskInfoUpdateInterval,
  16. boolean summarizeTaskInfo,
  17. Codec<TaskStatus> taskStatusCodec,
  18. Codec<TaskInfo> taskInfoCodec,
  19. Codec<TaskUpdateRequest> taskUpdateRequestCodec,
  20. PartitionedSplitCountTracker partitionedSplitCountTracker,
  21. RemoteTaskStats stats,
  22. boolean isBinaryEncoding,
  23. Optional<PlanNodeId> parent,
  24. QuerySnapshotManager snapshotManager)
  25. {
  26. requireNonNull(session, "session is null");
  27. requireNonNull(taskId, "taskId is null");
  28. requireNonNull(nodeId, "nodeId is null");
  29. requireNonNull(location, "location is null");
  30. requireNonNull(planFragment, "planFragment is null");
  31. requireNonNull(totalPartitions, "totalPartitions is null");
  32. requireNonNull(outputBuffers, "outputBuffers is null");
  33. requireNonNull(httpClient, "httpClient is null");
  34. requireNonNull(executor, "executor is null");
  35. requireNonNull(taskStatusCodec, "taskStatusCodec is null");
  36. requireNonNull(taskInfoCodec, "taskInfoCodec is null");
  37. requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
  38. requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
  39. requireNonNull(stats, "stats is null");
  40. requireNonNull(parent, "parent is null");
  41. try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
  42. this.taskId = taskId;
  43. this.session = session;
  44. this.nodeId = nodeId;
  45. this.planFragment = planFragment;
  46. this.totalPartitions = totalPartitions;
  47. this.outputBuffers.set(outputBuffers);
  48. this.httpClient = httpClient;
  49. this.executor = executor;
  50. this.errorScheduledExecutor = errorScheduledExecutor;
  51. this.summarizeTaskInfo = summarizeTaskInfo;
  52. this.taskInfoCodec = taskInfoCodec;
  53. this.taskUpdateRequestCodec = taskUpdateRequestCodec;
  54. this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
  55. this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
  56. this.stats = stats;
  57. this.isBinaryEncoding = isBinaryEncoding;
  58. this.parent = parent;
  59. for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
  60. ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
  61. pendingSplits.put(entry.getKey(), scheduledSplit); // planNodeId --> split(splitId, plannodeId, Split)
  62. }
  63. // 统计所有split的总数
  64. pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
  65. .filter(initialSplits::containsKey)
  66. .mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
  67. .sum();
  68. //
  69. List<BufferInfo> bufferStates = outputBuffers.getBuffers()
  70. .keySet().stream()
  71. .map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
  72. .collect(toImmutableList());
  73. TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
  74. this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
  75. this::failTask,
  76. initialTask.getTaskStatus(),
  77. taskStatusRefreshMaxWait,
  78. taskStatusCodec,
  79. executor,
  80. httpClient,
  81. maxErrorDuration,
  82. errorScheduledExecutor,
  83. stats,
  84. isBinaryEncoding,
  85. snapshotManager);
  86. this.taskInfoFetcher = new TaskInfoFetcher(
  87. this::failTask,
  88. initialTask,
  89. httpClient,
  90. taskInfoUpdateInterval,
  91. taskInfoCodec,
  92. maxErrorDuration,
  93. summarizeTaskInfo,
  94. executor,
  95. updateScheduledExecutor,
  96. errorScheduledExecutor,
  97. stats,
  98. isBinaryEncoding);
  99. // 添加监听器, 当状态发生变化时,需要进行回调
  100. taskStatusFetcher.addStateChangeListener(newStatus -> {
  101. TaskState state = newStatus.getState();
  102. if (state.isDone()) {
  103. cleanUpTask(state);
  104. }
  105. else {
  106. partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
  107. updateSplitQueueSpace();
  108. }
  109. });
  110. partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
  111. updateSplitQueueSpace();
  112. }
  113. }

(2)调用start方法
当创建完毕HttpRemoteTask对象之后, 就需要调用该对象的start() 方法向Task Resource发送RESTful请求, 在相应的节点上创建一个Sql Task Execution对象, 从而进行真正的Split计算和处理。其相关的代码如下:

  1. private void scheduleUpdate()
  2. {
  3. executor.execute(this::sendUpdate);
  4. }
  5. @Override
  6. public void start()
  7. {
  8. try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
  9. // to start we just need to trigger an update
  10. // 启动调度任务
  11. scheduleUpdate();
  12. taskStatusFetcher.start(); // 任务状态收集器
  13. taskInfoFetcher.start(); // 任务信息收集器
  14. }
  15. }

从上面代码可以看出,启动task的主要任务主要集中在HttpRemoteTask::scheduleUpdate方法,核心代码实现如下:

  1. private synchronized void sendUpdate()
  2. {
  3. if (abandoned.get()) {
  4. // Snapshot: Corresponding task has been canceled to resume. Stop any communication with it.
  5. // 相应任务已经取消需要重启的,需停止与他进行通信
  6. return;
  7. }
  8. TaskStatus taskStatus = getTaskStatus();
  9. // don't update if the task hasn't been started yet or if it is already finished
  10. // 如果任务未开始,或者已经结束了,不需要更新
  11. if (!needsUpdate.get() || taskStatus.getState().isDone()) {
  12. return;
  13. }
  14. // if there is a request already running, wait for it to complete
  15. // 有一个请求已经正在运行等待完成,不需要更新
  16. if (this.currentRequest != null && !this.currentRequest.isDone()) {
  17. return;
  18. }
  19. // if throttled due to error, asynchronously wait for timeout and try again
  20. // 如果误杀任务导致错误的,异步的等待超时或者重试
  21. ListenableFuture<?> errorRateLimit = updateErrorTracker.acquireRequestPermit();
  22. if (!errorRateLimit.isDone()) {
  23. errorRateLimit.addListener(this::sendUpdate, executor);
  24. return;
  25. }
  26. List<TaskSource> sources = getSources();
  27. Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment) : Optional.empty();
  28. TaskUpdateRequest updateRequest = new TaskUpdateRequest(
  29. // Snapshot: Add task instance id to all task related requests,
  30. // so receiver can verify if the instance id matches
  31. taskStatus.getTaskInstanceId(),
  32. session.toSessionRepresentation(),
  33. session.getIdentity().getExtraCredentials(),
  34. fragment,
  35. sources,
  36. outputBuffers.get(),
  37. totalPartitions,
  38. parent);
  39. byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
  40. if (fragment.isPresent()) {
  41. stats.updateWithPlanBytes(taskUpdateRequestJson.length);
  42. }
  43. HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
  44. Request request = setContentTypeHeaders(isBinaryEncoding, preparePost())
  45. .setUri(uriBuilder.build())
  46. .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(taskUpdateRequestJson))
  47. // .setBodyGenerator(createBodyGenerator(updateRequest))
  48. .build();
  49. // 是否需要进行二进制加密发送
  50. ResponseHandler responseHandler;
  51. if (isBinaryEncoding) {
  52. responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);
  53. }
  54. else {
  55. responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));
  56. }
  57. updateErrorTracker.startRequest();
  58. // 开发发送
  59. ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);
  60. currentRequest = future;
  61. currentRequestStartNanos = System.nanoTime();
  62. // The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
  63. // and does so without grabbing the instance lock.
  64. needsUpdate.set(false);
  65. // 当成功结果返回后,会执行SimpleHttpResponseHandler的onSuccess, 失败会执行onFailure
  66. Futures.addCallback(future, new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats), executor);
  67. }

从上面的代码可以看出, 需要将当前Task处理的数据都封装成Task Source的列表, 其中一个Task Source代表一个Task处理的数据源, 而Task处理的数据源又分为两类:Stage的输出和直接的数据源。对于Stage输出类型的数据源, Task Source类封装了一个Stage的Plan nodeId和根据该Stage上的Task Location生成的Scheduled Split列表; 对于直接的数据源, Task Source类封装了一个数据源的Plan nodeId和根据该数据源上的所有数据分片生成的Scheduled Split列表。从上面的代码可以看出, 通过方法get Sources() 生成Task Source列表, 在该方法中会将两种数据源(Stage的输出和直接的数据源) 都封装成Task Source,后合并到一起。该方法的代码如下所示:

  1. private synchronized List<TaskSource> getSources()
  2. {
  3. return Stream.concat(planFragment.getPartitionedSourceNodes().stream(), planFragment.getRemoteSourceNodes().stream())
  4. .filter(Objects::nonNull)
  5. .map(PlanNode::getId)
  6. .map(this::getSource)
  7. .filter(Objects::nonNull)
  8. .collect(toImmutableList());
  9. }

2)Resource端

从上面内容可以看到, 在Presto中对Task的启动其实是在Coordinator上根据数据本地性等条件, 向特定的Worker节点上的TaskResource类提供的RESTful服务发送类似/v 1/task/task Id?summarize的POST请求来实现的。通过发送此RESTful Worker上启动一个Sql Task Execution对象,用于执行Task计算任务。
在特定的在Presto集群启动的时候, 会在每个Worker上都启动一个Presto Server进程, 该进程会提供RESTful服务,用于处理前缀为/v 1/task/的URI标示的RESTful请求。而对于形如/v 1/task/*的RESTful请求的处理均由类Task Resource完成。Task Resource类对创建Task的RESTful请求的处理函数如下:

  1. @POST
  2. @Path("{taskId}")
  3. @Produces({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
  4. @Consumes({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
  5. public Response createOrUpdateTask(
  6. @PathParam("taskId") TaskId taskId,
  7. TaskUpdateRequest taskUpdateRequest,
  8. @Context UriInfo uriInfo)
  9. {
  10. try {
  11. requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
  12. }
  13. catch (Exception ex) {
  14. return Response.status(Status.BAD_REQUEST).build();
  15. }
  16. Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
  17. // 开启任务
  18. TaskInfo taskInfo = taskManager.updateTask(
  19. session, // 会话
  20. taskId, // task id
  21. taskUpdateRequest.getFragment(), //
  22. taskUpdateRequest.getSources(), //
  23. taskUpdateRequest.getOutputIds(), //
  24. taskUpdateRequest.getTotalPartitions(),
  25. taskUpdateRequest.getConsumerId(),
  26. taskUpdateRequest.getTaskInstanceId());
  27. if (taskInfo == null) {
  28. return Response.ok().entity(createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), taskUpdateRequest.getTaskInstanceId())).build();
  29. }
  30. if (shouldSummarize(uriInfo)) {
  31. taskInfo = taskInfo.summarize();
  32. }
  33. return Response.ok().entity(taskInfo).build();
  34. }

对新建的task的restful的处理主要是有方法TaskManager::updateTask具体实现如下:

  1. @Override
  2. public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, OptionalInt totalPartitions, Optional<PlanNodeId> consumer, String expectedTaskInstanceId)
  3. {
  4. requireNonNull(session, "session is null");
  5. requireNonNull(taskId, "taskId is null");
  6. requireNonNull(fragment, "fragment is null");
  7. requireNonNull(sources, "sources is null");
  8. requireNonNull(outputBuffers, "outputBuffers is null");
  9. // 获取sqlTask
  10. SqlTask sqlTask;
  11. if (isNullOrEmpty(expectedTaskInstanceId)) {
  12. sqlTask = tasks.getUnchecked(taskId);
  13. }
  14. else {
  15. sqlTask = currentTaskInstanceIds.get(expectedTaskInstanceId);
  16. if (sqlTask == null) {
  17. return null;
  18. }
  19. }
  20. if (resourceOvercommit(session)) {
  21. // TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
  22. queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
  23. }
  24. sqlTask.recordHeartbeat();
  25. // 最终调用SQLTask的updateTask方法
  26. return sqlTask.updateTask(session, fragment, sources, outputBuffers, totalPartitions, consumer, cteCtx);
  27. }

最终会调用SqlTask::updateTask方法来更新任务

  1. public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, OptionalInt totalPartitions, Optional<PlanNodeId> consumer,
  2. Map<String, CommonTableExecutionContext> cteCtx)
  3. {
  4. try {
  5. // The LazyOutput buffer does not support write methods, so the actual
  6. // output buffer must be established before drivers are created (e.g.
  7. // a VALUES query).
  8. // Lazy的输出缓冲区不支持写方法,所以实际输出缓冲区必须建立在创建驱动程序(例如一个值的查询)。
  9. outputBuffer.setOutputBuffers(outputBuffers);
  10. // assure the task execution is only created once
  11. // 确保task执行器只创建一次
  12. SqlTaskExecution taskExecution;
  13. synchronized (this) {
  14. // is task already complete?
  15. TaskHolder taskHolder = taskHolderReference.get();
  16. if (taskHolder.isFinished()) {
  17. return taskHolder.getFinalTaskInfo();
  18. }
  19. //
  20. taskExecution = taskHolder.getTaskExecution();
  21. // taskExecution 为 null,则创建
  22. if (taskExecution == null) {
  23. checkState(fragment.isPresent(), "fragment must be present");
  24. loadDCCatalogForUpdateTask(metadata, sources);
  25. taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources, totalPartitions, consumer, cteCtx);
  26. taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
  27. needsPlan.set(false);
  28. isSnapshotEnabled = SystemSessionProperties.isSnapshotEnabled(session);
  29. }
  30. }
  31. if (taskExecution != null) {
  32. // 将source中包含的所有split都合并到taskexecution现在需要处理的split列表中
  33. taskExecution.addSources(sources);
  34. }
  35. }
  36. catch (Error e) {
  37. failed(e);
  38. throw e;
  39. }
  40. catch (RuntimeException e) {
  41. failed(e);
  42. }
  43. return getTaskInfo();
  44. }


进一步分析,调用SqlTaskExecutionFactory::create创建返回SqlTaskExecution, 暂时

2.更新Task

3.运行task