本篇我们一起分析下Flink中流处理作业的初始化和执行逻辑。

AbstractInvokable

AbstractInvokable是TaskManager中运行的所有任务的父类。所有的读取上游数据,用户数据处理逻辑(map,filter算子以及用户自己编写的processFunction等等)和发送处理过的数据到下游相关逻辑都在该类的invoke方法中得到执行。

AbstractInvokable中与任务执行相关的2个方法为:

  • invoke方法:启动任务执行的入口方法。实现类必须重写这个方法。
  • cancel方法:任务被取消或者是用户终止任务的时候被调用

它有两个实现类:

  • BatchTask:所有批处理类型Task的基类。
  • StreamTask:所有流处理类型Task的基类。

我们以流处理为重点,下面详细介绍下StreamTask这个类。

AbstractInvokable的创建

在开始分析StreamTask之前我们需要了解下它是在何处,如何被创建出来的。
翻阅Task线程的处理逻辑,不难发现它的invoke变量初始化位于TaskdoRun方法。

  1. // now load and instantiate the task's invokable code
  2. invokable = loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(),
  3. nameOfInvokableClass, env);

这一行代码使用用户代码类加载器(userCodeClassLoader),调用目标类唯一参数为Environment类型的构造方法,创建出invokable对象。

  1. /**
  2. * Instantiates the given task invokable class, passing the given environment (and possibly
  3. * the initial task state) to the task's constructor.
  4. *
  5. * <p>The method will first try to instantiate the task via a constructor accepting both
  6. * the Environment and the TaskStateSnapshot. If no such constructor exists, and there is
  7. * no initial state, the method will fall back to the stateless convenience constructor that
  8. * accepts only the Environment.
  9. *
  10. * @param classLoader The classloader to load the class through.
  11. * @param className The name of the class to load.
  12. * @param environment The task environment.
  13. *
  14. * @return The instantiated invokable task object.
  15. *
  16. * @throws Throwable Forwards all exceptions that happen during initialization of the task.
  17. * Also throws an exception if the task class misses the necessary constructor.
  18. */
  19. private static AbstractInvokable loadAndInstantiateInvokable(
  20. ClassLoader classLoader,
  21. String className,
  22. Environment environment) throws Throwable {
  23. final Class<? extends AbstractInvokable> invokableClass;
  24. try {
  25. // 使用指定的classloader加载className对应的class,并转换为AbstractInvokable类型
  26. invokableClass = Class.forName(className, true, classLoader)
  27. .asSubclass(AbstractInvokable.class);
  28. } catch (Throwable t) {
  29. throw new Exception("Could not load the task's invokable class.", t);
  30. }
  31. Constructor<? extends AbstractInvokable> statelessCtor;
  32. try {
  33. // 获取构造函数
  34. statelessCtor = invokableClass.getConstructor(Environment.class);
  35. } catch (NoSuchMethodException ee) {
  36. throw new FlinkException("Task misses proper constructor", ee);
  37. }
  38. // instantiate the class
  39. try {
  40. //noinspection ConstantConditions --> cannot happen
  41. // 传入environment变量,创建出新的对象
  42. return statelessCtor.newInstance(environment);
  43. } catch (InvocationTargetException e) {
  44. // directly forward exceptions from the eager initialization
  45. throw e.getTargetException();
  46. } catch (Exception e) {
  47. throw new FlinkException("Could not instantiate the task's invokable class.", e);
  48. }
  49. }

StreamTask

StreamTask类是所有流处理任务的基类。Task由TaskManager部署和执行。Task是本地运行单元。每一个Task包含了一个或多个operator。这些operator在同一个OperatorChain中。

StreamTask任务执行生命周期包含:

  1. setInitialState:设置各个operator的初始状态。对应initializeState方法。
  2. 调用 invoke方法。

其中invoke方法包含的逻辑可细分为:

  • 创建出task相关配置,创建OperatorChain。
  • 执行operator的setup逻辑。
  • 执行task相关的初始化逻辑。
  • 加载并初始化operator的状态。
  • 调用各个operator的open方法。
  • 执行各个operator内的数据处理逻辑。
  • 关闭operator。
  • 销毁operator。
  • 任务清理操作。

下面我们从代码层面详细分析下invoke方法的处理流程。

invoke方法

