如果您在Flink程序中使用自定义类型无法通过Flink类型序列化程序进行序列化,则Flink将回退到使用通用Kryo序列化程序。您可以注册自己的序列化程序或序列化系统,如Google Protobuf或Apache Thrift with Kryo。为此,只需在ExecutionConfigFlink程序中注册类型类和序列化程序即可。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// register the class of the serializer as serializer for a typeenv.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);// register an instance as serializer for a typeMySerializer mySerializer = new MySerializer();env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
请注意,您的自定义序列化程序必须扩展Kryo的Serializer类。对于Google Protobuf或Apache Thrift,已经为您完成了这项工作:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// register the Google Protobuf serializer with Kryoenv.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);// register the serializer included with Apache Thrift as the standard serializer// TBaseSerializer states it should be initialized as a default Kryo serializerenv.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
要使上述示例起作用,您需要在Maven项目文件(pom.xml)中包含必要的依赖项。在依赖项部分中,为Apache Thrift添加以下内容:
<dependency><groupId>com.twitter</groupId><artifactId>chill-thrift</artifactId><version>0.5.2</version></dependency><!-- libthrift is required by chill-thrift --><dependency><groupId>org.apache.thrift</groupId><artifactId>libthrift</artifactId><version>0.6.1</version><exclusions><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion><exclusion><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId></exclusion></exclusions></dependency>
对于Google Protobuf,您需要以下Maven依赖项:
<dependency><groupId>com.twitter</groupId><artifactId>chill-protobuf</artifactId><version>0.5.2</version></dependency><!-- We need protobuf for chill-protobuf --><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.5.0</version></dependency>
请根据需要调整两个库的版本。
使用Kryo JavaSerializer的问题
如果您JavaSerializer为自定义类型注册Kryo ,即使您的自定义类型类包含在提交的用户代码jar中,您也可能遇到ClassNotFoundException。这是由于Kryo的已知问题JavaSerializer,可能会错误地使用错误的类加载器。
在这种情况下,您应该使用它org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 来解决问题。这是JavaSerializer在Flink中重新实现的,确保使用用户代码类加载器。
有关详细信息,请参阅FLINK-6025。
