API 迁移指南

从Flink 1.3+ 到 Flink 1.7

TypeSerializer 的变化

这部分主要与实现TypeSerializer接口来自定义序列化的用户有关。

原来的 TypeSerializerConfigSnapshot 抽象接口被弃用了, 并且将在将来完全删除,取而代之的是新的 TypeSerializerSnapshot. 详情请参考 Migrating from deprecated serializer snapshot APIs before Flink 1.7

从 Flink 1.2 迁移到 Flink 1.3

自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。

TypeSerializer 接口变化

这主要适用于自定义 TypeSerializer接口的用户

从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 序列化升级和兼容性

ProcessFunctionRichFunction

从Flink 1.2, ProcessFunction 引入,并有了多种实现例如 RichProcessFunction 。 从Flink 1.3,开始RichProcessFunction 被移除了, 现在 ProcessFunction 始终是 RichFunction 并且可以访问运行时上下文。

Flink CEP 库API更改

Flink 1.3中的CEP库新增了许多新函数,请参阅 CEP迁移文档

Flink core 中移除了Logger的依赖

Flink1.3以后,用户可以选用自己期望的日志框架了,Flink移除了日志记录框架的依赖。

实例和快速入门的demo已经指定了日志记录器,不会有问题,其他项目,请确保添加日志依赖,比如Maven的 pom.xml,中需要增加

  1. <dependency>
  2. <groupId>org.slf4j</groupId>
  3. <artifactId>slf4j-log4j12</artifactId>
  4. <version>1.7.7</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>log4j</groupId>
  8. <artifactId>log4j</artifactId>
  9. <version>1.2.17</version>
  10. </dependency>

从 Flink 1.1 到 Flink 1.2的迁移

正如 状态文档中所说,Flink 有两种状态: keyednon-keyed 状态 (也被称作 operator 状态). 这两种类型都可用于 算子和用户定义的函数。文档将指导您完成从Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些改变涉及到Flink 1.1中对齐窗口操作的弃用。 (请参阅 时间对齐窗口算子Aligned Processing Time Window Operators).

迁移有两个目标:

  1. 引入Flink1.2中引入的新函数,比如自适应(rescaling)

  2. 确保新Flink 1.2作业能够从Flink 1.1的保存点恢复执行

按照本指南操作可以把正在运行的Flink1.1作业迁移到Flink1.2中。前提需要在Flink1.1中使用 保存点 并把保存点作为Flink1.2作业的起点。这样,Flink1.2就可以从之前中断的位置恢复执行了。

用户函数示例

本文档其余部分使用 CountMapperBufferingSink 函数作为示例。第一个函数是 keyed 状态,第二个不是 s non-keyed 状态,Flink 1.1中上述两个函数的代码如下:

  1. public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
  2. private transient ValueState<Integer> counter;
  3. private final int numberElements;
  4. public CountMapper(int numberElements) {
  5. this.numberElements = numberElements;
  6. }
  7. @Override
  8. public void open(Configuration parameters) throws Exception {
  9. counter = getRuntimeContext().getState(
  10. new ValueStateDescriptor<>("counter", Integer.class, 0));
  11. }
  12. @Override
  13. public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
  14. int count = counter.value() + 1;
  15. counter.update(count);
  16. if (count % numberElements == 0) {
  17. out.collect(Tuple2.of(value.f0, count));
  18. counter.update(0); // reset to 0
  19. }
  20. }
  21. }
  22. public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  23. Checkpointed<ArrayList<Tuple2<String, Integer>>> {
  24. private final int threshold;
  25. private ArrayList<Tuple2<String, Integer>> bufferedElements;
  26. BufferingSink(int threshold) {
  27. this.threshold = threshold;
  28. this.bufferedElements = new ArrayList<>();
  29. }
  30. @Override
  31. public void invoke(Tuple2<String, Integer> value) throws Exception {
  32. bufferedElements.add(value);
  33. if (bufferedElements.size() == threshold) {
  34. for (Tuple2<String, Integer> element: bufferedElements) {
  35. // send it to the sink
  36. }
  37. bufferedElements.clear();
  38. }
  39. }
  40. @Override
  41. public ArrayList<Tuple2<String, Integer>> snapshotState(
  42. long checkpointId, long checkpointTimestamp) throws Exception {
  43. return bufferedElements;
  44. }
  45. @Override
  46. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  47. bufferedElements.addAll(state);
  48. }
  49. }

CountMapper 是一个按表格分组输入(word, 1)RichFlatMapFunction , 函数为每个传入的key保存一个计数器 (ValueState&lt;Integer&gt; counter) 并且 ,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。

