Flink 的 State 是为了让流处理有记忆,以便获取之前的结果和重启恢复

  • State 是缓存运算的中间结果
  • Checkpoint 是存储中间状态,用于失败重启
  • Savepoint 是存储程序快照,用于系统升级

1. State

Keyed State

  1. public class KeyedStateWithValueState {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. DataStreamSource<Tuple2<Long, Long>> dataStreamSource = env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),
  5. Tuple2.of(2L, 2L), Tuple2.of(2L, 4L), Tuple2.of(2L, 6L));
  6. dataStreamSource
  7. .keyBy(0)
  8. .flatMap(new CountWindowAverageWithValueState())
  9. .print();
  10. env.execute("TestStatefulApi");
  11. }
  12. }
  13. class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
  14. ValueState<Tuple2<Long, Long>> countAndSum;
  15. @Override
  16. public void open(Configuration parameters) throws Exception {
  17. super.open(parameters);
  18. // 获取 ValueState 的描述,
  19. ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("average", Types.TUPLE(Types.LONG, Types.LONG));
  20. // 可以设置过期时间 descriptor.enableTimeToLive(ttlConfig);
  21. countAndSum = getRuntimeContext().getState(descriptor);
  22. }
  23. @Override
  24. public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
  25. // 拿到当前的 key 的状态值
  26. Tuple2<Long, Long> currentState = countAndSum.value();
  27. if (currentState == null) {
  28. currentState = Tuple2.of(0L, 0L);
  29. }
  30. currentState.f0 += 1;
  31. currentState.f1 += element.f1;
  32. countAndSum.update(currentState);
  33. if (currentState.f0 >= 3) {
  34. double avg = (double) currentState.f1 / currentState.f0;
  35. out.collect(Tuple2.of(element.f0, avg));
  36. countAndSum.clear();
  37. }
  38. }
  39. }

Operated State

Backend

State 可以存在三种地方

  1. 内存,MemoryStateBackend。默认情况下,state 存在 TaskManager 的堆内存中

    • 缺点:只能保存数据量小的状态、状态数据有可能会丢失
    • 优点:开发测试很方便
  2. HDFS,FsStateBackend。state 存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

    • 缺点:状态大小受TaskManager内存限制(默认支持5M)
    • 优点:状态访问速度很快、状态信息不会丢失
    • 用于: 生产,也可存储状态数据量大的情况
  3. RocksDB,RocksDBStateBackend。状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中 checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)

    • 缺点:状态访问速度有所下降
    • 优点:可以存储超大量的状态信息、状态信息不会丢失
    • 用于: 生产,可以存储超大量的状态信息

StateBackend配置方式

(1)单任务调整

  1. 修改当前任务代码
  2. env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
  3. 或者new MemoryStateBackend()
  4. 或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

(2)全局调整

  1. 修改flink-conf.yaml
  2. state.backend: filesystem
  3. state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  4. 注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

2. Checkpoint

checkpoint 目的是为了让 flink 任务失败后自动重启,第一步开启 checkpoint,第二步设置重启侧露

1.开启 checkpoint

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. //--------------------重要参数------------------
  3. //默认情况 checkpoint 没有启用
  4. // state 比较大,可以设置 5 分钟
  5. env.enableCheckpointing(10000);
  6. // 高级选项:
  7. // 设置模式为exactly-once (这是默认值)
  8. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  9. // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
  10. env.getCheckpointConfig().setCheckpointTimeout(60000);
  11. //--------------------其他参数------------------
  12. // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
  13. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  14. // 同一时间只允许进行一个检查点
  15. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  16. // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的 Checkpoint【详细解释见备注】
  17. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.设置重启策略

  1. //失败后尝试重启, 重启 5 次,每次间隔 10秒钟
  2. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3. // 尝试重启的次数
  4. 5,
  5. // 间隔
  6. Time.of(10, TimeUnit.SECONDS)
  7. ));

3. savepoint

checkPoint 应用定时触发,用于保存状态,会过期,内部应用失败重启的时候使用。
savePoint 用户手动执行,是指向Checkpoint的指针,不会过期,在升级的情况下使用。

  1. 在flink-conf.yaml中配置Savepoint存储位置
    不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定 Savepoint的位置 state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
  2. 触发一个savepoint【直接触发或者在cancel的时候触发】
    停止程序:bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn 模式需要指定-yid参数】
  3. 从指定的savepoint启动job bin/flink run -s savepointPath [runArgs]