注意这篇文档里提到的type的意思是 类的类型。flink按照类的类型将class分为几种type.

画板

flink 会尽可能推断类型信息,flink 对类型信息了解的越多,序列化就越高效,并且flink内部的内存布局也就越好,更节约内存空间。并且将用户从序列化框架中解放出来。

Tuple类型

多字段复合类型,flink提供了tuple1-tuple25, tuple内部可叠加嵌套tuple

Pojo

flink pojo类定义

  1. class必须是public的
  2. 必须包含一个无参的构造函数
  3. 所有的成员变量都必须是public或者包含getter和setter方法。
  4. 成员变量的类型必须可以被已注册的serializer(默认的或者自定义的或者flink生成的)序列化

flink类型信息系统使用PojoTypeInfo表示一个pojo类的类型信息,使用专用的PojoSerializer类进行序列化。但有一个例外情况,如果Pojo类是由avro生成,那么该pojo类会被表示为AvroTypeInfo并使用AvroSerializer序列化。

Flink会分析Pojo类型的结构,也就是说也会分析Pojo的成员变量的类型。相比于普通类型,Flink能更高效地处理Pojos。

Primitive types

flink支持所有java原生类型的classes,比如Integer,String, Double

General class types 一般类类型

flink支持绝大多数java类型,包含不可序列化字段的class不允许作为DataStream的类型参数。

所有不能被识别为pojo的类型一律按照 一般类类型处理。

Value classes

Value类型的类 所有序列化和反序列化都必须自己来实现。独立于flink序列化框架,通过重写write和read来实现序列化。只有当flink序列化框架无法高效地序列化时才需要完全自定义。

举个例子,假如有一个用array存储稀疏向量的data type。稀疏向量里面绝对部分元素都是重复的0,我们可以在序列化时对非0元素使用特定的编码,从而避免序列化array里面那些占大多数的无用的0元素。

flink内部提供了一些Vaule的实现类,ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue,相比如primitive type,这些value class可以反复重用,值是可变的。从而减轻垃圾收集器的压力。

但是这种需要使用对象池进行管理,处理回收和再分配,避免出现并发问题。

类型擦除和类型推断

java代码编译成字节码后,会丢掉大部分泛型类型信息(类型擦除)。也就是说在运行时,一个object是不知道它的泛型类型的,举个例子,DataStream和DataStream对jvm来说都是一样的,只有真正需要类型转换的时候才去做。

Flink必须在运行之前就知道类型信息,从而决定类型的对象如何被序列化。flink会尽力重构类型信息并将类型信息存储在operator里面,从而可以通过DataStream.getType直接获取类型信息,该方法返回TypeInformation的一个实例。TypeInformation是flink内部用来表示type的方式,比如说一个operator的返回值是一个Pojo type的类,那个这个operator内部会存储一个PojoTypeInfo(这是TypeInformation的一个实现),从下面的图可以看到PojoTypeInfo存储了类内部所有成员变量的具体信息。并且给出了自定义的PojoSerializer。

类型信息系统和序列化 - 图2

类型信息系统和序列化 - 图3

类型信息系统和序列化 - 图4

Flink虽然会尽力重构类型信息,比如根据输入推断。但是类型推断有时候有一些限制,需要我们的协助。比如env.fromCollection(),可以通过传入一个描述类型信息的TypeInfomation作为参数。泛型function例如MapFunction可能也需要额外的type information。

我们也可以通过实现ResultTypeQueryable接口来显式指定operator 的返回值类型。而operator 的输入类型通常可以由上游算子的返回值类型推断得到

most freqent issue

大多数情况下,用户不需要与flink类型信息系统打交道,flink会自己推断。

但是以下几种情况,需要用户的协助:

  • 实际类型是返回值类型的子类型。

    1. 比如用户自定义了一个算子MapFunction<In,OutType>,但是在实现map函数的时候实际返回类型是OutType的子类型。这时候就需要明确地告诉flink这个实际子类型的信息,比如通过register来告知flink这个类型的信息
  • 遇到无法被kryo序列化的通用类类型(general class type)

    例如一些google guava集合类kryon不能很好的处理,这个时候我们就要自己定义该类型的默认kryo序列化器,StreamExecutionEnvironment.getConfig().addDefaultKryoSerializer(clazz, serializer)