执行配置

译者:flink.sojb.cn

StreamExecutionEnvironment包含ExecutionConfig允许为运行时设置工作的具体配置值。要更改影响所有作业的默认值,请参阅配置

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. ExecutionConfig executionConfig = env.getConfig();
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. var executionConfig = env.getConfig

可以使用以下配置选项:(默认为粗体)

  • enableClosureCleaner()/ disableClosureCleaner()。默认情况下启用闭包清理器。闭包清理器删除Flink程序中对周围类匿名函数的不需要的引用。禁用闭包清除程序后,可能会发生匿名用户函数引用周围的类(通常不是Serializable)。这将导致序列化程序出现异常。

  • getParallelism()/ setParallelism(int parallelism)设置作业的默认并行度。

  • getMaxParallelism()/ setMaxParallelism(int parallelism)设置作业的默认最大并行度。此设置确定最大并行度并指定动态缩放的上限。

  • getNumberOfExecutionRetries()/ setNumberOfExecutionRetries(int numberOfExecutionRetries)设置重新执行失败任务的次数。值为零可有效禁用容错。值为-1表示应使用系统默认值(在配置中定义)。这已弃用,请改用重启策略

  • getExecutionRetryDelay()/ setExecutionRetryDelay(long executionRetryDelay)设置在重新执行作业之前系统在作业失败后等待的延迟(以毫秒为单位)。在TaskManagers上成功停止所有任务后,延迟开始,一旦延迟过去,任务就会重新启动。此参数对于延迟重新执行非常有用,以便在尝试重新执行之前让某些超时相关故障完全浮出水面(例如尚未完全超时的断开连接),并且由于同样的问题而再次立即失败。仅当执行重试次数为一次或多次时,此参数才有效。这已弃用,请改用重启策略

  • getExecutionMode()/ setExecutionMode()。默认执行模式为PIPELINED。设置执行模式以执行程序。执行模式定义数据交换是以批处理还是以流水线方式执行。

  • enableForceKryo()/ disableForceKryo。Kryo默认不会被迫。强制GenericTypeInformation将Pryo序列化程序用于POJO,即使我们可以将它们分析为POJO。在某些情况下,这可能更可取。例如,当Flink的内部序列化程序无法正确处理POJO时。

  • enableForceAvro()/ disableForceAvro()。默认情况下不会强制使用Avro。强制Flink AvroTypeInformation使用Avro序列化程序而不是Kryo来序列化Avro POJO。

  • enableObjectReuse()/ disableObjectReuse()默认情况下,对象不会在Flink中重复使用。启用对象重用模式将指示运行时重用用户对象以获得更好的性能。请记住,当 算子操作的用户代码函数不知道此行为时,这可能会导致错误。

  • enableSysoutLogging()/ disableSysoutLogging()JobManager状态更新System.out默认打印到。此设置允许禁用此行为。

  • getGlobalJobParameters()/ setGlobalJobParameters()此方法允许用户将自定义对象设置为作业的全局配置。由于ExecutionConfig可以在所有用户定义的函数中访问,因此这是一种在作业中全局可用的配置的简单方法。

  • addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)为给定的注册Kryo序列化程序实例type

  • addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)为给定的注册Kryo序列化程序类type

  • registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)使用Kryo注册给定类型并为其指定序列化程序。通过使用Kryo注册类型,类型的序列化将更加高效。

  • registerKryoType(Class<?> type)如果类型最终被Kryo序列化,那么它将在Kryo注册以确保只写入标签(整数ID)。如果某个类型未在Kryo中注册,则其整个类名将与每个实例序列化,从而导致更高的I / O成本。

  • registerPojoType(Class<?> type)使用序列化堆栈注册给定类型。如果类型最终被序列化为POJO,则该类型将在POJO序列化程序中注册。如果类型最终被Kryo序列化,那么它将在Kryo注册以确保只写入标签。如果某个类型未在Kryo中注册,则其整个类名将与每个实例序列化,从而导致更高的I / O成本。

请注意,注册的类型registerKryoType()不适用于Flink的Kryo序列化程序实例。

  • disableAutoTypeRegistration()默认情况下启用自动类型注册。自动类型注册是使用Kryo和POJO序列化器注册用户代码使用的所有类型(包括子类型)。

  • setTaskCancellationInterval(long interval)设置在连续尝试取消正在运行的任务之间等待的间隔(以毫秒为单位)。取消interrupt()任务时,如果任务线程未在特定时间内终止,则创建新线程,该线程定期调用任务线程。此参数是指连续呼叫之间的时间interrupt(),默认设置为30000毫秒或30秒

RuntimeContext是在可访问的Rich*通过的函数的getRuntimeContext()方法还允许访问ExecutionConfig中的所有用户定义的函数。