image.png数据流元素

image.png

StreamRecord

  • 表示数据流中一条记录(或者叫做一个事件),也叫作数据记录
  • 包含

    • 数据的值本身
    • 时间戳 (可选) ```java /* The actual value held by this record. / private T value;

      /* The timestamp of the record. / private long timestamp;

      /* Flag whether the timestamp is actually set. / private boolean hasTimestamp;

  1. <a name="Debfe"></a>
  2. #### LatencyMarker
  3. - 用来近似估计延迟,LatencyMarker 在 Source 中创建,并向下游发送,绕过业务处理逻辑,在 Sink 节点使用 LatencyMarker 估计在整个 DAG 图中流花费的时间,用来近似的评估总体上的处理逻辑
  4. - 包含
  5. - 周期性地在数据源算子中创造出来的时间戳
  6. - 算子编号
  7. - 数据源算子所在的 Task 编号
  8. ```java
  9. /** The time the latency mark is denoting. */
  10. private final long markedTime;
  11. private final OperatorID operatorId;
  12. private final int subtaskIndex;

Watermark

  • WaterMark 是一个时间戳,用来告诉算子所有时间早于等于 WaterMark 的事件或记录都已经到达,不会再有比 WaterMark 更早的记录,算子可以根据 WaterMark 触发窗口的计算.清理资源等

      /** The timestamp of the watermark in milliseconds. */
      private final long timestamp;
    

    StreamStatus

  • 用来通知 Task 是否会继续接收到上游的记录或者 WaterMark. StreamStatus在数据源算子中生成,向下游沿着 Dataflow 传播.

  • StreamStatus 表示两种状态
    • IDLE 空闲状态
    • ACTIVE 活动状态
      public static final int IDLE_STATUS = -1;
      public static final int ACTIVE_STATUS = 0;
      

Transformation

    protected String name; // 转换器的名称,主要用于可视化
    private String uid; // 用户指定的UID,目的在Job重启时再次分配与之前相同的uid,可以持久保存状态
    protected long bufferTimeout = -1; // 超时时间
    private int parallelism; // 并行度
    protected final int id; // 跟属性uid 无关,生成方式是基于一个静态累加器
    private String userProvidedNodeHash;
    protected TypeInformation<T> outputType; // 输出类型 用来进行序列化数据
    private String slotSharingGroup; // 给当前的 Transformation 设置 Slot共享组

物理 Transformation

image.png
SourceTransformation

  • 从数据源读取数据的 Transformation ,是 Flink 作业的起点, SourceTransformation 只有下游 Transformation,没有上游输入 Transformation
  • 一个作业可以有多个 SourceTransformation, 从多个数据源读取数据[多流Join,维表Join,BroadcastState]

    SinkTransformation

  • 将数据写入到外部存储的 Transformation ,是 Flink 作业的终点, SinkTransformation 只有上游 Transformation,下游就是外部存储了

  • 一个作业可以有多个 SinkTransformation

    OneInputTransformation

  • 只接收一个输入流 同样需要 inputoperator 参数

    TwoInputTransformation

  • 接收两个输入流作为输入

image.png

SideOutputTransformation

  • 旁路输出中转换而来
  • 上游的 SideOutputTransformation 分流给下游的多个 SideOutputTransformation ,每一个 OutPut 通过 OutPutTag 进行标识.

    SplitTransformation

  • 按照条件切分数据流,该转换用于将一个流拆分成多个流(通过OutputSelector 来达到这个目的)

    SelectTransformation

  • SplitTransformation 配合使用,用来在下游选择 SplitTransformation 切分的数据流

    PartitionTransformation

  • 该转换器用于改变输入元素的分区

    UnionTransformation

  • 合并转换器,该转换器用于将多个输入 StreamTransformation 进行合并,因此该转换器接收 StreamTransformation 的集合,其名称在内部也被固定为 Union

  • Union运算要求上游输入的数据的结构必须是完全相同的

    FeedbackTransformation

  • 该转换器用于表示Flink DAG中的一个反馈点feedback point)。所谓反馈点,可用于连接一个或者多个StreamTransformation,这些StreamTransformation被称为反馈边(feedback edges)。处于反馈点下游的operation将可以从反馈点和反馈边获得元素输入。

    CoFeedbackTransformation

  • 某种程度上跟FeedbackTransformation类似。feedback元素的类型不需要跟上游的StreamTransformation元素的类型一致,因为CoFeedbackTransformation之后只允许跟TwoInputTransformations。上游的StreamTransformation将会连接到TwoInputTransformations第一个输入,而feedback edge将会连接到其第二个输入。因此上游的StreamTransformation其实是跟CoFeedbackTransformation无关的,它跟TwoInputTransformation有关。

  • 上游的StreamTransformationCoFeedbackTransformation无关,从CoFeedbackTransformation构造器需要的参数就可以看出来。通常,其他的StreamTransformation的实现都需要传入上游的StreamTransformation作为其输入。但CoFeedbackTransformation却没有,它只需要上游的并行度:parallelism。另外一个需要的参数是feedbackType

