作业图和函数序列化

作业图及其函数必须是可序列化的。

  1. Exception in thread "main" **org.apache.flink.api.common.InvalidProgramException: Task not serializable at**
  2. org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
  3. Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
  1. Caused by: org.apache.flink.api.common.InvalidProgramException:
  2. The implementation of the ProcessFunction is not serializable.
  3. The object probably contains or references non serializable fields.

实现分布式功能的最佳实践:

  • 使它们独立于外部变量/对象
    • 可以将配置信息传递给构造函数
  • 使用在open方法中初始化的暂态变量(而不是在构造函数中)

因为作业将在分布式集群中执行,所以作业图及其调用的函数必须是可序列化的。如果失败,您将看到类似如上所显示的错误。
这里的最佳实践是实现函数,使它们不包含对外部变量或对象的引用。
如果函数确实包含对自身外部对象的引用,Flink将不得不尝试序列化所有相关的传递闭包。这通常会引入一些无法序列化的内容。

举个例子

  1. public static void main(String[] args) throws Exception {
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.fromSource(...)
  4. ...
  5. .sinkTo(new MyCustomSink()); // this sink has to be serialized
  6. env.execute();
  7. }
  1. // don't do this
  2. public class MyCustomSink<T> extends RichSinkFunction<T> {
  3. private final Connection connection;
  4. MyCustomSink() {
  5. this.connection = ...
  6. }
  7. }
  1. // do this instead
  2. public class MyCustomSink<T> extends RichSinkFunction<T> {
  3. private transient Connection connection;
  4. @Override
  5. public void open() {
  6. this.connection = ...
  7. }
  8. }

当主方法被执行时,MyCustomSink的一个实例被创建。这个实例将被序列化。通过左侧显示的接收器的实现,序列化的接收器对象将必须包括Connection对象。但是,它可能包含对某些不可序列化的对象的引用,例如HttpRequestExecutor。右边显示的实现通过让接收器的每个并行实例独立创建自己的Connection对象来避免这个问题。

数据序列化

image.png
数据序列化可能发生在三个不同的地方。这些位置中的每一个都可以独立调整。

  • 输入序列化
    • 与外部系统(例如Kafka)的读写交互消息序列化/反序列化
    • 用户定义的格式(来源和水槽中)

image.png

  • 数据处理链路序列化
    • flink任务之间交换的记录的序列化
    • 在一个链的操作符之间使用TypeSerializer#copy()

image.png

  • 状态序列化
    • 状态对象的序列化/反序列化
      • RocksDB:每个状态都接触
      • 基于堆栈的状态后端:on checkpoint
    • 与带状态描述符的有数据处理链路列化器一样
      • new ValueStateDescriptor<>(“myState”, MyState.class)

在处理Flink管理(键控/非键控)状态时,无论是在访问还是在检查点上。
image.png

数据处理链路和状态序列化

Flink的目标是能够自动处理任何类型的数据。
输入/出序列化是由用户手动定义的(在创建源和接收时),链路和状态序列化是由Flink控制的。通常情况下,数据类型由Flink自动处理,大多数用例应该可以开箱即用。但是,对于故障排除或性能调优,我们将在本会话的其余部分研究Flink的类型系统的细节。

Flink的类型系统

  • Flink有其自己的序列化器,用于
    • 基本类型
      • 字符串,长,整型,布尔型
      • 数组
    • 复合类型
      • 元组
      • pojo
      • Scala Case类
  • 如果使用flink-avro,AVRO类Flink原生支持序列化/反序列化
  • 否则Flink落回Kryo ser / de
    • 也可以使用Apache Thrift和谷歌Protobuf类型

Avro: 对 org.apache.flink: flink-Avro 的依赖项应该添加到作业的 pom.xml 文件中。
您也可以使用 Row 或 BinaryRowData,但是这些更常用于 Table 和 SQLAPI。

Flink 如何确定要使用的序列化程序

  1. 隐式地ーー从用户函数的输入/输出中的类型

    1. env.fromElements(1, 2, 3, 4) // out: Integer
    2. .map(x -> 2L * x) // in: Integer, out: Long
    3. .print(); // in: Long
  2. 显式-在实现ResultTypeQueryable时通过getProducedType()

    1. public class MyRichMapFunction extends RichMapFunction<Integer, Long> implements ResultTypeQueryable<Long> {
    2. @Override public Long map(Integer x) { return 2L * x; }
    3. @Override public TypeInformation<Long> getProducedType() { return Types.LONG; }
    4. }
  3. 显式-通过SingleOutputStreamOperator#Returns()

    1. env.fromElements(1, 2, 3, 4).returns(Types.Integer)
    2. .map(x -> 2L * x).returns(Types.LONG)
    3. .print();

    Flink会尽最大努力识别用户作业给出的任何类型。
    但有时,由于Java类型擦除(后面会讲),无法为其内部序列化程序提取适当的TypeInformation。在这种情况下,您可以通过以下方式手动提供类型:

  • 正在实现ResultTypeQueryable,或…
  • 调用SingleOutputStreamOperator#Returns()
    • 请注意,这提供了一些方便的重载,可以将它们的参数包装到TypeInformation实例中。

Flink状态序列化

序列化信息通过状态描述符显式提供。

  1. ValueStateDescriptor<Integer> stateDescriptor =
  2. new ValueStateDescriptor<>("value", Types.INT); // TypeInformation<Integer>
  3. myState = getRuntimeContext().getState(stateDescriptor);

