从Flink 1.2迁移到Flink 1.3
自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。
TypeSerializer
界面变化
这主要适用TypeSerializer
于为其状态实施自定义的用户。
从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 处理序列化程序升级和兼容性。
ProcessFunction
总是一个 RichFunction
在Flink 1.2中,引入了ProcessFunction
其丰富的变体RichProcessFunction
。自Flink 1.3以来,RichProcessFunction
已被删除,ProcessFunction
现在始终RichFunction
可以访问生命周期方法和运行时上下文。
Flink CEP库API更改
Flink 1.3中的CEP库附带了许多新函数,这些函数导致API发生了一些变化。有关详细信息,请访问CEP迁移文档。
从Flink核心工件中删除了Logger依赖项
在Flink 1.3中,为了确保用户可以使用他们自己的自定义日志记录框架,核心Flink工件现在可以清除特定的记录器依赖项。
示例和快速入门原型已经指定了记录器,不应受到影响。对于其他自定义项目,请确保添加记录器依赖项。例如,在Maven中pom.xml
,您可以添加:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
从Flink 1.1迁移到Flink 1.2
如状态文档中所述,Flink有两种类型的状态:被Keys化状态 和非被Keys化状态(也称为算子状态)。这两种类型都可用于 算子和用户定义的函数。本文档将指导您完成将Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些更改涉及Flink 1.1中对齐窗口 算子的弃用(请参阅对齐处理时间窗口 算子)。
迁移过程将有两个目标:
允许您的函数利用Flink 1.2中引入的新函数,例如重新缩放,
确保您的新Flink 1.2作业能够从其Flink 1.1前身生成的保存点恢复执行。
按照本指南中的步骤 算子操作后,您可以将正在运行的作业从Flink 1.1迁移到Flink 1.2,只需在Flink 1.1作业中使用保存点并将其作为起点提供给Flink 1.2作业。这将允许Flink 1.2作业从其Flink 1.1前任中断的位置恢复执行。
示例用户函数
作为本文档其余部分的运行示例,我们将使用CountMapper
和BufferingSink
函数。第一个是具有被Keys化状态的函数的示例,而第二个是具有非被Keys化状态的函数。Flink 1.1中上述两个函数的代码如下:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
}
这CountMapper
是一个RichFlatMapFunction
假定按表格分组的输入流 (word, 1)
。该函数为每个传入的键(ValueState<Integer> counter
)保存一个计数器,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。
的BufferingSink
是一个SinkFunction
接收元件(的潜在的输出CountMapper
),并直到达到某个用户指定的阈值,将它们发射到最终水槽之前缓冲它们。这是避免对数据库或外部存储系统进行许多昂贵调用的常用方法。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements
)中,该列表定期检查点。
状态API迁移
要利用Flink 1.2的新函数,应修改上面的代码以使用新的状态抽象。完成这些更改后,您将能够更改作业的并行度(向上或向下扩展),并确保您的新版本的作业将从其前任停止的位置开始。
Keys状态:在深入研究迁移过程的细节之前需要注意的是,如果您的函数只有被Keys化状态,那么Flink 1.1的完全相同的代码也适用于Flink 1.2,完全支持新函数和完全向后兼容性。可以仅针对更好的代码组织进行更改,但这仅仅是一种风格问题。
如上所述,本节的其余部分重点介绍非被Keys化状态。
重新缩放和新状态抽象
第一个修改是从旧Checkpointed<T extends Serializable>
状态接口到新状态接口的转换。在Flink 1.2中,有状态函数可以实现更通用的CheckpointedFunction
接口或ListCheckpointed<T extends Serializable>
接口,它在语义上更接近旧接口 Checkpointed
。
在这两种情况中,非键合状态被预期是一个List
的序列化的对象,彼此独立的,因而在重新缩放获再分配。换句话说,这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行性1 BufferingSink
包含数据元的检查点状态,(test1, 2)
并且(test2, 2)
当将并行性增加到2时,(test1, 2)
可能最终在任务0中,而(test2, 2)
将进入任务1。
有关被Keys化状态和非被Keys化状态重新缩放的原则的更多详细信息,请参阅状态文档。
ListCheckpointed
该ListCheckpointed
接口需要实现两种方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
它们的语义与旧Checkpointed
界面中的对应物相同。唯一的区别是现在snapshotState()
应该将对象列表返回到检查点,如前所述,并且 restoreState
必须在恢复时处理此列表。如果状态不是重新分区,可以随时返回Collections.singletonList(MY_STATE)
的snapshotState()
。更新的代码BufferingSink
包括在下面:
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
如代码所示,更新的函数也实现了CheckpointedRestoring
接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。
CheckpointedFunction
该CheckpointedFunction
接口需要再次执行两种方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
与在Flink 1.1中一样,snapshotState()
每当执行检查点时都会调用它,但是每次初始化用户定义的函数时,都会调用initializeState()
它(它的对应部分restoreState()
),而不是仅在我们从故障中恢复的情况下。鉴于此,initializeState()
不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。CheckpointedFunction
接口的实现 BufferingSink
如下所示。
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
在initializeState
把参数作为一个FunctionInitializationContext
。这用于初始化非被Keys化状态“容器”。这是一个类型的容器,ListState
其中非被Keys化状态对象将在检查点存储:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
在初始化容器之后,我们使用isRestored()
上下文的方法来检查我们是否在失败后恢复。如果是这样true
,即我们正在恢复,则应用恢复逻辑。
如修改的代码所示,在状态初始化期间恢复的BufferingSink
这个ListState
被保存在类变量中以供将来使用snapshotState()
。在那里,ListState
被清除由先前的检查点包含的所有对象,然后填充我们要设置检查点新的。
作为旁注,被Keys化状态也可以在initializeState()
方法中初始化。这可以使用FunctionInitializationContext
给定的参数来完成,而不是RuntimeContext
Flink 1.1的情况。如果CheckpointedFunction
要在CountMapper
示例中使用该接口,则open()
可以删除旧方法,new snapshotState()
和initializeState()
方法如下所示:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
}
请注意,该snapshotState()
方法为空,因为Flink本身负责在检查点时SNAPSHOT托管的被Keys化状态。
向后兼容Flink 1.1
到目前为止,我们已经看到如何修改我们的函数以利用Flink 1.2引入的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。
答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,您必须什么都不做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须实现CheckpointedRestoring
接口,如上面的代码所示。这有一个方法,熟悉Flink 1.1 restoreState()
的旧Checkpointed
接口。如修改后的代码所示BufferingSink
,该restoreState()
方法与其前身相同。
对齐处理时间窗口 算子
在Flink 1.1中,只有在没有指定的逐出器或触发器的处理时间上运行时,timeWindow()
被Key化的数据流上的命令才会实例化特殊类型WindowOperator
。这可以是一个AggregatingProcessingTimeWindowOperator
或一个AccumulatingProcessingTimeWindowOperator
。这两个 算子都被称为对齐窗口 算子,因为它们假设它们的输入数据元按顺序到达。这在处理时间内有效,因为数据元在到达窗口算子时获得挂钟时间的时间戳。这些 算子仅限于使用内存状态后台,并且具有优化的数据结构,用于存储利用有序输入数据元到达的每窗口数据元。
在Flink 1.2中,不推荐使用对齐的窗口 算子,并且所有窗口 算子操作都通过泛型 WindowOperator
。此迁移不需要更改Flink 1.1作业的代码,因为Flink将透明地读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式WindowOperator
,并使用通用的WindowOperator
。
注意尽管已弃用,但您仍然可以使用Flink 1.2中的对齐窗口 算子,通过特殊WindowAssigners
介绍来实现此目的。这些assigners是 SlidingAlignedProcessingTimeWindows
和TumblingAlignedProcessingTimeWindows
assigners,滑动和翻滚分别窗口。使用对齐窗口的Flink 1.2作业必须是一项新工作,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。
注意对齐的窗口 算子不提供重新缩放函数,也不提供与Flink 1.1的向后兼容性。
在Flink 1.2中使用对齐窗口 算子的代码如下所示:
// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)
// for tumbling windows val window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows val window2 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)