Operator chain条件
一共4块包含7条原则,全部满足则可以chain到一起,具体如下。
1.上下游满足双射(单映射又是满映射)
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
两个节点间数据分区方式是ForwardPartitioner
2.SlotSharingGroup
-
3.ChainingStrategy
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
4.全局配置
-
Chain规则源码
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
Chain策略(ChainingStrategy)
一共就三种ALWAYS、NEVER、HEAD
- ALWAYS:可以与上下游链接,map、flatmap、filter等默认是ALWAYS。
- HEAD:只能与下游链接,不能与上游链接,Source默认是HEAD。
NEVER:上下游都不链接,算子自己独享一个Task。
public enum ChainingStrategy {
/**
* Operators will be eagerly chained whenever possible.
*
* <p>To optimize performance, it is generally a good practice to allow maximal
* chaining and increase operator parallelism.
*/
ALWAYS,
/**
* The operator will not be chained to the preceding or succeeding operators.
*/
NEVER,
/**
* The operator will not be chained to the predecessor, but successors may chain to this
* operator.
*/
HEAD
}
设置Chain
通过调用StreamExecutionEnvironment.disableOperatorChaining()
来全局禁用chaining。
map().startNewChain().map
// 独占Task
map().disableChaining()
//
map().slotSharingGroup("group-name")
方法设置Chain的基本原理是通过改变Chain策略,具体对应如下。
startNewChain() //对应的策略是后面算子 ChainingStrategy.HEAD
disableChaining() //对应的策略是 ChainingStrategy.NEVER
内部优化选项
Chained 的 Operators 之间数据默认序列化后拷贝传递。
通过下面代码可以开启对象重用,关闭深拷贝。
env.getConfig().enableObjectReuse();
注意:
慎用!必须要确保下游Function只有一种,或者下游的Function均不会改变对象内部的值。否则可能会有线程安全的问题。官方建议阅读源码 org.apache.flink.streaming.runtime.tasks.OperatorChain 与 org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。完全了解 reuse 内部机制后之后再使用。用不好会出现线程安全问题。