通常有三个重载使用:

  • 指定序列化器

    1. public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) { /* ... */ }
  • TypeInformation 实例

    1. public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) { /* ... */ }
  • Flink对给定类实例的类型推断

    1. public ValueStateDescriptor(String name, Class<T> typeClass) { /* ... */ }

    类型推断的限制

    尝试这个写法 ```java env.fromElements(1, 2, 3, 4) .map(x -> new MyEvent<>(2L * x)) .print();

env.execute();

  1. ```java
  2. public class MyEvent<T> {
  3. public T value;
  4. public MyEvent() { }
  5. public MyEvent(T x) { this.value = x; }
  6. }

失败的原因是:

  1. Exception in thread "main" ...InvalidTypesException:
  2. The return type of function '...' could not be determined automatically, due to type erasure...

上面示了一个例子,其中用户需要手动提供类型信息,以便使作业运行。
原因是 Java 只在编译时检查类型约束,在运行时从元素中丢弃特定的类型信息: 在这种情况下,在运行时,只能看到 MyEvent < object > ,我们需要通过手动告诉 Flink 细节,通过:

  • ResultTypeQueryable#getProducedType() or
  • SingleOutputStreamOperator#returns()

    手动提供 TypeInformation

    来自classes类的 flink 类型推断

    1. ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("value", TypeInformation.of(Long.class))

    用于捕获泛型类型的 TypeHint

    1. stream.map(x -> new MyEvent<>(2L * x)).returns(TypeInformation.of(new TypeHint<MyEvent<Long>>(){}))

    Types 类中带有内置序列化程序的常见类型

    1. Types.LONG Types.LIST Types.LOCAL_DATE_TIME // ...

    还提供了对复合类型的更多控制,例如:

    1. env.fromElements(Row.of(1, "first"), Row.of(2, "second"))
    2. .returns(Types.ROW_NAMED(new String[] { "id", "txt" }, Types.INT, Types.STRING));

    如果您需要(或想要)手动提供类型信息,您可以用几种不同的格式提供它。所有这些最终都将包装在一个TypeInformation中,它将公开Flink的所有类型详细信息。

  • Class< T > ,如果类型信息可以从中推断出来

    • (在上一节的场景中,这不会有任何帮助!)
    • 这通常在状态描述符中使用。
    • 您还可以提供一个特定的类,否则 flink 只能看到基类。
  • TypeHint < t > 匿名类

    • 这样,flink 的类型推断也可以捕获泛型参数!
  • TypeInformation信< T> 子类型

    • 您可以使用 Types 类中的方便类型定义。、
    • 复合类型可以通过所有细节指定,例如在与 Table API 交互时。这些代码嵌入如下代码:
      1. Table table = tEnv.fromDataStream(
      2. env.fromElements(Row.of(1, "first"), Row.of(2, "second"))
      3. .returns(Types.ROW_NAMED(
      4. new String[] { "id", "txt" },
      5. Types.INT,
      6. Types.STRING)),
      7. Schema.newBuilder()
      8. .column("id", DataTypes.INT())
      9. .column("payload", DataTypes.STRING())
      10. .build());
      您还可以创建自己的自定义子类型(后面将作为序列化性能调优的一部分介绍)。

Avro序列化

  • SpecificRecord
    • 自动检测是否在使用或扩展SpecificRecordBase
  • GenericRecord

    • 如果提供了模式(手动),则可以使用,例如
      1. Schema schema = ...
      2. DataStream<GenericRecord> sourceStream = env
      3. .addSource(new AvroGenericSource())
      4. .returns(new GenericRecordAvroTypeInfo(schema));
      如果你在 Flink 中使用 SpecificRecord 而不是 SpecificRecordBase,它将不会标识为 Avro 类型,而 Flink 将使用 Kryo 进行反/序列化。详情请参阅 FLINK-12577
      注意 Avro 的 Schema 类是不可序列化的,这在这里显示的代码中不是问题,但是如果通过 getProducedType ()处理,可能会有所不同。但是,您可以在 Schema 和 String 之间来回转换,并且只在初始化时进行一次转换。
  • 将 flink 的 PojoSerializer 交换为 Avro 的基于反射的序列化

    1. env.getConfig().enableForceAvro();

    强制 Flink 对 pojo 使用 Avro 的基于反射的序列化器通常是一个遗留选项,在 Avro 序列化没有得到很好集成时添加了这个选项。目前,似乎只有强制使用 Avro 增强的模式演化功能才有意义(见下一节) ,但是会带来严重的性能损失。

状态模式演化研究现状

  • Avro 类型的模式演进
    • 可以根据 Avro 规范进化模式
    • 可以在 GenericRecord 和代码生成的 SpecificRecords 之间进行交换
    • 不能更改生成的 SpecificRecord 类的命名空间
  • POJO 模式演进
    • 可以添加/删除字段,但不能更改类型
  • 如果想要状态的可进化模式,请避免使用 Kryo
    • env.getConfig().disableGenericTypes() 将禁用 Kryo 回退
  • 尚不支持: Rows (用于表)
  • 不支持: Scala case classes

状态迁移是从一个版本的快照迁移状态,并在数据模式更改后将该状态加载到下一个版本的过程。使用内置的序列化器,状态迁移将被自动处理。
如果状态迁移失败,那么模式更改就是不兼容的,并且作业也会失败。
请参阅 FLINK-10896 了解将模式演化支持扩展到其他类型的状态。对于 Scala case classes,还不清楚这是否可行。Avro 的特性:
http://avro.apache.org/docs/1.8.2/spec.html#Schema+Resolution