本节我们分析StreamTask核心执行逻辑invoke方法。invoke方法如下所示:

  1. /**
  2. * Starts the execution.
  3. *
  4. * <p>Must be overwritten by the concrete task implementation. This method
  5. * is called by the task manager when the actual execution of the task
  6. * starts.
  7. *
  8. * <p>All resources should be cleaned up when the method returns. Make sure
  9. * to guard the code with <code>try-finally</code> blocks where necessary.
  10. *
  11. * @throws Exception
  12. * Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
  13. */
  14. public final void invoke() throws Exception {
  15. try {
  16. // 调用作业执行前相关准备逻辑
  17. beforeInvoke();
  18. // final check to exit early before starting to run
  19. // 如果任务被取消,抛出异常退出
  20. if (canceled) {
  21. throw new CancelTaskException();
  22. }
  23. // let the task do its work
  24. // 执行用户编写的task逻辑
  25. runMailboxLoop();
  26. // if this left the run() method cleanly despite the fact that this was canceled,
  27. // make sure the "clean shutdown" is not attempted
  28. // 再次检查如果任务被取消,抛出异常退出
  29. if (canceled) {
  30. throw new CancelTaskException();
  31. }
  32. // 执行调用后相关逻辑
  33. afterInvoke();
  34. }
  35. catch (Throwable invokeException) {
  36. failing = !canceled;
  37. try {
  38. cleanUpInvoke();
  39. }
  40. // TODO: investigate why Throwable instead of Exception is used here.
  41. catch (Throwable cleanUpException) {
  42. Throwable throwable = ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
  43. ExceptionUtils.rethrowException(throwable);
  44. }
  45. ExceptionUtils.rethrowException(invokeException);
  46. }
  47. // 执行invoke后清理操作
  48. cleanUpInvoke();
  49. }

beforeInvoke方法

beforeInvoke方法主要为task的初始化操作,包含创建OperatorChain,读取上游数据和下游数据输出配置等。详细内容如下:

  1. protected void beforeInvoke() throws Exception {
  2. disposedOperators = false;
  3. LOG.debug("Initializing {}.", getName());
  4. // 创建出OperatorChain
  5. // OperatorChain是JobGraph生成时的一箱优化措施
  6. // 将复合条件的多个StreamNode(对应数据变换操作)合并到一个chain中
  7. // 他们会被调度到同一个StreamTask中执行
  8. operatorChain = new OperatorChain<>(this, recordWriter);
  9. // 获取OperatorChain中第一个operator
  10. mainOperator = operatorChain.getMainOperator();
  11. // task specific initialization
  12. // 执行task专属的初始化工作
  13. // 这个是抽象方法
  14. // 具体逻辑需要在子类中实现
  15. init();
  16. // save the work of reloading state, etc, if the task is already canceled
  17. if (canceled) {
  18. throw new CancelTaskException();
  19. }
  20. // -------- Invoke --------
  21. LOG.debug("Invoking {}", getName());
  22. // we need to make sure that any triggers scheduled in open() cannot be
  23. // executed before all operators are opened
  24. // task动作必须在StreamTaskActionExecutor中执行,防止出现并发执行问题,影响checkpoint
  25. // 该executor实际为StreamTaskActionExecutor.IMMEDIATE,即在当前线程直接运行
  26. actionExecutor.runThrowing(() -> {
  27. // 创建SequentialChannelStateReader,用于读取checkpoint时保存的channel状态
  28. SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
  29. // TODO: for UC rescaling, reenable notifyAndBlockOnCompletion for non-iterative jobs
  30. // 获取ResultPartitionWriter状态
  31. reader.readOutputData(getEnvironment().getAllWriters(), false);
  32. // 初始化OperatorChain中所有的operator
  33. // 调用他们的initializeState(初始化状态)和open(包含初始化动作)方法
  34. operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
  35. channelIOExecutor.execute(() -> {
  36. try {
  37. // 获取InputGate状态
  38. reader.readInputData(getEnvironment().getAllInputGates());
  39. } catch (Exception e) {
  40. asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
  41. }
  42. });
  43. for (InputGate inputGate : getEnvironment().getAllInputGates()) {
  44. // 在inputGate状态被读取之后执行
  45. inputGate
  46. .getStateConsumedFuture()
  47. .thenRun(() ->
  48. // 在task线程中执行
  49. mainMailboxExecutor.execute(
  50. // 执行请求partition方法
  51. inputGate::requestPartitions, "Input gate request partitions"));
  52. }
  53. });
  54. // 水池状态为正在执行
  55. isRunning = true;
  56. }

