Java Heap、Off-Heap

Java Advanced Runtime Options
java进程的内存分类:

  1. heap,堆内内存,受GC管理。参数有 Xmx Xms Xmn等。
  2. Off-Heap,堆外内存
    1. jvm运行需要的堆外内存
      如JVM Metaspace、JVM OverHead(thread stacks, code cache, garbage collection space等)
    2. 业务可以使用的堆外内存
      1. direct,指NIO的DirectByteBuffer
        内存申请ByteBuffer.allocateDirect(size)。
        -XX:MaxDirectMemorySize=size 限制大小。
      2. non-direct(unsafe)
        申请sun.misc.Unsafe#allocateMemory和sun.misc.Unsafe#allocateInstance
        不受-XX:MaxDirectMemorySize的限制。

        Flink内存结构

        简介

        flink文档对于内存结构的描述非常清晰。
        Flink 内存分析 - 图1
        Flink 内存分析 - 图2
        TE内存配置
Component Configuration options Description
Framework Heap Memory taskmanager.memory.framework.heap.size JVM Heap memory dedicated to Flink framework (advanced option)
Task Heap Memory taskmanager.memory.task.heap.size JVM Heap memory dedicated to Flink application to run operators and user code
Managed memory taskmanager.memory.managed.size

taskmanager.memory.managed.fraction | Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend | | Framework Off-heap Memory | taskmanager.memory.framework.off-heap.size | Off-heap direct (or native) memory
dedicated to Flink framework (advanced option) | | Task Off-heap Memory | taskmanager.memory.task.off-heap.size | Off-heap direct (or native) memory
dedicated to Flink application to run operators | | Network Memory | taskmanager.memory.network.min

taskmanager.memory.network.max

taskmanager.memory.network.fraction | Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), is a capped fractionated component
of the total Flink memory
. This memory is used for allocation of network buffers | | JVM metaspace | taskmanager.memory.jvm-metaspace.size | Metaspace size of the Flink JVM process | | JVM Overhead | taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.max

taskmanager.memory.jvm-overhead.fraction | Native memory reserved for other JVM overhead: e.g. thread stacks, code cache, garbage collection space etc, it is a capped fractionated component
of the total process memory |

MemorySegment

Flink内存管理的基本单元,底层存储结构分heap和off-heap。

  1. heap:byte数组
  2. off-heap:direct(用于network)和unsafe(用于managed memory)

    MemorySegmentFactory

    所有的MemorSegment都是通过MemorySegmentFactory创建的。
    image.png

Network Off-Heap

Network缓冲区底层使用的是DirectByteBuffer,并且保证了永远不会超出Network Memory大小限制。

NetworkBufferPool

  1. TM级别的全局固定大小的pool,代表了TM整个Network buffer。
    其大小networkMemSize=totalNumberOfMemorySegments*memorySegmentSize,
    在TM启动初始化shuffle(NettyShuffleEnvironment)的时候创建的。
  2. 其主要两个作用

    1. 创建每个task的LocalBufferPool。createBufferPool方法
    2. 为每个LocalBufferPool提供Segment。requestPooledMemorySegment/requestPooledMemorySegmentsBlocking

      LocalBufferPool

      LocalBufferPool在Task读和写的过程中提供Segment。
  3. 写:ResultPartition级别pool
    Task new时创建ResultPartitionWriter,每个ResultPartitionWriter持有一个LocalBufferPool,在setup时创建。

  4. 读:InputGate级别pool。
    Task new时创建InputGate,每个InputGate持有一个LocalBufferPool,在setup时创建。

    Network Buffer在数据发送层的作用

    image.png

BufferBuilder

  1. 两个成员变量,
    1. memorySegment,LocalBufferPool#toBufferBuilder申请到后,传入的。
    2. buffer,this.buffer = new NetworkBuffer(memorySegment, recycler);封装了segment
  2. 作用
    1. 用来向Segment写入内容,append方法。
    2. 创建BufferConsumer。

