Custom Serialization for Managed State 用于托管状态的自定义序列化

此页面是针对需要为其状态使用自定义序列化的用户提供的指南,包括如何提供自定义状态序列化程序以及实现允许状态架构演变的序列化程序的指南和最佳实践。

如果您只是使用Flink自己的序列化程序,则此页面是不相关的,可以忽略。

Using custom state serializers 使用自定义状态序列化程序

注册受管理的运算符或键状态时,需要StateDescriptor 来指定状态的名称,以及有关状态类型的信息。类型信息由flink的类型序列化框架为该状态创建适当的序列化程序。

还可以完全绕过这一点,让flink使用您自己的自定义串行器序列化受管理的状态,简单地通过直接实例化StateDescriptor 和您自己的 TypeSerializer 实现:

  1. public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};
  2. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  3. new ListStateDescriptor<>(
  4. "state-name",
  5. new CustomTypeSerializer());
  6. checkpointedState = getRuntimeContext().getListState(descriptor);
  1. class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
  2. val descriptor = new ListStateDescriptor[(String, Integer)](
  3. "state-name",
  4. new CustomTypeSerializer)
  5. )
  6. checkpointedState = getRuntimeContext.getListState(descriptor)

State serializers and schema evolution 状态序列化程序和架构演进

本节解释与状态序列化和模式演化相关的面向用户的抽象,以及关于Flink如何与这些抽象交互的必要的内部细节。

从保存点恢复时,flink允许更改用于读取和写入先前注册状态的Serializer,以便用户不会锁定到任何特定的序列化架构中。当恢复状态时,将为状态注册新的序列化程序(即,使用 StateDescriptor 来访问还原作业中的状态的序列化程序)。这个新的串行化器可以具有比先前串行化器的模式不同的模式。因此,当实现状态序列化程序时,除了读/写数据的基本逻辑之外,要记住的另一个重要问题是如何在将来改变序列化模式。

说到 schema,在这种情况下,该术语在引用状态类型的 data model 和状态类型的 serialized binary format 之间是可互换的。一般而言,模式可以更改几种情况:

  1. 状态类型的数据模式已经发展,即从用作状态的POJO中添加或删除字段。
  2. 一般来说,在对数据模式进行更改后,序列化程序的序列化格式将需要升级。
  3. 序列化程序的配置已更改。

为了使新的执行具有关于 written schema OF状态的信息并检测模式是否已改变,在获取操作者的状态的保存点时,需要将状态串行化器的 snapshot of与状态字节一起写入。这被抽象化了下一个小节中解释的TypeSerializerSnapshot

The TypeSerializerSnapshot abstraction “TypeSerializerSnapail”抽象

  1. public interface TypeSerializerSnapshot<T> {
  2. int getCurrentVersion();
  3. void writeSnapshot(DataOuputView out) throws IOException;
  4. void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
  5. TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
  6. TypeSerializer<T> restoreSerializer();
  7. }
  1. public abstract class TypeSerializer<T> {
  2. // ...
  3. public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
  4. }

串行器 TypeSerializerSnapshot 是一个时间点信息,它作为关于状态串行器的写模式的唯一的事实源,以及任何附加的信息,这些信息必须恢复与给定的时间点相同的串行器。在恢复时应该写入和读取的逻辑,因为序列化快照是在writeSnapshotreadSnapshot方法中定义的。

请注意,快照本身的写入模式也可能需要随着时间的推移而更改(例如,当您希望向快照中添加更多有关序列化程序的信息时)。为了方便这一点,快照是版本化的,在getCurrentVersion方法中定义了当前版本号。在还原时,当从保存点读取序列化程序快照时,将把写入快照的架构的版本提供给 readSnapshot 方法,以便读取实现能够处理不同的版本。

在恢复时,检测新序列化程序的模式是否已更改的逻辑应该在 resolveSchemaCompatibility方法中实现。当在已还原的操作符的执行中使用新的序列化器再次注册前一个已注册状态时,将通过此方法将新的序列化程序提供给前一个序列化程序的快照。此方法返回一个 TypeSerializerSchemaCompatibility ,它表示兼容性解析的结果,可以是以下内容之一:

  1. TypeSerializerSchemaCompatibility.compatibleAsIs(): 这导致新的串行器兼容,这意味着新的串行器与先前的串行器具有相同的模式。可以在 resolveSchemaCompatibility方法中重新配置新的串行器,使其兼容。
  2. TypeSerializerSchemaCompatibility.compatibleAfterMigration(): 该结果信号使得新的串行化器具有不同的串行化模式,并且可以通过使用先前的串行器(其识别旧模式)将字节读入到状态对象,然后用新的串行器(其识别新模式)将该对象重写回字节来从旧模式迁移。
  3. TypeSerializerSchemaCompatibility.incompatible(): 此结果表明新序列化程序具有不同的序列化架构,但无法从旧模式迁移。

