译者:flink.sojb.cn

从Flink 1.2迁移到Flink 1.3

自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。

TypeSerializer 界面变化

这主要适用TypeSerializer于为其状态实施自定义的用户。

从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 处理序列化程序升级和兼容性

ProcessFunction 总是一个 RichFunction

在Flink 1.2中,引入了ProcessFunction其丰富的变体RichProcessFunction。自Flink 1.3以来,RichProcessFunction已被删除,ProcessFunction现在始终RichFunction可以访问生命周期方法和运行时上下文。

Flink CEP库API更改

Flink 1.3中的CEP库附带了许多新函数,这些函数导致API发生了一些变化。有关详细信息,请访问CEP迁移文档

从Flink核心工件中删除了Logger依赖项

在Flink 1.3中,为了确保用户可以使用他们自己的自定义日志记录框架,核心Flink工件现在可以清除特定的记录器依赖项。

示例和快速入门原型已经指定了记录器,不应受到影响。对于其他自定义项目,请确保添加记录器依赖项。例如,在Maven中pom.xml,您可以添加:

  1. <dependency>
  2. <groupId>org.slf4j</groupId>
  3. <artifactId>slf4j-log4j12</artifactId>
  4. <version>1.7.7</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>log4j</groupId>
  8. <artifactId>log4j</artifactId>
  9. <version>1.2.17</version>
  10. </dependency>

从Flink 1.1迁移到Flink 1.2

状态文档中所述,Flink有两种类型的状态:被Keys化状态 和非被Keys化状态(也称为算子状态)。这两种类型都可用于 算子和用户定义的函数。本文档将指导您完成将Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些更改涉及Flink 1.1中对齐窗口 算子的弃用(请参阅对齐处理时间窗口 算子)。

迁移过程将有两个目标:

  1. 允许您的函数利用Flink 1.2中引入的新函数,例如重新缩放,

  2. 确保您的新Flink 1.2作业能够从其Flink 1.1前身生成的保存点恢复执行。

按照本指南中的步骤 算子操作后,您可以将正在运行的作业从Flink 1.1迁移到Flink 1.2,只需在Flink 1.1作业中使用保存点并将其作为起点提供给Flink 1.2作业。这将允许Flink 1.2作业从其Flink 1.1前任中断的位置恢复执行。

示例用户函数

作为本文档其余部分的运行示例,我们将使用CountMapperBufferingSink 函数。第一个是具有被Keys化状态的函数的示例,而第二个是具有非被Keys化状态的函数。Flink 1.1中上述两个函数的代码如下:

  1. public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
  2. private transient ValueState<Integer> counter;
  3. private final int numberElements;
  4. public CountMapper(int numberElements) {
  5. this.numberElements = numberElements;
  6. }
  7. @Override
  8. public void open(Configuration parameters) throws Exception {
  9. counter = getRuntimeContext().getState(
  10. new ValueStateDescriptor<>("counter", Integer.class, 0));
  11. }
  12. @Override
  13. public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
  14. int count = counter.value() + 1;
  15. counter.update(count);
  16. if (count % numberElements == 0) {
  17. out.collect(Tuple2.of(value.f0, count));
  18. counter.update(0); // reset to 0
  19. }
  20. }
  21. }
  22. public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  23. Checkpointed<ArrayList<Tuple2<String, Integer>>> {
  24. private final int threshold;
  25. private ArrayList<Tuple2<String, Integer>> bufferedElements;
  26. BufferingSink(int threshold) {
  27. this.threshold = threshold;
  28. this.bufferedElements = new ArrayList<>();
  29. }
  30. @Override
  31. public void invoke(Tuple2<String, Integer> value) throws Exception {
  32. bufferedElements.add(value);
  33. if (bufferedElements.size() == threshold) {
  34. for (Tuple2<String, Integer> element: bufferedElements) {
  35. // send it to the sink
  36. }
  37. bufferedElements.clear();
  38. }
  39. }
  40. @Override
  41. public ArrayList<Tuple2<String, Integer>> snapshotState(
  42. long checkpointId, long checkpointTimestamp) throws Exception {
  43. return bufferedElements;
  44. }
  45. @Override
  46. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  47. bufferedElements.addAll(state);
  48. }
  49. }

CountMapper是一个RichFlatMapFunction假定按表格分组的输入流 (word, 1)。该函数为每个传入的键(ValueState&lt;Integer&gt; counter)保存一个计数器,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。

BufferingSink是一个SinkFunction接收元件(的潜在的输出CountMapper),并直到达到某个用户指定的阈值,将它们发射到最终水槽之前缓冲它们。这是避免对数据库或外部存储系统进行许多昂贵调用的常用方法。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements)中,该列表定期检查点。

