Flink 的 State 是为了让流处理有记忆,以便获取之前的结果和重启恢复
- State 是缓存运算的中间结果
- Checkpoint 是存储中间状态,用于失败重启
- Savepoint 是存储程序快照,用于系统升级
1. State
Keyed State
public class KeyedStateWithValueState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple2<Long, Long>> dataStreamSource = env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),Tuple2.of(2L, 2L), Tuple2.of(2L, 4L), Tuple2.of(2L, 6L));dataStreamSource.keyBy(0).flatMap(new CountWindowAverageWithValueState()).print();env.execute("TestStatefulApi");}}class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {ValueState<Tuple2<Long, Long>> countAndSum;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 获取 ValueState 的描述,ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("average", Types.TUPLE(Types.LONG, Types.LONG));// 可以设置过期时间 descriptor.enableTimeToLive(ttlConfig);countAndSum = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {// 拿到当前的 key 的状态值Tuple2<Long, Long> currentState = countAndSum.value();if (currentState == null) {currentState = Tuple2.of(0L, 0L);}currentState.f0 += 1;currentState.f1 += element.f1;countAndSum.update(currentState);if (currentState.f0 >= 3) {double avg = (double) currentState.f1 / currentState.f0;out.collect(Tuple2.of(element.f0, avg));countAndSum.clear();}}}
Operated State
Backend
State 可以存在三种地方
内存,MemoryStateBackend。默认情况下,state 存在 TaskManager 的堆内存中
- 缺点:只能保存数据量小的状态、状态数据有可能会丢失
- 优点:开发测试很方便
HDFS,FsStateBackend。state 存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
- 缺点:状态大小受TaskManager内存限制(默认支持5M)
- 优点:状态访问速度很快、状态信息不会丢失
- 用于: 生产,也可存储状态数据量大的情况
RocksDB,RocksDBStateBackend。状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中 checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
- 缺点:状态访问速度有所下降
- 优点:可以存储超大量的状态信息、状态信息不会丢失
- 用于: 生产,可以存储超大量的状态信息
StateBackend配置方式
(1)单任务调整
修改当前任务代码env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));或者new MemoryStateBackend()或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
(2)全局调整
修改flink-conf.yamlstate.backend: filesystemstate.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
2. Checkpoint
checkpoint 目的是为了让 flink 任务失败后自动重启,第一步开启 checkpoint,第二步设置重启侧露
1.开启 checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//--------------------重要参数------------------//默认情况 checkpoint 没有启用// state 比较大,可以设置 5 分钟env.enableCheckpointing(10000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);//--------------------其他参数------------------// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的 Checkpoint【详细解释见备注】env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
2.设置重启策略
//失败后尝试重启, 重启 5 次,每次间隔 10秒钟env.setRestartStrategy(RestartStrategies.fixedDelayRestart(// 尝试重启的次数5,// 间隔Time.of(10, TimeUnit.SECONDS)));
3. savepoint
checkPoint 应用定时触发,用于保存状态,会过期,内部应用失败重启的时候使用。
savePoint 用户手动执行,是指向Checkpoint的指针,不会过期,在升级的情况下使用。
- 在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定 Savepoint的位置 state.savepoints.dir: hdfs://namenode:9000/flink/savepoints - 触发一个savepoint【直接触发或者在cancel的时候触发】
停止程序:bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn 模式需要指定-yid参数】 - 从指定的savepoint启动job bin/flink run -s savepointPath [runArgs]