最后一个细节是在需要迁移的情况下如何获得前一个序列化程序。序列化程序的“TypeSerializerSnapail”的另一个重要作用是它作为一个工厂来恢复以前的序列化程序。更具体地说, TypeSerializerSnapshot 应该实现restoreSerializer方法,以实例化一个序列化器实例,该实例识别上一个序列化程序的模式和配置,因此可以安全地读取由前一个序列化程序编写的数据。

How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions flink如何与“排版器”和“排版快照”抽象进行交互抽象

为了总结,本节将总结Flink,或更具体而言,状态后端如何与抽象进行交互。根据状态后端,交互略有不同,但这与状态串行器及其串行器快照的实现是正交的。

Off-heap state backends (e.g. RocksDBStateBackend) 堆外状态后端(例如“RocksDBStateBackend”)

  1. Register new state with a state serializer that has schema A 使用具有架构的状态序列化程序注册新状态
    • 用于该状态的注册的TypeSerializer 被用于在每个状态访问上的读/写状态。
    • 状态以架构写入模式 A.
  2. Take a savepoint 获取保存点
    • 序列化程序快照是通过TypeSerializer#snapshotConfiguration方法提取的。
    • 序列化器快照被写入保存点以及已经序列化的状态字节(带有schemaA)。
  3. 还原后的执行使用具有架构的新状态序列化程序重新访问已恢复的状态字节。B
    • 还原以前的状态序列化程序的快照。
    • Tate字节在还原时不会反序列化,只会加载回状态后端(因此,仍然处于schemaA)中。
    • 在接收到新的序列化程序后,将通过TypeSerializer#resolveSchemaCompatibility将其提供给还原后的前一个序列化程序的快照,以检查模式兼容性。
  4. Migrate state bytes in backend from schema A to schema B 从模式迁移后端的状态字节
    • 如果兼容性解决方案反映了架构已更改并可能迁移,则执行架构迁移。识别架构A的上一个状态串行器将从串行器快照通过“TypeEriizerSnapshot#RestorReservalizer()”获得,并用于将状态字节反序列化为对象,这些对象又再次与新的序列化程序重新写入,新的序列化程序会识别架构B来完成迁移。在处理继续之前,被访问的状态的所有条目都被一起迁移到一起。
    • 如果解析表明不兼容,则状态访问将失败,但有异常。

Heap state backends (e.g. MemoryStateBackend, FsStateBackend) 堆状态后端(例如“MemoryStateBackend”、“FsStateBackend”)

  1. 向具有的状态序列化程序注册新状态
    • 注册的“TypeSerializer”由州后端维护。
  2. 接受保存点,使用schemaA
    • 序列化程序快照是通过‘TypeSeriizer#快照配置’方法提取的。
    • 序列化程序快照将写入保存点。
    • 状态对象现在被序列化到保存点,在schemaa中写入。
  3. 在还原时,将状态反序列化为堆中的对象
    • 还原以前的状态序列化程序的快照。
    • 上一个序列化程序识别SCHEMAA,它通过‘TypeSerializerSnapshot#RestoreSeriizer()’从序列化器快照中获得,并用于将状态字节反序列化为对象。
    • 从现在开始,所有的州都已经被反序列化了。
  4. 恢复的执行使用具有schemab的新的状态序列化程序重新访问先前的状态
    • 在接收到新的序列化程序后,将通过“TypeSeriizer#解析式模式兼容性”将其提供给还原后的前一个序列化程序的快照,以检查模式兼容性。
    • 如果需要迁移的兼容性检查信号,在此情况下,由于堆后端,所有状态都已被反序列化为对象。
    • 如果解析表明不兼容,则状态访问将失败,但有异常。
  5. 接受另一个保存点,使用schemaB
    • 与步骤2相同,但现在状态字节都在schemaB中。

