Flink JobMaster 调度作业的 TaskTaskManager,所有 Task 启动后成功,进入执行状态,则整个作业进入执行状态.
从外部数据源开始读取数据,数据在 Flink Task DAG 中流转,处理完毕后,写出到外部存储.
image.png


作业执行图

image.png

经过 Flink 的多层 Graph 转换之后,作业进入调度阶段,开始分发任务执行,最终会在集群中形成 执行 Graph
核心对象有

  • Task / ResultPartition & ResultSubPartition / InputGate & InputChannel

核心对象

输入处理器 <StreamInputProcessor>

输入处理器是 StreamInputProcessor , 是对 StreamTask 中读取数据行为的抽象,在其实现中要完成数据的读取、处理、输出给下游的过程,
image.png
StreamOneInputProcessor : 用在 OneInputStreamTask 中 , 只有 1 个上游输入
StreamTwoInputProcessor : 用在 TwoInputStreamTask 中, 有 2 个上游输入

image.png


Task 输入 <StreamTaskInput >

  • StreamTaskInput 负责 Task 输入,是 StreamTask 的数据输入的抽象

image.png
对于 Flink 中的 StreamTask 而言, 数据读取的行为有两种

StreamTaskNetworkInput (从上游 Task 获取数据,使用 InputGate 作为底层读取数据 )
StreamTaskSourceInput (从外部数据源获取数据,使用 SourceFunction 读取数据, 交给下游的 Task)


Task 输出 <StreamTaskNetworkOutput >

  • StreamTaskNetworkOutput 负责 Task 输出,是 StreamTask 的数据输出的抽象

image.png
StreamTaskNetworkOutput 只是负责将数据交给算子来进行处理,实际的数据写出是在算子层上执行的
StreamTaskNetworkOutput 也有对应于单流和双流输入的两种实现,作为私有的内部类定义在 OneInputStreamTaskStreamTwoInputProcessor


结果分区 <ResultPartition >

ResultPartition (结果分区) , 用来表示作业的单个 Task 产生的数据

ResultPartition 是运行时的实体, 与 ExecutionGraph 中的中间结果分区对应 (IntermediateResultPartition)

一个 ResultPartition 是一组 Buffer 实例, ResultPartitionResultSubPartition 组成-

  • ResultSubPartition 用来进一步将 ResultPartition 进行切分
  • 切分成 (与下游子任务的并行度/数据分发模式 决定) 数量的 ResultSubPartition

下游子任务消费上游子任务产生的 ResultPartition , 在实际请求的时候

  • 是向上游请求 ResultSubPartition
  • 并不是请求整个 ResultPartition
  • 请求的方式有远程请求和本地请求两种

ResultPartition
对于流上的作业而言

  • ResultPartition 在作业的执行过程中会一直存在

对于批而言

  • 上游 Task 输出 ResultPartition
  • 下游 Task 消费上游的 ResultPartition 消费完毕之后
  • 上游的 ResultPartition 就没有什么用了,需要进行资源回收,所以 Flink 增加了 ReleaseOnConsumptionResultPartition

结果子分区 <ResultSubPartition >

ResultSubPartition (结果子分区) ,结果子分区是结果分区的一部分,负责存储实际的 Buffer

ResultPartition 是一个 Task 的输出,上游 Task 跟 下游 Task 之间的数据交换经常是一对多的关系,所以 Flink 在输出端将 ResultPartition 进行了切分. (下游如果有5个 Task , ResultPartition 就会生成5个 ResultSubPartition )

image.png
结果子分区有 PipelinedSubpartitionBoundedBlockingSubpartition (流/批)

PipelinedSubpartition
PipelinedSubpartition 是纯内存型的结果子分区,只能被消费一次
当向 PipelinedSubpartition 中添加一个完成的 BufferConsumer 或者添加下一个 BufferConsumer 时 (默认前一个 BufferConsumer 是完成的), 会通知 PipelinedSubpartitionView 新数据到达,可以消费了

BoundedBlockingSubpartition
用作对批处理 Task 的计算结果的数据存储,其行为是阻塞式的,需要等待上游所有的数据处理完毕,然后下游才开始消费数据,可以消费 1 次 或者多次

  • 这种 ResultSubPartition 有多种存储形式,可以保存在文件中或者内存映射文件中

BoundedBlockingSubpartition 不是线程安全的, Flink 网络栈是单线程模型,添加、刷新、完成都由单个 Writer 完成

  • 如果在写入阶段需要 release , 则同样由该 Writer 的线程负责执行,所以能确保安全

在实现中,支持多个并发的 Reader 但是需要确保 Reader 是单线程的

ResultSubPartition 是持有数据的容器,其中的 BoundedData 属性是实际的数据操作接口,BoundedData 提供了几种不同的实现


有限数据集

BoundedData 定义了存储和读取批处理中建计算结果数据集的阻塞式接口, BoundedBlockingSubPartition 使用 BoundedData 接口来实现中间结果集的访问
BoundedData 有 3 个不同的实现,分别对应于不同的 Buffer 的存储方式.
image.png

FileChannelBoundedData

  • 使用 Java NIOFileChannel 写入数据和读取文件

FileChannelMemoryMappedBoundedData

  • 使用 FileChannel 写入数据到文件,使用内存映射文件读取数据

MemoryMappedBoundedData

  • 使用内存映射文件写入、读取, 全都是内存操作

内存映射文件 (Memory-Mapped File) 是将一段虚拟内存逐步映射于一个文件,使得应用程序处理文件如同访问主内存(但在真正使用到这些数据前却不会消耗物理内存,也不会有读写磁盘的操作),内存映射文件与磁盘的真正交互由操作系统负责

Java.io 基于流的文件读写

  • 读文件会产生2次数据复制
    • 首先是从硬盘复制到操作系统内核
    • 然后从操作系统内核复制到用户态的应用程序
  • 写文件也类似

Java.nio

  • 一般情况下只有一次复制
  • 内存分配在操作系统内核
  • 应用程序访问的就是操作系统的内核内存空间
  • 比基于流进行文件读写快几个数量级

FileChannel 是直接读写文件,与内存映射文件相比

  • FileChannel 对小文件的读写效率高
  • 内存映射大文件则是针对大文件效率高

输入网关

InputGate —-> 输入网关,是 Task 的输入数据的封装,和 JobGraph 中的 JobEdge 一一对应,对应于上游的 ResultPartition

InputGate 中负责实际数据消费的是 InputChannel , 是 InputChannel 的容器,用于读取中间结果(IntermediateResult) 在并行执行时由上游 Task 产生的一个或多个结果分区 (ResultPartition)

image.png

SingleInputGate 是消费 ResultPartition 的实体,对应于一个 IntermediateResult .

UnionInputGate 主要充当 InputGate 容器的角色,将多个 InputGate 联合起来,当做一个 InputGate.

  • 一般是对应于上游的多个输出类型相同的 IntermediateResult ,对应于多个上游的 IntermediateResult.

InputGateWithMetrics 本质上来说就是一个 InputGate + 监控统计,统计 InputGate 读取的数据量,单位为 Byte
image.png

Flink 作业在集群中并行执行时,其并行状态下的逻辑关系,中间结果集会按照并行度切分为与并行度个数相同的结果分区,每个结果分区又进一步切分为一个或者多个结果子分区
image.png
作业的并行度为 2, 有两个 map 字任务分切产生数据,生成两个 ResultPartition (ResultSubPartition1 , ResultSubPartition2)

每个 ResultPartition 又被切分为两个 ResultSubPartition

  • ResultSubPartition 的数量与下游 Reduce 任务的子任务个数相同

每个 Reduce 的子任务都有一个 InputGate

  • 负责从上游的所有 ResultPartition 中获取该子任务所需要的 ResultSubPartition

输入通道

InputChannel —-> 输入通道
每个 InputGate 会包含一个以上的 InputChannel , 和 ExecutionEdge 一一对应,也和结果子分区一对一相连,一个 InputChannel 接收一个结果子分区的输出
image.png

RemoteInputChannel

  • 对应于远程的结果子分区的输入通道,用来表示跨网络的数据交换,底层基于 Netty

LocalInputChannel

  • 对应于本地结果子分区的输入通道,用来在本地线程内不同线程之间的数据交换

UnknownInputChannel

  • 一种位于占位目的的输入通道,需要占位通道是因为暂未确定向对于 Task 生产者的位置,再确定上游 Task 位置之后
    • 如果位于不同的 TaskManager 则替换为 RemoteInputChannel
    • 如果位于相同的 TaskManager 则替换为 LocalInputChannel

