状态使用

状态的使用主要分两部分:实现有状态算子和配置。

实现有状态算子

从使用角度讲可以把状态分为系统状态和自定义状态。系统状态是Flink算子自带的,比如Window内置的状态存有相关数据供触发计算时计算(比如窗口时间到了会触发聚合操作)。
下面将详细讲述用户自定义状态。

状态类型

Flink内置了多种不同类型的状态,可供用户按需使用。
下图是各种状态的继承关系图。
image.png

image.png

总结一下上图:

  1. ListState是Keyed State和 Operator State共有。
  2. BroadcastState是Operator State独有。
  3. 其他都是Keyed独有。

使用实例介绍

Keyed State示例

https://github.com/deadwind4/flink-training/blob/master/src/main/java/me/training/flink/state/Average.java
上诉代码中map回调函数相当于作用当前记录。
也就是说每条记录过来只会修改所对应的key的sum值(ValueState)。不同key之间彼此是隔离的。

Operator State示例

三个要点:

  1. 使用Operator State要实现CheckpointedFunction或ListCheckpointed接口。
  2. 这两个接口都有一个共性就是都要实现,做Checkpoint和从Checkpoint中恢复的两个逻辑。
  3. Opertator State全部都是类似List的类型,目的是扩容之后可以把List中的每个元素均匀分配给所有并行算子。

    实现CheckpointedFunction接口

    https://github.com/deadwind4/flink-training/blob/master/src/main/java/me/training/flink/state/BufferingSink.java

    实现ListCheckpointed接口

    https://github.com/deadwind4/flink-training/blob/master/src/main/java/me/training/flink/state/CounterSource.java

配置

StateBackend(状态后端)选择

不同后端的区别如下图。状态后端主要在做Checkpoint时快照存储的方式不同,存储状态时差别不大。RocksDB的Keyed State存在RocksDB,防止Keyed State过大。
image.png

设置Time-To-Live (TTL)

为了防止Keyed State随时间增长过于巨大,所以引入TTL机制,设置时间定期清理状态。
设置实例代码如下:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.common.time.Time;
  4. StateTtlConfig ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build();
  9. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
  10. stateDescriptor.enableTimeToLive(ttlConfig);

配置除了时间外还包含3个选项:

  • UpdateType:设置刷新类型。OnReadAndWrite选项是指Read、Write、Create都刷新倒计时。
  • StateVisibility:配置过期但还没被清除的数据是否可见。由于清理过期数据是惰性的,不是过期了马上就没,可能到定期清理时才会消失。为了满足业务需求,所以添加这个选项。
  • CleanupStrategies:清理策略。
    • cleanupFullSnapshot策略内存中状态并不会被清理,只是做Checkpoint的快照时快照清理后变小了。
    • cleanupIncrementally策略特定条件触发后检查N个状态,如过期则触发清理。
    • cleanupRocksdbCompactFilter策略在RocksDB定期压缩时清理。

image.png

额外知识点:

TTL时间设置的源码加入了整数溢出保护。具体如下。

  1. public static boolean expired(long ts, long ttl, long currentTimestamp) {
  2. return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
  3. }
  4. private static long getExpirationTimestamp(long ts, long ttl) {
  5. long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
  6. return ts + ttlWithoutOverflow;
  7. }

特殊的广播状态

作用,把数据流甲的每个数据广播到流乙。
广播状态是Operator state的一种,但用法特殊一点。
使用广播状态只需要三步。

  1. 创建BroadcastStream。
  2. 调用connect()与事实数据流DataStream连接。
  3. 调用process()使用ProcessFunction实现接收到广播的处理逻辑。

应用场景:动态更新配置。向甲数据流发送配置,广播到乙数据流,乙数据流根据配置修改代码逻辑。
//TODO 示例

状态故障恢复

概述

状态故障恢复使用Checkpoint机制。核心思想就是定期保存数据,故障后从保存的数据中恢复曾经的状态。
这种机制必须借助外部持久化存储系统。
需要两部分配备持久化存储:

  1. 数据源(Source)是持久化系统,意味着可重放(例如Kafka)。
  2. 状态定期存储到持久化系统(例如HDFS)。

将整个数据流划分成一个又一个阶段

故障恢复的两种模式

exactly-once:准确恢复状态值,不多不少。
at-least-once:恢复状态值后可能会变多。
at-least-once存在的意义是很多场景并不需要计算的数值非常准确。例如,一些海量日志处理,一段时间内报警10001次和10002次数量级是一样对最终的业务没有影响。使用at-least-once可以降低延迟。具体降低延迟的原理后面讲述。

默认Checkpoint会异步做State快照。

配置注意事项

config``.``setMinPauseBetweenCheckpoints``(``500``);
这个选项是设置两次Checkpoint见最小的时间间隔。为了防止单次Checkpoint时间过长,刚做完,就开始做下次Checkpoint,导致Flink处理数据效率下降,甚至压根就不处理,一直在做无意义的Checkpoint。
而且如果不配置这个值,会导致做检查点的时间重叠,会出现并发做多个。
示意图如下,绿框代表做Checkpoint的时间,空白代表正常处理数据的时间。
image.png

集群扩容再分配

总体核心思想就是均匀分配,根据线程数取模。只是两种Operator State和Keyed State取模细节不一样,核心原理是一样的。

Keyed State再分配

Key Groups重新均匀分配到所有并行算子上。
Key Groups详细介绍

Operator State再分配(Redistribute)

两种方式
Even-split redistribution:
重启后均匀分配到每个算子。
Union redistribution:
重启后每个算子获得所有状态。这个不造咋用就不能乱用,会导致每个节点的State线性增长,撑爆内存。

广播状态每个算子都一样所以再分配就在所有算子里都复制一遍。