本篇我们一起分析下Flink中流处理作业的初始化和执行逻辑。
AbstractInvokable
AbstractInvokable是TaskManager中运行的所有任务的父类。所有的读取上游数据,用户数据处理逻辑(map,filter算子以及用户自己编写的processFunction等等)和发送处理过的数据到下游相关逻辑都在该类的invoke方法中得到执行。
AbstractInvokable中与任务执行相关的2个方法为:
- invoke方法:启动任务执行的入口方法。实现类必须重写这个方法。
- cancel方法:任务被取消或者是用户终止任务的时候被调用
它有两个实现类:
- BatchTask:所有批处理类型Task的基类。
- StreamTask:所有流处理类型Task的基类。
我们以流处理为重点,下面详细介绍下StreamTask这个类。
AbstractInvokable的创建
在开始分析StreamTask之前我们需要了解下它是在何处,如何被创建出来的。
翻阅Task线程的处理逻辑,不难发现它的invoke变量初始化位于Task的doRun方法。
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(),
nameOfInvokableClass, env);
这一行代码使用用户代码类加载器(userCodeClassLoader),调用目标类唯一参数为Environment类型的构造方法,创建出invokable对象。
/**
* Instantiates the given task invokable class, passing the given environment (and possibly
* the initial task state) to the task's constructor.
*
* <p>The method will first try to instantiate the task via a constructor accepting both
* the Environment and the TaskStateSnapshot. If no such constructor exists, and there is
* no initial state, the method will fall back to the stateless convenience constructor that
* accepts only the Environment.
*
* @param classLoader The classloader to load the class through.
* @param className The name of the class to load.
* @param environment The task environment.
*
* @return The instantiated invokable task object.
*
* @throws Throwable Forwards all exceptions that happen during initialization of the task.
* Also throws an exception if the task class misses the necessary constructor.
*/
private static AbstractInvokable loadAndInstantiateInvokable(
ClassLoader classLoader,
String className,
Environment environment) throws Throwable {
final Class<? extends AbstractInvokable> invokableClass;
try {
// 使用指定的classloader加载className对应的class,并转换为AbstractInvokable类型
invokableClass = Class.forName(className, true, classLoader)
.asSubclass(AbstractInvokable.class);
} catch (Throwable t) {
throw new Exception("Could not load the task's invokable class.", t);
}
Constructor<? extends AbstractInvokable> statelessCtor;
try {
// 获取构造函数
statelessCtor = invokableClass.getConstructor(Environment.class);
} catch (NoSuchMethodException ee) {
throw new FlinkException("Task misses proper constructor", ee);
}
// instantiate the class
try {
//noinspection ConstantConditions --> cannot happen
// 传入environment变量,创建出新的对象
return statelessCtor.newInstance(environment);
} catch (InvocationTargetException e) {
// directly forward exceptions from the eager initialization
throw e.getTargetException();
} catch (Exception e) {
throw new FlinkException("Could not instantiate the task's invokable class.", e);
}
}
StreamTask
StreamTask类是所有流处理任务的基类。Task由TaskManager部署和执行。Task是本地运行单元。每一个Task包含了一个或多个operator。这些operator在同一个OperatorChain中。
StreamTask任务执行生命周期包含:
- setInitialState:设置各个operator的初始状态。对应initializeState方法。
- 调用 invoke方法。
其中invoke方法包含的逻辑可细分为:
- 创建出task相关配置,创建OperatorChain。
- 执行operator的setup逻辑。
- 执行task相关的初始化逻辑。
- 加载并初始化operator的状态。
- 调用各个operator的open方法。
- 执行各个operator内的数据处理逻辑。
- 关闭operator。
- 销毁operator。
- 任务清理操作。
invoke方法
本节我们分析StreamTask核心执行逻辑invoke方法。invoke方法如下所示:
/**
* Starts the execution.
*
* <p>Must be overwritten by the concrete task implementation. This method
* is called by the task manager when the actual execution of the task
* starts.
*
* <p>All resources should be cleaned up when the method returns. Make sure
* to guard the code with <code>try-finally</code> blocks where necessary.
*
* @throws Exception
* Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
*/
public final void invoke() throws Exception {
try {
// 调用作业执行前相关准备逻辑
beforeInvoke();
// final check to exit early before starting to run
// 如果任务被取消,抛出异常退出
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
// 执行用户编写的task逻辑
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
// 再次检查如果任务被取消,抛出异常退出
if (canceled) {
throw new CancelTaskException();
}
// 执行调用后相关逻辑
afterInvoke();
}
catch (Throwable invokeException) {
failing = !canceled;
try {
cleanUpInvoke();
}
// TODO: investigate why Throwable instead of Exception is used here.
catch (Throwable cleanUpException) {
Throwable throwable = ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
ExceptionUtils.rethrowException(throwable);
}
ExceptionUtils.rethrowException(invokeException);
}
// 执行invoke后清理操作
cleanUpInvoke();
}
beforeInvoke方法
beforeInvoke方法主要为task的初始化操作,包含创建OperatorChain,读取上游数据和下游数据输出配置等。详细内容如下:
protected void beforeInvoke() throws Exception {
disposedOperators = false;
LOG.debug("Initializing {}.", getName());
// 创建出OperatorChain
// OperatorChain是JobGraph生成时的一箱优化措施
// 将复合条件的多个StreamNode(对应数据变换操作)合并到一个chain中
// 他们会被调度到同一个StreamTask中执行
operatorChain = new OperatorChain<>(this, recordWriter);
// 获取OperatorChain中第一个operator
mainOperator = operatorChain.getMainOperator();
// task specific initialization
// 执行task专属的初始化工作
// 这个是抽象方法
// 具体逻辑需要在子类中实现
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
// task动作必须在StreamTaskActionExecutor中执行,防止出现并发执行问题,影响checkpoint
// 该executor实际为StreamTaskActionExecutor.IMMEDIATE,即在当前线程直接运行
actionExecutor.runThrowing(() -> {
// 创建SequentialChannelStateReader,用于读取checkpoint时保存的channel状态
SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
// TODO: for UC rescaling, reenable notifyAndBlockOnCompletion for non-iterative jobs
// 获取ResultPartitionWriter状态
reader.readOutputData(getEnvironment().getAllWriters(), false);
// 初始化OperatorChain中所有的operator
// 调用他们的initializeState(初始化状态)和open(包含初始化动作)方法
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
channelIOExecutor.execute(() -> {
try {
// 获取InputGate状态
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
}
});
for (InputGate inputGate : getEnvironment().getAllInputGates()) {
// 在inputGate状态被读取之后执行
inputGate
.getStateConsumedFuture()
.thenRun(() ->
// 在task线程中执行
mainMailboxExecutor.execute(
// 执行请求partition方法
inputGate::requestPartitions, "Input gate request partitions"));
}
});
// 水池状态为正在执行
isRunning = true;
}
runMailboxLoop方法
runMailboxLoop方法启动task的数据输入和处理逻辑:
public void runMailboxLoop() throws Exception {
mailboxProcessor.runMailboxLoop();
}
MailBoxProcessor在StreamTask的构造函数中创建出来:
this.mailboxProcessor = new MailboxProcessor(this::processInput,
mailbox, actionExecutor);
mailboxProcessor.runMailboxLoop()方法可以理解为在actionExecutor线程池执行processInput方法。
processInput方法从上游(StreamTaskNetworkInput,InputGate)读取数据。这部分逻辑参见Flink 源码之节点间通信。
afterInvoke
afterInvoke方法内容如下,概括起来为task执行完毕后的清理工作,关闭operator等。
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
getCompletionFuture().exceptionally(unused -> null).join();
final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
// close all operators in a chain effect way
// 关闭OperatorChain中所有的operator
// 从前向后依次调用各个operator的close方法
operatorChain.closeOperators(actionExecutor);
// make sure no further checkpoint and notification actions happen.
// at the same time, this makes sure that during any "regular" exit where still
actionExecutor.runThrowing(() -> {
// make sure no new timers can come
// 停止timer服务
FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);
// let mailbox execution reject all new letters from this point
// 准备关闭mailboxProcessor,不再接受新的事件
mailboxProcessor.prepareClose();
// only set the StreamTask to not running after all operators have been closed!
// See FLINK-7430
// 设置task状态为停止
isRunning = false;
});
// processes the remaining mails; no new mails can be enqueued
// 处理积压的事件
mailboxProcessor.drain();
// make sure all timers finish
// 等待所有的time都停止
timersFinishedFuture.get();
LOG.debug("Closed operators for task {}", getName());
// make sure all buffered data is flushed
// 处理掉buffer中的所有数据
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
// 依次废弃掉OperatorChain中的所有operator(顺序为从头到尾)
disposeAllOperators();
}
StreamTask的子类
StreamTask是所有流处理计算任务的父类,它本身是一个抽象类。为了处理不同类型的StreamOperator,StreamTask有多种不同的实现。几个典型的实现如下:
- OneInputStreamTask:处理OneInputStreamOperator,即只有一个输入流的StreamOperator。
- TwoInputStreamTask:处理TwoInputStreamOperator,具有2个输入流。
- MultipleInputStreamTask:处理MultipleInputStreamOperator,具有多个输入流。
- SourceStreamTask:处理StreamSource,即数据源。
OneInputStreamTask的init方法
它的init方法主要流程为创建网络输入与输出,创建inputProcessor用于从网络输入读取数据,反序列化之后传递给网络输出。最后初始化数据流监控。代码和分析如下:
public void init() throws Exception {
// 获取作业流配置
StreamConfig configuration = getConfiguration();
// 获取网络输入流数量
int numberOfInputs = configuration.getNumberOfNetworkInputs();
if (numberOfInputs > 0) {
// 创建一个CheckpointedInputGate
// 该类型InputGate拥有一个CheckpointBarrierHandler,用来处理接收到的CheckpointBarrier
CheckpointedInputGate inputGate = createCheckpointedInputGate();
// 监控相关,设置流入数据条数计数器
Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
// 创建StreamTaskNetworkOutput
// 发送反序列化后的数据给task处理流程
DataOutput<IN> output = createDataOutput(numRecordsIn);
// 创建StreamTaskNetworkInput
// 包装了CheckpointedInputGate,从中读取网络接收到的原始数据并发给反序列化器
StreamTaskInput<IN> input = createTaskInput(inputGate);
// 读取输入流配置
// 如果要求对数据排序
// 含义为数据按照key字段分组
// 在一段时间内只会给task提供同一分组的数据
// 不同组的数据不会频繁交替出现
if (configuration.shouldSortInputs()) {
checkState(!configuration.isCheckpointingEnabled(), "Checkpointing is not allowed with sorted inputs.");
input = wrapWithSorted(input);
}
// 注册流入数据条数计数器监控
getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn);
// 创建inputProcessor
// 从网络读取数据,反序列化后给output,然后把反序列化后的数据交给OperatorChain
inputProcessor = new StreamOneInputProcessor<>(
input,
output,
operatorChain);
}
// 创建watermark监控
mainOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
其中创建CheckpointedInputGate的过程在 Flink 源码之分布式快照 有介绍,请大家查阅。
TwoInputStreamTask的init方法
它的初始化方法和OneInputStreamTask的类似,只不过需要创建两个InputGate。TwoInputStreamTask对应CoOperator,即有两个输入流的operator(比如CoFlatmap)。
SourceStreamTask的init方法
protected void init() {
// we check if the source is actually inducing the checkpoints, rather
// than the trigger
// 获取数据源数据产生逻辑SourceFunction
SourceFunction<?> source = mainOperator.getUserFunction();
// 如果source实现了这个接口,说明接收到CheckpointCoordinator发来的触发checkpoint消息之时source不触发checkpoint
// checkpoint的触发由输入数据控制
if (source instanceof ExternallyInducedSource) {
externallyInducedCheckpoints = true;
// 创建checkpoint触发钩子
ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() {
@Override
public void triggerCheckpoint(long checkpointId) throws FlinkException {
// TODO - we need to see how to derive those. We should probably not encode this in the
// TODO - source's trigger message, but do a handshake in this task between the trigger
// TODO - message from the master, and the source's trigger notification
final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(
configuration.isExactlyOnceCheckpointMode(),
configuration.isUnalignedCheckpointsEnabled(),
configuration.getAlignmentTimeout());
final long timestamp = System.currentTimeMillis();
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
try {
SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false)
.get();
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new FlinkException(e.getMessage(), e);
}
}
};
((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
}
// 配置checkpoint启动延迟时间监控
getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos);
}
StreamTask从上游获取数据
StreamTask从上游获取数据的调用链为:
- StreamTask.processInput
- inputProcessor.processInput
- StreamTaskNetworkInput.emitNext
- inputGate.pollNext
- inputChannel.getNextBuffer
StreamTask通过InputGate从上游其他Task获取到数据。每个InputGate包含一个或多个InputChannel,根据数据是否走网络通信,这些InputChannel分为RemoteInputChannel和LocalInputChannel。其中RemoteInputChannel使用Netty通过网络从上游task的ResultSubPartition获取数据,适用与本task和上游task运行在不同集群节点的情况。和它相反的是LocalInputChannel,适用于本task和上游task运行在同一节点的情况,从上游task获取数据不需要走网络通信。
这部分逻辑的详细分析,参见 Flink 源码之节点间通信。
数据传递给OperatorChain
这一段逻辑我们从StreamTaskNetworkInput的processElement方法开始分析。
StreamTask的processInput方法为处理数据逻辑的入口。这个方法调用了StreamOneInputProcessor的同名方法,命令StreamTaskNetworkInput一直循环不停的从InputGate中获取数据。对于获取到的数据,需要先交给反序列化器,将二进制数据反序列化为StreamRecord对象。接着交给processElement方法处理。
上面逻辑的分析请参见 Flink 源码之节点间通信 读取数据章节。
下面是processElement方法。该方法位于AbstractStreamTaskNetworkInput。参数中的output实际上就是StreamTaskNetworkOutput`对象。
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
// 首先判断元素的类型,可能是数据,watermark,延迟标记或者是流状态
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel, output);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel, output);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
StreamTaskNetworkOutput接收反序列化处理过的数据,发送给OperatorChain的第一个operator。
/**
* The network data output implementation used for processing stream elements
* from {@link StreamTaskNetworkInput} in two input selective processor.
*/
private static class StreamTaskNetworkOutput<T> extends AbstractDataOutput<T> {
// 创建的时候传入的是OperatorChain的mainOperator,即第一个operator
private final TwoInputStreamOperator<?, ?, ?> operator;
/** The function way is only used for frequent record processing as for JIT optimization. */
private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
private final WatermarkGauge inputWatermarkGauge;
/** The input index to indicate how to process elements by two input operator. */
private final int inputIndex;
private final Counter numRecordsIn;
private final StreamStatusTracker statusTracker;
private StreamTaskNetworkOutput(
TwoInputStreamOperator<?, ?, ?> operator,
ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer,
StreamStatusMaintainer streamStatusMaintainer,
WatermarkGauge inputWatermarkGauge,
StreamStatusTracker statusTracker,
int inputIndex,
Counter numRecordsIn) {
super(streamStatusMaintainer);
this.operator = checkNotNull(operator);
this.recordConsumer = checkNotNull(recordConsumer);
this.inputWatermarkGauge = checkNotNull(inputWatermarkGauge);
this.statusTracker = statusTracker;
this.inputIndex = inputIndex;
this.numRecordsIn = numRecordsIn;
}
// 发送数据
@Override
public void emitRecord(StreamRecord<T> record) throws Exception {
numRecordsIn.inc();
recordConsumer.accept(record);
}
// 发送watermark
@Override
public void emitWatermark(Watermark watermark) throws Exception {
inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
if (inputIndex == 0) {
operator.processWatermark1(watermark);
} else {
operator.processWatermark2(watermark);
}
}
@Override
public void emitStreamStatus(StreamStatus streamStatus) {
final StreamStatus anotherStreamStatus;
if (inputIndex == 0) {
statusTracker.setFirstStatus(streamStatus);
anotherStreamStatus = statusTracker.getSecondStatus();
} else {
statusTracker.setSecondStatus(streamStatus);
anotherStreamStatus = statusTracker.getFirstStatus();
}
// check if we need to toggle the task's stream status
if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
if (streamStatus.isActive()) {
// we're no longer idle if at least one input has become active
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
} else if (anotherStreamStatus.isIdle()) {
// we're idle once both inputs are idle
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
}
}
}
// 发送延迟标记,被用于统计数据在整个Flink处理流程中的耗时
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
if (inputIndex == 0) {
operator.processLatencyMarker1(latencyMarker);
} else {
operator.processLatencyMarker2(latencyMarker);
}
}
}
OperatorChain的逻辑在后续博客中单独分析。