• Task 和 OperatorChain">Task 和 OperatorChain
    • TwoInputStreamTask">TwoInputStreamTask
  • 小节">小节
  • 参考">参考

    一个 Flink Job 提交到集群中运行时,会被调度为不同的 Task。在前面的文章中,我们已经介绍了 Flink 如何根据用户的编写的程序生成调度用的执行图,如何为 Task 分配计算资源,以及 Task 之间如何进行数据交换。在这篇文章中,我们将跟踪一个 Task 的完整的生命周期,进一步加深对 Flink 执行过程的理解。

    Task 和 OperatorChain

    在前面介绍如何生成 JobGraph 的文章中,我们已经了解了 Flink 会尽可能把能够 chaining 到一起的算子串联在一起,形成 OperatorChain,对应一个 JobVertex
    两个 Operator 能够串联在一起的条件包括

    | ``` class StreamingJobGraphGenerator { public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    1. StreamOperator<?> headOperator = upStreamVertex.getOperator();
    2. StreamOperator<?> outOperator = downStreamVertex.getOperator();
    3. return downStreamVertex.getInEdges().size() == 1 _ && outOperator != **null**
    4. && headOperator != **null**
    5. && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) _ _ && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
    6. && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD &#124;&#124;
    7. headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
    8. _ && (edge.getPartitioner() instanceof ForwardPartitioner)
    9. _ && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
    10. _ && streamGraph.isChainingEnabled();
    11. }


    1. |
    2. | --- |
    3. `JobGraph` JobManager 中进一步被转换为可供调度的并行化版本的 `ExecutionGraph`,其中 `JobVertex` 被展开为并行化版本的 `ExecutionVertex`,每一个 `ExecutionVertex` 对应 `JobVertex` 的一个并行子任务,它的每一次调度对应一个 `Execution`,即 TaskManager 中的一个 `Task`。所以,一个 `Task` 运行期间的主要处理逻辑对应一个 `OperatorChain`,这个 `OperatorChain` 可能包含多个 Operator,也可能只有一个 Operator
    4. <a name="db44d034"></a>
    5. ## [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#operatorchain-%E5%86%85%E9%83%A8%E7%9A%84%E6%95%B0%E6%8D%AE%E4%BC%A0%E9%80%92)OperatorChain 内部的数据传递
    6. 在前面的文章中,我们已经介绍过不同的 `Task` 之间如何通过网络栈进行数据交换,并对 `Task` 之间应对”反压”的机制进行了分析。现在我们知道,在一个 `Task` 内部同样可能包含多个不同的算子,这些算子处理数据的主要逻辑由用户提供的自定义函数(UDF)实现,那么上游算子处理之后的记录如何传递给下游算子呢?既然一个 `Task` 是一个独立的线程,多个算子的计算逻辑是依次执行的,那么很直观的想法就是直接通过函数调用的参数来数据。我们看下 Flink 内部是如何处理的。<br />首先,要看一下 `Output` 接口,`Output` 接口继承自 `Collector` 接口,用于接受 Operator 提交的数据:

    public interface Output extends Collector { /**

    1. * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
    2. * operators.
    3. *
    4. * <p>A watermark specifies that no element with a timestamp lower or equal to the watermark
    5. * timestamp will be emitted in the future.
    6. */
    7. void emitWatermark(Watermark mark);
    8. /**
    9. * Emits a record the side output identified by the given {@link OutputTag}.
    10. *
    11. * @param record The record to collect.
    12. */
    13. <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);
    14. void emitLatencyMarker(LatencyMarker latencyMarker);


    1. `OperatorChain` 内部还有一个 `WatermarkGaugeExposingOutput` 接口继承自 `Output`,它主要是额外提供了一个获取 watermark 值的方法:

    public interface WatermarkGaugeExposingOutput extends Output { Gauge getWatermarkGauge(); }

    1. 每一个 `StreamOperator` 都有一个 `Output` 成员,用于收集当前算子处理完的记录,比如在 `StreamMap` 中:

    public class StreamMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { @Override public void processElement(StreamRecord element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); }

    1. `StreamFilter` 中:
    2. |

    public class StreamFilter extends AbstractUdfStreamOperator> implements OneInputStreamOperator { @Override public void processElement(StreamRecord element) throws Exception { if (userFunction.filter(element.getValue())) { output.collect(element); } } }

    1. |
    2. | --- |
    3. `StreamFlatMap` 中:
    4. |

    public class StreamFlatMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { private transient TimestampedCollector collector; public StreamFlatMap(FlatMapFunction flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); } }

    1. |
    2. | --- |
    3. 那么 `Output` 又是怎么处理算子提交的记录的呢?这就需要进一步看一下 `Output` 的具体实现类。<br />`OperatorChain` 的内部类 `ChainingOutput` 实现了 `WatermarkGaugeExposingOutput` 接口,它持有一个 `OneInputStreamOperator`, `OperatorChain` 中当前算子的下游算子。当 `ChainingOutput` 接收到当前算子提交的数据时,直接将调用下游算子的 `processElement` 方法:
    4. |

    class ChainingOutput implements WatermarkGaugeExposingOutput> { protected final OneInputStreamOperator operator; //这是下游算子 @Override public void collect(StreamRecord record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } pushToOperator(record); } @Override public void collect(OutputTag outputTag, StreamRecord record) { //如果有 OutputTag, 则要求 OutputTag 匹配才会转发记录 if (this.outputTag == null || !this.outputTag.equals(outputTag)) { return; } pushToOperator(record); } protected void pushToOperator(StreamRecord record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator expects. @SuppressWarnings(“unchecked”) StreamRecord castRecord = (StreamRecord) record; numRecordsIn.inc(); //直接调用下游算子的 processElement 方法 operator.setKeyContextElement1(castRecord); operator.processElement(castRecord); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }

    1. |
    2. | --- |
    3. 通过在 `ChainingOutput` 中保存下游 `StreamOperator` 的引用,`ChainingOutput` 直接将对象的引用传递给下游算子。但是 `ExecutionConfig` 有一个配置项,即 `objectReuse`,在默认情况下会禁止对象重用。如果不允许对象重用,则不会使用 `ChainingOutput`,而是会使用 `CopyingChainingOutput`。顾名思义,它和 `ChainingOutput` 的区别在于,它会对记录进行拷贝后传递给下游算子。<br />`BroadcastingOutputCollector` 封装了一组 `Output`, `Output<StreamRecord<T>>[] outputs`, 在接收到 `StreamRecord` 时,会将消息提交到所有的 内部所有的 `Output` 中。`BroadcastingOutputCollector` 主要用在当前算子有多个下游算子的情况下。与此对应的还有一个 `CopyingBroadcastingOutputCollector`。<br />`DirectedOutput` 基于 `OutputSelector<OUT>[] outputSelectors` 选择要转发的目标 `Output`,主要是在 split/select 的情况下使用。与 `DirectedOutput` 对应的也有一个 `CopyingDirectedOutput`。<br />对于位于 `OperatorChain` 末尾的算子,它处理过的记录需要被其它 `Task` 消费,因此它的记录需要被写入 `ResultPartition` 。因此,Flink 提供了 `RecordWriterOutput`,它也实现了 `WatermarkGaugeExposingOutput` 但是它是通过 `RecordWriter` 输出接收到的消息记录。`RecordWriter` `ResultPartitionWriter` 的一层包装,提供了将记录序列化到 buffer 中的功能。
    4. <a name="18f3e5b9"></a>
    5. ## [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#task-%E7%9A%84%E7%94%9F%E5%91%BD%E5%91%A8%E6%9C%9F)Task 的生命周期
    6. 下面我们将进一步对 Task 运行的生命周期进行分析。
    7. <a name="9737f450"></a>
    8. ### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6)任务调度
    9. `JobGraph` 被提交到 `JobMaster` 之后,首先会生成 `ExecutionGraph`,这是任务调度时使用的调度执行图。然后通过 `ExecutionGraph#scheduleForExecution` 方法开始调度各个子任务。
    10. |

    class ExecutionGraph { public void scheduleForExecution() throws JobException { assertRunningInJobMasterMainThread(); final long currentGlobalModVersion = globalModVersion; if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { final CompletableFuture newSchedulingFuture; switch (scheduleMode) { //调度任务 case LAZY_FROM_SOURCES: //只运行 source,其它的子任务由source进行通知 newSchedulingFuture = scheduleLazy(slotProvider); break; case EAGER: //所有的子任务都立即进行调度,这是 streaming 模式采用的方式 newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); break; default: throw new JobException(“Schedule mode is invalid.”); } if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { schedulingFuture = newSchedulingFuture; newSchedulingFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null && !(throwable instanceof CancellationException)) { // only fail if the scheduling future was not canceled failGlobal(ExceptionUtils.stripCompletionException(throwable)); } }); } else { newSchedulingFuture.cancel(false); } } else { throw new IllegalStateException(“Job may only be scheduled from state “ + JobStatus.CREATED); } } }

    1. |
    2. | --- |
    3. 在调度执行的时候,首先所有的子任务都需要先向 `Scheduler` 申请 slot 资源(关于计算资源的管理可以参考前面的文章),当所有需要调度的子任务都分配到 slot 资源后,才正式开始调度任务:
    4. |

    class ExecutionGraph { private CompletableFuture scheduleEager(SlotProvider slotProvider, final Time timeout) { assertRunningInJobMasterMainThread(); checkState(state == JobStatus.RUNNING, “job is not running currently”); // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); final Set allPreviousAllocationIds = Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling()); // allocate the slots (obtain all their futures for (ExecutionJobVertex ejv : getVerticesTopologically()) { // these calls are not blocking, they only return futures Collection> allocationFutures = ejv.allocateResourcesForAll( slotProvider, queued, LocationPreferenceConstraint.ALL, allPreviousAllocationIds, timeout); allAllocationFutures.addAll(allocationFutures); } // this future is complete once all slot futures are complete. // the future fails once one slot future fails. // 等待所有需要调度的子任务都分配到资源 final ConjunctFuture> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); return allAllocationsFuture.thenAccept( (Collection executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { try { //启动 Execution execution.deploy(); } catch (Throwable t) { throw new CompletionException( new FlinkException( String.format(“Could not deploy execution %s.”, execution), t)); } } }) .exceptionally(……) } }

    1. |
    2. | --- |
    3. `Execution` `ExecutionVertex` 的一次执行,在调度的时候会先生成对任务的描述 `TaskDeploymentDescription` `TaskDeploymentDescription` 包含了对输入的描述 `InputGateDeploymentDescriptor`, 对输出的描述 `ResultPartitionDeploymentDescriptor`,以及保存了这个 Task 中运行的所有算子运行时信息的 `TaskInformation` `JobInformation`。生成了 `TaskDeploymentDescription` 通过 RPC 调用提交给 `TaskExecutor` 执行。
    4. |

    class Execution { public void deploy() throws JobException { …… try { // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { slot.releaseSlot(new FlinkException(“Actual state of execution “ + this + “ (“ + state + “) does not match expected state DEPLOYING.”)); return; } final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber); // null taskRestore to let it be GC’ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor(); // We run the submission in the future executor so that the serialization of large TDDs does not block // the main thread and sync back to the main thread once submission is completed. CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor) .thenCompose(Function.identity()) .whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + “ (“ + attemptId + ‘)’; markFailed(new Exception( “Cannot deploy task “ + taskname + “ - TaskManager (“ + getAssignedResourceLocation()

    1. + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
    2. } else {
    3. markFailed(failure);
    4. }
    5. }
    6. },
    7. jobMasterMainThreadExecutor);
    8. }
    9. catch (Throwable t) {
    10. markFailed(t);
    11. ExceptionUtils.rethrow(t);
    12. }
    13. }


    1. |
    2. | --- |
    3. <a name="d7dc80d2"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E5%90%AF%E5%8A%A8-task-%E7%BA%BF%E7%A8%8B)启动 Task 线程
    5. `TaskDeploymentDescription` 被提交到 `TaskExecutor` 后,`TaskExecutor` 会据此创建一个 `Task` 对象,并在构造函数中完成一些初始化操作,如根据 `InputGateDeploymentDescriptor` 创建 `InputGate`,根据 `ResultPartitionDeploymentDescriptor` 创建 `ResultPartition`。<br />`Task` 实现了 `Runnable` 接口,每个 Task 都会在一个单独的线程中运行,Task 的启动流程包括:
    6. 1. 首先完成状态的初始化 `ExecutionState.CREATED` -> `ExecutionState.DEPLOYING`
    7. 1. 任务装配
    8. - 创建一个用户加载用户代码的类加载器
    9. - 通过反序列化得到 `ExecutionConfig`,从 `ExecutionConfig` 中可以的到所有算子相关的信息
    10. - 向网络栈中注册 `Task`,为 `ResultPartition` `InputGate` 分配缓冲池
    11. - 初始化用户代码,通过反射得到 `AbstractInvokable`(`StreamTask`) 实例
    12. 3. 执行任务
    13. - 状态转换 `ExecutionState.DEPLOYING` -> `ExecutionState.RUNNING`
    14. - 调用 `AbstractInvokable.invoke()` 启动任务
    15. 如下:
    16. |

    class Task { /**

    1. * The core work method that bootstraps the task and executes its code.
    2. */
    3. @Override
    4. public void run() {
    5. // ----------------------------
    6. // Initial State transition
    7. // ----------------------------
    8. while (true) {
    9. ExecutionState current = this.executionState;
    10. if (current == ExecutionState.CREATED) {
    11. if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
    12. // success, we can start our work
    13. break;
    14. }
    15. }
    16. ...... //handle other state
    17. }
    18. // all resource acquisitions and registrations from here on
    19. // need to be undone in the end
    20. Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
    21. AbstractInvokable invokable = null;
    22. try {
    23. ......
    24. userCodeClassLoader = createUserCodeClassloader();
    25. final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
    26. network.registerTask(this);
    27. for (ResultPartition partition : producedPartitions) {
    28. taskEventDispatcher.registerPartition(partition.getPartitionId());
    29. }
    30. Environment env = new RuntimeEnvironment(.......)
    31. // now load and instantiate the task's invokable code
    32. // nameOfInvokableClass 是 JobVertex 的 invokableClassName,
    33. // 每一个 StreamNode 在添加的时候都会有一个 jobVertexClass 属性
    34. // 对于一个 operator chain,就是 head operator 对应的 invokableClassName,见 StreamingJobGraphGenerator.createChain
    35. // 通过反射创建 AbstractInvokable 对象
    36. // 对于 Stream 任务而言,就是 StreamTask 的子类,SourceStreamTask、OneInputStreamTask、TwoInputStreamTask 等
    37. invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
    38. // we must make strictly sure that the invokable is accessible to the cancel() call
    39. // by the time we switched to running.
    40. this.invokable = invokable;
    41. // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
    42. if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
    43. throw new CancelTaskException();
    44. }
    45. // notify everyone that we switched to running
    46. taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
    47. // make sure the user code classloader is accessible thread-locally
    48. executingThread.setContextClassLoader(userCodeClassLoader);
    49. // run the invokable
    50. // 运行
    51. invokable.invoke();
    52. // make sure, we enter the catch block if the task leaves the invoke() method due
    53. // to the fact that it has been canceled
    54. if (isCanceledOrFailed()) {
    55. throw new CancelTaskException();
    56. }
    57. // 正常结束
    58. // finish the produced partitions. if this fails, we consider the execution failed.
    59. for (ResultPartition partition : producedPartitions) {
    60. if (partition != null) {
    61. partition.finish();
    62. }
    63. }
    64. // try to mark the task as finished
    65. // if that fails, the task was canceled/failed in the meantime
    66. if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
    67. throw new CancelTaskException();
    68. }
    69. } catch (Throwable t) {
    70. ......
    71. } finally {
    72. ......
    73. }
    74. }


    1. |
    2. | --- |
    3. <a name="streamtask"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#streamtask)StreamTask
    5. `AbstractInvokable` `Task` 执行的主要逻辑,也是所有被执行的任务的基类,包括 Streaming 模式和 Batch 模式。在 Streaming 模式下,所有任务都继承自 `StreamTask`,包括 `StreamTask` 的子类包括 `SourceStreamTask`, `OneInputStreamTask`, `TwoInputStreamTask`, 以及用于迭代模式下的 `StreamIterationHead` `StreamIterationTail`。<br />每一个 `StreamNode` 在添加到 `StreamGraph` 的时候都会有一个关联的 `jobVertexClass` 属性,这个属性就是该 `StreamNode` 对应的 `StreamTask` 类型;对于一个 `OperatorChain` 而言,它所对应的 `StreamTask` 就是其 head operator 对应的 `StreamTask`。<br />`StreamTask` 完整的生命周期包括:
    6. - 创建状态存储后端,为 OperatorChain 中的所有算子提供状态
    7. - 加载 OperatorChain 中的所有算子
    8. - 所有的 operator 调用 `setup`
    9. - task 相关的初始化操作
    10. - 所有 operator 调用 `initializeState` 初始化状态
    11. - 所有的 operator 调用 `open`
    12. - `run` 方法循环处理数据
    13. - 所有 operator 调用 `close`
    14. - 所有 operator 调用 `dispose`
    15. - 通用的 cleanup 操作
    16. - task 相关的 cleanup 操作
    17. 主要代码如下:
    18. |

    abstract class StreamTask { @Override public final void invoke() throws Exception { boolean disposed = false; try { // ———— Initialize ————- //创建状态存储后端 stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, “Time Trigger for “ + getName(), getUserCodeClassLoader()); timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } //创建 OperatorChain,会加载每一个 operator,并调用 setup 方法 operatorChain = new OperatorChain<>(this, recordWriters); headOperator = operatorChain.getHeadOperator(); // 和具体 StreamTask 子类相关的初始化操作 init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. //状态初始化 initializeState(); //open openAllOperators(); } // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; //开始处理数据,这里通常是个循环 run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the “clean shutdown” is not attempted if (canceled) { throw new CancelTaskException(); } synchronized (lock) { // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); // make sure no new timers can come timerService.quiesce(); // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } // make sure all timers finish timerService.awaitPendingAfterQuiesce(); // make sure all buffered data is flushed operatorChain.flushOutputs(); // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail tryDisposeAllOperators(); disposed = true; } finally { //cleanup } } }

    1. |
    2. | --- |
    3. <a name="operatorchain"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#operatorchain)OperatorChain
    5. 前面已经提了很多次 `OperatorChain`,下面我们就看下 `OperatorChain` 是如何加载的,主要逻辑都在 `OperatorChain` 的构造函数中:
    6. |

    public class OperatorChain> implements StreamStatusMaintainer { public OperatorChain( StreamTask containingTask, List>>> recordWriters) { final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final StreamConfig configuration = containingTask.getConfiguration(); //head operator headOperator = configuration.getStreamOperator(userCodeClassloader); //OperatorChain 内部所有的 operator 的配置 // we read the chained configs, and the order of record writer registrations by output name Map chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader); // create the final output stream writers // we iterate through all the out edges from this job vertex and create a stream output // 所有的输出边,这是对外输出,不包含内部 operator 之间的的数据传输 List outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader); Map> streamOutputMap = new HashMap<>(outEdgesInOrder.size()); this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()]; // from here on, we need to make sure that the output writers are shut down again on failure boolean success = false; try { //对外输出的 RecordWriterOutput for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge outEdge = outEdgesInOrder.get(i); RecordWriterOutput<?> streamOutput = createStreamOutput( recordWriters.get(i), outEdge, chainedConfigs.get(outEdge.getSourceId()), containingTask.getEnvironment()); this.streamOutputs[i] = streamOutput; streamOutputMap.put(outEdge, streamOutput); } // we create the chain of operators and grab the collector that leads into the chain List> allOps = new ArrayList<>(chainedConfigs.size()); //这里会递归调用,为 OperatorChain 内部的所有的 Operator 都创建 output this.chainEntryPoint = createOutputCollector( containingTask, configuration, chainedConfigs, userCodeClassloader, streamOutputMap, allOps); if (headOperator != null) { //chainEntryPoint 是 headOperator 的 output WatermarkGaugeExposingOutput> output = getChainEntryPoint(); //header operator 调用 setup 方法 headOperator.setup(containingTask, configuration, output); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge()); } // add head operator to end of chain allOps.add(headOperator); this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]); success = true; } finally { // make sure we clean up after ourselves in case of a failure after acquiring // the first resources if (!success) { for (RecordWriterOutput<?> output : this.streamOutputs) { if (output != null) { output.close(); } } } } } //创建 output collector private WatermarkGaugeExposingOutput> createOutputCollector( StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map chainedConfigs, ClassLoader userCodeClassloader, Map> streamOutputs, List> allOperators) { List>, StreamEdge>> allOutputs = new ArrayList<>(4); // create collectors for the network outputs for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings(“unchecked”) RecordWriterOutput output = (RecordWriterOutput) streamOutputs.get(outputEdge); allOutputs.add(new Tuple2<>(output, outputEdge)); } // Create collectors for the chained outputs // OperatorChain 内部 Operator 之间的边 for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) { int outputId = outputEdge.getTargetId(); StreamConfig chainedOpConfig = chainedConfigs.get(outputId); //创建当前节点的下游节点,并返回当前节点的 output //createChainedOperator 在创建 operator 的时候,会调用 createOutputCollector 为 operator 创建 output //随意会形成递归调用关系,所有的 operator 以及它们的 output 都会被创建出来 WatermarkGaugeExposingOutput> output = createChainedOperator( containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag()); allOutputs.add(new Tuple2<>(output, outputEdge)); } // if there are multiple outputs, or the outputs are directed, we need to // wrap them as one output List> selectors = operatorConfig.getOutputSelectors(userCodeClassloader); if (selectors == null || selectors.isEmpty()) { // simple path, no selector necessary //只有一个输出 if (allOutputs.size() == 1) { return allOutputs.get(0).f0; } else { //不止有一个输出,需要使用 BroadcastingOutputCollector 进行封装 // send to N outputs. Note that this includes the special case // of sending to zero outputs @SuppressWarnings({“unchecked”, “rawtypes”}) Output>[] asArray = new Output[allOutputs.size()]; for (int i = 0; i < allOutputs.size(); i++) { asArray[i] = allOutputs.get(i).f0; } // This is the inverse of creating the normal ChainingOutput. // If the chaining output does not copy we need to copy in the broadcast output, // otherwise multi-chaining would not work correctly. if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { return new CopyingBroadcastingOutputCollector<>(asArray, this); } else { return new BroadcastingOutputCollector<>(asArray, this); } } } else { // selector present, more complex routing necessary // 存在 selector,用 DirectedOutput 进行封装 // This is the inverse of creating the normal ChainingOutput. // If the chaining output does not copy we need to copy in the broadcast output, // otherwise multi-chaining would not work correctly. if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { return new CopyingDirectedOutput<>(selectors, allOutputs); } else { return new DirectedOutput<>(selectors, allOutputs); } } } private WatermarkGaugeExposingOutput> createChainedOperator( StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map chainedConfigs, ClassLoader userCodeClassloader, Map> streamOutputs, List> allOperators, OutputTag outputTag) { // create the output that the operator writes to first. this may recursively create more operators // 为当前 Operator 创建 output WatermarkGaugeExposingOutput> chainedOperatorOutput = createOutputCollector( containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators); // now create the operator and give it the output collector to write its output to //从 StreamConfig 中取出当前 Operator OneInputStreamOperator chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput); allOperators.add(chainedOperator); //这里是在为当前 operator 前向的 operator 创建 output //所以当前 operator 被传递给前一个 operator 的 output,这样前一个 operator 的输出就可以直接调用当前 operator WatermarkGaugeExposingOutput> currentOperatorOutput; if (containingTask.getExecutionConfig().isObjectReuseEnabled()) { currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag); } else { TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader); currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this); } // wrap watermark gauges since registered metrics must be unique chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue); chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue); return currentOperatorOutput; } }

    1. |
    2. | --- |
    3. 这里的主要逻辑其实就是递归地创建 `OpeartorChain` 内部所有的 `StreamOperator`,并为每一个 `StreamOperator` 创建 `Output` collecto,结合本文上面对 `Output` 的介绍应该就很容易理解了。
    4. <a name="ec6c0069"></a>
    5. ### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#%E5%87%A0%E7%B1%BB%E4%B8%8D%E5%90%8C%E7%9A%84-streamtask)几类不同的 StreamTask
    6. `StreamTask` `init` 方法和 `run` 方法等都是在子类中自行实现的。下面我们先主要看先 `SourceStramTask`, `OneInputStreamTask` `TwoInputStreamTask`。对于在迭代场景下使用的 `StreamIterationHead` `StreamIterationTail` 这里先不加以介绍了,留在后面分析迭代任务的实现时再进行说明。
    7. <a name="sourcestreamtask"></a>
    8. #### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#sourcestreamtask)SourceStreamTask
    9. 顾名思义,`SourceStreamTask` 负责为下游任务生成数据,因此它没有输入,只负责对外输出记录。
    10. |

    public class SourceStreamTask, OP extends StreamSource> extends StreamTask { protected void init() { // we check if the source is actually inducing the checkpoints, rather // than the trigger SourceFunction<?> source = headOperator.getUserFunction(); // 如果用户提供的 SourceFunction 是 ExternallyInducedSource,则需要创建一个 CheckpointTrigger 对象提供给 ExternallyInducedSource if (source instanceof ExternallyInducedSource) { externallyInducedCheckpoints = true; ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() { @Override public void triggerCheckpoint(long checkpointId) throws FlinkException { // TODO - we need to see how to derive those. We should probably not encode this in the // TODO - source’s trigger message, but do a handshake in this task between the trigger // TODO - message from the master, and the source’s trigger notification final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); final long timestamp = System.currentTimeMillis(); final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); try { SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions, false); } catch (RuntimeException | FlinkException e) { throw e; } catch (Exception e) { throw new FlinkException(e.getMessage(), e); } } }; ((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook); } } @Override protected void run() throws Exception { // 对source而言,就是调用 head operator 的 run 方法 //head operator 是一个 StreamSource,最终会调用用户提供的 SourceFunction 的 run 方法,一般是一个循环 //head operator 通过 Output 将数据传递给下游的算子 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); } }

    1. |
    2. | --- |
    3. <a name="oneinputstreamtask"></a>
    4. #### [](https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#oneinputstreamtask)OneInputStreamTask
    5. 对于 `OneInputStreamTask`,它的主要执行逻辑就是不断循环调用 `StreamInputProcessor.processInpt()` 方法。<br />`StreamInputProcessor` 从缓冲区中读取记录或 watermark 等消息,然后调用 `streamOperator.processElement(record)` 交给 head operator 进行处理,并依次将处理结果交给下游算子。
    6. |

    public class OneInputStreamTask extends StreamTask> { public void init() throws Exception { //创建一个 StreamInputProcessor StreamConfig configuration = getConfiguration(); TypeSerializer inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); int numberOfInputs = configuration.getNumberOfInputs(); if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); inputProcessor = new StreamInputProcessor<>( inputGates, inSerializer, this, configuration.getCheckpointMode(), getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), inputWatermarkGauge); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); // wrap watermark gauge since registered metrics must be unique getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue); } protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor inputProcessor = this.inputProcessor; //循环调用 StreamInputProcessor.processInput 方法 while (running && inputProcessor.processInput()) { // all the work happens in the “processInput” method } } } public class StreamInputProcessor { public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn(“An exception occurred during the metrics setup.”, e); numRecordsIn = new SimpleCounter(); } } //这里虽然是一个while循环,但其实只会处理一条记录,因为单条记录可能需要多个 buffer 传输 while (true) { if (currentRecordDeserializer != null) { //反序列化 DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { //如果buffer里面的数据已经被消费了,则归还buffer currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { //得到了一条完整的记录 StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing //是一条正常的记录,调用 operator 的处理方法,最终会调用用户自定义的函数的处理方法 StreamRecord record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } //处理完一条记录,结束本次调用 return true; } } } //获取下一个 BufferOrEvent,这是个阻塞的调用 final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { //如果是Buffer,要确定是哪个 channel 的,然后用对应 channel 的反序列化器解析 //不同channel在反序列化的时候不能混淆 currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException(“Unexpected event: “ + event); } } } else { //表明上游结束了 isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException(“Trailing data in checkpoint barrier handler.”); } return false; } } } } ``` | | —- |


    TwoInputStreamTaskOneInputStreamTask 的处理逻辑类似,只是要对两个上游的输入分别调用 TwoInputStreamOperator.processElement1TwoInputStreamOperator.processElement2 进行处理。这里就不再赘述了。


    Task 是 Flink 任务调度的最小单位。本文简要地介绍了 Task 的生命周期以及数据的处理的基本模式。通过 StreamTask -> StreamOperator -> User-define-function 这样的封装,用户自定义的数据处理逻辑最终得以调度执行。

