思考:
- Flink 是如何将多个 operators chain在一起的呢
- chain在一起的operators是如何作为一个整体被执行的呢
- Operator 之间的数据流又是如何避免了序列化/反序列化以及网络传输的呢
- 多个tasks(或者说operators)是如何共享slot的呢
Chain 连接 共享 Slot
Operator chain的行为可以通过编程API中进行指定。
可以通过在DataStream的operator后面(如
someStream.map(..)
)调用startNewChain()
来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。调用
disableChaining()
来指示该operator不参与chaining(不会与前后的operator chain一起)。在底层,这两个方法都是通过调整operator的 chain 策略(HEAD、NEVER)来实现的。也可以通过调用
StreamExecutionEnvironment.disableOperatorChaining()
来全局禁用chaining。 ```java- Start new chain
- 从此运算符开始,开始新的链。这两个映射器将被链接,并且过滤器将不会链接到第一个映射器。 someStream.filter(…).map(…).startNewChain().map(…);
- Disable chaining
不要链 当前 operator someStream.map(…).disableChaining();
设置广告位共享组
- 设置操作的插槽共享组。 Flink会将具有相同插槽共享组的操作放入同一插槽,同时将没有插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一插槽共享组中,则插槽共享组将从输入操作继承。默认插槽共享组的名称为“ default”,可以通过调用slotSharingGroup(“ default”)将操作显式放入该组中。 someStream.filter(…).slotSharingGroup(“name”); ```
下图展示了operators chain的内部实现:
Flink内部是通过 OperatorChain
这个类来将多个 operator
链在一起形成一个新的operator
。OperatorChain
形成的框框就像一个黑盒,Flink 无需知道黑盒中有多少个 ChainOperator
、数据在chain
内部是怎么流动的,只需要将 input
数据交给 HeadOperator
就可以了,这就使得OperatorChain
在行为上与普通的 operator
无差别,上面的 OperaotrChain
就可以看做是一个入度为1,出度为2的 operator
。
所以在实现中,对外可见的只有 HeadOperator
,以及与外部连通的实线输出,这些输出对应了JobGraph
中的 JobEdge
,在底层通过RecordWriterOutput
来实现。另外,框中的虚线是operator ``chain
内部的数据流,这个流内的数据不会经过序列化/反序列化、网络传输,而是直接将消息对象传递给下游的 ChainOperator
处理,这是性能提升的关键点,在底层是通过 ChainingOutput
实现的,源码如下方所示,
注:HeadOperator和ChainOperator并不是具体的数据结构,前者指代chain中的第一个operator,后者指代chain中其余的operator,它们实际上都是StreamOperator。
private static class <T> implements Output<StreamRecord<T>> {
protected final OneInputStreamOperator<T, ?> operator;
public (OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
}
// 发送消息方法的实现,直接将消息对象传递给operator处理,不经过序列化/反序列化、网络传输
public void collect(StreamRecord<T> record) {
try {
operator.setKeyContextElement1(record);
// 下游operator直接处理消息对象
operator.processElement(record);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
...
}
SlotSharingGroup 与 CoLocationGroup
默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许slot共享有以下两点好处:
Flink 集群所需的task slots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。
更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将keyAggregation/sink的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。
我们将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink
被平均地分配到各个 TaskManager
。
SlotSharingGroup
是Flink中用来实现slot共享的类,它尽可能地让 subtasks
共享一个 slot
。
CoLocationGroup
类用来强制将 subtasks
放到同一个 slot
中。
CoLocationGroup
主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。
怎么判断 operator
属于哪个 slot
共享组呢?默认情况下,所有的 operator
都属于默认的共享组default
,也就是说默认情况下所有的 operator
都是可以共享一个 slot
的。而当所有input operators
具有相同的 slot
共享组时,该 operator
会继承这个共享组。
最后,为了防止不合理的共享,用户也能通过API来强制指定 operator
的共享组,比如:someStream.filter(...).slotSharingGroup("group1");
就强制指定了 filter
的 slot
共享组为group1
。
共享Slot 原理与实现
先来看一下用来定义计算资源的slot的类图:
抽象类 Slot
定义了该槽位属于哪个 TaskManager
(instance
)的第几个槽位(slotNumber
),属于哪个Job(jobID
)等信息。最简单的情况下,一个slot只持有一个task,也就是SimpleSlot
的实现。复杂点的情况,一个slot能共享给多个task使用,也就是SharedSlot
的实现。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一个SharedSlot能定义出一棵slots树。
接下来我们来看看 Flink 为subtask分配slot的过程。关于Flink调度,有两个非常重要的原则我们必须知道:
1)同一个operator的各个subtask是不能呆在同一个SharedSlot中的,例如FlatMap[1]
和FlatMap[2]
是不能在同一个SharedSlot中的。
2)Flink是按照拓扑顺序从Source一个个调度到Sink的。例如WordCount(Source并行度为1,其他并行度为2),那么调度的顺序依次是:Source
-> FlatMap[1]
-> FlatMap[2]
-> KeyAgg->Sink[1]
-> KeyAgg->Sink[2]
。假设现在有2个TaskManager,每个只有1个slot(为简化问题),那么分配slot的过程如图所示:
注:图中 SharedSlot 与 SimpleSlot 后带的括号中的数字代表槽位号(slotNumber)
为
Source
分配slot。首先,我们从TaskManager1中分配出一个SharedSlot。并从SharedSlot中为Source
分配出一个SimpleSlot。如上图中的①和②。为
FlatMap[1]
分配slot。目前已经有一个SharedSlot,则从该SharedSlot中分配出一个SimpleSlot用来部署FlatMap[1]
。如上图中的③。为
FlatMap[2]
分配slot。由于TaskManager1的SharedSlot中已经有同operator的FlatMap[1]
了,我们只能分配到其他SharedSlot中去。从TaskManager2中分配出一个SharedSlot,并从该SharedSlot中为FlatMap[2]
分配出一个SimpleSlot。如上图的④和⑤。为
Key->Sink[1]
分配slot。目前两个SharedSlot都符合条件,从TaskManager1的SharedSlot中分配出一个SimpleSlot用来部署Key->Sink[1]
。如上图中的⑥。为
Key->Sink[2]
分配slot。TaskManager1的SharedSlot中已经有同operator的Key->Sink[1]
了,则只能选择另一个SharedSlot中分配出一个SimpleSlot用来部署Key->Sink[2]
。如上图中的⑦。
最后Source
、FlatMap[1]
、Key->Sink[1]
这些subtask都会部署到TaskManager1的唯一一个slot中,并启动对应的线程。FlatMap[2]
、Key->Sink[2]
这些subtask都会被部署到TaskManager2的唯一一个slot中,并启动对应的线程。从而实现了slot共享。
参考: