什么是反压问题?
反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃
Flink中的反压
Flink在运行时主要由operators和streams构成,每个operator会消费中间态的流并在流上转换,形成新的流。
Flink的实现反压的机制类似于Java中的BlockingQueue队列,通过LocalBufferPool来实现。
网络传输中的内存管理
网络上传输的数据会首先写到该Task(任务链)的**InputGate**(IG)中有Task处理后再写到**ResultPartition**中(RS),输入和输出的数据存在**LocalBufferPool**中,该Buffer是**MemorySegment**的包装类
初始化步骤
初始化NetworkEnvironment
**TaskManager**启动时会先初始化**NetworkEnvironment**对象,TM中所有与网络相关的类由该类来管理,其中就包括**NetworkBufferPool**。
会在NetworkBufferPool中生成一定数量的内存块**MemorySegment**默认是2048 * 32 Kb,内存块的总数量代表了网络传输所有可以用的内存,NetworkEnvironment 和NetworkBufferPool 在Task之间共享,每个TM只会实例化一个。
Task申请网络缓冲资源
Task 线程启动时,会向 NetworkEnvironment 注册,NetworkEnvironment 会为 Task 的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 **LocalBufferPool**(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中 **InputChannel** 数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 **ResultSubpartition** 数量一致。
InputGate为一个Task中的,InputChannel为一个SubTask中的;RP与Rsp的意义同理
不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。
**注意,这个过程只是指定了缓冲池所能使用的内存块数量,并没有真正分配内存块,只有当需要时才分配。**
为什么要动态地为缓冲池扩容呢?因为内存越多,意味着系统可以更轻松地应对瞬时压力(如GC),不会频繁地进入反压状态,所以我们要利用起那部分闲置的内存块。
Task线程接收到数据时
此时Netty端接收到数据会向本地池申请,如果本地池达到上限则会向NetworkBufferPool申请,如果均载满,则会停止读取
Buffer回收
当一个内存块被消费完之后,即输入端的字节被序列化成对象,输出端的字节写入到了netty channel中,则会调用buffer.recycle()方法还给LocalBufferPool。
如果当前申请的数量超过了池子的容量,则会还给NetworkBufferPool,如果没有则会继续留在池子中,减少反复开销。
反压步骤
- 记录“A”进入了 Flink 并且被 Task 1 处理。(这里省略了 Netty 接收、反序列化等过程)
- 记录被序列化到 buffer 中。
- 该 buffer 被发送到 Task 2,然后 Task 2 从这个 buffer 中读出记录。
总结
Task 1 在输出端有一个相关联的 LocalBufferPool(称缓冲池1),Task 2 在输入端也有一个相关联的 LocalBufferPool(称缓冲池2)。如果缓冲池1中有空闲可用的 buffer 来序列化记录 “A”,我们就序列化并发送该 buffer。
同节点反压和跨节点反压
Netty水位机制:跨节点反压
NettyServer初始化
// 默认高水位值为2个buffer大小, 当接收端消费速度跟不上,发送端会立即感知到
//低水位,一个buffer大小+1
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
//高水位,2个buffer大小
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
发送代码
当输出缓冲中的字节数超过了高水位值, 则 Channel.isWritable()
会返回false。当输出缓存中的字节数又掉到了低水位值以下, 则 Channel.isWritable()
会重新返回true。Flink 中发送数据的核心代码在 PartitionRequestQueue
中,该类是 server channel pipeline 的最后一层。发送数据关键代码如下所示
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError) {
return;
}
Buffer buffer = null;
try {
// channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK
// 和 WRITE_BUFFER_HIGH_WATER_MARK 实现发送端的流量控制
if (channel.isWritable()) {
// 注意: 一个while循环也就最多只发送一个BufferResponse, 连续发送BufferResponse是通过writeListener回调实现的
while (true) {
if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
return;
}
buffer = currentPartitionQueue.getNextBuffer();
if (buffer == null) {
// 跳过这部分代码
...
}
else {
// 构造一个response返回给客户端
BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
if (!buffer.isBuffer() &&
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
// 跳过这部分代码。batch 模式中 subpartition 的数据准备就绪,通知下游消费者。
...
}
// 将该response发到netty channel, 当写成功后,
// 通过注册的writeListener又会回调进来, 从而不断地消费 queue 中的请求
channel.writeAndFlush(resp).addListener(writeListener);
return;
}
}
}
}
catch (Throwable t) {
if (buffer != null) {
buffer.recycle();
}
throw new IOException(t.getMessage(), t);
}
}
// 当水位值降下来后(channel 再次可写),会重新触发发送函数
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}