image.png
image.png


算子

所有算子都包含了

  • 生命周期管理

    • setup : 初始化环境,时间服务,注册监控等
    • open : 当前操作由各个具体的算子负责实现,包含了算子的初始化逻辑
    • close : 所有的数据处理完毕之后关闭算子,此时需要确保将所有的缓存数据向下发送
    • dispose : 算子生命周期的最后阶段执行,此时算子已经关闭,停止处理数据,进行资源的释放
  • 状态与容错管理

    • 算子负责状态管理,提供状态存储,触发检查点的时候,保存状态快照,并将快照异步保存到外部的分布式存储
    • 当作业执行失败时从保存的快照中恢复状态
  • 数据处理

    • 算子对数据的处理,不仅会进行数据记录的处理,同时也会提供对 WaterMark 和 Laten-cyMarker 的处理
    • 算子按照单流输入 和 双流输入 走对应的接口
      • OneInputStreamOperator
      • TwoInputStreamOperator
    • 算子融合优化策略

OneInputStreamOperator

@PublicEvolving
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
    void processElement(StreamRecord<IN> element) throws Exception;
    void processWatermark(Watermark mark) throws Exception;
    void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}

TwoInputStreamOperator

@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    void processElement1(StreamRecord<IN1> element) throws Exception;
    void processElement2(StreamRecord<IN2> element) throws Exception;
    void processWatermark1(Watermark mark) throws Exception;
    void processWatermark2(Watermark mark) throws Exception;
    void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;
    void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
}

image.png
数据处理

  • OneInputStreamOperator
  • TwoInputStreamOperator

生命周期,状态容错

  • AbstractStreamOperator 抽象类
  • AbstractUdfStreamOperator 实现类

DataStream 的算子 从行为上来说分4类

  • 单流输入算子

image.png

  • 双流输入算子

image.png

  • 数据源算子

image.png

  • 异步算子

image.png


Blink Join 算子

image.png

  • HashJoinParameter
    • InnerHashJoinOperator
    • BuildOuterHashJoinOperator
    • BuildLeftSemiOrAntiHashJoinOperator
    • ProbeOuterHashJoinOperator
    • FullOuterHashJoinOperator
    • AntiHashJoinOperator
    • SemiHashJoinOperator

      Temporal Join

image.png

  • TemporalProcessTimeJoinOperator 基于处理时间
  • TemporalRowTimeJoinOperator 基于事件时间

    Sort算子

    image.png

  • 排序算子,分别为流批两种,排序是一种全局排序,对于流批而言无法共用相同的算子实现

  • 流上包括 (支持时间+其他排序字段)

    • RowTimeSortOperator 事件时间
    • TemporalProcessTimeJoinOperator 处理时间

      OverWindow算子

      image.png
  • BufferDataOverWindowOperator

    • Over 开窗运算经常需要用当前数据跟之前N条数据一起计算,所以需要采用将之前的数据缓存起来的方式,在内存不足的情况下自动溢出到磁盘
  • NonBufferOverWindowOperator

    • 该算子应用于 rank 等不需要跟之前 N 条数据一起计算的开窗运算,无需缓存数据,可以提高计算效率

      Window 算子

      image.png
  • AggregateWindowOperator

    • 使用普通聚合函数 (UDAF) 的窗口算子
  • TableAggregateWindowOperator

    • 使用表聚合函数 (UDTAF) 的窗口算子

      Watermark算子

      image.png
  • WatermarkAssignerOperator

    • 从数据元素中提取时间戳,周期的生成 Watermark
  • RowTimeMiniBatchAssginerOperator
    • 用在 mini-batch 模式下,依赖上游的 Watermark ,基于事件时间周期性地生成 Watermark
  • ProcTimeMiniBatchAssignerOperator

    • 用在 mini-batch 模式下,基于处理时间周期性地生成 Watermark,不依赖上游

      Mini-batch 算子

      image.png
  • Mini-batch 算子用微批来提升计算效率,提高吞吐量. 使用Java 的Map 来缓存数据,Map的Key 与 State 的Key保持一致,在进行聚合运算的时候可以批量操作,避免每一条数据都访问State.

  • MapBundleOperator
    • 应用于未按照 Key 分组的数据流
  • KeyedMapBundleOperator

    • 应用于按照Key分组的数据流 <=> KeyedStream

      批上算子

  • SortOperator 实现批上的全局数据排序

  • SortLimitOperator 实现批上的带Limit排序
  • LimitOperator 实现批上的Limit 语义
  • RankOperator 实现批上的 TopN 语义