Implementation notes and best practices 执行说明和最佳做法

1. Flink通过用类名实例化序列化器快照来还原它们。

序列化程序的快照是如何序列化注册状态的唯一真实来源,它是在保存点中读取状态的入口点。为了能够恢复和访问以前的状态,必须能够还原前一个状态序列化程序的快照。

flink通过首先用其ClassName(与快照字节一起编写)实例化“TypeEriizerSnapshot”来恢复序列化程序快照。因此,为避免受到意外的ClassName更改或实例化失败的影响,“TypeLiizerSnapshot”类应:

  • 避免被实现为匿名类或嵌套类,
  • 具有用于实例化的公共零构造函数。

2. 避免在不同的Serializer上共享相同的“TypeEriizerSnapshot”类

由于架构兼容性检查通过串行器快照,因此具有多个Serializer返回相同的“TypeEriizerSnapshot”类,因为它们的快照会使“TypeEriizerSnapshot#ResolvescheCompatibility”和“TypeEriizerSnapshot#ResReservisionalizer()”方法的实现复杂化。

这也是一个错误的关注点分离;一个序列化程序的序列化模式、配置以及如何恢复它,应该合并到它自己的专用`TypeSerializerSnapail‘类中。

3. 对于包含嵌套序列化程序的序列化程序,请使用“CompositeSerializerSnapail”实用程序

在有些情况下,TypeSerializer 依赖其他嵌套的TypeSerializer;例如Flink的TupleSerializer,其中为元组字段配置了嵌套的TypeSerializer。在这种情况下,大多数外部序列化程序的快照还应该包含嵌套序列化器的快照。

CompositeSerializerSnapshot可专门用于此场景。它封装了解析复合序列化程序的总体模式兼容性检查结果的逻辑。关于应该如何使用它的示例,可以参考Flink的ListSerializerSnapshot实现。

Migrating from deprecated serializer snapshot APIs before Flink 1.7 在flink1.7之前从不建议使用的序列化程序快照API迁移1.7

本节是在FLink1.7之前存在的来自Serializer和Serializer快照的API迁移指南。

在Flink 1.7之前,序列化器快照被实现为TypeSerializerConfigSnapshot (现在已不再推荐,最终将被删除,由新的“TypeSerializerSnapail”接口完全取代)。此外,序列化程序模式兼容性检查的责任存在于在TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)方法中实现的“TypeSerializer”中。

新旧抽象之间的另一个主要区别是,不推荐的TypeSerializerConfigSnapshot 不具有实例化前一个序列化程序的能力。因此,在序列化程序仍然返回‘ TypeSerializerConfigSnapshot子类作为其快照的情况下,序列化程序实例本身将始终被写入使用Java序列化的保存点,以便在还原时可以使用以前的序列化程序。这是非常不可取的,因为还原作业是否成功容易受到上一个序列化程序类的可用性的影响,或者通常情况下,序列化程序实例是否可以使用Java序列化在还原时读取。这意味着您的状态仅限于相同的序列化程序,并且一旦您想升级序列化程序类或执行架构迁移,可能会出现问题。

为了便于将来的验证,并具有迁移状态序列化器和模式的灵活性,强烈建议从旧的抽象迁移。这样做的步骤如下:

  1. 实现一个新的子类TypeSerializerSnapshot。这将是序列化程序的新快照。
  2. 返回新的TypeSerializerSnapshot作为序列化程序的快照,在TypeSerializer#snapshotConfiguration()方法中。
  3. 从在flink1.7之前存在的savepoint还原作业,然后再次获取保存点。请注意,在此步骤中,序列化程序中的旧的 TypeSerializerConfigSnapshot必须仍然存在于类路径中,并且必须不删除TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) 方法的实现。此过程的目的是用新实现的序列化程序的 TypeSerializerConfigSnapshot替换旧存储点中写入的 TypeSerializerSnapshot
  4. 一旦您使用了flink1.7的保存点,则保存点将包含作为状态序列化程序快照的“TypeerializerSnapshot”,并且序列化程序实例将不再写入保存点。在这一点上,现在可以安全地删除旧的抽象的所有实现(删除旧的“类型EriizerConfigSnapshot”实现),这将是串行器的“TypeErierConfigure#Ensure兼容性(类型EriizerConfigSnapshot)”。