flink数据流动中,我们提到ResultPartitionWriter emitRecord方法从数据流动的角度讲主要是向BufferBuild append record。
从内存的角度,emitRecord会去LocalBufferPool申请BufferBuild,如果buffer不足,则会block。

  1. @Override
  2. public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
  3. totalWrittenBytes += record.remaining();
  4. //append record to subPartition
  5. BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
  6. while (record.hasRemaining()) {
  7. // full buffer, partial record
  8. finishUnicastBufferBuilder(targetSubpartition);
  9. buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
  10. }
  11. if (buffer.isFull()) {
  12. // full buffer, full record
  13. finishUnicastBufferBuilder(targetSubpartition);
  14. }
  15. // partial buffer, full record
  16. }
  17. private BufferBuilder appendUnicastDataForNewRecord(
  18. final ByteBuffer record, final int targetSubpartition) throws IOException {
  19. if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) {
  20. throw new ArrayIndexOutOfBoundsException(targetSubpartition);
  21. }
  22. BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
  23. if (buffer == null) {
  24. //如果subPartition对应的BufferBuilder不存在,申请新的
  25. buffer = requestNewUnicastBufferBuilder(targetSubpartition);
  26. //新申请的BufferBuilder添加到subpartition中的BufferConsumer队列buffers,
  27. //以使读取时能读到
  28. addToSubpartition(buffer, targetSubpartition, 0, record.remaining());
  29. }
  30. //具体往buffer写数据
  31. buffer.appendAndCommit(record);
  32. return buffer;
  33. }
  34. private BufferBuilder requestNewUnicastBufferBuilder(int targetSubpartition)
  35. throws IOException {
  36. checkInProduceState();
  37. ensureUnicastMode();
  38. final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
  39. unicastBufferBuilders[targetSubpartition] = bufferBuilder;
  40. return bufferBuilder;
  41. }
  42. private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
  43. throws IOException {
  44. //LocalBufferPool去申请buffer
  45. BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
  46. if (bufferBuilder != null) {
  47. return bufferBuilder;
  48. }
  49. hardBackPressuredTimeMsPerSecond.markStart();
  50. try {
  51. //如果申请buffer的为null则阻塞直到申请成功。
  52. bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
  53. hardBackPressuredTimeMsPerSecond.markEnd();
  54. return bufferBuilder;
  55. } catch (InterruptedException e) {
  56. throw new IOException("Interrupted while waiting for buffer");
  57. }
  58. }

NetworkBuffer

作用

NetworkBuffer继承自AbstractReferenceCountedByteBuf。
是在FLINK-7315引入的,避免了传递给netty ByteBuf时不必要的内存拷贝。
封装了MemorySegment,MemorySegment底层是ByteBuffer。
NetworkBuffer会作为BufferResponse的成员变量,在BufferResponse序列化时直接write NetworkBuffer,不用重新申请netty的ByteBuf,减少了内存拷贝。

整个数据发送的过程,在用户态只会在BufferBuilder#append时,把record序列化的数据copy到MemorySegment里去。 在发送侧,netty直接发送flink buffer的数据,不需要再次copy到netty buffer。

ByteBufAllocator(NettyBufferPool)

NettyBufferPool#NettyBufferPool控制PooledByteBufAllocator

BufferConsumer

生成NetworkBuffer,包含BufferBuilder写入数据。
flink数据流动,我们提到在writeAndFlushNextMessageIfPossible中 NettyServer发送BufferResponse消息给下游。poll到有数据的reader,reader读数据的具体逻辑在next = reader.getNextBuffer();
最后的逻辑其实就是BufferConsumer build出来最终的NetworkBuffer。

  1. public BufferAndAvailability getNextBuffer() throws IOException {
  2. BufferAndBacklog next = subpartitionView.getNextBuffer();
  3. ···
  4. return new BufferAndAvailability(
  5. next.buffer(), nextDataType, next.buffersInBacklog(), next.getSequenceNumber());
  6. ···
  7. }
  8. public BufferAndBacklog getNextBuffer() {
  9. return parent.pollBuffer();
  10. }
  11. BufferAndBacklog pollBuffer() {
  12. ···
  13. while (!buffers.isEmpty()) {
  14. BufferConsumerWithPartialRecordLength bufferConsumerWithPartialRecordLength =
  15. buffers.peek();
  16. BufferConsumer bufferConsumer =
  17. bufferConsumerWithPartialRecordLength.getBufferConsumer();
  18. buffer = buildSliceBuffer(bufferConsumerWithPartialRecordLength);
  19. ···
  20. return new BufferAndBacklog(
  21. buffer,
  22. getBuffersInBacklogUnsafe(),
  23. isDataAvailableUnsafe() ? getNextBufferTypeUnsafe() : Buffer.DataType.NONE,
  24. sequenceNumber++);
  25. }
  26. }
  27. Buffer buildSliceBuffer(BufferConsumerWithPartialRecordLength buffer) {
  28. return buffer.build();
  29. }