状态API迁移

要利用Flink 1.2的新函数,应修改上面的代码以使用新的状态抽象。完成这些更改后,您将能够更改作业的并行度(向上或向下扩展),并确保您的新版本的作业将从其前任停止的位置开始。

Keys状态:在深入研究迁移过程的细节之前需要注意的是,如果您的函数只有被Keys化状态,那么Flink 1.1的完全相同的代码也适用于Flink 1.2,完全支持新函数和完全向后兼容性。可以仅针对更好的代码组织进行更改,但这仅仅是一种风格问题。

如上所述,本节的其余部分重点介绍非被Keys化状态

重新缩放和新状态抽象

第一个修改是从旧Checkpointed&lt;T extends Serializable&gt;状态接口到新状态接口的转换。在Flink 1.2中,有状态函数可以实现更通用的CheckpointedFunction 接口或ListCheckpointed&lt;T extends Serializable&gt;接口,它在语义上更接近旧接口 Checkpointed

在这两种情况中,非键合状态被预期是一个List序列化的对象,彼此独立的,因而在重新缩放获再分配。换句话说,这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行性1 BufferingSink 包含数据元的检查点状态,(test1, 2)并且(test2, 2)当将并行性增加到2时,(test1, 2)可能最终在任务0中,而(test2, 2)将进入任务1。

有关被Keys化状态和非被Keys化状态重新缩放的原则的更多详细信息,请参阅状态文档

ListCheckpointed

ListCheckpointed接口需要实现两种方法:

  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

它们的语义与旧Checkpointed界面中的对应物相同。唯一的区别是现在snapshotState()应该将对象列表返回到检查点,如前所述,并且 restoreState必须在恢复时处理此列表。如果状态不是重新分区,可以随时返回Collections.singletonList(MY_STATE)snapshotState()。更新的代码BufferingSink 包括在下面:

  1. public class BufferingSinkListCheckpointed implements
  2. SinkFunction<Tuple2<String, Integer>>,
  3. ListCheckpointed<Tuple2<String, Integer>>,
  4. CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
  5. private final int threshold;
  6. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  7. private List<Tuple2<String, Integer>> bufferedElements;
  8. public BufferingSinkListCheckpointed(int threshold) {
  9. this.threshold = threshold;
  10. this.bufferedElements = new ArrayList<>();
  11. }
  12. @Override
  13. public void invoke(Tuple2<String, Integer> value) throws Exception {
  14. this.bufferedElements.add(value);
  15. if (bufferedElements.size() == threshold) {
  16. for (Tuple2<String, Integer> element: bufferedElements) {
  17. // send it to the sink
  18. }
  19. bufferedElements.clear();
  20. }
  21. }
  22. @Override
  23. public List<Tuple2<String, Integer>> snapshotState(
  24. long checkpointId, long timestamp) throws Exception {
  25. return this.bufferedElements;
  26. }
  27. @Override
  28. public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
  29. if (!state.isEmpty()) {
  30. this.bufferedElements.addAll(state);
  31. }
  32. }
  33. @Override
  34. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  35. // this is from the CheckpointedRestoring interface.
  36. this.bufferedElements.addAll(state);
  37. }
  38. }

如代码所示,更新的函数也实现了CheckpointedRestoring接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。

CheckpointedFunction

CheckpointedFunction接口需要再次执行两种方法:

  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

与在Flink 1.1中一样,snapshotState()每当执行检查点时都会调用它,但是每次初始化用户定义的函数时,都会调用initializeState()它(它的对应部分restoreState()),而不是仅在我们从故障中恢复的情况下。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。CheckpointedFunction接口的实现 BufferingSink如下所示。

  1. public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
  2. CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
  3. private final int threshold;
  4. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  5. private List<Tuple2<String, Integer>> bufferedElements;
  6. public BufferingSink(int threshold) {
  7. this.threshold = threshold;
  8. this.bufferedElements = new ArrayList<>();
  9. }
  10. @Override
  11. public void invoke(Tuple2<String, Integer> value) throws Exception {
  12. bufferedElements.add(value);
  13. if (bufferedElements.size() == threshold) {
  14. for (Tuple2<String, Integer> element: bufferedElements) {
  15. // send it to the sink
  16. }
  17. bufferedElements.clear();
  18. }
  19. }
  20. @Override
  21. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  22. checkpointedState.clear();
  23. for (Tuple2<String, Integer> element : bufferedElements) {
  24. checkpointedState.add(element);
  25. }
  26. }
  27. @Override
  28. public void initializeState(FunctionInitializationContext context) throws Exception {
  29. checkpointedState = context.getOperatorStateStore().
  30. getSerializableListState("buffered-elements");
  31. if (context.isRestored()) {
  32. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  33. bufferedElements.add(element);
  34. }
  35. }
  36. }
  37. @Override
  38. public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
  39. // this is from the CheckpointedRestoring interface.
  40. this.bufferedElements.addAll(state);
  41. }
  42. }