runMailboxLoop方法

runMailboxLoop方法启动task的数据输入和处理逻辑:

  1. public void runMailboxLoop() throws Exception {
  2. mailboxProcessor.runMailboxLoop();
  3. }

MailBoxProcessorStreamTask的构造函数中创建出来:

  1. this.mailboxProcessor = new MailboxProcessor(this::processInput,
  2. mailbox, actionExecutor);

mailboxProcessor.runMailboxLoop()方法可以理解为在actionExecutor线程池执行processInput方法。
processInput方法从上游(StreamTaskNetworkInput,InputGate)读取数据。这部分逻辑参见Flink 源码之节点间通信。

afterInvoke

afterInvoke方法内容如下,概括起来为task执行完毕后的清理工作,关闭operator等。

  1. protected void afterInvoke() throws Exception {
  2. LOG.debug("Finished task {}", getName());
  3. getCompletionFuture().exceptionally(unused -> null).join();
  4. final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
  5. // close all operators in a chain effect way
  6. // 关闭OperatorChain中所有的operator
  7. // 从前向后依次调用各个operator的close方法
  8. operatorChain.closeOperators(actionExecutor);
  9. // make sure no further checkpoint and notification actions happen.
  10. // at the same time, this makes sure that during any "regular" exit where still
  11. actionExecutor.runThrowing(() -> {
  12. // make sure no new timers can come
  13. // 停止timer服务
  14. FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);
  15. // let mailbox execution reject all new letters from this point
  16. // 准备关闭mailboxProcessor,不再接受新的事件
  17. mailboxProcessor.prepareClose();
  18. // only set the StreamTask to not running after all operators have been closed!
  19. // See FLINK-7430
  20. // 设置task状态为停止
  21. isRunning = false;
  22. });
  23. // processes the remaining mails; no new mails can be enqueued
  24. // 处理积压的事件
  25. mailboxProcessor.drain();
  26. // make sure all timers finish
  27. // 等待所有的time都停止
  28. timersFinishedFuture.get();
  29. LOG.debug("Closed operators for task {}", getName());
  30. // make sure all buffered data is flushed
  31. // 处理掉buffer中的所有数据
  32. operatorChain.flushOutputs();
  33. // make an attempt to dispose the operators such that failures in the dispose call
  34. // still let the computation fail
  35. // 依次废弃掉OperatorChain中的所有operator(顺序为从头到尾)
  36. disposeAllOperators();
  37. }

StreamTask的子类

StreamTask是所有流处理计算任务的父类,它本身是一个抽象类。为了处理不同类型的StreamOperatorStreamTask有多种不同的实现。几个典型的实现如下:

  • OneInputStreamTask:处理OneInputStreamOperator,即只有一个输入流的StreamOperator。
  • TwoInputStreamTask:处理TwoInputStreamOperator,具有2个输入流。
  • MultipleInputStreamTask:处理MultipleInputStreamOperator,具有多个输入流。
  • SourceStreamTask:处理StreamSource,即数据源。

接下来我们重点关注这些类实现的抽象方法。

OneInputStreamTask的init方法