Task

之前的 Task 执行模型依赖于锁机制,可能导致多个潜在的线程并发访问其内部状态

  • 比如事件处理以及检查点的触发线程.

当前,他们都通过一个全局锁 (检查点锁) 来保证彼此互斥,这种机制有一些劣势

  • 锁对象必须在类的各种互斥访问的代码段中进行传递

    • 代码可读性很差
    • 使用不当或者漏用容易造成难以定位的问题
  • 设计不够优雅,锁对象暴露给了面向用户的 API (SourceContext)

新的执行模型 类似与基于 (Mailbox) 单线程执行模型,取代现有的多线程模型

  • 所有的并发操作都通过队列进行排队 (Mailbox),单线程 (Mailbox 线程) 依次处理,避免并发操作

Task 处理数据

  1. 启动 Task 进入执行状态,开始读取数据,当有可消费的数据时,则持续读取数据

/ StreamTask /

  1. protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
  2. InputStatus status = inputProcessor.processInput();
  3. if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
  4. return;
  5. }
  6. if (status == InputStatus.END_OF_INPUT) {
  7. controller.allActionsCompleted();
  8. return;
  9. }
  10. CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
  11. MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
  12. assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
  13. }
  1. StreamInputProcessor 是数据读取,处理,输出的高层逻辑的载体
  • 由其负责触发数据时读取,并交给算子处理,然后输出