其他模块算子

  • ProcessOperator | KeyedProcessOperator
  • CEPOperator

函数体系

  • SourceFunction
    • 无上游算子,SourceFunction直接从外部数据存储读取数据
  • SinkFunction
    • 无下游算子,SinkFunction直接将数据写入到外部存储
  • 一般 Function (UDF)
    • 一般的 UDF 函数用在作业的中间处理步骤中,一般UDF 有上游算子,也有下游算子
    • 一般算子分为:
      • 单流输入Function
      • 双流输入Function

函数层次

Flink 提供的函数体系从接口的层级来看,从高阶 Function 到 低阶 Function

  • 无状态Function
    • UDF 接口 (MapFunction)
  • RichFunction
    • UDF接口 + 状态 + 生命周期 (RichFunction)
  • ProcessFunction
    • UDF接口 + 状态 + 生命周期 + 触发器 (JoinFunction)
  • RichFunction 相比于 无状态Function 有两方面的增强
    • 增加了 open 和 close 方法来管理 Function 的生命周期,在 open 方法中执行初始化,在Function 停止时,在 Close 方法中 执行清理,释放占用资源等
    • 增加了 getRuntimeContext 和 setRuntimeContext , RichFunction 能获取到执行时作业级别的参数信息

处理函数

处理函数 (ProcessFunction) 可以访问流应用程序所有 基本构建块

  • 事件 (数据流元素)
  • 状态 (容错和一致性)
  • 定时器 (事件时间和处理时间)