它的init方法主要流程为创建网络输入与输出,创建inputProcessor用于从网络输入读取数据,反序列化之后传递给网络输出。最后初始化数据流监控。代码和分析如下:

  1. public void init() throws Exception {
  2. // 获取作业流配置
  3. StreamConfig configuration = getConfiguration();
  4. // 获取网络输入流数量
  5. int numberOfInputs = configuration.getNumberOfNetworkInputs();
  6. if (numberOfInputs > 0) {
  7. // 创建一个CheckpointedInputGate
  8. // 该类型InputGate拥有一个CheckpointBarrierHandler,用来处理接收到的CheckpointBarrier
  9. CheckpointedInputGate inputGate = createCheckpointedInputGate();
  10. // 监控相关,设置流入数据条数计数器
  11. Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
  12. // 创建StreamTaskNetworkOutput
  13. // 发送反序列化后的数据给task处理流程
  14. DataOutput<IN> output = createDataOutput(numRecordsIn);
  15. // 创建StreamTaskNetworkInput
  16. // 包装了CheckpointedInputGate,从中读取网络接收到的原始数据并发给反序列化器
  17. StreamTaskInput<IN> input = createTaskInput(inputGate);
  18. // 读取输入流配置
  19. // 如果要求对数据排序
  20. // 含义为数据按照key字段分组
  21. // 在一段时间内只会给task提供同一分组的数据
  22. // 不同组的数据不会频繁交替出现
  23. if (configuration.shouldSortInputs()) {
  24. checkState(!configuration.isCheckpointingEnabled(), "Checkpointing is not allowed with sorted inputs.");
  25. input = wrapWithSorted(input);
  26. }
  27. // 注册流入数据条数计数器监控
  28. getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn);
  29. // 创建inputProcessor
  30. // 从网络读取数据,反序列化后给output,然后把反序列化后的数据交给OperatorChain
  31. inputProcessor = new StreamOneInputProcessor<>(
  32. input,
  33. output,
  34. operatorChain);
  35. }
  36. // 创建watermark监控
  37. mainOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
  38. // wrap watermark gauge since registered metrics must be unique
  39. getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
  40. }

其中创建CheckpointedInputGate的过程在 Flink 源码之分布式快照 有介绍,请大家查阅。

TwoInputStreamTask的init方法

它的初始化方法和OneInputStreamTask的类似,只不过需要创建两个InputGate。TwoInputStreamTask对应CoOperator,即有两个输入流的operator(比如CoFlatmap)。

SourceStreamTask的init方法

  1. protected void init() {
  2. // we check if the source is actually inducing the checkpoints, rather
  3. // than the trigger
  4. // 获取数据源数据产生逻辑SourceFunction
  5. SourceFunction<?> source = mainOperator.getUserFunction();
  6. // 如果source实现了这个接口,说明接收到CheckpointCoordinator发来的触发checkpoint消息之时source不触发checkpoint
  7. // checkpoint的触发由输入数据控制
  8. if (source instanceof ExternallyInducedSource) {
  9. externallyInducedCheckpoints = true;
  10. // 创建checkpoint触发钩子
  11. ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() {
  12. @Override
  13. public void triggerCheckpoint(long checkpointId) throws FlinkException {
  14. // TODO - we need to see how to derive those. We should probably not encode this in the
  15. // TODO - source's trigger message, but do a handshake in this task between the trigger
  16. // TODO - message from the master, and the source's trigger notification
  17. final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(
  18. configuration.isExactlyOnceCheckpointMode(),
  19. configuration.isUnalignedCheckpointsEnabled(),
  20. configuration.getAlignmentTimeout());
  21. final long timestamp = System.currentTimeMillis();
  22. final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
  23. try {
  24. SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false)
  25. .get();
  26. }
  27. catch (RuntimeException e) {
  28. throw e;
  29. }
  30. catch (Exception e) {
  31. throw new FlinkException(e.getMessage(), e);
  32. }
  33. }
  34. };
  35. ((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
  36. }
  37. // 配置checkpoint启动延迟时间监控
  38. getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos);
  39. }

StreamTask从上游获取数据

StreamTask从上游获取数据的调用链为:

  • StreamTask.processInput
  • inputProcessor.processInput
  • StreamTaskNetworkInput.emitNext
  • inputGate.pollNext
  • inputChannel.getNextBuffer

StreamTask通过InputGate从上游其他Task获取到数据。每个InputGate包含一个或多个InputChannel,根据数据是否走网络通信,这些InputChannel分为RemoteInputChannel和LocalInputChannel。其中RemoteInputChannel使用Netty通过网络从上游task的ResultSubPartition获取数据,适用与本task和上游task运行在不同集群节点的情况。和它相反的是LocalInputChannel,适用于本task和上游task运行在同一节点的情况,从上游task获取数据不需要走网络通信。
这部分逻辑的详细分析,参见 Flink 源码之节点间通信。

数据传递给OperatorChain

