• Batch
    • #">Adaptive Batch Scheduler #
      • #">用法 #
        • #">启用 Adaptive Batch Scheduler #
        • #">配置算子的并行度为 -1 #
      • #">性能调优 #
      • #">局限性 #

    Batch

    Adaptive Batch Scheduler #

    Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批作业调度器。如果算子未设置并行度,调度器将根据其消费的数据量的大小来推导其并行度。这可以带来诸多好处:

    • 批作业用户可以从并行度调优中解脱出来
    • 根据数据量自动推导并行度可以更好地适应每天变化的数据量
    • SQL作业中的算子也可以分配不同的并行度

      用法 #

      使用 Adaptive Batch Scheduler 自动推导算子的并行度,需要:

    • 启用 Adaptive Batch Scheduler

    • 配置算子的并行度为 -1

      启用 Adaptive Batch Scheduler #

      为了启用 Adaptive Batch Scheduler, 你需要:

    • 配置 jobmanager.scheduler: AdaptiveBatch

    • 由于 “只支持所有数据交换都为 BLOCKING 模式的作业”, 需要将 execution.batch-shuffle-mode 配置为 ALL-EXCHANGES-BLOCKING(默认值) 。

    除此之外,使用 Adaptive Batch Scheduler 时,以下相关配置也可以调整:

    • jobmanager.adaptive-batch-scheduler.min-parallelism: 允许自动设置的并行度最小值。需要配置为 2 的幂,否则也会被自动调整为最接近且大于其的 2 的幂。
    • jobmanager.adaptive-batch-scheduler.max-parallelism: 允许自动设置的并行度最大值。需要配置为 2 的幂,否则也会被自动调整为最接近且小于其的 2 的幂。
    • jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task: 期望每个任务平均处理的数据量大小。由于顶点的并行度会被调整为 2^N,因此实际每个任务平均处理的数据量大小将是该值的 0.75~1.5 倍。 另外需要注意的是,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
    • jobmanager.adaptive-batch-scheduler.default-source-parallelism: source 算子的默认并行度

      配置算子的并行度为 -1 #

      Adaptive Batch Scheduler 只会为用户未指定并行度的算子(并行度为 -1)推导并行度。 所以如果你想自动推导算子的并行度,需要进行以下配置:

    • 配置 parallelism.default: -1

    • 对于 SQL 作业,需要配置 table.exec.resource.default-parallelism: -1
    • 对于 DataStream/DataSet 作业,不要在作业中通过算子的 setParallelism() 方法来指定并行度
    • 对于 DataStream/DataSet 作业,不要在作业中通过 StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 方法来指定并行度

      性能调优 #

    1. 建议使用 Sort Shuffle 并且设置 taskmanager.network.memory.buffers-per-channel 为 0。 这会解耦并行度与需要的网络内存,对于大规模作业,这样可以降低遇到 “Insufficient number of network buffers” 错误的可能性。
    2. 建议将 jobmanager.adaptive-batch-scheduler.max-parallelism 设置为最坏情况下预期需要的并行度。不建议配置太大的值,否则可能会影响性能。这个配置项会影响上游任务产出的 subpartition 的数量,过多的 subpartition 可能会影响 hash shuffle 的性能,或者由于小包影响网络传输的性能。

      局限性 #

    • 只支持批作业: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
    • 只支持所有数据交换都为 BLOCKING 模式的作业: 目前 Adaptive Batch Scheduler 只支持 shuffle mode 为 ALL-EXCHANGES-BLOCKING 的作业。
    • 推导出的并行度是 2 的幂: 为了使子分区可以均匀分配给下游任务,jobmanager.adaptive-batch-scheduler.max-parallelism 应该被配置为 2^N, 推导出的并行度会是 2^M, 且满足 M <= N。
    • 不支持 FileInputFormat 类型的 source: 不支持 FileInputFormat 类型的 source, 包括 StreamExecutionEnvironment#readFile(…) StreamExecutionEnvironment#readTextFile(…) 和 StreamExecutionEnvironment#createInput(FileInputFormat, …)。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API (FileSystem DataStream ConnectorFileSystem SQL Connector) 来读取文件.
    • Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致: 在使用 Adaptive Batch Scheduler 时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 FLIP-187