在前面的章节中详细说明了各种类型的Task的调度过程, 我们发现, 无论是哪种类型的Task, 最终都需要调用方法scheduleTask(in tid, Node node, Plan NodeId sourceId, Iterable<?extends Split>source Splits) 创建Task, 或者调用方法Http Remote Task.add Splits(Plan NodeId sourceId, Iterable
1.Task的创建
从上面的代码可以看出, Coordinator_Only、Single和Fixed类型的Task, 在Task调度节点都是直接创建Task, 而Source类型的Task, 由于其分批调度Splits, 因此有可能会多次调用schedule Task方法。所以, 在调用schedule Task方法的时候会判断对应的Node上是否已经创建了Task:若已经创建, 就更新Task; 否则就创建Task。其实在创建Task的时候,主要是调用Http Remote Task类的构造方法创建一个Http Remote Task对象,并调用
Http Remote Task对象的Start(方法, 而Start() 方法最终又会调用封装在Http Remote Task内部的httpClient, 向特定的Worker Node上的Task Resource服务发起RESTful请求, 从而在特定的Worker Node上启动一个对应的Sql Task Execution进行数据处理和计算。因此创建Task分为两部分:
- RESTful Client端创建Http Remote Task对象。
- RESTful Resource端创建Sql Task Execution对象。
1)Client端
在Client端创建Task的过程分为两步:创建HttpRemoteTask对象和调用该对象的start()方法,相关代码如下:
(1)调用构造方法
createRemoteTask最终会调用HttpRemoteTask的构造方法,具体实现如下
public HttpRemoteTask(Session session,
TaskId taskId,
String nodeId,
URI location,
PlanFragment planFragment,
Multimap<PlanNodeId, Split> initialSplits,
OptionalInt totalPartitions,
OutputBuffers outputBuffers,
HttpClient httpClient,
Executor executor,
ScheduledExecutorService updateScheduledExecutor,
ScheduledExecutorService errorScheduledExecutor,
Duration maxErrorDuration,
Duration taskStatusRefreshMaxWait,
Duration taskInfoUpdateInterval,
boolean summarizeTaskInfo,
Codec<TaskStatus> taskStatusCodec,
Codec<TaskInfo> taskInfoCodec,
Codec<TaskUpdateRequest> taskUpdateRequestCodec,
PartitionedSplitCountTracker partitionedSplitCountTracker,
RemoteTaskStats stats,
boolean isBinaryEncoding,
Optional<PlanNodeId> parent,
QuerySnapshotManager snapshotManager)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(nodeId, "nodeId is null");
requireNonNull(location, "location is null");
requireNonNull(planFragment, "planFragment is null");
requireNonNull(totalPartitions, "totalPartitions is null");
requireNonNull(outputBuffers, "outputBuffers is null");
requireNonNull(httpClient, "httpClient is null");
requireNonNull(executor, "executor is null");
requireNonNull(taskStatusCodec, "taskStatusCodec is null");
requireNonNull(taskInfoCodec, "taskInfoCodec is null");
requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
requireNonNull(stats, "stats is null");
requireNonNull(parent, "parent is null");
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
this.session = session;
this.nodeId = nodeId;
this.planFragment = planFragment;
this.totalPartitions = totalPartitions;
this.outputBuffers.set(outputBuffers);
this.httpClient = httpClient;
this.executor = executor;
this.errorScheduledExecutor = errorScheduledExecutor;
this.summarizeTaskInfo = summarizeTaskInfo;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.updateErrorTracker = new RequestErrorTracker(taskId, location, maxErrorDuration, errorScheduledExecutor, "updating task");
this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
this.stats = stats;
this.isBinaryEncoding = isBinaryEncoding;
this.parent = parent;
for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
pendingSplits.put(entry.getKey(), scheduledSplit); // planNodeId --> split(splitId, plannodeId, Split)
}
// 统计所有split的总数
pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
.filter(initialSplits::containsKey)
.mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
.sum();
//
List<BufferInfo> bufferStates = outputBuffers.getBuffers()
.keySet().stream()
.map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
.collect(toImmutableList());
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
this::failTask,
initialTask.getTaskStatus(),
taskStatusRefreshMaxWait,
taskStatusCodec,
executor,
httpClient,
maxErrorDuration,
errorScheduledExecutor,
stats,
isBinaryEncoding,
snapshotManager);
this.taskInfoFetcher = new TaskInfoFetcher(
this::failTask,
initialTask,
httpClient,
taskInfoUpdateInterval,
taskInfoCodec,
maxErrorDuration,
summarizeTaskInfo,
executor,
updateScheduledExecutor,
errorScheduledExecutor,
stats,
isBinaryEncoding);
// 添加监听器, 当状态发生变化时,需要进行回调
taskStatusFetcher.addStateChangeListener(newStatus -> {
TaskState state = newStatus.getState();
if (state.isDone()) {
cleanUpTask(state);
}
else {
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
});
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
}
(2)调用start方法
当创建完毕HttpRemoteTask对象之后, 就需要调用该对象的start() 方法向Task Resource发送RESTful请求, 在相应的节点上创建一个Sql Task Execution对象, 从而进行真正的Split计算和处理。其相关的代码如下:
private void scheduleUpdate()
{
executor.execute(this::sendUpdate);
}
@Override
public void start()
{
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
// to start we just need to trigger an update
// 启动调度任务
scheduleUpdate();
taskStatusFetcher.start(); // 任务状态收集器
taskInfoFetcher.start(); // 任务信息收集器
}
}
从上面代码可以看出,启动task的主要任务主要集中在HttpRemoteTask::scheduleUpdate方法,核心代码实现如下:
private synchronized void sendUpdate()
{
if (abandoned.get()) {
// Snapshot: Corresponding task has been canceled to resume. Stop any communication with it.
// 相应任务已经取消需要重启的,需停止与他进行通信
return;
}
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
// 如果任务未开始,或者已经结束了,不需要更新
if (!needsUpdate.get() || taskStatus.getState().isDone()) {
return;
}
// if there is a request already running, wait for it to complete
// 有一个请求已经正在运行等待完成,不需要更新
if (this.currentRequest != null && !this.currentRequest.isDone()) {
return;
}
// if throttled due to error, asynchronously wait for timeout and try again
// 如果误杀任务导致错误的,异步的等待超时或者重试
ListenableFuture<?> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}
List<TaskSource> sources = getSources();
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
// Snapshot: Add task instance id to all task related requests,
// so receiver can verify if the instance id matches
taskStatus.getTaskInstanceId(),
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
sources,
outputBuffers.get(),
totalPartitions,
parent);
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = setContentTypeHeaders(isBinaryEncoding, preparePost())
.setUri(uriBuilder.build())
.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(taskUpdateRequestJson))
// .setBodyGenerator(createBodyGenerator(updateRequest))
.build();
// 是否需要进行二进制加密发送
ResponseHandler responseHandler;
if (isBinaryEncoding) {
responseHandler = createFullSmileResponseHandler((SmileCodec<TaskInfo>) taskInfoCodec);
}
else {
responseHandler = createAdaptingJsonResponseHandler(unwrapJsonCodec(taskInfoCodec));
}
updateErrorTracker.startRequest();
// 开发发送
ListenableFuture<BaseResponse<TaskInfo>> future = httpClient.executeAsync(request, responseHandler);
currentRequest = future;
currentRequestStartNanos = System.nanoTime();
// The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
// and does so without grabbing the instance lock.
needsUpdate.set(false);
// 当成功结果返回后,会执行SimpleHttpResponseHandler的onSuccess, 失败会执行onFailure
Futures.addCallback(future, new SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri(), stats), executor);
}
从上面的代码可以看出, 需要将当前Task处理的数据都封装成Task Source的列表, 其中一个Task Source代表一个Task处理的数据源, 而Task处理的数据源又分为两类:Stage的输出和直接的数据源。对于Stage输出类型的数据源, Task Source类封装了一个Stage的Plan nodeId和根据该Stage上的Task Location生成的Scheduled Split列表; 对于直接的数据源, Task Source类封装了一个数据源的Plan nodeId和根据该数据源上的所有数据分片生成的Scheduled Split列表。从上面的代码可以看出, 通过方法get Sources() 生成Task Source列表, 在该方法中会将两种数据源(Stage的输出和直接的数据源) 都封装成Task Source,后合并到一起。该方法的代码如下所示:
private synchronized List<TaskSource> getSources()
{
return Stream.concat(planFragment.getPartitionedSourceNodes().stream(), planFragment.getRemoteSourceNodes().stream())
.filter(Objects::nonNull)
.map(PlanNode::getId)
.map(this::getSource)
.filter(Objects::nonNull)
.collect(toImmutableList());
}
2)Resource端
从上面内容可以看到, 在Presto中对Task的启动其实是在Coordinator上根据数据本地性等条件, 向特定的Worker节点上的TaskResource类提供的RESTful服务发送类似/v 1/task/task Id?summarize的POST请求来实现的。通过发送此RESTful Worker上启动一个Sql Task Execution对象,用于执行Task计算任务。
在特定的在Presto集群启动的时候, 会在每个Worker上都启动一个Presto Server进程, 该进程会提供RESTful服务,用于处理前缀为/v 1/task/的URI标示的RESTful请求。而对于形如/v 1/task/*的RESTful请求的处理均由类Task Resource完成。Task Resource类对创建Task的RESTful请求的处理函数如下:
@POST
@Path("{taskId}")
@Produces({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, APPLICATION_JACKSON_SMILE})
public Response createOrUpdateTask(
@PathParam("taskId") TaskId taskId,
TaskUpdateRequest taskUpdateRequest,
@Context UriInfo uriInfo)
{
try {
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
}
catch (Exception ex) {
return Response.status(Status.BAD_REQUEST).build();
}
Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
// 开启任务
TaskInfo taskInfo = taskManager.updateTask(
session, // 会话
taskId, // task id
taskUpdateRequest.getFragment(), //
taskUpdateRequest.getSources(), //
taskUpdateRequest.getOutputIds(), //
taskUpdateRequest.getTotalPartitions(),
taskUpdateRequest.getConsumerId(),
taskUpdateRequest.getTaskInstanceId());
if (taskInfo == null) {
return Response.ok().entity(createAbortedTaskInfo(taskId, uriInfo.getAbsolutePath(), taskUpdateRequest.getTaskInstanceId())).build();
}
if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
return Response.ok().entity(taskInfo).build();
}
对新建的task的restful的处理主要是有方法TaskManager::updateTask具体实现如下:
@Override
public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, OptionalInt totalPartitions, Optional<PlanNodeId> consumer, String expectedTaskInstanceId)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(fragment, "fragment is null");
requireNonNull(sources, "sources is null");
requireNonNull(outputBuffers, "outputBuffers is null");
// 获取sqlTask
SqlTask sqlTask;
if (isNullOrEmpty(expectedTaskInstanceId)) {
sqlTask = tasks.getUnchecked(taskId);
}
else {
sqlTask = currentTaskInstanceIds.get(expectedTaskInstanceId);
if (sqlTask == null) {
return null;
}
}
if (resourceOvercommit(session)) {
// TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
}
sqlTask.recordHeartbeat();
// 最终调用SQLTask的updateTask方法
return sqlTask.updateTask(session, fragment, sources, outputBuffers, totalPartitions, consumer, cteCtx);
}
最终会调用SqlTask::updateTask方法来更新任务
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers, OptionalInt totalPartitions, Optional<PlanNodeId> consumer,
Map<String, CommonTableExecutionContext> cteCtx)
{
try {
// The LazyOutput buffer does not support write methods, so the actual
// output buffer must be established before drivers are created (e.g.
// a VALUES query).
// Lazy的输出缓冲区不支持写方法,所以实际输出缓冲区必须建立在创建驱动程序(例如一个值的查询)。
outputBuffer.setOutputBuffers(outputBuffers);
// assure the task execution is only created once
// 确保task执行器只创建一次
SqlTaskExecution taskExecution;
synchronized (this) {
// is task already complete?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}
//
taskExecution = taskHolder.getTaskExecution();
// taskExecution 为 null,则创建
if (taskExecution == null) {
checkState(fragment.isPresent(), "fragment must be present");
loadDCCatalogForUpdateTask(metadata, sources);
taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources, totalPartitions, consumer, cteCtx);
taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
needsPlan.set(false);
isSnapshotEnabled = SystemSessionProperties.isSnapshotEnabled(session);
}
}
if (taskExecution != null) {
// 将source中包含的所有split都合并到taskexecution现在需要处理的split列表中
taskExecution.addSources(sources);
}
}
catch (Error e) {
failed(e);
throw e;
}
catch (RuntimeException e) {
failed(e);
}
return getTaskInfo();
}
�
进一步分析,调用SqlTaskExecutionFactory::create创建返回SqlTaskExecution, 暂时
2.更新Task
3.运行task
�
�
