Operator chain条件

一共4块包含7条原则,全部满足则可以chain到一起,具体如下。

1.上下游满足双射(单映射又是满映射)

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 两个节点间数据分区方式是ForwardPartitioner

    2.SlotSharingGroup

  4. 上下游节点都在同一个SlotSharingGroup中

    3.ChainingStrategy

  5. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)

  6. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

    4.全局配置

  7. 用户没有设置禁用 chain

    Chain规则源码

    1. public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    2. StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    3. StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
    4. StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
    5. StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
    6. return downStreamVertex.getInEdges().size() == 1
    7. && outOperator != null
    8. && headOperator != null
    9. && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
    10. && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
    11. && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
    12. headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
    13. && (edge.getPartitioner() instanceof ForwardPartitioner)
    14. && edge.getShuffleMode() != ShuffleMode.BATCH
    15. && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
    16. && streamGraph.isChainingEnabled();
    17. }

    Chain策略(ChainingStrategy)

    一共就三种ALWAYS、NEVER、HEAD

  • ALWAYS:可以与上下游链接,map、flatmap、filter等默认是ALWAYS。
  • HEAD:只能与下游链接,不能与上游链接,Source默认是HEAD。
  • NEVER:上下游都不链接,算子自己独享一个Task。

    1. public enum ChainingStrategy {
    2. /**
    3. * Operators will be eagerly chained whenever possible.
    4. *
    5. * <p>To optimize performance, it is generally a good practice to allow maximal
    6. * chaining and increase operator parallelism.
    7. */
    8. ALWAYS,
    9. /**
    10. * The operator will not be chained to the preceding or succeeding operators.
    11. */
    12. NEVER,
    13. /**
    14. * The operator will not be chained to the predecessor, but successors may chain to this
    15. * operator.
    16. */
    17. HEAD
    18. }

设置Chain

通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。

  1. map().startNewChain().map
  2. // 独占Task
  3. map().disableChaining()
  4. //
  5. map().slotSharingGroup("group-name")

方法设置Chain的基本原理是通过改变Chain策略,具体对应如下。

  1. startNewChain() //对应的策略是后面算子 ChainingStrategy.HEAD
  2. disableChaining() //对应的策略是 ChainingStrategy.NEVER

内部优化选项

Chained 的 Operators 之间数据默认序列化后拷贝传递。
通过下面代码可以开启对象重用,关闭深拷贝。

  1. env.getConfig().enableObjectReuse();

注意:

慎用!必须要确保下游Function只有一种,或者下游的Function均不会改变对象内部的值。否则可能会有线程安全的问题。官方建议阅读源码 org.apache.flink.streaming.runtime.tasks.OperatorChain 与 org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。完全了解 reuse 内部机制后之后再使用。用不好会出现线程安全问题。