线程模型

两个线程

  1. Task运行线程。处理一条数据后最终通过BufferBuild写入Buffer。
  2. netty EventLoop线程,处理PartitionRequest请求,availableReaders.poll(),通过ViewReader读buffer。

    线程同步

    ViewReader层
  3. 通过Netty fireUserEventTriggered事件通知的方式,最终在eventloop线程中向PartitionRequestQueue添加reader。

  4. 在eventloop线程中poll reader,读buffer写数据。

    buffer层
  5. PipelinedSubpartition有个buffers变量final PrioritizedDeque<BufferConsumerWithPartialRecordLength> buffers = new PrioritizedDeque<>();

  6. Task运行线程写数据申请新的buffer(BufferConsumer的形式)都要add到buffers queue里去。PipelinedSubpartition#add(org.apache.flink.runtime.io.network.buffer.BufferConsumer, int, boolean)过程对buffers 加锁。
  7. EventLoop线程通过reader执行PipelinedSubpartition#pollBuffer,也会对buffers加锁。

    Segment层

    这一层使用了无锁设计。核心点是用PositionMarker里volatile变量position,记录buffer当前写入数据的offset。

    1. static class SettablePositionMarker implements PositionMarker {
    2. private volatile int position = 0;
    3. }
  8. Task运行线程BufferBuilder#appendAndCommit,commit更新position。

  9. EventLoop线程在BufferConsumer#build一个buffer时,先BufferConsumer.CachedPositionMarker#update读取position的值,使只读buffer里position之前的数据。

Network Buffer在数据接收层的作用

接收过程