BufferingSink 是一个 SinkFunction 接收方 ( CountMapper可能的输出) ,直到达到用户定义的最终状态之前会一直缓存数据,这可以避免频繁的对数据库或者是存储系统的操作,通常这些操作都是比较耗时或开销比较大的。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements) 列表会被定期被检查点保存。

状态 API 迁移

要使用Flink 1.2的新函数,应修改上面的代码来完成新的状态抽象。完成这些更改后,就可以实现作业的并行度的修改(向上或向下扩展),并确保新版本的作业将从之前作业停止的位置开始执行。

Keyed State: 需要注意的是,如果代码中只有keyed state,那么Flink1.1的代码也适用于1.2版本,并且完全支持新函数和向下兼容。可以仅针代码格式进行更改,但这只是风格/习惯问题。

综上所述,本章我们重点阐述 non-keyed state的迁移

自适应和新状态抽象

第一个修改是 Checkpointed&lt;T extends Serializable&gt; 接口有了新的实现。在 Flink 1.2中,有状态函数可以实现更通用的 CheckpointedFunction 接口或 ListCheckpointed&lt;T extends Serializable&gt; 接口 ,和之前版本的 Checkpointed 类似。

在这两种情况中,非键合状态预期是一个 可序列化List ,对象彼此独立,这样可以在自适应的时候重新分配,意味着, 这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行度为1的 BufferingSink(test1, 2)(test2, 2)两个数据,当并行度增加到2时, (test1, 2) 可能在task 0中,而 (test2, 2) 可能在task 1中。

更详细的信息可以参阅状态文档.

ListCheckpointed

ListCheckpointed 接口需要实现两个方法:

  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

它的语义和之前的 Checkpointed 接口类似,唯一区别是,现在 snapshotState() 返回的是检查点对象列表, 如前所述, restoreState 必须在恢复的时候,处理这个列表。如果状态不是重新分区,可以随时返回 Collections.singletonList(MY_STATE)snapshotState()。 更新的代码 BufferingSink 如下:

  1. public class BufferingSinkListCheckpointed implements
  2. SinkFunction<Tuple2<String, Integer>>,
  3. ListCheckpointed<Tuple2<String, Integer>>,
  4. CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
  5. private final int threshold;
  6. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  7. private List<Tuple2<String, Integer>> bufferedElements;
  8. public BufferingSinkListCheckpointed(int threshold) {
  9. this.threshold = threshold;
  10. this.bufferedElements = new ArrayList<>();
  11. }
  12. @Override
  13. public void invoke(Tuple2<String, Integer> value) throws Exception {
  14. this.bufferedElements.add(value);
  15. if (bufferedElements.size() == threshold) {
  16. for (Tuple2<String, Integer> element: bufferedElements) {
  17. // send it to the sink
  18. }
  19. bufferedElements.clear();
  20. }
  21. }
  22. @Override
  23. public List<Tuple2<String, Integer>> snapshotState(
  24. long checkpointId, long timestamp) throws Exception {
  25. return this.bufferedElements;
  26. }
  27. @Override
  28. public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
  29. if (!state.isEmpty()) {
  30. this.bufferedElements.addAll(state);
  31. }
  32. }
  33. @Override
  34. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  35. // this is from the CheckpointedRestoring interface.
  36. this.bufferedElements.addAll(state);
  37. }
  38. }

更新后的函数也实现了 CheckpointedRestoring 接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。

CheckpointedFunction

CheckpointedFunction 接口也需要实现这两个方法。

  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

在Flink 1.1中, 检查点执行会调用snapshotState() 方法,但是现在当用户每次初始化自定义函数时,会调用 initializeState() (对应 restoreState()) ,而不是在恢复的情况下调用,鉴于此, initializeState() 不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。实现了 CheckpointedFunction 接口的 BufferingSink 代码如下所示

  1. public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  2. CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
  3. private final int threshold;
  4. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  5. private List<Tuple2<String, Integer>> bufferedElements;
  6. public BufferingSink(int threshold) {
  7. this.threshold = threshold;
  8. this.bufferedElements = new ArrayList<>();
  9. }
  10. @Override
  11. public void invoke(Tuple2<String, Integer> value) throws Exception {
  12. bufferedElements.add(value);
  13. if (bufferedElements.size() == threshold) {
  14. for (Tuple2<String, Integer> element: bufferedElements) {
  15. // send it to the sink
  16. }
  17. bufferedElements.clear();
  18. }
  19. }
  20. @Override
  21. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  22. checkpointedState.clear();
  23. for (Tuple2<String, Integer> element : bufferedElements) {
  24. checkpointedState.add(element);
  25. }
  26. }
  27. @Override
  28. public void initializeState(FunctionInitializationContext context) throws Exception {
  29. checkpointedState = context.getOperatorStateStore().
  30. getSerializableListState("buffered-elements");
  31. if (context.isRestored()) {
  32. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  33. bufferedElements.add(element);
  34. }
  35. }
  36. }
  37. @Override
  38. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  39. // this is from the CheckpointedRestoring interface.
  40. this.bufferedElements.addAll(state);
  41. }
  42. }