image.png

  • KeyedProcessFunction 与 Non-KeyedProcessFunction 的区别是
    • KeyedProcessFunction 只能应用于 KeyedStream
  • ProcessFunction 与 CoProcessFunction 区别是
    • CoProcessFunction 是双流输入
    • ProcessFunction 是单流输入
  • CoProcessFunction 实现双流 Join

    • 即时双流 Join

      1. 创建1个State对象
      2. 接收到输入流1 事件后更新 State
      3. 接收到输入流2 事件后 遍历 State,根据 Join 条件进行匹配,将匹配后的结果发送到下游
    • 延迟双流 Join

    • 在流数据里,数据可能是乱系的,数据会延迟到达,并且为了提供处理效率,使用小批量计算模式,而不是每个事件都触发一次 Join 计算
      1. 创建2个State对象,分别缓存输入流1和输入流2的事件
      2. 创建一个定时器,等待数据的到达,定时延迟触发Join计算
      3. 接收到输入流1事件后更新 State
      4. 接收到输入流2事件后更新 State
      5. 定时器遍历 State1 和 State2,根据Join 条件进行匹配,将匹配后的结果发送到下游
  • 延迟计算

    • 窗口计算 Watermark 和 Window 就是所谓的窗口计算(WindowOperator)
    • 使用Window 暂存数据,使用Watermark 触发 Window 计算
    • **InternalTimerServiceImpl**

      public void advanceWatermark(long time) throws Exception {
         currentWatermark = time;
      
         InternalTimer<K, N> timer;
      
         while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
             eventTimeTimersQueue.poll();
             keyContext.setCurrentKey(timer.getKey());
             triggerTarget.onEventTime(timer);
         }
      }
      

      广播函数

      image.png

  • processElement() 和 processBroadcastElement() 方法区别在于:

    • processElement() 只能使用只读的上下文 ReadOnlyContext
    • processBroadcastElement() 支持读写的上下文 Context

      异步函数

      image.png
  • 异步函数的抽象类 RichAsyncFunction 实现 AsyncFunction接口,继承 AbstractRichFunction 获取了生命周期管理和 FunctionContext 的访问能力

  • 异步调用行为将结果封装到 ResultFuture 中,同时提供了调用超时的处理,防止不释放资源
  • AsyncFunction
  • RichAsyncFunction
  • AbstractRichFunction ```java @PublicEvolving public interface AsyncFunction extends Function, Serializable { void asyncInvoke(IN input, ResultFuture resultFuture) throws Exception; default void timeout(IN input, ResultFuture resultFuture) throws Exception {
      resultFuture.completeExceptionally(
          new TimeoutException("Async function call has timed out."));
    
    }

}

<a name="RjQrz"></a>
#### 数据源函数
![image.png](https://cdn.nlark.com/yuque/0/2020/png/667110/1605667569098-ae056046-31b3-48a8-842f-4b2f18295641.png#align=left&display=inline&height=613&margin=%5Bobject%20Object%5D&name=image.png&originHeight=613&originWidth=1284&size=62134&status=done&style=none&width=1284)

- SourceFunction
   - 专门用来从外部存储读取数据,SourceFunction 是 Flink 的起点,一个作业可以有多个起点,即读取多个数据源的数据
- 一般使用 RichSourceFunction | RichParallelSourceFunction
   - 这两个 抽象类 通过继承 AbstractRichFunction 获得了 Function的生命周期,访问 RuntimeContext 的能力
   - RichSourceFunction 是不可以并行的,并行度限定为1,超过1则会报错
   - RichParallelSourceFunction 是可以并行的,并行度可以按照需求设定,并没有限制
- SourceFunction 有几个比较关键的行为
> 1. 生命周期管理: open | close | cancle 生命周期方法中包含 相应的 初始化\清理
> 1. 读取数据: 持续的从外部存储读取数据
> 1. 向下游发送数据
> 1. 发送 Watermark : 生成 Watermark 向下游发送
> 1. 空闲状态标记: 如果读不到数据,则将该 Task 标记为空闲,向下游发送 Status 阻止 Watermark 向下游传播/

```java
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();

    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {
        void collect(T element);

        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        @PublicEvolving
        void emitWatermark(Watermark mark);

        void markAsTemporarilyIdle();
        Object getCheckpointLock();
        void close();
    }
}
  • StreamSourceContext 中提供了两种不同类型的SourceContext的实例方法,从整体上按带不带时间分为两种

    • NonTimestampContext: 为所有元素赋予-1 作为时间戳,意味永远不会向下游发送 Watermark (Processing Time)
    • WatermarkContext
      1. 负责管理当前的 StreamStatus,确保 StreamStatus 向下游传递
      2. 负责空闲检测的逻辑,当超过设定事件间隔而没有收到数据或者 Watermark 时,认为 Task 处于空闲状态
  • WatermarkContext 有两个实现类

    • AutomaticWatermarkContext
      • 使用摄入时间(Ingestion Time)的时候,AutomaticWatermarkContext 会自动生成 Watermark
      • 启动 WatermarkEmittingTask 向下游发送 Watermark,使用一个定时器 其触发时间= (作业启动的时刻+Watermark周期x n),一旦启动,WatermarkEmittingTask 会持续的自动注册定时器,向下游发送 Watermark
    • ManualWatermarkContext
      • 使用事件时间(Event Time)的时候,ManualWatermarkContext 不会产生 Watermark,而是向下游发送透传上游的 Watermark

        输出函数

        image.png
  • SinkFunction

  • RichSinkFunction
  • TwoPhaseCommitSinkFunction

    检查点函数

  • 支持函数级别状态的保存和恢复的函数

  • CheckpointedFunction

    • 当保存状态之后,snapshotState() 会被调用,用于备份保存状态到外部存储
    • 当恢复状态的时候,其initializeState() 方法负责初始化 State,执行从上一个检查点恢复状态的逻辑
      public interface CheckpointedFunction {
      void snapshotState(FunctionSnapshotContext context) throws Exception;
      void initializeState(FunctionInitializationContext context) throws Exception;
      }
      
  • ListCheckpointed

    • 该接口的行为 跟 Checkpoint 行为类似,除了提供状态管理之外,修改作业并行度的时候,还提供了状态重分布的支持
      public interface ListCheckpointed<T extends Serializable> {
      List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
      void restoreState(List<T> state) throws Exception;
      }
      

      数据分区

      image.png
      • CustomPartitionerWrapper

      • ForwardPartitioner : 数据直接传递给下游
  • ShufflePartitioner : 随机将元素进行分区,可以确保下游的Task 能够均匀的获取数据

  • RebalancePartitioner : 以Round-robin的方式为每个元素分配分区,确保下游的Task可以均匀的获取到数据

  • RescalingPartitioner :

  • BroadcastPartitioner : 广播给所有分区

  • KeyGroupStreamPartitioner : 该分词器不是提供给用户的,KeyedStream在构建Transformation时,默认使用KeyedGroup分区形式,从而在底层上支持作业 Resscale 功能

  • GlobalPartitioner :

连接器

分布式ID

image.png

  • 在分布式计算中,Flink 对所有需要进行唯一标识的组件,对象提供了抽象类 AbstractID,因为需要跨网络进行传输,所以实现了 Serializable 接口,需要比较唯一标识是否相同 所以也实现了 Comparable 接口 .
    • Flink 作业
    • 资源管理
    • 作业管理器
    • 资源管理器
    • TaskManager