image.png
Flink数据流动一节,回顾一下数据接收的流程:

  1. 下游task先向上游发送PartitionRequest
  2. 在CreditBasedPartitionRequestClientHandler保存subPartition对应的RemoteInputChannel
  3. CreditBasedPartitionRequestClientHandler channelRead在读取到message时,获取message对应的RemoteInputChannel。
    通过RemoteInputChannel#onBuffer把数据写入到RemoteInputChannel#receivedBuffers队列,并InputChannel#notifyPriorityEvent 通知SingleInputGate当前channel有数据,并存入SingleInputGate#inputChannelsWithData队列。

    消息格式

    image.png

    Decoder实现

    在上面三步中,我们少了一步是把数据从netty buffer decode成Flink buffer。实现主要在FLINK-10742
    decoder包括从flink buffer申请内存的逻辑在NettyMessageClientDecoderDelegate。
    现在实现并没有减少从netty buffer copy到flink buffer。具体在这个pr的这个位置。
    image.png

    NettyMessageClientDecoderDelegate

    frame结构| FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) |
    负责解析frame header,根据msgId代理到具体decoder。 ```java public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof ByteBuf)) {

    1. ctx.fireChannelRead(msg);
    2. return;

    }

    ByteBuf data = (ByteBuf) msg; try {

    1. while (data.isReadable()) {
    2. if (currentDecoder != null) {
    3. //header解析得到了currentDecoder,把decode交给具体的Decoder
    4. NettyMessageDecoder.DecodingResult result = currentDecoder.onChannelRead(data);
    5. if (!result.isFinished()) {
    6. break;
    7. }
    8. ctx.fireChannelRead(result.getMessage());
    9. currentDecoder = null;
    10. frameHeaderBuffer.clear();
    11. }
    12. //解析header,设置currentDecoder
    13. decodeFrameHeader(data);
    14. }
    15. checkState(!data.isReadable(), "Not all data of the received buffer consumed.");

    } finally {

    1. data.release();

    } }

  1. <a name="rcxzz"></a>
  2. #### BufferResponseDecoder
  3. MessageHeader<br />//receiver ID (16), sequence number (4), backlog (4), dataType (1), isCompressed (1), buffer size (4)
  4. 1. 根据message header,创建BufferResponse对象。此时通过NetworkBufferAllocator向Flink申请buffer。
  5. 具体申请在inputChannel.requestBuffer()。
  6. ```java
  7. //BufferResponseDecoder#onChannelRead
  8. public DecodingResult onChannelRead(ByteBuf data) throws Exception {
  9. if (bufferResponse == null) {
  10. decodeMessageHeader(data);
  11. }
  12. ···
  13. }
  14. static BufferResponse readFrom(
  15. ByteBuf messageHeader, NetworkBufferAllocator bufferAllocator) {
  16. InputChannelID receiverId = InputChannelID.fromByteBuf(messageHeader);
  17. int sequenceNumber = messageHeader.readInt();
  18. int backlog = messageHeader.readInt();
  19. Buffer.DataType dataType = Buffer.DataType.values()[messageHeader.readByte()];
  20. boolean isCompressed = messageHeader.readBoolean();
  21. int size = messageHeader.readInt();
  22. Buffer dataBuffer;
  23. if (dataType.isBuffer()) {
  24. dataBuffer = bufferAllocator.allocatePooledNetworkBuffer(receiverId);
  25. } else {
  26. dataBuffer = bufferAllocator.allocateUnPooledNetworkBuffer(size, dataType);
  27. }
  28. if (size == 0 && dataBuffer != null) {
  29. // recycle the empty buffer directly, we must allocate a buffer for
  30. // the empty data to release the credit already allocated for it
  31. dataBuffer.recycleBuffer();
  32. dataBuffer = null;
  33. }
  34. if (dataBuffer != null) {
  35. dataBuffer.setCompressed(isCompressed);
  36. }
  37. return new BufferResponse(
  38. dataBuffer, dataType, isCompressed, sequenceNumber, receiverId, backlog, size);
  39. }
  40. Buffer allocatePooledNetworkBuffer(InputChannelID receiverId) {
  41. Buffer buffer = null;
  42. RemoteInputChannel inputChannel = networkClientHandler.getInputChannel(receiverId);
  43. // If the input channel has been released, we cannot allocate buffer and the received
  44. // message
  45. // will be discarded.
  46. if (inputChannel != null) {
  47. buffer = inputChannel.requestBuffer();
  48. }
  49. return buffer;
  50. }
  1. copy nettybuffer里的数据到flink buffer。bufferResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode);

    1. public DecodingResult onChannelRead(ByteBuf data) throws Exception {
    2. if (bufferResponse == null) {
    3. decodeMessageHeader(data);
    4. }
    5. if (bufferResponse != null) {
    6. int remainingBufferSize = bufferResponse.bufferSize - decodedDataBufferSize;
    7. int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize);
    8. // For the case of data buffer really exists in BufferResponse now.
    9. if (actualBytesToDecode > 0) {
    10. // For the case of released input channel, the respective data buffer part would be
    11. // discarded from the received buffer.
    12. if (bufferResponse.getBuffer() == null) {
    13. data.readerIndex(data.readerIndex() + actualBytesToDecode);
    14. } else {
    15. bufferResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode);
    16. }
    17. decodedDataBufferSize += actualBytesToDecode;
    18. }
    19. if (decodedDataBufferSize == bufferResponse.bufferSize) {
    20. BufferResponse result = bufferResponse;
    21. clearState();
    22. return DecodingResult.fullMessage(result);
    23. }
    24. }
    25. return DecodingResult.NOT_FINISHED;
    26. }

Managed Memory

TODO