initializeState 方法是需要传入 FunctionInitializationContext,用于初始化non-keyed 状态的 “容器”,容器的类型是 ListState,供 non-keyed 状态的对象被检查点存储时使用:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

初始化之后,调用 isRestored() 方法可以获取当前是否在恢复。如果是 true, 表示正在恢复。

正如下面的代码所示,在状态初始化期间恢复 BufferingSinkListState 中保存的变量可以被 snapshotState()使用, ListState 会清除掉之前检查点存储的对象,然后存储当前检查点的对象。

当然, keyed 状态也可以在 initializeState() 方法中初始化, 可以使用 FunctionInitializationContext 来完成初始化,而不是使用Flink1.1中的 RuntimeContext,如果CheckpointedFunction 要在 CountMapper 中使用该接口,则可以不使用 open() 方法, snapshotState()initializeState() 方法如下所示:

  1. public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
  2. implements CheckpointedFunction {
  3. private transient ValueState<Integer> counter;
  4. private final int numberElements;
  5. public CountMapper(int numberElements) {
  6. this.numberElements = numberElements;
  7. }
  8. @Override
  9. public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
  10. int count = counter.value() + 1;
  11. counter.update(count);
  12. if (count % numberElements == 0) {
  13. out.collect(Tuple2.of(value.f0, count));
  14. counter.update(0); // reset to 0
  15. }
  16. }
  17. @Override
  18. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  19. // all managed, nothing to do.
  20. }
  21. @Override
  22. public void initializeState(FunctionInitializationContext context) throws Exception {
  23. counter = context.getKeyedStateStore().getState(
  24. new ValueStateDescriptor<>("counter", Integer.class, 0));
  25. }
  26. }

请注意, snapshotState() 方法为空,因为Flink本身负责在检查点时就会存储Keys化的对象

向后兼容Flink 1.1

到目前为止,我们已经了解如何修改函数来引入Flink 1.2的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。

答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,什么都不需要做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须像上面代码一样实现 CheckpointedRestoring 接口,还有个办法,需要熟悉Flink1.1的 restoreState()Checkpointed 接口,然后修改 BufferingSink, restoreState() 方法完成和之前一样的功能。

时间对齐窗口算子

在Flink 1.1中,只有在没有指定的触发器的时, timeWindow() 才会实例化特殊的数据类型 WindowOperator。 它可以是 AggregatingProcessingTimeWindowOperatorAccumulatingProcessingTimeWindowOperator.。这两个算子都可以称之为时间对齐窗口算子,因为它们假定输入数据是按顺序到达,当在处理时间中操作时,这是有效的,元素到达窗口操作时获得时间。算子仅限于使用内存状态,并且优化了用于存储数据的元素结构。

在Flink 1.2中,不推荐使用对齐窗口算子,并且所有窗口算子操作都通过泛型WindowOperator来实现。迁移不需要更改Flink 1.1作业的代码,因为Flink将读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式 WindowOperator,并使用通用的 WindowOperator

虽然是已经弃用这个方法,但是在Flink 1.2 中仍然是可以使用的,通过特殊的 WindowAssigners 可以实现这个目的。 SlidingAlignedProcessingTimeWindowsTumblingAlignedProcessingTimeWindows assigners,分别是滑动窗口和滚动窗口,使用对齐窗口的Flink 1.2作业必须是一项新作业,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。

注意时间对齐窗口算子不提供自适应 而且 不向下兼容 Flink 1.1.

在Flink 1.2中使用对齐窗口 算子的代码如下所示:

  1. // for tumbling windows
  2. DataStream<Tuple2<String, Integer>> window1 = source
  3. .keyBy(0)
  4. .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  5. .apply(your-function)
  6. // for sliding windows
  7. DataStream<Tuple2<String, Integer>> window1 = source
  8. .keyBy(0)
  9. .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  10. .apply(your-function)
  1. // for tumbling windows val window1 = source
  2. .keyBy(0)
  3. .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  4. .apply(your-function)
  5. // for sliding windows val window2 = source
  6. .keyBy(0)
  7. .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  8. .apply(your-function)