在前面的章节中详细说明了各种类型的Task的调度过程, 我们发现, 无论是哪种类型的Task, 最终都需要调用方法scheduleTask(in tid, Node node, Plan NodeId sourceId, Iterable<?extends Split>source Splits) 创建Task, 或者调用方法Http Remote Task.add Splits(Plan NodeId sourceId, Iterable
1.Task的创建
从上面的代码可以看出, Coordinator_Only、Single和Fixed类型的Task, 在Task调度节点都是直接创建Task, 而Source类型的Task, 由于其分批调度Splits, 因此有可能会多次调用schedule Task方法。所以, 在调用schedule Task方法的时候会判断对应的Node上是否已经创建了Task:若已经创建, 就更新Task; 否则就创建Task。其实在创建Task的时候,主要是调用Http Remote Task类的构造方法创建一个Http Remote Task对象,并调用
Http Remote Task对象的Start(方法, 而Start() 方法最终又会调用封装在Http Remote Task内部的httpClient, 向特定的Worker Node上的Task Resource服务发起RESTful请求, 从而在特定的Worker Node上启动一个对应的Sql Task Execution进行数据处理和计算。因此创建Task分为两部分:
- RESTful Client端创建Http Remote Task对象。
 - RESTful Resource端创建Sql Task Execution对象。
 
1)Client端
在Client端创建Task的过程分为两步:创建HttpRemoteTask对象和调用该对象的start()方法,相关代码如下:
(1)调用构造方法
createRemoteTask最终会调用HttpRemoteTask的构造方法,具体实现如下
public HttpRemoteTask(Session session,TaskId taskId,String nodeId,URI location,PlanFragment planFragment,Multimap<PlanNodeId, Split> initialSplits,OptionalInt totalPartitions,OutputBuffers outputBuffers,HttpClient httpClient,Executor executor,ScheduledExecutorService updateScheduledExecutor,ScheduledExecutorService errorScheduledExecutor,Duration maxErrorDuration,Duration taskStatusRefreshMaxWait,Duration taskInfoUpdateInterval,boolean summarizeTaskInfo,Codec<TaskStatus> taskStatusCodec,Codec<TaskInfo> taskInfoCodec,Codec<TaskUpdateRequest> taskUpdateRequestCodec,PartitionedSplitCountTracker partitionedSplitCountTracker,RemoteTaskStats stats,boolean isBinaryEncoding,Optional<PlanNodeId> parent,QuerySnapshotManager snapshotManager){requireNonNull(session, "session is null");requireNonNull(taskId, "taskId is null");requireNonNull(nodeId, "nodeId is null");requireNonNull(location, "location is null");requireNonNull(planFragment, "planFragment is null");requireNonNull(totalPartitions, "totalPartitions is null");requireNonNull(outputBuffers, "outputBuffers is null");requireNonNull(httpClient, "httpClient is null");requireNonNull(executor, "executor is null");requireNonNull(taskStatusCodec, "taskStatusCodec is null");requireNonNull(taskInfoCodec, "taskInfoCodec is null");requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");requireNonNull(stats, "stats is null");requireNonNull(parent, "parent is null");try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {this.taskId = taskId;this.session = session;this.nodeId = nodeId;this.planFragment = planFragment;this.totalPartitions = totalPartitions;this.outputBuffers.set(outputBuffers);this.httpClient = httpClient;this.executor = executor;this.errorScheduledExecutor = errorScheduledExecutor;this.summarizeTaskInfo = summarizeTaskInfo;this.taskInfoCodec = taskInfoCodec;this.taskUpdateRequestCodec = taskUpdateRequestCodec;this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");this.stats = stats;this.isBinaryEncoding = isBinaryEncoding;this.parent = parent;for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());pendingSplits.put(entry.getKey(), scheduledSplit); // planNodeId --> split(splitId, plannodeId, Split)}// 统计所有split的总数pendingSourceSplitCount = planFragment.getPartitionedSources().stream().filter(initialSplits::containsKey).mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size()).sum();//List<BufferInfo> bufferStates = outputBuffers.getBuffers().keySet().stream().map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty())).collect(toImmutableList());TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));this.taskStatusFetcher = new ContinuousTaskStatusFetcher(this::failTask,initialTask.getTaskStatus(),taskStatusRefreshMaxWait,taskStatusCodec,executor,httpClient,maxErrorDuration,errorScheduledExecutor,stats,isBinaryEncoding,snapshotManager);this.taskInfoFetcher = new TaskInfoFetcher(this::failTask,initialTask,httpClient,taskInfoUpdateInterval,taskInfoCodec,maxErrorDuration,summarizeTaskInfo,executor,updateScheduledExecutor,errorScheduledExecutor,stats,isBinaryEncoding);// 添加监听器, 当状态发生变化时,需要进行回调taskStatusFetcher.addStateChangeListener(newStatus -> {TaskState state = newStatus.getState();if (state.isDone()) {cleanUpTask(state);}else {partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());updateSplitQueueSpace();}});partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());updateSplitQueueSpace();}}
(2)调用start方法
当创建完毕HttpRemoteTask对象之后, 就需要调用该对象的start() 方法向Task Resource发送RESTful请求, 在相应的节点上创建一个Sql Task Execution对象, 从而进行真正的Split计算和处理。其相关的代码如下:
private void scheduleUpdate(){executor.execute(this::sendUpdate);}@Overridepublic void start(){try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {// to start we just need to trigger an update// 启动调度任务scheduleUpdate();taskStatusFetcher.start(); // 任务状态收集器taskInfoFetcher.start(); // 任务信息收集器}}
从上面代码可以看出,启动task的主要任务主要集中在HttpRemoteTask::scheduleUpdate方法,核心代码实现如下:
private synchronized void sendUpdate(){if (abandoned.get()) {// Snapshot: Corresponding task has been canceled to resume. Stop any communication with it.// 相应任务已经取消需要重启的,需停止与他进行通信return;}TaskStatus taskStatus = getTaskStatus();// don't update if the task hasn't been started yet or if it is already finished// 如果任务未开始,或者已经结束了,不需要更新if (!needsUpdate.get() || taskStatus.getState().isDone()) {return;}// if there is a request already running, wait for it to complete// 有一个请求已经正在运行等待完成,不需要更新if (this.currentRequest != null && !this.currentRequest.isDone()) {return;}// if throttled due to error, asynchronously wait for timeout and try again// 如果误杀任务导致错误的,异步的等待超时或者重试ListenableFuture<?> errorRateLimit = updateErrorTracker.acquireRequestPermit();if (!errorRateLimit.isDone()) {errorRateLimit.addListener(this::sendUpdate, executor);return;}List<TaskSource> sources = getSources();Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment) : Optional.empty();TaskUpdateRequest updateRequest = new TaskUpdateRequest(// Snapshot: Add task instance id to all task related requests,// so receiver can verify if the instance id matchestaskStatus.getTaskInstanceId(),session.toSessionRepresentation(),session.getIdentity().getExtraCredentials(),fragment,sources,outputBuffers.get(),totalPartitions,parent);byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);if (fragment.isPresent()) {stats.updateWithPlanBytes(taskUpdateRequestJson.length);}HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);Request request = setContentTypeHeaders(isBinaryEncoding, preparePost()).setUri(uriBuilder.build()).setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(taskUpdateRequestJson))// .setBodyGenerator(createBodyGenerator(updateRequest)).build();// 是否需要进行二进制加密发送ResponseHandler responseHandler;if (isBinaryEncoding) {responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);}else {responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));}updateErrorTracker.startRequest();// 开发发送ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);currentRequest = future;currentRequestStartNanos = System.nanoTime();// The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value// and does so without grabbing the instance lock.needsUpdate.set(false);// 当成功结果返回后,会执行SimpleHttpResponseHandler的onSuccess, 失败会执行onFailureFutures.addCallback(future, new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats), executor);}
从上面的代码可以看出, 需要将当前Task处理的数据都封装成Task Source的列表, 其中一个Task Source代表一个Task处理的数据源, 而Task处理的数据源又分为两类:Stage的输出和直接的数据源。对于Stage输出类型的数据源, Task Source类封装了一个Stage的Plan nodeId和根据该Stage上的Task Location生成的Scheduled Split列表; 对于直接的数据源, Task Source类封装了一个数据源的Plan nodeId和根据该数据源上的所有数据分片生成的Scheduled Split列表。从上面的代码可以看出, 通过方法get Sources() 生成Task Source列表, 在该方法中会将两种数据源(Stage的输出和直接的数据源) 都封装成Task Source,后合并到一起。该方法的代码如下所示:
private synchronized List<TaskSource> getSources(){return Stream.concat(planFragment.getPartitionedSourceNodes().stream(), planFragment.getRemoteSourceNodes().stream()).filter(Objects::nonNull).map(PlanNode::getId).map(this::getSource).filter(Objects::nonNull).collect(toImmutableList());}
2)Resource端
从上面内容可以看到, 在Presto中对Task的启动其实是在Coordinator上根据数据本地性等条件, 向特定的Worker节点上的TaskResource类提供的RESTful服务发送类似/v 1/task/task Id?summarize的POST请求来实现的。通过发送此RESTful Worker上启动一个Sql Task Execution对象,用于执行Task计算任务。
在特定的在Presto集群启动的时候, 会在每个Worker上都启动一个Presto Server进程, 该进程会提供RESTful服务,用于处理前缀为/v 1/task/的URI标示的RESTful请求。而对于形如/v 1/task/*的RESTful请求的处理均由类Task Resource完成。Task Resource类对创建Task的RESTful请求的处理函数如下:
@POST@Path("{taskId}")@Produces({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})@Consumes({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId,TaskUpdateRequest taskUpdateRequest,@Context UriInfo uriInfo){try {requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");}catch (Exception ex) {return Response.status(Status.BAD_REQUEST).build();}Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());// 开启任务TaskInfo taskInfo = taskManager.updateTask(session, // 会话taskId, // task idtaskUpdateRequest.getFragment(), //taskUpdateRequest.getSources(), //taskUpdateRequest.getOutputIds(), //taskUpdateRequest.getTotalPartitions(),taskUpdateRequest.getConsumerId(),taskUpdateRequest.getTaskInstanceId());if (taskInfo == null) {return Response.ok().entity(createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), taskUpdateRequest.getTaskInstanceId())).build();}if (shouldSummarize(uriInfo)) {taskInfo = taskInfo.summarize();}return Response.ok().entity(taskInfo).build();}
对新建的task的restful的处理主要是有方法TaskManager::updateTask具体实现如下:
@Overridepublic TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, OptionalInt totalPartitions, Optional<PlanNodeId> consumer, String expectedTaskInstanceId){requireNonNull(session, "session is null");requireNonNull(taskId, "taskId is null");requireNonNull(fragment, "fragment is null");requireNonNull(sources, "sources is null");requireNonNull(outputBuffers, "outputBuffers is null");// 获取sqlTaskSqlTask sqlTask;if (isNullOrEmpty(expectedTaskInstanceId)) {sqlTask = tasks.getUnchecked(taskId);}else {sqlTask = currentTaskInstanceIds.get(expectedTaskInstanceId);if (sqlTask == null) {return null;}}if (resourceOvercommit(session)) {// TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();}sqlTask.recordHeartbeat();// 最终调用SQLTask的updateTask方法return sqlTask.updateTask(session, fragment, sources, outputBuffers, totalPartitions, consumer, cteCtx);}
最终会调用SqlTask::updateTask方法来更新任务
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, OptionalInt totalPartitions, Optional<PlanNodeId> consumer,Map<String, CommonTableExecutionContext> cteCtx){try {// The LazyOutput buffer does not support write methods, so the actual// output buffer must be established before drivers are created (e.g.// a VALUES query).// Lazy的输出缓冲区不支持写方法,所以实际输出缓冲区必须建立在创建驱动程序(例如一个值的查询)。outputBuffer.setOutputBuffers(outputBuffers);// assure the task execution is only created once// 确保task执行器只创建一次SqlTaskExecution taskExecution;synchronized (this) {// is task already complete?TaskHolder taskHolder = taskHolderReference.get();if (taskHolder.isFinished()) {return taskHolder.getFinalTaskInfo();}//taskExecution = taskHolder.getTaskExecution();// taskExecution 为 null,则创建if (taskExecution == null) {checkState(fragment.isPresent(), "fragment must be present");loadDCCatalogForUpdateTask(metadata, sources);taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources, totalPartitions, consumer, cteCtx);taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));needsPlan.set(false);isSnapshotEnabled = SystemSessionProperties.isSnapshotEnabled(session);}}if (taskExecution != null) {// 将source中包含的所有split都合并到taskexecution现在需要处理的split列表中taskExecution.addSources(sources);}}catch (Error e) {failed(e);throw e;}catch (RuntimeException e) {failed(e);}return getTaskInfo();}
�
进一步分析,调用SqlTaskExecutionFactory::create创建返回SqlTaskExecution, 暂时
2.更新Task
3.运行task
�
�
�
