子图复用优化是为了找到SQL执行计划中重复的节点,将其复用,避免这部分重复计算的逻辑。先回顾SQL执行的主要流程 parser -> validate -> logical optimize -> physical optimize -> translateToExecNode。
而子图复用的逻辑就是在这个阶段进行的

  1. private[flink] def translateToExecNodeGraph(
  2. optimizedRelNodes: Seq[RelNode],
  3. isCompiled: Boolean): ExecNodeGraph = {
  4. val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel])
  5. if (nonPhysicalRel.nonEmpty) {
  6. throw new TableException(
  7. "The expected optimized plan is FlinkPhysicalRel plan, " +
  8. s"actual plan is ${nonPhysicalRel.head.getClass.getSimpleName} plan.")
  9. }
  10. require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))
  11. // Rewrite same rel object to different rel objects
  12. // in order to get the correct dag (dag reuse is based on object not digest)
  13. val shuttle = new SameRelObjectShuttle()
  14. val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
  15. // reuse subplan
  16. val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
  17. // convert FlinkPhysicalRel DAG to ExecNodeGraph
  18. val generator = new ExecNodeGraphGenerator()
  19. val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]), isCompiled)
  20. // process the graph
  21. val context = new ProcessorContext(this)
  22. val processors = getExecNodeGraphProcessors
  23. processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context))
  24. }

可以看到这里首先会校验relNodes都是FlinkPhysicalRel 物理执行计划的节点

  1. require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))

SameRelObjectShuttle

  1. /**
  2. * Rewrite same rel object to different rel objects.
  3. *
  4. * <p>e.g.
  5. * {{{
  6. * Join Join
  7. * / \ / \
  8. * Filter1 Filter2 => Filter1 Filter2
  9. * \ / | |
  10. * Scan Scan1 Scan2
  11. * }}}
  12. * After rewrote, Scan1 and Scan2 are different object but have same digest.
  13. */
  14. class SameRelObjectShuttle extends DefaultRelShuttle {
  15. private val visitedNodes = Sets.newIdentityHashSet[RelNode]()
  16. override def visit(node: RelNode): RelNode = {
  17. val visited = !visitedNodes.add(node)
  18. var change = false
  19. val newInputs = node.getInputs.map {
  20. input =>
  21. val newInput = input.accept(this)
  22. change = change || (input ne newInput)
  23. newInput
  24. }
  25. if (change || visited) {
  26. node.copy(node.getTraitSet, newInputs)
  27. } else {
  28. node
  29. }
  30. }
  31. }

然后进行rel节点重写,RelShuttle的作用就是提供visit的模式根据实现的逻辑来替换树中的某些节点。可以看到这个实现中会将 同一个objec(注意这里保存visitedNodes使用的是identity hash set) 第二次访问时 copy成一个新的对象,但是有相同的digest,这一步的目的是什么呢?
我们往下面看在后续生成ExecNode时, 会创建一个IdentityHashMap 来保存访问过的Rels,所以意思就是真正生成ExecNode时,是和Rels对象一一对应的。

  1. private final Map<FlinkPhysicalRel, ExecNode<?>> visitedRels = new IdentityHashMap();
  2. private ExecNode<?> generate(FlinkPhysicalRel rel, boolean isCompiled) {
  3. ExecNode<?> execNode = visitedRels.get(rel);
  4. if (execNode != null) {
  5. return execNode;
  6. }
  7. if (rel instanceof CommonIntermediateTableScan) {
  8. throw new TableException("Intermediate RelNode can't be converted to ExecNode.");
  9. }
  10. List<ExecNode<?>> inputNodes = new ArrayList<>();
  11. for (RelNode input : rel.getInputs()) {
  12. inputNodes.add(generate((FlinkPhysicalRel) input, isCompiled));
  13. }
  14. execNode = rel.translateToExecNode(isCompiled);
  15. // connects the input nodes
  16. List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size());
  17. for (ExecNode<?> inputNode : inputNodes) {
  18. inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build());
  19. }
  20. execNode.setInputEdges(inputEdges);
  21. visitedRels.put(rel, execNode);
  22. return execNode;
  23. }

看到这里上面将同一个object 拆成两个的目的就更不可理解了,因为本来是一个object的话在这里天然就复用了,但是拆成2个反而就不能复用了。
这里的目的是先将相同的object被重复引用的节点拆开,然后再根据digest相同以及内部规则来决定是否复用。这样就可以有Flink引擎来控制哪些节点是可以合并的。

SubplanReuseContext

在context中通过ReusableSubplanVisitor构造两组映射关系

  1. // mapping a relNode to its digest
  2. private val mapRelToDigest = Maps.newIdentityHashMap[RelNode, String]()
  3. // mapping the digest to RelNodes
  4. private val mapDigestToReusableNodes = new util.HashMap[String, util.List[RelNode]]()

中间的逻辑比较简单就是遍历整棵树,查找是否存在可reusable的节点,怎么判断可reusable呢?

  • 同一digest下,挂了多个RelNode节点,那么这一组RelNode是同一语义的,是可以复用的候选
  • 节点没有disable reusable

    1. /** Returns true if the given node is reusable disabled */
    2. private def isNodeReusableDisabled(node: RelNode): Boolean = {
    3. node match {
    4. // TableSourceScan node can not be reused if reuse TableSource disabled
    5. case _: FlinkLogicalLegacyTableSourceScan | _: CommonPhysicalLegacyTableSourceScan |
    6. _: FlinkLogicalTableSourceScan | _: CommonPhysicalTableSourceScan =>
    7. !tableSourceReuseEnabled
    8. // Exchange node can not be reused if its input is reusable disabled
    9. case e: Exchange => isNodeReusableDisabled(e.getInput)
    10. // TableFunctionScan and sink can not be reused
    11. case _: TableFunctionScan | _: LegacySink | _: Sink => true
    12. case _ => false
    13. }
    14. }

    例如TableFunctionScan就不能被Reuse(这个原因还没理解),或者exchange只有input被reuse时,该节点才能复用

    SubplanReuseShuttle

    在以上的visit执行完之后以及知道哪些节点是可以复用的了,最后通过一个Shuttle来将可复用的节点进行替换

    1. class SubplanReuseShuttle(context: SubplanReuseContext) extends DefaultRelShuttle {
    2. private val mapDigestToNewNode = new util.HashMap[String, RelNode]()
    3. override def visit(rel: RelNode): RelNode = {
    4. val canReuseOtherNode = context.reuseOtherNode(rel)
    5. val digest = context.getRelDigest(rel)
    6. if (canReuseOtherNode) {
    7. val newNode = mapDigestToNewNode.get(digest)
    8. if (newNode == null) {
    9. throw new TableException("This should not happen")
    10. }
    11. newNode
    12. } else {
    13. val newNode = visitInputs(rel)
    14. mapDigestToNewNode.put(digest, newNode)
    15. newNode
    16. }
    17. }
    18. }

    �实现的方式就是记录每个digest对应的newNode,当可以复用时,那么直接返回该复用digest对应的RelNode(替换了原先的digest相同,对象不同的RelNode),这样整棵树中可复用的节点又重新合并了。