Java Heap、Off-Heap
Java Advanced Runtime Options
java进程的内存分类:
- heap,堆内内存,受GC管理。参数有 Xmx Xms Xmn等。
- Off-Heap,堆外内存
- jvm运行需要的堆外内存
如JVM Metaspace、JVM OverHead(thread stacks, code cache, garbage collection space等) - 业务可以使用的堆外内存
- jvm运行需要的堆外内存
Component | Configuration options | Description |
---|---|---|
Framework Heap Memory | taskmanager.memory.framework.heap.size | JVM Heap memory dedicated to Flink framework (advanced option) |
Task Heap Memory | taskmanager.memory.task.heap.size | JVM Heap memory dedicated to Flink application to run operators and user code |
Managed memory | taskmanager.memory.managed.size |
taskmanager.memory.managed.fraction | Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend |
| Framework Off-heap Memory | taskmanager.memory.framework.off-heap.size | Off-heap direct (or native) memory
dedicated to Flink framework (advanced option) |
| Task Off-heap Memory | taskmanager.memory.task.off-heap.size | Off-heap direct (or native) memory
dedicated to Flink application to run operators |
| Network Memory | taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction | Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a capped fractionated component
of the total Flink memory
. This memory is used for allocation of network buffers |
| JVM metaspace | taskmanager.memory.jvm-metaspace.size | Metaspace size of the Flink JVM process |
| JVM Overhead | taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction | Native memory reserved for other JVM overhead: e.g. thread stacks, code cache, garbage collection space etc, it is a capped fractionated component
of the total process memory |
MemorySegment
Flink内存管理的基本单元,底层存储结构分heap和off-heap。
- heap:byte数组
- off-heap:direct(用于network)和unsafe(用于managed memory)
MemorySegmentFactory
所有的MemorSegment都是通过MemorySegmentFactory创建的。
Network Off-Heap
Network缓冲区底层使用的是DirectByteBuffer,并且保证了永远不会超出Network Memory大小限制。
NetworkBufferPool
- TM级别的全局固定大小的pool,代表了TM整个Network buffer。
其大小networkMemSize=totalNumberOfMemorySegments*memorySegmentSize,
在TM启动初始化shuffle(NettyShuffleEnvironment)的时候创建的。 其主要两个作用
写:ResultPartition级别pool
Task new时创建ResultPartitionWriter,每个ResultPartitionWriter持有一个LocalBufferPool,在setup时创建。- 读:InputGate级别pool。
Task new时创建InputGate,每个InputGate持有一个LocalBufferPool,在setup时创建。Network Buffer在数据发送层的作用
BufferBuilder
- 两个成员变量,
- memorySegment,LocalBufferPool#toBufferBuilder申请到后,传入的。
- buffer,
this.buffer = new NetworkBuffer(memorySegment, recycler);
封装了segment
- 作用
- 用来向Segment写入内容,append方法。
- 创建BufferConsumer。
在flink数据流动中,我们提到ResultPartitionWriter emitRecord方法从数据流动的角度讲主要是向BufferBuild append record。
从内存的角度,emitRecord会去LocalBufferPool申请BufferBuild,如果buffer不足,则会block。
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
totalWrittenBytes += record.remaining();
//append record to subPartition
BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
while (record.hasRemaining()) {
// full buffer, partial record
finishUnicastBufferBuilder(targetSubpartition);
buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
}
if (buffer.isFull()) {
// full buffer, full record
finishUnicastBufferBuilder(targetSubpartition);
}
// partial buffer, full record
}
private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record, final int targetSubpartition) throws IOException {
if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) {
throw new ArrayIndexOutOfBoundsException(targetSubpartition);
}
BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
if (buffer == null) {
//如果subPartition对应的BufferBuilder不存在,申请新的
buffer = requestNewUnicastBufferBuilder(targetSubpartition);
//新申请的BufferBuilder添加到subpartition中的BufferConsumer队列buffers,
//以使读取时能读到
addToSubpartition(buffer, targetSubpartition, 0, record.remaining());
}
//具体往buffer写数据
buffer.appendAndCommit(record);
return buffer;
}
private BufferBuilder requestNewUnicastBufferBuilder(int targetSubpartition)
throws IOException {
checkInProduceState();
ensureUnicastMode();
final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
unicastBufferBuilders[targetSubpartition] = bufferBuilder;
return bufferBuilder;
}
private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
throws IOException {
//LocalBufferPool去申请buffer
BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
if (bufferBuilder != null) {
return bufferBuilder;
}
hardBackPressuredTimeMsPerSecond.markStart();
try {
//如果申请buffer的为null则阻塞直到申请成功。
bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
hardBackPressuredTimeMsPerSecond.markEnd();
return bufferBuilder;
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for buffer");
}
}
NetworkBuffer
作用
NetworkBuffer继承自AbstractReferenceCountedByteBuf。
是在FLINK-7315引入的,避免了传递给netty ByteBuf时不必要的内存拷贝。
封装了MemorySegment,MemorySegment底层是ByteBuffer。
NetworkBuffer会作为BufferResponse的成员变量,在BufferResponse序列化时直接write NetworkBuffer,不用重新申请netty的ByteBuf,减少了内存拷贝。
整个数据发送的过程,在用户态只会在BufferBuilder#append时,把record序列化的数据copy到MemorySegment里去。 在发送侧,netty直接发送flink buffer的数据,不需要再次copy到netty buffer。
ByteBufAllocator(NettyBufferPool)
NettyBufferPool#NettyBufferPool控制PooledByteBufAllocator
BufferConsumer
生成NetworkBuffer,包含BufferBuilder写入数据。
在flink数据流动,我们提到在writeAndFlushNextMessageIfPossible中 NettyServer发送BufferResponse消息给下游。poll到有数据的reader,reader读数据的具体逻辑在next = reader.getNextBuffer();
。
最后的逻辑其实就是BufferConsumer build出来最终的NetworkBuffer。
public BufferAndAvailability getNextBuffer() throws IOException {
BufferAndBacklog next = subpartitionView.getNextBuffer();
···
return new BufferAndAvailability(
next.buffer(), nextDataType, next.buffersInBacklog(), next.getSequenceNumber());
···
}
public BufferAndBacklog getNextBuffer() {
return parent.pollBuffer();
}
BufferAndBacklog pollBuffer() {
···
while (!buffers.isEmpty()) {
BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength =
buffers.peek();
BufferConsumer bufferConsumer =
bufferConsumerWithPartialRecordLength.getBufferConsumer();
buffer = buildSliceBuffer(bufferConsumerWithPartialRecordLength);
···
return new BufferAndBacklog(
buffer,
getBuffersInBacklogUnsafe(),
isDataAvailableUnsafe() ? getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
sequenceNumber++);
}
}
Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
return buffer.build();
}
线程模型
两个线程
- Task运行线程。处理一条数据后最终通过BufferBuild写入Buffer。
netty EventLoop线程,处理PartitionRequest请求,availableReaders.poll(),通过ViewReader读buffer。
线程同步
ViewReader层
通过Netty fireUserEventTriggered事件通知的方式,最终在eventloop线程中向PartitionRequestQueue添加reader。
在eventloop线程中poll reader,读buffer写数据。
buffer层
PipelinedSubpartition有个buffers变量
final PrioritizedDeque<BufferConsumerWithPartialRecordLength> buffers = new PrioritizedDeque<>();
- Task运行线程写数据申请新的buffer(BufferConsumer的形式)都要add到buffers queue里去。PipelinedSubpartition#add(org.apache.flink.runtime.io.network.buffer.BufferConsumer, int, boolean)过程对buffers 加锁。
EventLoop线程通过reader执行PipelinedSubpartition#pollBuffer,也会对buffers加锁。
Segment层
这一层使用了无锁设计。核心点是用PositionMarker里volatile变量position,记录buffer当前写入数据的offset。
static class SettablePositionMarker implements PositionMarker {
private volatile int position = 0;
}
Task运行线程BufferBuilder#appendAndCommit,commit更新position。
- EventLoop线程在BufferConsumer#build一个buffer时,先BufferConsumer.CachedPositionMarker#update读取position的值,使只读buffer里position之前的数据。
Network Buffer在数据接收层的作用
接收过程
在Flink数据流动一节,回顾一下数据接收的流程:
- 下游task先向上游发送PartitionRequest
- 在CreditBasedPartitionRequestClientHandler保存subPartition对应的RemoteInputChannel
CreditBasedPartitionRequestClientHandler channelRead在读取到message时,获取message对应的RemoteInputChannel。
通过RemoteInputChannel#onBuffer把数据写入到RemoteInputChannel#receivedBuffers队列,并InputChannel#notifyPriorityEvent 通知SingleInputGate当前channel有数据,并存入SingleInputGate#inputChannelsWithData队列。消息格式
Decoder实现
在上面三步中,我们少了一步是把数据从netty buffer decode成Flink buffer。实现主要在FLINK-10742。
decoder包括从flink buffer申请内存的逻辑在NettyMessageClientDecoderDelegate。
现在实现并没有减少从netty buffer copy到flink buffer。具体在这个pr的这个位置。NettyMessageClientDecoderDelegate
frame结构| FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
负责解析frame header,根据msgId代理到具体decoder。 ```java public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof ByteBuf)) {ctx.fireChannelRead(msg);
return;
}
ByteBuf data = (ByteBuf) msg; try {
while (data.isReadable()) {
if (currentDecoder != null) {
//header解析得到了currentDecoder,把decode交给具体的Decoder
NettyMessageDecoder.DecodingResult result = currentDecoder.onChannelRead(data);
if (!result.isFinished()) {
break;
}
ctx.fireChannelRead(result.getMessage());
currentDecoder = null;
frameHeaderBuffer.clear();
}
//解析header,设置currentDecoder
decodeFrameHeader(data);
}
checkState(!data.isReadable(), "Not all data of the received buffer consumed.");
} finally {
data.release();
} }
<a name="rcxzz"></a>
#### BufferResponseDecoder
MessageHeader<br />//receiver ID (16), sequence number (4), backlog (4), dataType (1), isCompressed (1), buffer size (4)
1. 根据message header,创建BufferResponse对象。此时通过NetworkBufferAllocator向Flink申请buffer。
具体申请在inputChannel.requestBuffer()。
```java
//BufferResponseDecoder#onChannelRead
public DecodingResult onChannelRead(ByteBuf data) throws Exception {
if (bufferResponse == null) {
decodeMessageHeader(data);
}
···
}
static BufferResponse readFrom(
ByteBuf messageHeader, NetworkBufferAllocator bufferAllocator) {
InputChannelID receiverId = InputChannelID.fromByteBuf(messageHeader);
int sequenceNumber = messageHeader.readInt();
int backlog = messageHeader.readInt();
Buffer.DataType dataType = Buffer.DataType.values()[messageHeader.readByte()];
boolean isCompressed = messageHeader.readBoolean();
int size = messageHeader.readInt();
Buffer dataBuffer;
if (dataType.isBuffer()) {
dataBuffer = bufferAllocator.allocatePooledNetworkBuffer(receiverId);
} else {
dataBuffer = bufferAllocator.allocateUnPooledNetworkBuffer(size, dataType);
}
if (size == 0 && dataBuffer != null) {
// recycle the empty buffer directly, we must allocate a buffer for
// the empty data to release the credit already allocated for it
dataBuffer.recycleBuffer();
dataBuffer = null;
}
if (dataBuffer != null) {
dataBuffer.setCompressed(isCompressed);
}
return new BufferResponse(
dataBuffer, dataType, isCompressed, sequenceNumber, receiverId, backlog, size);
}
Buffer allocatePooledNetworkBuffer(InputChannelID receiverId) {
Buffer buffer = null;
RemoteInputChannel inputChannel = networkClientHandler.getInputChannel(receiverId);
// If the input channel has been released, we cannot allocate buffer and the received
// message
// will be discarded.
if (inputChannel != null) {
buffer = inputChannel.requestBuffer();
}
return buffer;
}
copy nettybuffer里的数据到flink buffer。
bufferResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode);
public DecodingResult onChannelRead(ByteBuf data) throws Exception {
if (bufferResponse == null) {
decodeMessageHeader(data);
}
if (bufferResponse != null) {
int remainingBufferSize = bufferResponse.bufferSize - decodedDataBufferSize;
int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize);
// For the case of data buffer really exists in BufferResponse now.
if (actualBytesToDecode > 0) {
// For the case of released input channel, the respective data buffer part would be
// discarded from the received buffer.
if (bufferResponse.getBuffer() == null) {
data.readerIndex(data.readerIndex() + actualBytesToDecode);
} else {
bufferResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode);
}
decodedDataBufferSize += actualBytesToDecode;
}
if (decodedDataBufferSize == bufferResponse.bufferSize) {
BufferResponse result = bufferResponse;
clearState();
return DecodingResult.fullMessage(result);
}
}
return DecodingResult.NOT_FINISHED;
}
Managed Memory
TODO