Flink JobMaster
调度作业的 Task
到 TaskManager
,所有 Task
启动后成功,进入执行状态,则整个作业进入执行状态.
从外部数据源开始读取数据,数据在 Flink Task DAG
中流转,处理完毕后,写出到外部存储.
作业执行图
经过 Flink 的多层 Graph
转换之后,作业进入调度阶段,开始分发任务执行,最终会在集群中形成 执行 Graph
核心对象有
Task / ResultPartition & ResultSubPartition / InputGate & InputChannel
核心对象
输入处理器 <StreamInputProcessor
>
输入处理器是 StreamInputProcessor
, 是对 StreamTask
中读取数据行为的抽象,在其实现中要完成数据的读取、处理、输出给下游的过程,StreamOneInputProcessor
: 用在 OneInputStreamTask
中 , 只有 1 个上游输入StreamTwoInputProcessor
: 用在 TwoInputStreamTask
中, 有 2 个上游输入
Task 输入 <StreamTaskInput
>
StreamTaskInput
负责Task
输入,是StreamTask
的数据输入的抽象
对于 Flink 中的 StreamTask
而言, 数据读取的行为有两种
StreamTaskNetworkInput
(从上游 Task
获取数据,使用 InputGate
作为底层读取数据 )StreamTaskSourceInput
(从外部数据源获取数据,使用 SourceFunction
读取数据, 交给下游的 Task
)
Task 输出 <StreamTaskNetworkOutput
>
StreamTaskNetworkOutput
负责Task
输出,是StreamTask
的数据输出的抽象
StreamTaskNetworkOutput
只是负责将数据交给算子来进行处理,实际的数据写出是在算子层上执行的StreamTaskNetworkOutput
也有对应于单流和双流输入的两种实现,作为私有的内部类定义在 OneInputStreamTask
和 StreamTwoInputProcessor
中
结果分区 <ResultPartition
>
ResultPartition
(结果分区) , 用来表示作业的单个 Task
产生的数据
ResultPartition
是运行时的实体, 与 ExecutionGraph
中的中间结果分区对应 (IntermediateResultPartition
)
一个 ResultPartition
是一组 Buffer
实例, ResultPartition
由 ResultSubPartition
组成-
ResultSubPartition
用来进一步将ResultPartition
进行切分- 切分成 (与下游子任务的并行度/数据分发模式 决定) 数量的
ResultSubPartition
下游子任务消费上游子任务产生的 ResultPartition
, 在实际请求的时候
- 是向上游请求
ResultSubPartition
- 并不是请求整个
ResultPartition
- 请求的方式有远程请求和本地请求两种
ResultPartition
对于流上的作业而言
ResultPartition
在作业的执行过程中会一直存在
对于批而言
- 上游
Task
输出ResultPartition
- 下游
Task
消费上游的ResultPartition
消费完毕之后 - 上游的
ResultPartition
就没有什么用了,需要进行资源回收,所以 Flink 增加了ReleaseOnConsumptionResultPartition
结果子分区 <ResultSubPartition
>
ResultSubPartition
(结果子分区) ,结果子分区是结果分区的一部分,负责存储实际的 Buffer
ResultPartition
是一个 Task
的输出,上游 Task
跟 下游 Task
之间的数据交换经常是一对多的关系,所以 Flink 在输出端将 ResultPartition
进行了切分. (下游如果有5个 Task
, ResultPartition
就会生成5个 ResultSubPartition
)
结果子分区有 PipelinedSubpartition
跟 BoundedBlockingSubpartition
(流/批)
PipelinedSubpartition
PipelinedSubpartition
是纯内存型的结果子分区,只能被消费一次
当向 PipelinedSubpartition
中添加一个完成的 BufferConsumer
或者添加下一个 BufferConsumer
时 (默认前一个 BufferConsumer
是完成的), 会通知 PipelinedSubpartitionView
新数据到达,可以消费了
BoundedBlockingSubpartition
用作对批处理 Task
的计算结果的数据存储,其行为是阻塞式的,需要等待上游所有的数据处理完毕,然后下游才开始消费数据,可以消费 1 次 或者多次
- 这种
ResultSubPartition
有多种存储形式,可以保存在文件中或者内存映射文件中
BoundedBlockingSubpartition
不是线程安全的, Flink 网络栈是单线程模型,添加、刷新、完成都由单个Writer
完成
- 如果在写入阶段需要
release
, 则同样由该Writer
的线程负责执行,所以能确保安全在实现中,支持多个并发的
Reader
但是需要确保Reader
是单线程的
ResultSubPartition
是持有数据的容器,其中的 BoundedData
属性是实际的数据操作接口,BoundedData
提供了几种不同的实现
有限数据集
BoundedData
定义了存储和读取批处理中建计算结果数据集的阻塞式接口, BoundedBlockingSubPartition
使用 BoundedData
接口来实现中间结果集的访问BoundedData
有 3 个不同的实现,分别对应于不同的 Buffer
的存储方式.
FileChannelBoundedData
- 使用
Java NIO
的FileChannel
写入数据和读取文件
FileChannelMemoryMappedBoundedData
- 使用
FileChannel
写入数据到文件,使用内存映射文件读取数据
MemoryMappedBoundedData
- 使用内存映射文件写入、读取, 全都是内存操作
内存映射文件 (Memory-Mapped File
) 是将一段虚拟内存逐步映射于一个文件,使得应用程序处理文件如同访问主内存(但在真正使用到这些数据前却不会消耗物理内存,也不会有读写磁盘的操作),内存映射文件与磁盘的真正交互由操作系统负责
Java.io
基于流的文件读写
- 读文件会产生2次数据复制
- 首先是从硬盘复制到操作系统内核
- 然后从操作系统内核复制到用户态的应用程序
- 写文件也类似
Java.nio
- 一般情况下只有一次复制
- 内存分配在操作系统内核
- 应用程序访问的就是操作系统的内核内存空间
- 比基于流进行文件读写快几个数量级
FileChannel
是直接读写文件,与内存映射文件相比
FileChannel
对小文件的读写效率高- 内存映射大文件则是针对大文件效率高
输入网关
InputGate
—-> 输入网关,是 Task
的输入数据的封装,和 JobGraph
中的 JobEdge
一一对应,对应于上游的 ResultPartition
InputGate
中负责实际数据消费的是 InputChannel
, 是 InputChannel
的容器,用于读取中间结果(IntermediateResult
) 在并行执行时由上游 Task
产生的一个或多个结果分区 (ResultPartition
)
SingleInputGate
是消费 ResultPartition
的实体,对应于一个 IntermediateResult
.
UnionInputGate
主要充当 InputGate
容器的角色,将多个 InputGate
联合起来,当做一个 InputGate
.
- 一般是对应于上游的多个输出类型相同的
IntermediateResult
,对应于多个上游的IntermediateResult
.
InputGateWithMetrics
本质上来说就是一个 InputGate
+ 监控统计,统计 InputGate
读取的数据量,单位为 Byte
Flink
作业在集群中并行执行时,其并行状态下的逻辑关系,中间结果集会按照并行度切分为与并行度个数相同的结果分区,每个结果分区又进一步切分为一个或者多个结果子分区
作业的并行度为 2, 有两个 map
字任务分切产生数据,生成两个 ResultPartition
(ResultSubPartition1
, ResultSubPartition2
)
每个 ResultPartition
又被切分为两个 ResultSubPartition
ResultSubPartition
的数量与下游Reduce
任务的子任务个数相同
每个 Reduce
的子任务都有一个 InputGate
- 负责从上游的所有
ResultPartition
中获取该子任务所需要的ResultSubPartition
输入通道
InputChannel
—-> 输入通道
每个 InputGate
会包含一个以上的 InputChannel
, 和 ExecutionEdge
一一对应,也和结果子分区一对一相连,一个 InputChannel
接收一个结果子分区的输出
RemoteInputChannel
- 对应于远程的结果子分区的输入通道,用来表示跨网络的数据交换,底层基于
Netty
LocalInputChannel
- 对应于本地结果子分区的输入通道,用来在本地线程内不同线程之间的数据交换
UnknownInputChannel
- 一种位于占位目的的输入通道,需要占位通道是因为暂未确定向对于
Task
生产者的位置,再确定上游Task
位置之后- 如果位于不同的
TaskManager
则替换为RemoteInputChannel
- 如果位于相同的
TaskManager
则替换为LocalInputChannel
- 如果位于不同的
Task
之前的 Task
执行模型依赖于锁机制,可能导致多个潜在的线程并发访问其内部状态
- 比如事件处理以及检查点的触发线程.
当前,他们都通过一个全局锁 (检查点锁) 来保证彼此互斥,这种机制有一些劣势
锁对象必须在类的各种互斥访问的代码段中进行传递
- 代码可读性很差
- 使用不当或者漏用容易造成难以定位的问题
设计不够优雅,锁对象暴露给了面向用户的 API (
SourceContext
)
新的执行模型 类似与基于 (Mailbox
) 单线程执行模型,取代现有的多线程模型
- 所有的并发操作都通过队列进行排队 (
Mailbox
),单线程 (Mailbox
线程) 依次处理,避免并发操作
Task 处理数据
- 启动
Task
进入执行状态,开始读取数据,当有可消费的数据时,则持续读取数据
/ StreamTask /
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
InputStatus status = inputProcessor.processInput();
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
}
StreamInputProcessor
是数据读取,处理,输出的高层逻辑的载体
- 由其负责触发数据时读取,并交给算子处理,然后输出
/ StreamOneInputProcessor /
@Override
public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
endOfInputAware.endInput(input.getInputIndex() + 1);
}
return status;
}
StreamInputProcessor
实际上将具体的数据读取工作交给了StreamTaskInput
- 当读取了完整的记录之后就开始向下游发送数据
在发送数据的过程中,调用算子进行数据的处理** ```java public StreamOneInputProcessor(
StreamTaskInput<IN> input, DataOutput<IN> output, BoundedMultiInput endOfInputAware) {
this.input = checkNotNull(input);
this.output = checkNotNull(output);
this.endOfInputAware = checkNotNull(endOfInputAware);
}
**
```java
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
if (currentRecordDeserializer != null) {
DeserializationResult result;
try {
result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
} catch (IOException e) {
throw new IOException(
String.format("Can't get next record for channel %s", lastChannel), e);
}
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is performed for unaligned checkpoint
// mode
if (bufferOrEvent.get().isBuffer()) {
processBuffer(bufferOrEvent.get());
} else {
processEvent(bufferOrEvent.get());
return InputStatus.MORE_AVAILABLE;
}
} else {
if (checkpointedInputGate.isFinished()) {
checkState(
checkpointedInputGate.getAvailableFuture().isDone(),
"Finished BarrierHandler should be available");
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
- 读取到完成数据记录之后,根据其类型进行不同的逻辑处理
/ StreamTaskNetworkInput /
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
if (recordOrMark.isRecord()) {
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(
recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(
recordOrMark.asStreamStatus(),
flattenedChannelIndices.get(lastChannel),
output);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
- 读取到数据,对于数据记录(
StreamRecord
) ,会在算子中包装用户的业务逻辑,即时使用了DataStream
编写的UDF
/ OneInputStreamTask /
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
operator.processElement(record);
}
- 进入算子内部,由算子去执行用户编写的业务逻辑 (以
FlatMap
为例)
/ StreamFlatMap /
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
- 在算子中处理完毕,数据要交给下一个算子或者
Task
进行计算,此时就会涉及 3 种 算子之间数据传递的情形
OperatorChain
内部的数据传递,发生OperatorChain
所在本地线程内- 同一个
TaskManager
不同Task
之间传递数据,发生在同一个JVM
的不同线程之间 - 不同
TaskManager
的Task
之间传递, 跨JVM
的数据传递,需要使用跨网络的通信,即便TaskManager
位于同一个物理机上,也会使用网络协议进行数据传递
Task 处理 Watermark
Task
处理 Watermark
的时候分为两种
- 一种是在
OneInputStreamOperator
(单流输入算子) - 一种是在
TwoInputStreamOperator
(双流输入算子)
单流输入逻辑比较简单,如果有定时器服务,则判断是否触发计算,并将 Watermark
发往下游
/ AbstractStreamOperator /
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}
双流输入从上游两个算子中接收到两个 Watermark
,input1Watermark1
表示第一个输入流的 Watermark
,input1Watermark2
表示第二个输入流的 Watermark
,选择其中较小的那一个 Math.min(input1Watermark, input2Watermark)
作为当前的 Watermark
, 之后的处理逻辑与单流输入一致.
/ StreamTwoInputProcessor /
@Override
public void emitWatermark(Watermark watermark) throws Exception {
inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
if (inputIndex == 0) {
operator.processWatermark1(watermark);
} else {
operator.processWatermark2(watermark);
}
}
/ AbstractStreamOperator /
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
processWatermark(new Watermark(combinedWatermark));
}
}
Task 处理 StreamStatus
StreamStatus
是 StreamElement
的一种,用来标识 Task
是活动状态还是空闲状态
当 SourceStreamTask
或一般的 StreamTask
处理空间状态 (IDLE
) , 不会向下游发送数据或者 Watermark
时,就向下游发送 StreamStatus.IDLE
状态告知下游,依次向下传递
当恢复向下游发送数据或者 Watermark
前,首先发送 StreamStatus.ACTIVE
状态告知下游
- 如何判断 是 IDLE 还是 ACTIVE 状态
StreamStatus
状态变化在 SourceFunction
中产生
SourceTask
如果读取不到输入数据,则认为是IDLE
状态,如果Kafka Consumer
未分配到数据分区 (Partition
),则不会读取数据- 如果需要重新读取数据,则认为是
Active
状态
/ FlinkKafkaConsumerBase /
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
当 StreamTask
的所有上游 Task
全部处理 IDLE
状态的时候,认为这个 StreamTask
处于 IDLE
状态
只要有一个上游的 SourceTask
是 Active
状态,StreamTask
就是 Active
状态
- StreamStatus 对 WaterMark 的影响
由于 SourceTask
保证在 IDLE
状态和 Active
状态之间不会发送数据元素,所以 StreamTask
可以在不需要检查当前状态的情况下安全地处理和传播收到数据元素.
由于在 DataFlow
的任何中间节点都可能产生 watermark
, 所以当前 StreamTask
在发送 WaterMark
之前必须检查当前算子的状态,如果当前的状态是 IDLE
,则 Watermark
阻塞不会向下游发送.
如果 StreamTask
有多个上游输入
- 上有输入的
Watermark
状态为IDLE
- 恢复到
Active
状态,但是其Watermark
落后与当前算子的最小Watermark
,此时需要忽略这个特殊的Watermark
- 在判断是否需要向前推进
Watermark
和向下游发送的时候,这个特殊的Watermark
不起作用
- 在判断是否需要向前推进
Task 处理 LatencyMarker
LatencyMarker
用来近似估计数据从读取到写出之间的延迟,但是并不包含计算的延迟.
在算子中只能将数据记录交给 UDF
执行,所以收到 LatencyMarker
就直接交给下游了.
/ AbstractStreamOperator /
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);
}