• 所有batch优化规则定义在FlinkBatchRuleSets中
  • Flink1.12.1版本针对 Join的优化规则如下:
    1. BatchExecHashJoinRule.INSTANCE,
    2. BatchExecSortMergeJoinRule.INSTANCE,
    3. BatchExecNestedLoopJoinRule.INSTANCE,
    4. BatchExecSingleRowJoinRule.INSTANCE,
    5. BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
    6. BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,

BatchExecHashJoinRule

此规则主要作用在与boardcast优化,最终生成BatchExecHashJoin。代码实现:

  • 匹配(默认都会匹配): ```scala val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig val isShuffleHashJoinEnabled = !isOperatorDisabled(tableConfig, OperatorType.ShuffleHashJoin) val isBroadcastHashJoinEnabled = !isOperatorDisabled( tableConfig, OperatorType.BroadcastHashJoin)

val leftSize = binaryRowRelNodeSize(join.getLeft) val rightSize = binaryRowRelNodeSize(join.getRight) val (isBroadcast, _) = canBroadcast(join.getJoinType, leftSize, rightSize, tableConfig)

// TODO use shuffle hash join if isBroadcast is true and isBroadcastHashJoinEnabled is false ? if (isBroadcast) isBroadcastHashJoinEnabled else isShuffleHashJoinEnabled

  1. - 转换:
  2. ```scala
  3. //boardcast转换
  4. if (isBroadcast) {
  5. join.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
  6. .replace(FlinkRelDistribution.BROADCAST_DISTRIBUTED)
  7. }else {
  8. //hash,对partial key单独做了优化,略
  9. join.getCluster.getPlanner.emptyTraitSet.
  10. replace(FlinkConventions.BATCH_PHYSICAL).
  11. replace(FlinkRelDistribution.hash(columns))
  12. }
  • 生成:
    1. val newJoin = new BatchExecHashJoin(...)

BatchExecSortMergeJoinRule

  1. **对于有equal条件的join,会进入此规则,优化成SortMergeJoin**
  • 有eq条件即匹配

    1. val joinInfo = join.analyzeCondition
    2. val isSortMergeJoinEnabled = !isOperatorDisabled(tableConfig, OperatorType.SortMergeJoin)
    3. !joinInfo.pairs().isEmpty && isSortMergeJoinEnabled
  • 转换:

    1. candidates.foreach {
    2. case (requireLeftSorted, requireRightSorted) =>
    3. transformToEquiv(
    4. joinInfo.leftKeys,
    5. joinInfo.rightKeys,
    6. requireLeftSorted,
    7. requireRightSorted)
    8. }
  • 最终会生成BatchExecSortMergeJoin,简单看下最终生成的op:SortMergeJoinOperator,其内部有两个Sorter,处理数据时,会先将数据排序,根据左右join合并数据。

    1. public class SortMergeJoinOperator extends TableStreamOperator<RowData>
    2. implements TwoInputStreamOperator<RowData, RowData, RowData>, BoundedMultiInput {
    3. private transient BinaryExternalSorter sorter1;
    4. private transient BinaryExternalSorter sorter2;
    5. }

BatchExecNestedLoopJoinRule

左右遍历join ~,规则简单。但是flink做了个小优化,如果是semi或anti Join,会判断内层是否需要去重,效果也显而易见,如果重复很多则可以省去很多不必要的二层遍历。

  1. case JoinRelType.SEMI | JoinRelType.ANTI =>
  2. // We can do a distinct to buildSide(right) when semi join.
  3. val distinctKeys = 0 until join.getRight.getRowType.getFieldCount
  4. val useBuildDistinct = chooseSemiBuildDistinct(join.getRight, distinctKeys)
  5. if (useBuildDistinct) {
  6. addLocalDistinctAgg(join.getRight, distinctKeys, call.builder())
  7. } else {
  8. join.getRight
  9. }