这一段逻辑我们从StreamTaskNetworkInput的processElement方法开始分析。
StreamTask的processInput方法为处理数据逻辑的入口。这个方法调用了StreamOneInputProcessor的同名方法,命令StreamTaskNetworkInput一直循环不停的从InputGate中获取数据。对于获取到的数据,需要先交给反序列化器,将二进制数据反序列化为StreamRecord对象。接着交给processElement方法处理。
上面逻辑的分析请参见 Flink 源码之节点间通信 读取数据章节。
下面是processElement方法。该方法位于AbstractStreamTaskNetworkInput。参数中的output实际上就是StreamTaskNetworkOutput`对象。

  1. private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
  2. // 首先判断元素的类型,可能是数据,watermark,延迟标记或者是流状态
  3. if (recordOrMark.isRecord()){
  4. output.emitRecord(recordOrMark.asRecord());
  5. } else if (recordOrMark.isWatermark()) {
  6. statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel, output);
  7. } else if (recordOrMark.isLatencyMarker()) {
  8. output.emitLatencyMarker(recordOrMark.asLatencyMarker());
  9. } else if (recordOrMark.isStreamStatus()) {
  10. statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel, output);
  11. } else {
  12. throw new UnsupportedOperationException("Unknown type of StreamElement");
  13. }
  14. }

StreamTaskNetworkOutput接收反序列化处理过的数据,发送给OperatorChain的第一个operator。

  1. /**
  2. * The network data output implementation used for processing stream elements
  3. * from {@link StreamTaskNetworkInput} in two input selective processor.
  4. */
  5. private static class StreamTaskNetworkOutput<T> extends AbstractDataOutput<T> {
  6. // 创建的时候传入的是OperatorChain的mainOperator,即第一个operator
  7. private final TwoInputStreamOperator<?, ?, ?> operator;
  8. /** The function way is only used for frequent record processing as for JIT optimization. */
  9. private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
  10. private final WatermarkGauge inputWatermarkGauge;
  11. /** The input index to indicate how to process elements by two input operator. */
  12. private final int inputIndex;
  13. private final Counter numRecordsIn;
  14. private final StreamStatusTracker statusTracker;
  15. private StreamTaskNetworkOutput(
  16. TwoInputStreamOperator<?, ?, ?> operator,
  17. ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer,
  18. StreamStatusMaintainer streamStatusMaintainer,
  19. WatermarkGauge inputWatermarkGauge,
  20. StreamStatusTracker statusTracker,
  21. int inputIndex,
  22. Counter numRecordsIn) {
  23. super(streamStatusMaintainer);
  24. this.operator = checkNotNull(operator);
  25. this.recordConsumer = checkNotNull(recordConsumer);
  26. this.inputWatermarkGauge = checkNotNull(inputWatermarkGauge);
  27. this.statusTracker = statusTracker;
  28. this.inputIndex = inputIndex;
  29. this.numRecordsIn = numRecordsIn;
  30. }
  31. // 发送数据
  32. @Override
  33. public void emitRecord(StreamRecord<T> record) throws Exception {
  34. numRecordsIn.inc();
  35. recordConsumer.accept(record);
  36. }
  37. // 发送watermark
  38. @Override
  39. public void emitWatermark(Watermark watermark) throws Exception {
  40. inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
  41. if (inputIndex == 0) {
  42. operator.processWatermark1(watermark);
  43. } else {
  44. operator.processWatermark2(watermark);
  45. }
  46. }
  47. @Override
  48. public void emitStreamStatus(StreamStatus streamStatus) {
  49. final StreamStatus anotherStreamStatus;
  50. if (inputIndex == 0) {
  51. statusTracker.setFirstStatus(streamStatus);
  52. anotherStreamStatus = statusTracker.getSecondStatus();
  53. } else {
  54. statusTracker.setSecondStatus(streamStatus);
  55. anotherStreamStatus = statusTracker.getFirstStatus();
  56. }
  57. // check if we need to toggle the task's stream status
  58. if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
  59. if (streamStatus.isActive()) {
  60. // we're no longer idle if at least one input has become active
  61. streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
  62. } else if (anotherStreamStatus.isIdle()) {
  63. // we're idle once both inputs are idle
  64. streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
  65. }
  66. }
  67. }
  68. // 发送延迟标记,被用于统计数据在整个Flink处理流程中的耗时
  69. @Override
  70. public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
  71. if (inputIndex == 0) {
  72. operator.processLatencyMarker1(latencyMarker);
  73. } else {
  74. operator.processLatencyMarker2(latencyMarker);
  75. }
  76. }
  77. }

OperatorChain的逻辑在后续博客中单独分析。