思考:

  • Flink 是如何将多个 operators chain在一起的呢
  • chain在一起的operators是如何作为一个整体被执行的呢
  • Operator 之间的数据流又是如何避免了序列化/反序列化以及网络传输的呢
  • 多个tasks(或者说operators)是如何共享slot的呢

Chain 连接 共享 Slot

Operator chain的行为可以通过编程API中进行指定。

  • 可以通过在DataStream的operator后面(如someStream.map(..))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。

  • 调用disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起)。在底层,这两个方法都是通过调整operator的 chain 策略(HEAD、NEVER)来实现的。

  • 也可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。 ```java

  • Start new chain
  • 从此运算符开始,开始新的链。这两个映射器将被链接,并且过滤器将不会链接到第一个映射器。 someStream.filter(…).map(…).startNewChain().map(…);
  • Disable chaining
  • 不要链 当前 operator someStream.map(…).disableChaining();

  • 设置广告位共享组

  • 设置操作的插槽共享组。 Flink会将具有相同插槽共享组的操作放入同一插槽,同时将没有插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一插槽共享组中,则插槽共享组将从输入操作继承。默认插槽共享组的名称为“ default”,可以通过调用slotSharingGroup(“ default”)将操作显式放入该组中。 someStream.filter(…).slotSharingGroup(“name”); ```

下图展示了operators chain的内部实现:
image.png
Flink内部是通过 OperatorChain 这个类来将多个 operator 链在一起形成一个新的operator
OperatorChain形成的框框就像一个黑盒,Flink 无需知道黑盒中有多少个 ChainOperator、数据在chain 内部是怎么流动的,只需要将 input 数据交给 HeadOperator 就可以了,这就使得OperatorChain在行为上与普通的 operator 无差别,上面的 OperaotrChain 就可以看做是一个入度为1,出度为2的 operator

所以在实现中,对外可见的只有 HeadOperator,以及与外部连通的实线输出,这些输出对应了JobGraph 中的 JobEdge,在底层通过RecordWriterOutput来实现。另外,框中的虚线是operator ``chain 内部的数据流,这个流内的数据不会经过序列化/反序列化、网络传输,而是直接将消息对象传递给下游的 ChainOperator 处理,这是性能提升的关键点,在底层是通过 ChainingOutput 实现的,源码如下方所示,

注:HeadOperator和ChainOperator并不是具体的数据结构,前者指代chain中的第一个operator,后者指代chain中其余的operator,它们实际上都是StreamOperator。

  1. private static class <T> implements Output<StreamRecord<T>> {
  2. protected final OneInputStreamOperator<T, ?> operator;
  3. public (OneInputStreamOperator<T, ?> operator) {
  4. this.operator = operator;
  5. }
  6. // 发送消息方法的实现,直接将消息对象传递给operator处理,不经过序列化/反序列化、网络传输
  7. public void collect(StreamRecord<T> record) {
  8. try {
  9. operator.setKeyContextElement1(record);
  10. // 下游operator直接处理消息对象
  11. operator.processElement(record);
  12. }
  13. catch (Exception e) {
  14. throw new ExceptionInChainedOperatorException(e);
  15. }
  16. }
  17. ...
  18. }

SlotSharingGroup 与 CoLocationGroup

默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许slot共享有以下两点好处:

  1. Flink 集群所需的task slots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。

  2. 更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将keyAggregation/sink的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。

Coding - Chain - 图2

我们将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager

SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让 subtasks 共享一个 slot

CoLocationGroup 类用来强制将 subtasks 放到同一个 slot 中。

  • CoLocationGroup主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。

怎么判断 operator 属于哪个 slot 共享组呢?默认情况下,所有的 operator 都属于默认的共享组default,也就是说默认情况下所有的 operator都是可以共享一个 slot 的。而当所有input operators具有相同的 slot 共享组时,该 operator 会继承这个共享组。
最后,为了防止不合理的共享,用户也能通过API来强制指定 operator 的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了 filterslot 共享组为group1


共享Slot 原理与实现

先来看一下用来定义计算资源的slot的类图:
image.png

抽象类 Slot 定义了该槽位属于哪个 TaskManagerinstance)的第几个槽位(slotNumber),属于哪个Job(jobID)等信息。最简单的情况下,一个slot只持有一个task,也就是SimpleSlot的实现。复杂点的情况,一个slot能共享给多个task使用,也就是SharedSlot的实现。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一个SharedSlot能定义出一棵slots树。

接下来我们来看看 Flink 为subtask分配slot的过程。关于Flink调度,有两个非常重要的原则我们必须知道:

1)同一个operator的各个subtask是不能呆在同一个SharedSlot中的,例如FlatMap[1]FlatMap[2]是不能在同一个SharedSlot中的。

2)Flink是按照拓扑顺序从Source一个个调度到Sink的。例如WordCount(Source并行度为1,其他并行度为2),那么调度的顺序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。假设现在有2个TaskManager,每个只有1个slot(为简化问题),那么分配slot的过程如图所示:
Coding - Chain - 图4

注:图中 SharedSlot 与 SimpleSlot 后带的括号中的数字代表槽位号(slotNumber)

  1. Source分配slot。首先,我们从TaskManager1中分配出一个SharedSlot。并从SharedSlot中为Source分配出一个SimpleSlot。如上图中的①和②。

  2. FlatMap[1]分配slot。目前已经有一个SharedSlot,则从该SharedSlot中分配出一个SimpleSlot用来部署FlatMap[1]。如上图中的③。

  3. FlatMap[2]分配slot。由于TaskManager1的SharedSlot中已经有同operator的FlatMap[1]了,我们只能分配到其他SharedSlot中去。从TaskManager2中分配出一个SharedSlot,并从该SharedSlot中为FlatMap[2]分配出一个SimpleSlot。如上图的④和⑤。

  4. Key->Sink[1]分配slot。目前两个SharedSlot都符合条件,从TaskManager1的SharedSlot中分配出一个SimpleSlot用来部署Key->Sink[1]。如上图中的⑥。

  5. Key->Sink[2]分配slot。TaskManager1的SharedSlot中已经有同operator的Key->Sink[1]了,则只能选择另一个SharedSlot中分配出一个SimpleSlot用来部署Key->Sink[2]。如上图中的⑦。

最后SourceFlatMap[1]Key->Sink[1]这些subtask都会部署到TaskManager1的唯一一个slot中,并启动对应的线程。FlatMap[2]Key->Sink[2]这些subtask都会被部署到TaskManager2的唯一一个slot中,并启动对应的线程。从而实现了slot共享。

参考: