- Task 和 OperatorChain">Task 和 OperatorChain
- TwoInputStreamTask">TwoInputStreamTask
- 小节">小节
- 参考">参考
一个 Flink Job 提交到集群中运行时,会被调度为不同的 Task。在前面的文章中,我们已经介绍了 Flink 如何根据用户的编写的程序生成调度用的执行图,如何为 Task 分配计算资源,以及 Task 之间如何进行数据交换。在这篇文章中,我们将跟踪一个 Task 的完整的生命周期,进一步加深对 Flink 执行过程的理解。
Task 和 OperatorChain
在前面介绍如何生成 JobGraph
的文章中,我们已经了解了 Flink 会尽可能把能够 chaining 到一起的算子串联在一起,形成 OperatorChain
,对应一个 JobVertex
。
两个 Operator 能够串联在一起的条件包括
| ```
class StreamingJobGraphGenerator {
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
return downStreamVertex.getInEdges().size() == 1 _ && outOperator != **null**
&& headOperator != **null**
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) _ _ && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
_ && (edge.getPartitioner() instanceof ForwardPartitioner)
_ && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
_ && streamGraph.isChainingEnabled();
}
}
|
| --- |
`JobGraph` 在 JobManager 中进一步被转换为可供调度的并行化版本的 `ExecutionGraph`,其中 `JobVertex` 被展开为并行化版本的 `ExecutionVertex`,每一个 `ExecutionVertex` 对应 `JobVertex` 的一个并行子任务,它的每一次调度对应一个 `Execution`,即 TaskManager 中的一个 `Task`。所以,一个 `Task` 运行期间的主要处理逻辑对应一个 `OperatorChain`,这个 `OperatorChain` 可能包含多个 Operator,也可能只有一个 Operator。
<a name="db44d034"></a>
## [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#operatorchain-%E5%86%85%E9%83%A8%E7%9A%84%E6%95%B0%E6%8D%AE%E4%BC%A0%E9%80%92)OperatorChain 内部的数据传递
在前面的文章中,我们已经介绍过不同的 `Task` 之间如何通过网络栈进行数据交换,并对 `Task` 之间应对”反压”的机制进行了分析。现在我们知道,在一个 `Task` 内部同样可能包含多个不同的算子,这些算子处理数据的主要逻辑由用户提供的自定义函数(UDF)实现,那么上游算子处理之后的记录如何传递给下游算子呢?既然一个 `Task` 是一个独立的线程,多个算子的计算逻辑是依次执行的,那么很直观的想法就是直接通过函数调用的参数来数据。我们看下 Flink 内部是如何处理的。<br />首先,要看一下 `Output` 接口,`Output` 接口继承自 `Collector` 接口,用于接受 Operator 提交的数据:
public interface Output extends Collector {
/**
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
* <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
* timestamp will be emitted in the future.
*/
void emitWatermark(Watermark mark);
/**
* Emits a record the side output identified by the given {@link OutputTag}.
*
* @param record The record to collect.
*/
<X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);
void emitLatencyMarker(LatencyMarker latencyMarker);
}
在 `OperatorChain` 内部还有一个 `WatermarkGaugeExposingOutput` 接口继承自 `Output`,它主要是额外提供了一个获取 watermark 值的方法:
public interface WatermarkGaugeExposingOutput extends Output {
Gauge getWatermarkGauge();
}
每一个 `StreamOperator` 都有一个 `Output` 成员,用于收集当前算子处理完的记录,比如在 `StreamMap` 中:
public class StreamMap
extends AbstractUdfStreamOperator>
implements OneInputStreamOperator {
@Override
public void processElement(StreamRecord element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
在 `StreamFilter` 中:
|
public class StreamFilter extends AbstractUdfStreamOperator> implements OneInputStreamOperator {
@Override
public void processElement(StreamRecord element) throws Exception {
if (userFunction.filter(element.getValue())) {
output.collect(element);
}
}
}
|
| --- |
在 `StreamFlatMap` 中:
|
public class StreamFlatMap
extends AbstractUdfStreamOperator>
implements OneInputStreamOperator {
private transient TimestampedCollector collector;
public StreamFlatMap(FlatMapFunction flatMapper) {
super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}
|
| --- |
那么 `Output` 又是怎么处理算子提交的记录的呢?这就需要进一步看一下 `Output` 的具体实现类。<br />`OperatorChain` 的内部类 `ChainingOutput` 实现了 `WatermarkGaugeExposingOutput` 接口,它持有一个 `OneInputStreamOperator`, 即 `OperatorChain` 中当前算子的下游算子。当 `ChainingOutput` 接收到当前算子提交的数据时,直接将调用下游算子的 `processElement` 方法:
|
class ChainingOutput implements WatermarkGaugeExposingOutput> {
protected final OneInputStreamOperator operator; //这是下游算子
@Override
public void collect(StreamRecord record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToOperator(record);
}
@Override
public void collect(OutputTag outputTag, StreamRecord record) {
//如果有 OutputTag, 则要求 OutputTag 匹配才会转发记录
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
return;
}
pushToOperator(record);
}
protected void pushToOperator(StreamRecord record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings(“unchecked”)
StreamRecord castRecord = (StreamRecord) record;
numRecordsIn.inc();
//直接调用下游算子的 processElement 方法
operator.setKeyContextElement1(castRecord);
operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
}
|
| --- |
通过在 `ChainingOutput` 中保存下游 `StreamOperator` 的引用,`ChainingOutput` 直接将对象的引用传递给下游算子。但是 `ExecutionConfig` 有一个配置项,即 `objectReuse`,在默认情况下会禁止对象重用。如果不允许对象重用,则不会使用 `ChainingOutput`,而是会使用 `CopyingChainingOutput`。顾名思义,它和 `ChainingOutput` 的区别在于,它会对记录进行拷贝后传递给下游算子。<br />`BroadcastingOutputCollector` 封装了一组 `Output`, 即 `Output<StreamRecord<T>>[] outputs`, 在接收到 `StreamRecord` 时,会将消息提交到所有的 内部所有的 `Output` 中。`BroadcastingOutputCollector` 主要用在当前算子有多个下游算子的情况下。与此对应的还有一个 `CopyingBroadcastingOutputCollector`。<br />`DirectedOutput` 基于 `OutputSelector<OUT>[] outputSelectors` 选择要转发的目标 `Output`,主要是在 split/select 的情况下使用。与 `DirectedOutput` 对应的也有一个 `CopyingDirectedOutput`。<br />对于位于 `OperatorChain` 末尾的算子,它处理过的记录需要被其它 `Task` 消费,因此它的记录需要被写入 `ResultPartition` 。因此,Flink 提供了 `RecordWriterOutput`,它也实现了 `WatermarkGaugeExposingOutput`, 但是它是通过 `RecordWriter` 输出接收到的消息记录。`RecordWriter` 是 `ResultPartitionWriter` 的一层包装,提供了将记录序列化到 buffer 中的功能。
<a name="18f3e5b9"></a>
## [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#task-%E7%9A%84%E7%94%9F%E5%91%BD%E5%91%A8%E6%9C%9F)Task 的生命周期
下面我们将进一步对 Task 运行的生命周期进行分析。
<a name="9737f450"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6)任务调度
当 `JobGraph` 被提交到 `JobMaster` 之后,首先会生成 `ExecutionGraph`,这是任务调度时使用的调度执行图。然后通过 `ExecutionGraph#scheduleForExecution` 方法开始调度各个子任务。
|
class ExecutionGraph {
public void scheduleForExecution() throws JobException {
assertRunningInJobMasterMainThread();
final long currentGlobalModVersion = globalModVersion;
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
final CompletableFuture newSchedulingFuture;
switch (scheduleMode) {
//调度任务
case LAZY_FROM_SOURCES: //只运行 source,其它的子任务由source进行通知
newSchedulingFuture = scheduleLazy(slotProvider);
break;
case EAGER: //所有的子任务都立即进行调度,这是 streaming 模式采用的方式
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;
default:
throw new JobException(“Schedule mode is invalid.”);
}
if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;
newSchedulingFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
});
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException(“Job may only be scheduled from state “ + JobStatus.CREATED);
}
}
}
|
| --- |
在调度执行的时候,首先所有的子任务都需要先向 `Scheduler` 申请 slot 资源(关于计算资源的管理可以参考前面的文章),当所有需要调度的子任务都分配到 slot 资源后,才正式开始调度任务:
|
class ExecutionGraph {
private CompletableFuture scheduleEager(SlotProvider slotProvider, final Time timeout) {
assertRunningInJobMasterMainThread();
checkState(state == JobStatus.RUNNING, “job is not running currently”);
// collecting all the slots may resize and fail in that operation without slots getting lost
final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
final Set allPreviousAllocationIds =
Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return futures
Collection> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL,
allPreviousAllocationIds,
timeout);
allAllocationFutures.addAll(allocationFutures);
}
// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
// 等待所有需要调度的子任务都分配到资源
final ConjunctFuture> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
return allAllocationsFuture.thenAccept(
(Collection executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
//启动 Execution
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format(“Could not deploy execution %s.”, execution),
t));
}
}
})
.exceptionally(……)
}
}
|
| --- |
`Execution` 是 `ExecutionVertex` 的一次执行,在调度的时候会先生成对任务的描述 `TaskDeploymentDescription`, `TaskDeploymentDescription` 包含了对输入的描述 `InputGateDeploymentDescriptor`, 对输出的描述 `ResultPartitionDeploymentDescriptor`,以及保存了这个 Task 中运行的所有算子运行时信息的 `TaskInformation` 和 `JobInformation`。生成了 `TaskDeploymentDescription` 通过 RPC 调用提交给 `TaskExecutor` 执行。
|
class Execution {
public void deploy() throws JobException {
……
try {
// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
slot.releaseSlot(new FlinkException(“Actual state of execution “ + this + “ (“ + state + “) does not match expected state DEPLOYING.”));
return;
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);
// null taskRestore to let it be GC’ed
taskRestore = null;
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
// We run the submission in the future executor so that the serialization of large TDDs does not block
// the main thread and sync back to the main thread once submission is completed.
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + “ (“ + attemptId + ‘)’;
markFailed(new Exception(
“Cannot deploy task “ + taskname + “ - TaskManager (“ + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
}
|
| --- |
<a name="d7dc80d2"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E5%90%AF%E5%8A%A8-task-%E7%BA%BF%E7%A8%8B)启动 Task 线程
当 `TaskDeploymentDescription` 被提交到 `TaskExecutor` 后,`TaskExecutor` 会据此创建一个 `Task` 对象,并在构造函数中完成一些初始化操作,如根据 `InputGateDeploymentDescriptor` 创建 `InputGate`,根据 `ResultPartitionDeploymentDescriptor` 创建 `ResultPartition`。<br />`Task` 实现了 `Runnable` 接口,每个 Task 都会在一个单独的线程中运行,Task 的启动流程包括:
1. 首先完成状态的初始化 `ExecutionState.CREATED` -> `ExecutionState.DEPLOYING`
1. 任务装配
- 创建一个用户加载用户代码的类加载器
- 通过反序列化得到 `ExecutionConfig`,从 `ExecutionConfig` 中可以的到所有算子相关的信息
- 向网络栈中注册 `Task`,为 `ResultPartition` 和 `InputGate` 分配缓冲池
- 初始化用户代码,通过反射得到 `AbstractInvokable`(`StreamTask`) 实例
3. 执行任务
- 状态转换 `ExecutionState.DEPLOYING` -> `ExecutionState.RUNNING`
- 调用 `AbstractInvokable.invoke()` 启动任务
如下:
|
class Task {
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
// ----------------------------
// Initial State transition
// ----------------------------
while (true) {
ExecutionState current = this.executionState;
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
}
...... //handle other state
}
// all resource acquisitions and registrations from here on
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
AbstractInvokable invokable = null;
try {
......
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
network.registerTask(this);
for (ResultPartition partition : producedPartitions) {
taskEventDispatcher.registerPartition(partition.getPartitionId());
}
Environment env = new RuntimeEnvironment(.......)
// now load and instantiate the task's invokable code
// nameOfInvokableClass 是 JobVertex 的 invokableClassName,
// 每一个 StreamNode 在添加的时候都会有一个 jobVertexClass 属性
// 对于一个 operator chain,就是 head operator 对应的 invokableClassName,见 StreamingJobGraphGenerator.createChain
// 通过反射创建 AbstractInvokable 对象
// 对于 Stream 任务而言,就是 StreamTask 的子类,SourceStreamTask、OneInputStreamTask、TwoInputStreamTask 等
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
// 运行
invokable.invoke();
// make sure, we enter the catch block if the task leaves the invoke() method due
// to the fact that it has been canceled
if (isCanceledOrFailed()) {
throw new CancelTaskException();
}
// 正常结束
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartition partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
}
// try to mark the task as finished
// if that fails, the task was canceled/failed in the meantime
if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
} catch (Throwable t) {
......
} finally {
......
}
}
}
|
| --- |
<a name="streamtask"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#streamtask)StreamTask
`AbstractInvokable` 是 `Task` 执行的主要逻辑,也是所有被执行的任务的基类,包括 Streaming 模式和 Batch 模式。在 Streaming 模式下,所有任务都继承自 `StreamTask`,包括 `StreamTask` 的子类包括 `SourceStreamTask`, `OneInputStreamTask`, `TwoInputStreamTask`, 以及用于迭代模式下的 `StreamIterationHead` 和 `StreamIterationTail`。<br />每一个 `StreamNode` 在添加到 `StreamGraph` 的时候都会有一个关联的 `jobVertexClass` 属性,这个属性就是该 `StreamNode` 对应的 `StreamTask` 类型;对于一个 `OperatorChain` 而言,它所对应的 `StreamTask` 就是其 head operator 对应的 `StreamTask`。<br />`StreamTask` 完整的生命周期包括:
- 创建状态存储后端,为 OperatorChain 中的所有算子提供状态
- 加载 OperatorChain 中的所有算子
- 所有的 operator 调用 `setup`
- task 相关的初始化操作
- 所有 operator 调用 `initializeState` 初始化状态
- 所有的 operator 调用 `open`
- `run` 方法循环处理数据
- 所有 operator 调用 `close`
- 所有 operator 调用 `dispose`
- 通用的 cleanup 操作
- task 相关的 cleanup 操作
主要代码如下:
|
abstract class StreamTask {
@Override
public final void invoke() throws Exception {
boolean disposed = false;
try {
// ———— Initialize ————-
//创建状态存储后端
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
“Time Trigger for “ + getName(), getUserCodeClassLoader());
timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
}
//创建 OperatorChain,会加载每一个 operator,并调用 setup 方法
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
// 和具体 StreamTask 子类相关的初始化操作
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.
//状态初始化
initializeState();
//open
openAllOperators();
}
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
isRunning = true;
//开始处理数据,这里通常是个循环
run();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the “clean shutdown” is not attempted
if (canceled) {
throw new CancelTaskException();
}
synchronized (lock) {
// this is part of the main logic, so if this fails, the task is considered failed
closeAllOperators();
// make sure no new timers can come
timerService.quiesce();
// only set the StreamTask to not running after all operators have been closed!
// See FLINK-7430
isRunning = false;
}
// make sure all timers finish
timerService.awaitPendingAfterQuiesce();
// make sure all buffered data is flushed
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
tryDisposeAllOperators();
disposed = true;
} finally {
//cleanup
}
}
}
|
| --- |
<a name="operatorchain"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#operatorchain)OperatorChain
前面已经提了很多次 `OperatorChain`,下面我们就看下 `OperatorChain` 是如何加载的,主要逻辑都在 `OperatorChain` 的构造函数中:
|
public class OperatorChain> implements StreamStatusMaintainer {
public OperatorChain(
StreamTask containingTask,
List>>> recordWriters) {
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
//head operator
headOperator = configuration.getStreamOperator(userCodeClassloader);
//OperatorChain 内部所有的 operator 的配置
// we read the chained configs, and the order of record writer registrations by output name
Map chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
// 所有的输出边,这是对外输出,不包含内部 operator 之间的的数据传输
List outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
Map> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try {
//对外输出的 RecordWriterOutput
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// we create the chain of operators and grab the collector that leads into the chain
List> allOps = new ArrayList<>(chainedConfigs.size());
//这里会递归调用,为 OperatorChain 内部的所有的 Operator 都创建 output
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps);
if (headOperator != null) {
//chainEntryPoint 是 headOperator 的 output
WatermarkGaugeExposingOutput> output = getChainEntryPoint();
//header operator 调用 setup 方法
headOperator.setup(containingTask, configuration, output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
}
// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
success = true;
}
finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}
//创建 output collector
private WatermarkGaugeExposingOutput> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map chainedConfigs,
ClassLoader userCodeClassloader,
Map> streamOutputs,
List> allOperators) {
List>, StreamEdge>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings(“unchecked”)
RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// Create collectors for the chained outputs
// OperatorChain 内部 Operator 之间的边
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
//创建当前节点的下游节点,并返回当前节点的 output
//createChainedOperator 在创建 operator 的时候,会调用 createOutputCollector 为 operator 创建 output
//随意会形成递归调用关系,所有的 operator 以及它们的 output 都会被创建出来
WatermarkGaugeExposingOutput> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// if there are multiple outputs, or the outputs are directed, we need to
// wrap them as one output
List> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
if (selectors == null || selectors.isEmpty()) {
// simple path, no selector necessary
//只有一个输出
if (allOutputs.size() == 1) {
return allOutputs.get(0).f0;
}
else {
//不止有一个输出,需要使用 BroadcastingOutputCollector 进行封装
// send to N outputs. Note that this includes the special case
// of sending to zero outputs
@SuppressWarnings({“unchecked”, “rawtypes”})
Output>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
asArray[i] = allOutputs.get(i).f0;
}
// This is the inverse of creating the normal ChainingOutput.
// If the chaining output does not copy we need to copy in the broadcast output,
// otherwise multi-chaining would not work correctly.
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return new CopyingBroadcastingOutputCollector<>(asArray, this);
} else {
return new BroadcastingOutputCollector<>(asArray, this);
}
}
}
else {
// selector present, more complex routing necessary
// 存在 selector,用 DirectedOutput 进行封装
// This is the inverse of creating the normal ChainingOutput.
// If the chaining output does not copy we need to copy in the broadcast output,
// otherwise multi-chaining would not work correctly.
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return new CopyingDirectedOutput<>(selectors, allOutputs);
} else {
return new DirectedOutput<>(selectors, allOutputs);
}
}
}
private WatermarkGaugeExposingOutput> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map chainedConfigs,
ClassLoader userCodeClassloader,
Map> streamOutputs,
List> allOperators,
OutputTag outputTag) {
// create the output that the operator writes to first. this may recursively create more operators
// 为当前 Operator 创建 output
WatermarkGaugeExposingOutput> chainedOperatorOutput = createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);
// now create the operator and give it the output collector to write its output to
//从 StreamConfig 中取出当前 Operator
OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);
allOperators.add(chainedOperator);
//这里是在为当前 operator 前向的 operator 创建 output
//所以当前 operator 被传递给前一个 operator 的 output,这样前一个 operator 的输出就可以直接调用当前 operator
WatermarkGaugeExposingOutput> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
// wrap watermark gauges since registered metrics must be unique
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}
}
|
| --- |
这里的主要逻辑其实就是递归地创建 `OpeartorChain` 内部所有的 `StreamOperator`,并为每一个 `StreamOperator` 创建 `Output` collecto,结合本文上面对 `Output` 的介绍应该就很容易理解了。
<a name="ec6c0069"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E5%87%A0%E7%B1%BB%E4%B8%8D%E5%90%8C%E7%9A%84-streamtask)几类不同的 StreamTask
`StreamTask` 的 `init` 方法和 `run` 方法等都是在子类中自行实现的。下面我们先主要看先 `SourceStramTask`, `OneInputStreamTask` 和 `TwoInputStreamTask`。对于在迭代场景下使用的 `StreamIterationHead` 和 `StreamIterationTail` 这里先不加以介绍了,留在后面分析迭代任务的实现时再进行说明。
<a name="sourcestreamtask"></a>
#### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#sourcestreamtask)SourceStreamTask
顾名思义,`SourceStreamTask` 负责为下游任务生成数据,因此它没有输入,只负责对外输出记录。
|
public class SourceStreamTask, OP extends StreamSource>
extends StreamTask {
protected void init() {
// we check if the source is actually inducing the checkpoints, rather
// than the trigger
SourceFunction<?> source = headOperator.getUserFunction();
// 如果用户提供的 SourceFunction 是 ExternallyInducedSource,则需要创建一个 CheckpointTrigger 对象提供给 ExternallyInducedSource
if (source instanceof ExternallyInducedSource) {
externallyInducedCheckpoints = true;
ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() {
@Override
public void triggerCheckpoint(long checkpointId) throws FlinkException {
// TODO - we need to see how to derive those. We should probably not encode this in the
// TODO - source’s trigger message, but do a handshake in this task between the trigger
// TODO - message from the master, and the source’s trigger notification
final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
final long timestamp = System.currentTimeMillis();
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
try {
SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions, false);
}
catch (RuntimeException | FlinkException e) {
throw e;
}
catch (Exception e) {
throw new FlinkException(e.getMessage(), e);
}
}
};
((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
}
}
@Override
protected void run() throws Exception {
// 对source而言,就是调用 head operator 的 run 方法
//head operator 是一个 StreamSource,最终会调用用户提供的 SourceFunction 的 run 方法,一般是一个循环
//head operator 通过 Output 将数据传递给下游的算子
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
}
|
| --- |
<a name="oneinputstreamtask"></a>
#### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#oneinputstreamtask)OneInputStreamTask
对于 `OneInputStreamTask`,它的主要执行逻辑就是不断循环调用 `StreamInputProcessor.processInpt()` 方法。<br />`StreamInputProcessor` 从缓冲区中读取记录或 watermark 等消息,然后调用 `streamOperator.processElement(record)` 交给 head operator 进行处理,并依次将处理结果交给下游算子。
|
public class OneInputStreamTask extends StreamTask> {
public void init() throws Exception {
//创建一个 StreamInputProcessor
StreamConfig configuration = getConfiguration();
TypeSerializer inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor inputProcessor = this.inputProcessor;
//循环调用 StreamInputProcessor.processInput 方法
while (running && inputProcessor.processInput()) {
// all the work happens in the “processInput” method
}
}
}
public class StreamInputProcessor {
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
} catch (Exception e) {
LOG.warn(“An exception occurred during the metrics setup.”, e);
numRecordsIn = new SimpleCounter();
}
}
//这里虽然是一个while循环,但其实只会处理一条记录,因为单条记录可能需要多个 buffer 传输
while (true) {
if (currentRecordDeserializer != null) {
//反序列化
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
//如果buffer里面的数据已经被消费了,则归还buffer
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
//得到了一条完整的记录
StreamElement recordOrMark = deserializationDelegate.getInstance();
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
// now we can do the actual processing
//是一条正常的记录,调用 operator 的处理方法,最终会调用用户自定义的函数的处理方法
StreamRecord record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
//处理完一条记录,结束本次调用
return true;
}
}
}
//获取下一个 BufferOrEvent,这是个阻塞的调用
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
//如果是Buffer,要确定是哪个 channel 的,然后用对应 channel 的反序列化器解析
//不同channel在反序列化的时候不能混淆
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException(“Unexpected event: “ + event);
}
}
}
else {
//表明上游结束了
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException(“Trailing data in checkpoint barrier handler.”);
}
return false;
}
}
}
}
```
|
| —- |
TwoInputStreamTask
TwoInputStreamTask
和 OneInputStreamTask
的处理逻辑类似,只是要对两个上游的输入分别调用 TwoInputStreamOperator.processElement1
和 TwoInputStreamOperator.processElement2
进行处理。这里就不再赘述了。
小节
Task 是 Flink 任务调度的最小单位。本文简要地介绍了 Task 的生命周期以及数据的处理的基本模式。通过 StreamTask -> StreamOperator -> User-define-function 这样的封装,用户自定义的数据处理逻辑最终得以调度执行。
参考
-EOF-