从Flink 1.3+ 到 Flink 1.7
TypeSerializer 的变化
这部分主要与实现TypeSerializer
接口来自定义序列化的用户有关。
原来的 TypeSerializerConfigSnapshot
抽象接口被弃用了, 并且将在将来完全删除,取而代之的是新的 TypeSerializerSnapshot
. 详情请参考 Migrating from deprecated serializer snapshot APIs before Flink 1.7。
从 Flink 1.2 迁移到 Flink 1.3
自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。
TypeSerializer
接口变化
这主要适用于自定义 TypeSerializer
接口的用户
从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 序列化升级和兼容性
ProcessFunction
是 RichFunction
从Flink 1.2, ProcessFunction
引入,并有了多种实现例如 RichProcessFunction
。 从Flink 1.3,开始RichProcessFunction
被移除了, 现在 ProcessFunction
始终是 RichFunction
并且可以访问运行时上下文。
Flink CEP 库API更改
Flink 1.3中的CEP库新增了许多新函数,请参阅 CEP迁移文档 。
Flink core 中移除了Logger的依赖
Flink1.3以后,用户可以选用自己期望的日志框架了,Flink移除了日志记录框架的依赖。
实例和快速入门的demo已经指定了日志记录器,不会有问题,其他项目,请确保添加日志依赖,比如Maven的 pom.xml
,中需要增加
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
从 Flink 1.1 到 Flink 1.2的迁移
正如 状态文档中所说,Flink 有两种状态: keyed 和 non-keyed 状态 (也被称作 operator 状态). 这两种类型都可用于 算子和用户定义的函数。文档将指导您完成从Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些改变涉及到Flink 1.1中对齐窗口操作的弃用。 (请参阅 时间对齐窗口算子Aligned Processing Time Window Operators).
迁移有两个目标:
引入Flink1.2中引入的新函数,比如自适应(rescaling)
确保新Flink 1.2作业能够从Flink 1.1的保存点恢复执行
按照本指南操作可以把正在运行的Flink1.1作业迁移到Flink1.2中。前提需要在Flink1.1中使用 保存点 并把保存点作为Flink1.2作业的起点。这样,Flink1.2就可以从之前中断的位置恢复执行了。
用户函数示例
本文档其余部分使用 CountMapper
和 BufferingSink
函数作为示例。第一个函数是 keyed 状态,第二个不是 s non-keyed 状态,Flink 1.1中上述两个函数的代码如下:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
}
CountMapper
是一个按表格分组输入(word, 1)
的 RichFlatMapFunction
, 函数为每个传入的key保存一个计数器 (ValueState<Integer> counter
) 并且 ,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。
BufferingSink
是一个 SinkFunction
接收方 ( CountMapper
可能的输出) ,直到达到用户定义的最终状态之前会一直缓存数据,这可以避免频繁的对数据库或者是存储系统的操作,通常这些操作都是比较耗时或开销比较大的。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements
) 列表会被定期被检查点保存。
状态 API 迁移
要使用Flink 1.2的新函数,应修改上面的代码来完成新的状态抽象。完成这些更改后,就可以实现作业的并行度的修改(向上或向下扩展),并确保新版本的作业将从之前作业停止的位置开始执行。
Keyed State: 需要注意的是,如果代码中只有keyed state,那么Flink1.1的代码也适用于1.2版本,并且完全支持新函数和向下兼容。可以仅针代码格式进行更改,但这只是风格/习惯问题。
综上所述,本章我们重点阐述 non-keyed state的迁移
自适应和新状态抽象
第一个修改是 Checkpointed<T extends Serializable>
接口有了新的实现。在 Flink 1.2中,有状态函数可以实现更通用的 CheckpointedFunction
接口或 ListCheckpointed<T extends Serializable>
接口 ,和之前版本的 Checkpointed
类似。
在这两种情况中,非键合状态预期是一个 可序列化 的 List
,对象彼此独立,这样可以在自适应的时候重新分配,意味着, 这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行度为1的 BufferingSink
有 (test1, 2)
和 (test2, 2)
两个数据,当并行度增加到2时, (test1, 2)
可能在task 0中,而 (test2, 2)
可能在task 1中。
更详细的信息可以参阅状态文档.
ListCheckpointed
ListCheckpointed
接口需要实现两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
它的语义和之前的 Checkpointed
接口类似,唯一区别是,现在 snapshotState()
返回的是检查点对象列表, 如前所述, restoreState
必须在恢复的时候,处理这个列表。如果状态不是重新分区,可以随时返回 Collections.singletonList(MY_STATE)
的 snapshotState()
。 更新的代码 BufferingSink
如下:
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
更新后的函数也实现了 CheckpointedRestoring
接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。
CheckpointedFunction
CheckpointedFunction
接口也需要实现这两个方法。
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
在Flink 1.1中, 检查点执行会调用snapshotState()
方法,但是现在当用户每次初始化自定义函数时,会调用 initializeState()
(对应 restoreState()
) ,而不是在恢复的情况下调用,鉴于此, initializeState()
不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。实现了 CheckpointedFunction
接口的 BufferingSink
代码如下所示
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
initializeState
方法是需要传入 FunctionInitializationContext
,用于初始化non-keyed 状态的 “容器”,容器的类型是 ListState
,供 non-keyed 状态的对象被检查点存储时使用:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
初始化之后,调用 isRestored()
方法可以获取当前是否在恢复。如果是 true
, 表示正在恢复。
正如下面的代码所示,在状态初始化期间恢复 BufferingSink
, ListState
中保存的变量可以被 snapshotState()
使用, ListState
会清除掉之前检查点存储的对象,然后存储当前检查点的对象。
当然, keyed 状态也可以在 initializeState()
方法中初始化, 可以使用 FunctionInitializationContext
来完成初始化,而不是使用Flink1.1中的 RuntimeContext
,如果CheckpointedFunction
要在 CountMapper
中使用该接口,则可以不使用 open()
方法, snapshotState()
和 initializeState()
方法如下所示:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
}
请注意, snapshotState()
方法为空,因为Flink本身负责在检查点时就会存储Keys化的对象
向后兼容Flink 1.1
到目前为止,我们已经了解如何修改函数来引入Flink 1.2的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。
答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,什么都不需要做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须像上面代码一样实现 CheckpointedRestoring
接口,还有个办法,需要熟悉Flink1.1的 restoreState()
和Checkpointed
接口,然后修改 BufferingSink
, restoreState()
方法完成和之前一样的功能。
时间对齐窗口算子
在Flink 1.1中,只有在没有指定的触发器的时, timeWindow()
才会实例化特殊的数据类型 WindowOperator
。 它可以是 AggregatingProcessingTimeWindowOperator
或 AccumulatingProcessingTimeWindowOperator
.。这两个算子都可以称之为时间对齐窗口算子,因为它们假定输入数据是按顺序到达,当在处理时间中操作时,这是有效的,元素到达窗口操作时获得时间。算子仅限于使用内存状态,并且优化了用于存储数据的元素结构。
在Flink 1.2中,不推荐使用对齐窗口算子,并且所有窗口算子操作都通过泛型WindowOperator
来实现。迁移不需要更改Flink 1.1作业的代码,因为Flink将读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式 WindowOperator
,并使用通用的 WindowOperator
。
虽然是已经弃用这个方法,但是在Flink 1.2 中仍然是可以使用的,通过特殊的 WindowAssigners
可以实现这个目的。 SlidingAlignedProcessingTimeWindows
和 TumblingAlignedProcessingTimeWindows
assigners,分别是滑动窗口和滚动窗口,使用对齐窗口的Flink 1.2作业必须是一项新作业,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。
注意时间对齐窗口算子不提供自适应 而且 不向下兼容 Flink 1.1.
在Flink 1.2中使用对齐窗口 算子的代码如下所示:
// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)
// for tumbling windows val window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows val window2 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)