initializeState把参数作为一个FunctionInitializationContext。这用于初始化非被Keys化状态“容器”。这是一个类型的容器,ListState其中非被Keys化状态对象将在检查点存储:

this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");

在初始化容器之后,我们使用isRestored()上下文的方法来检查我们是否在失败后恢复。如果是这样true我们正在恢复,则应用恢复逻辑。

如修改的代码所示,在状态初始化期间恢复的BufferingSink这个ListState被保存在类变量中以供将来使用snapshotState()。在那里,ListState被清除由先前的检查点包含的所有对象,然后填充我们要设置检查点新的。

作为旁注,被Keys化状态也可以在initializeState()方法中初始化。这可以使用FunctionInitializationContext给定的参数来完成,而不是RuntimeContextFlink 1.1的情况。如果CheckpointedFunction要在CountMapper示例中使用该接口,则open()可以删除旧方法,new snapshotState()initializeState()方法如下所示:

  1. public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
  2. implements CheckpointedFunction {
  3. private transient ValueState<Integer> counter;
  4. private final int numberElements;
  5. public CountMapper(int numberElements) {
  6. this.numberElements = numberElements;
  7. }
  8. @Override
  9. public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
  10. int count = counter.value() + 1;
  11. counter.update(count);
  12. if (count % numberElements == 0) {
  13. out.collect(Tuple2.of(value.f0, count));
  14. counter.update(0); // reset to 0
  15. }
  16. }
  17. @Override
  18. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  19. // all managed, nothing to do.
  20. }
  21. @Override
  22. public void initializeState(FunctionInitializationContext context) throws Exception {
  23. counter = context.getKeyedStateStore().getState(
  24. new ValueStateDescriptor<>("counter", Integer.class, 0));
  25. }
  26. }

请注意,该snapshotState()方法为空,因为Flink本身负责在检查点时SNAPSHOT托管的被Keys化状态。

向后兼容Flink 1.1

到目前为止,我们已经看到如何修改我们的函数以利用Flink 1.2引入的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。

答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,您必须什么都不做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须实现CheckpointedRestoring接口,如上面的代码所示。这有一个方法,熟悉Flink 1.1 restoreState()的旧Checkpointed接口。如修改后的代码所示BufferingSink,该restoreState()方法与其前身相同。

对齐处理时间窗口 算子

在Flink 1.1中,只有在没有指定的逐出器或触发器的处理时间上运行timeWindow()被Key化的数据流上的命令才会实例化特殊类型WindowOperator。这可以是一个AggregatingProcessingTimeWindowOperator或一个AccumulatingProcessingTimeWindowOperator。这两个 算子都被称为对齐窗口 算子,因为它们假设它们的输入数据元按顺序到达。这在处理时间内有效,因为数据元在到达窗口算子时获得挂钟时间的时间戳。这些 算子仅限于使用内存状态后台,并且具有优化的数据结构,用于存储利用有序输入数据元到达的每窗口数据元。

在Flink 1.2中,不推荐使用对齐的窗口 算子,并且所有窗口 算子操作都通过泛型 WindowOperator。此迁移不需要更改Flink 1.1作业的代码,因为Flink将透明地读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式WindowOperator,并使用通用的WindowOperator

注意尽管已弃用,但您仍然可以使用Flink 1.2中的对齐窗口 算子,通过特殊WindowAssigners介绍来实现此目的。这些assigners是 SlidingAlignedProcessingTimeWindowsTumblingAlignedProcessingTimeWindowsassigners,滑动和翻滚分别窗口。使用对齐窗口的Flink 1.2作业必须是一项新工作,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。

注意对齐的窗口 算子不提供重新缩放函数,也不提供与Flink 1.1的向后兼容性

在Flink 1.2中使用对齐窗口 算子的代码如下所示:

  1. // for tumbling windows
  2. DataStream<Tuple2<String, Integer>> window1 = source
  3. .keyBy(0)
  4. .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  5. .apply(your-function)
  6. // for sliding windows
  7. DataStream<Tuple2<String, Integer>> window1 = source
  8. .keyBy(0)
  9. .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  10. .apply(your-function)
  1. // for tumbling windows val window1 = source
  2. .keyBy(0)
  3. .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
  4. .apply(your-function)
  5. // for sliding windows val window2 = source
  6. .keyBy(0)
  7. .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
  8. .apply(your-function)