- 所有batch优化规则定义在FlinkBatchRuleSets中
- Flink1.12.1版本针对 Join的优化规则如下:
BatchExecHashJoinRule.INSTANCE,BatchExecSortMergeJoinRule.INSTANCE,BatchExecNestedLoopJoinRule.INSTANCE,BatchExecSingleRowJoinRule.INSTANCE,BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
BatchExecHashJoinRule
此规则主要作用在与boardcast优化,最终生成BatchExecHashJoin。代码实现:
- 匹配(默认都会匹配): ```scala val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig val isShuffleHashJoinEnabled = !isOperatorDisabled(tableConfig, OperatorType.ShuffleHashJoin) val isBroadcastHashJoinEnabled = !isOperatorDisabled( tableConfig, OperatorType.BroadcastHashJoin)
val leftSize = binaryRowRelNodeSize(join.getLeft) val rightSize = binaryRowRelNodeSize(join.getRight) val (isBroadcast, _) = canBroadcast(join.getJoinType, leftSize, rightSize, tableConfig)
// TODO use shuffle hash join if isBroadcast is true and isBroadcastHashJoinEnabled is false ? if (isBroadcast) isBroadcastHashJoinEnabled else isShuffleHashJoinEnabled
- 转换:```scala//boardcast转换if (isBroadcast) {join.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL).replace(FlinkRelDistribution.BROADCAST_DISTRIBUTED)}else {//hash,对partial key单独做了优化,略join.getCluster.getPlanner.emptyTraitSet.replace(FlinkConventions.BATCH_PHYSICAL).replace(FlinkRelDistribution.hash(columns))}
- 生成:
val newJoin = new BatchExecHashJoin(...)
BatchExecSortMergeJoinRule
**对于有equal条件的join,会进入此规则,优化成SortMergeJoin**
有eq条件即匹配
val joinInfo = join.analyzeConditionval isSortMergeJoinEnabled = !isOperatorDisabled(tableConfig, OperatorType.SortMergeJoin)!joinInfo.pairs().isEmpty && isSortMergeJoinEnabled
转换:
candidates.foreach {case (requireLeftSorted, requireRightSorted) =>transformToEquiv(joinInfo.leftKeys,joinInfo.rightKeys,requireLeftSorted,requireRightSorted)}
最终会生成BatchExecSortMergeJoin,简单看下最终生成的op:SortMergeJoinOperator,其内部有两个Sorter,处理数据时,会先将数据排序,根据左右join合并数据。
public class SortMergeJoinOperator extends TableStreamOperator<RowData>implements TwoInputStreamOperator<RowData, RowData, RowData>, BoundedMultiInput {private transient BinaryExternalSorter sorter1;private transient BinaryExternalSorter sorter2;}
BatchExecNestedLoopJoinRule
左右遍历join ~,规则简单。但是flink做了个小优化,如果是semi或anti Join,会判断内层是否需要去重,效果也显而易见,如果重复很多则可以省去很多不必要的二层遍历。
case JoinRelType.SEMI | JoinRelType.ANTI =>// We can do a distinct to buildSide(right) when semi join.val distinctKeys = 0 until join.getRight.getRowType.getFieldCountval useBuildDistinct = chooseSemiBuildDistinct(join.getRight, distinctKeys)if (useBuildDistinct) {addLocalDistinctAgg(join.getRight, distinctKeys, call.builder())} else {join.getRight}
