Flink中的任务是执行的基本单位。它是执行 算子的每个并行实例的位置。例如,并行度为5的 算子将使每个实例由单独的任务执行。
它StreamTask
是Flink流处理中所有不同任务子类型的基础。本文档StreamTask
介绍了生命周期中的不同阶段,并描述了代表这些阶段的主要方法。
简而言之,算子生命周期
因为任务是执行 算子的并行实例的实体,所以它的生命周期与 算子的生命周期紧密集成。因此,我们将简要提及代表算子生命周期的基本方法,然后再深入了解算子的生命周期StreamTask
。下面按照调用每个方法的顺序列出了该列表。假设 算子可以具有用户定义的函数(UDF),则在每个 算子方法下面,我们还会在它调用的UDF的生命周期中呈现(缩进)方法。如果 算子扩展了这些方法,则这些方法可用AbstractUdfStreamOperator
,这是执行UDF的所有 算子的基本类。
// initialization phase
OPERATOR::setup
UDF::setRuntimeContext
OPERATOR::initializeState
OPERATOR::open
UDF::open
// processing phase (called on every element/watermark)
OPERATOR::processElement
UDF::run
OPERATOR::processWatermark
// checkpointing phase (called asynchronously on every checkpoint)
OPERATOR::snapshotState
// termination phase
OPERATOR::close
UDF::close
OPERATOR::dispose
简而言之,setup()
调用它来初始化一些特定于算子的机制,例如它RuntimeContext
及其度量集合数据结构。在此之后,initializeState()
为算子提供其初始状态,并且该 open()
方法执行任何特定于算子的初始化,例如在该情况下打开用户定义的函数AbstractUdfStreamOperator
。
注意的initializeState()
同时包含它的初始执行过程中初始化 算子操作者的状态的逻辑(例如寄存器任何键入的状态),并且还从在发生故障后的检查点检索其状态的逻辑。有关此内容的更多信息,请参见本页的其余部
现在一切都已设置,算子已准备好处理传入的数据。传入数据元可以是以下之一:输入数据元,水印和检查点障碍。它们中的每一个都有一个特殊的数据元来处理它。数据元由processElement()
方法处理,水印由processWatermark()
和检查点障碍触发一个检查点,该检查点调用(异步)snapshotState()
方法,我们在下面描述。对于每个传入数据元,根据其类型调用上述方法之一。请注意,processElement()
它也是调用UDF逻辑的位置,例如map()
您的方法MapFunction
。
最后,在算子正常,无故障终止的情况下(例如,如果流是有限的并且到达其结束),close()
则调用该方法以执行算子逻辑所需的任何最终副本记录 算子操作(例如, 关闭任何连接)或者在算子执行期间打开的I / O流),dispose()
之后调用它以释放算子持有的任何资源(例如算子数据所持有的本机内存)。
在由于故障或由于手动取消而终止的情况下,执行直接跳转到dispose()
并且跳过算子在故障发生时所处的阶段与之间的任何中间阶段dispose()
。
检查点:snapshotState()
只要收到检查点障碍,就会调用算子的方法与上述其他方法异步。检查点在处理阶段执行,即在算子打开之后和关闭之前执行。此方法的职责是将算子的当前状态存储到指定的状态后台,当作业在失败后恢复执行时将从该后台检索该状态。下面我们将简要介绍Flink的检查点机制,有关Flink中检查点的原则的更详细讨论,请阅读相应的文档: Data Streaming Fault Tolerance。
任务生命周期
在简要介绍了算子的主要阶段之后,本节将更详细地描述任务如何在群集上执行期间调用相应的方法。这里描述的阶段的顺序主要包括在该类的invoke()
方法中StreamTask
。本文档的其余部分分为两个小节,一个描述在任务的常规无故障执行期间的阶段(参见正常执行),以及(更短的)一个描述在任务被取消时遵循的不同序列(请参阅“ 中断执行”,手动或由于某些其他原因,例如执行期间抛出的异常。
正常执行
执行任务直到完成而不被中断的步骤如下所示:
TASK::setInitialState
TASK::invoke
create basic utils (config, etc) and load the chain of operators
setup-operators
task-specific-init
initialize-operator-states
open-operators
run
close-operators
dispose-operators
task-specific-cleanup
common-cleanup
如上所示,在恢复任务配置并初始化一些重要的运行时参数之后,该任务的第一步是检索其初始的任务范围状态。这是在setInitialState()
,这在两种情况下尤其重要:
- 当任务从故障中恢复并从上一个成功检查点重新启动时
- 从保存点恢复时。
如果是第一次执行任务,则初始任务状态为空。
恢复任何初始状态后,任务进入其invoke()
方法。在那里,它首先通过调用setup()
它们中的每一个的方法来初始化参与本地计算的 算子,然后通过调用本地init()
方法来执行其任务特定的初始化。通过特定的任务,我们的意思是根据任务(类型SourceTask
,OneInputStreamTask
或TwoInputStreamTask
等),这个步骤可能会有所不同,但在任何情况下,这里是必要的任务范围内获取资源。作为示例,OneInputStreamTask
表示期望具有单个输入流的任务,将连接初始化为与本地任务相关的输入流的不同分区的位置。
获得必要的资源后,不同的算子和用户定义的函数就可以从上面检索的任务范围状态获取其各自的状态。这是在initializeState()
方法中完成的,该方法调用initializeState()
每个单独的 算子。每个有状态 算子都应该覆盖此方法,并且应该包含状态初始化逻辑,这两者都是第一次执行作业,也适用于任务从故障中恢复或使用保存点时的情况。
现在任务中的所有算子都已初始化,open()
每个单独的算子的openAllOperators()
方法都由该方法调用StreamTask
。此方法执行所有 算子操作初始化,例如使用计时器服务注册任何检索到的计时器。单个任务可能正在执行多个 算子,其中一个 算子消耗其前任的输出。在这种情况下,open()
从最后一个 算子(即其输出也是任务本身的输出的 算子)调用该方法到第一个 算子。这样做是为了当第一个算子开始处理任务的输入时,所有下游算子都准备好接收其输出。
注意任务中的连续算子从最后一个到第一个打开。
现在任务可以恢复执行,算子可以开始处理新的输入数据。这run()
是调用特定于任务的方法的位置。此方法将一直运行,直到没有更多输入数据(有限流),或任务被取消(手动或不手动)。这是 调用 算子特定processElement()
和processWatermark()
方法的位置。
在运行直到完成的情况下,即没有更多的输入数据要处理,在退出该run()
方法之后,任务进入其关闭过程。最初,定时器服务停止注册任何新的定时器(例如,从正在执行的触发定时器),清除所有尚未启动的定时器,并等待当前正在执行的定时器的完成。然后closeAllOperators()
尝试通过调用close()
每个 算子的方法来优雅地关闭计算中涉及的运算符。然后,刷新任何缓冲的输出数据,以便下游任务可以处理它们,最后任务尝试通过调用它来清除算子持有的所有资源。 dispose()
每个人的方法。打开不同的算子时,我们提到订单是从最后一个到第一个。结束以相反的方式发生,从头到尾。
注意任务中的连续算子从第一个到最后一个关闭。
最后,当所有算子都已关闭并释放所有资源时,任务会关闭其计时器服务,执行特定于任务的清理,例如清除其所有内部缓冲区,然后执行其通用任务清理,其中包括关闭所有 算子操作输出通道和清理任何输出缓冲区。
检查点:以前我们看到过,在initializeState()
从故障中恢复的过程中,任务及其所有算子和函数检索在故障之前的最后一个成功检查点期间持久保存到稳定存储的状态。Flink中的检查点是根据用户指定的间隔定期执行的,并且由与主任务线程不同的线程执行。这就是为什么它们不包含在任务生命周期的主要阶段中。简而言之,调用的特殊数据元CheckpointBarriers
由输入数据流中的作业的源任务定期注入,并随实际数据从源传递到接收器。源任务在运行模式后注入这些障碍,并假设为CheckpointCoordinator
也在运行。每当任务接收到这样的障碍时,它就会调度由检查点线程执行的任务,该线程调用 snapshotState()
任务中的 算子。在执行检查点时,任务仍然可以接收输入数据,但是数据被缓冲并且仅在检查点成功完成后处理并向下游发射。
执行中断
在前面的部分中,我们描述了一直运行到完成的任务的生命周期。如果任务在任何时候被取消,则正常执行被中断,从那一点开始执行的唯一 算子操作是定时器服务关闭,特定于任务的清理,算子的处理以及一般任务清理,如如上所述。