/ StreamOneInputProcessor /

  1. @Override
  2. public InputStatus processInput() throws Exception {
  3. InputStatus status = input.emitNext(output);
  4. if (status == InputStatus.END_OF_INPUT) {
  5. endOfInputAware.endInput(input.getInputIndex() + 1);
  6. }
  7. return status;
  8. }
  1. StreamInputProcessor 实际上将具体的数据读取工作交给了 StreamTaskInput
  • 当读取了完整的记录之后就开始向下游发送数据
  • 在发送数据的过程中,调用算子进行数据的处理** ```java public StreamOneInputProcessor(

    1. StreamTaskInput<IN> input, DataOutput<IN> output, BoundedMultiInput endOfInputAware) {
    2. this.input = checkNotNull(input);
    3. this.output = checkNotNull(output);
    4. this.endOfInputAware = checkNotNull(endOfInputAware);

    }

**
```java
    @Override
    public InputStatus emitNext(DataOutput<T> output) throws Exception {

        while (true) {
            // get the stream element from the deserializer
            if (currentRecordDeserializer != null) {
                DeserializationResult result;
                try {
                    result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
                } catch (IOException e) {
                    throw new IOException(
                            String.format("Can't get next record for channel %s", lastChannel), e);
                }
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    processElement(deserializationDelegate.getInstance(), output);
                    return InputStatus.MORE_AVAILABLE;
                }
            }

            Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
            if (bufferOrEvent.isPresent()) {
                // return to the mailbox after receiving a checkpoint barrier to avoid processing of
                // data after the barrier before checkpoint is performed for unaligned checkpoint
                // mode
                if (bufferOrEvent.get().isBuffer()) {
                    processBuffer(bufferOrEvent.get());
                } else {
                    processEvent(bufferOrEvent.get());
                    return InputStatus.MORE_AVAILABLE;
                }
            } else {
                if (checkpointedInputGate.isFinished()) {
                    checkState(
                            checkpointedInputGate.getAvailableFuture().isDone(),
                            "Finished BarrierHandler should be available");
                    return InputStatus.END_OF_INPUT;
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
    }
  1. 读取到完成数据记录之后,根据其类型进行不同的逻辑处理

/ StreamTaskNetworkInput /

    private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()) {
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {
            statusWatermarkValve.inputWatermark(
                    recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {
            statusWatermarkValve.inputStreamStatus(
                    recordOrMark.asStreamStatus(),
                    flattenedChannelIndices.get(lastChannel),
                    output);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }
  1. 读取到数据,对于数据记录(StreamRecord) ,会在算子中包装用户的业务逻辑,即时使用了DataStream 编写的 UDF

/ OneInputStreamTask /

@Override
 public void emitRecord(StreamRecord<IN> record) throws Exception {
            numRecordsIn.inc();
            operator.setKeyContextElement1(record);
            operator.processElement(record);
        }
  1. 进入算子内部,由算子去执行用户编写的业务逻辑 (以FlatMap 为例)

/ StreamFlatMap /

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
  1. 在算子中处理完毕,数据要交给下一个算子或者 Task 进行计算,此时就会涉及 3 种 算子之间数据传递的情形
  • OperatorChain 内部的数据传递,发生 OperatorChain 所在本地线程内
  • 同一个 TaskManager 不同 Task 之间传递数据,发生在同一个 JVM 的不同线程之间
  • 不同 TaskManagerTask 之间传递, 跨 JVM 的数据传递,需要使用跨网络的通信,即便 TaskManager 位于同一个物理机上,也会使用网络协议进行数据传递

Task 处理 Watermark

Task 处理 Watermark 的时候分为两种

  • 一种是在 OneInputStreamOperator (单流输入算子)
  • 一种是在 TwoInputStreamOperator (双流输入算子)

单流输入逻辑比较简单,如果有定时器服务,则判断是否触发计算,并将 Watermark 发往下游
/ AbstractStreamOperator /

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

双流输入从上游两个算子中接收到两个 Watermark ,input1Watermark1 表示第一个输入流的 Watermark ,input1Watermark2 表示第二个输入流的 Watermark,选择其中较小的那一个 Math.min(input1Watermark, input2Watermark) 作为当前的 Watermark , 之后的处理逻辑与单流输入一致.

/ StreamTwoInputProcessor /

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (inputIndex == 0) {
                operator.processWatermark1(watermark);
            } else {
                operator.processWatermark2(watermark);
            }
        }

/ AbstractStreamOperator /

  public void processWatermark1(Watermark mark) throws Exception {
        input1Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        input2Watermark = mark.getTimestamp();
        long newMin = Math.min(input1Watermark, input2Watermark);
        if (newMin > combinedWatermark) {
            combinedWatermark = newMin;
            processWatermark(new Watermark(combinedWatermark));
        }
    }

Task 处理 StreamStatus

StreamStatusStreamElement 的一种,用来标识 Task 是活动状态还是空闲状态

SourceStreamTask 或一般的 StreamTask 处理空间状态 (IDLE) , 不会向下游发送数据或者 Watermark 时,就向下游发送 StreamStatus.IDLE 状态告知下游,依次向下传递
当恢复向下游发送数据或者 Watermark 前,首先发送 StreamStatus.ACTIVE 状态告知下游

  • 如何判断 是 IDLE 还是 ACTIVE 状态

StreamStatus 状态变化在 SourceFunction 中产生

  • SourceTask 如果读取不到输入数据,则认为是 IDLE 状态,如果 Kafka Consumer 未分配到数据分区 (Partition),则不会读取数据
  • 如果需要重新读取数据,则认为是 Active 状态

/ FlinkKafkaConsumerBase /

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

StreamTask 的所有上游 Task 全部处理 IDLE 状态的时候,认为这个 StreamTask 处于 IDLE 状态
只要有一个上游的 SourceTaskActive 状态,StreamTask 就是 Active 状态

  • StreamStatus 对 WaterMark 的影响

由于 SourceTask 保证在 IDLE 状态和 Active 状态之间不会发送数据元素,所以 StreamTask 可以在不需要检查当前状态的情况下安全地处理和传播收到数据元素.
由于在 DataFlow 的任何中间节点都可能产生 watermark , 所以当前 StreamTask 在发送 WaterMark 之前必须检查当前算子的状态,如果当前的状态是 IDLE ,则 Watermark 阻塞不会向下游发送.
如果 StreamTask 有多个上游输入

  • 上有输入的 Watermark 状态为 IDLE
  • 恢复到 Active 状态,但是其 Watermark 落后与当前算子的最小 Watermark,此时需要忽略这个特殊的 Watermark
    • 在判断是否需要向前推进 Watermark 和向下游发送的时候,这个特殊的 Watermark 不起作用

Task 处理 LatencyMarker

LatencyMarker 用来近似估计数据从读取到写出之间的延迟,但是并不包含计算的延迟.
在算子中只能将数据记录交给 UDF 执行,所以收到 LatencyMarker 就直接交给下游了.
/ AbstractStreamOperator /

    protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        // all operators are tracking latencies
        this.latencyStats.reportLatency(marker);

        // everything except sinks forwards latency markers
        this.output.emitLatencyMarker(marker);
    }