MemorySegment概述

MemorySegment是Flink管理的内存片段。该类是一个抽象类。它的实现既可以是堆内存,也可以是堆外内存,甚至是两者同时使用。使用MemorySegment这个类型管理内存,无需知道内存片段是堆内、堆外还是混合,一视同仁。
MemorySegment有如下两个实现类:

  • HeapMemorySegment:负责维护堆内内存。使用一个字节数组来存放内存数据。
  • HybridMemorySegment:是一种混合类型内存片段。可以使用堆内内存,可以使用堆外内存,或者是同时使用。

关于MemorySegment优化的一些讨论

Java虚拟机内有一个JIT(Just In Time)编译器。JIT是一种即时编译器能够将Java字节码编译为机器码对于提高程序执行速度有很大的帮助

但是JIT不会把所有Java字节码都编译为机器码。因为,大多数的代码只会运行一次,对这些代码即时编译为机器码再执行效率上不如直接在JVM运行字节码。运行编译过后的机器码比JVM解释执行Java字节码要快,但是把字节码编译为机器码在执行的过程就不见得比JVM解释执行字节码快。还有一个需要考虑的问题就是代码膨胀Java字节码编译为机器码之后占用存储会扩大10倍,甚至20倍以上,非常浪费存储空间。总的来说只运行一次或频率很低的代码,解释执行要比JIT编译执行更快。

对于一些经常执行的代码,例如循环中调用某个方法,或者是频繁调用某个处理逻辑,JIT编译这些代码就显得非常有必要。这种执行频率很高的代码称为热代码。在Java程序运行过程中,JIT会把这种热代码以method为单位,编译为机器码并保存起来。这样以后每次不必在解释执行字节码,直接调用保存的机器码就可以了。

将字节码编译为机器码有很多中优化方式。其中有一种是内联,即调用方法时,将方法内容部分的代码直接搬到方法调用的位置,可以避免依照指针查找方法代码的位置。这里会有一个问题,Java支持方法的重写,使用多态方式调用的时候,需要查找具体实现类对应逻辑。也就是说,如果一个父类具有多个子类,多个子类被加载到JVM的时候,JIT无法确定方法的具体实现,这样很不利于JIT的优化。

这时候大家会想到,如果只用其中一个子类,那么在程序运行的时候,调用哪个方法是确定的。JIT便可以采用内联和去虚化的方式,优化这些方法的调用。

HybridMemorySegment这个类的出现正是为了解决上述问题。HybridMemorySegment一种实现类既能够处理堆内内存又能够处理堆外内存。这样就不存在多种MemorySegment的子类,从而使JIT的性能优化效果最大化。

MemorySegment中的UNSAFE对象

MemorySegment里面有一个UNSAFE对象。Unsafe是sun.misc包内的一个类提供了native方式直接操作内存(分配内存,释放内存,复制内存,CAS操作,读写各种Java原生数据类型的数据)的方法。这些方法的执行效率非常高。但由于这些方法使用了类似C语言指针的方式操作内存,如果使用不当,很可能会造成不可预知的问题。因此这个类叫做Unsafe,意为用户去使用这个类是不安全的。

正常情况Unsafe是不允许用户使用的。查看一下它的构造函数:

  1. @CallerSensitive
  2. public static Unsafe getUnsafe() {
  3. Class<?> caller = Reflection.getCallerClass();
  4. if (!VM.isSystemDomainLoader(caller.getClassLoader()))
  5. throw new SecurityException("Unsafe");
  6. return theUnsafe;
  7. }

Unsafe使用的是单例模式。创建这个类实例的方法位于static代码块中。getUnsafe方法仅仅是取回这个实例。
但是这段代码中有一个条件判断,用来检查调用者的classloader是不是SystemDomainLoader,不清楚是什么意思。所以我们查看下VM.isSystemDomainLoader方法。代码如下所示:

  1. /**
  2. * Returns true if the given class loader is in the system domain
  3. * in which all permissions are granted.
  4. */
  5. public static boolean isSystemDomainLoader(ClassLoader loader) {
  6. return loader == null;
  7. }

看到这里就明白了,对于Bootstrap classloader加载的类而言,他们的classloader值为null。而Java的rt.jar是Bootstrap classloader加载的。因此我们可以确定,Unsafe这个类无法在rt.jar之外的class中使用。

那么问题来了,在Flink分配内存中Unsafe是怎么获取到的?我们看一下MemoryUtils的UNSAFE变量。该变量通过getUnsafe方法获取,代码如下所示:

  1. private static sun.misc.Unsafe getUnsafe() {
  2. try {
  3. // 利用反射
  4. Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
  5. unsafeField.setAccessible(true);
  6. return (sun.misc.Unsafe) unsafeField.get(null);
  7. } catch (SecurityException e) {
  8. throw new Error("Could not access the sun.misc.Unsafe handle, permission denied by security manager.", e);
  9. } catch (NoSuchFieldException e) {
  10. throw new Error("The static handle field in sun.misc.Unsafe was not found.", e);
  11. } catch (IllegalArgumentException e) {
  12. throw new Error("Bug: Illegal argument reflection access for static field.", e);
  13. } catch (IllegalAccessException e) {
  14. throw new Error("Access to sun.misc.Unsafe is forbidden by the runtime.", e);
  15. } catch (Throwable t) {
  16. throw new Error("Unclassified error while trying to access the sun.misc.Unsafe handle.", t);
  17. }
  18. }

这里通过一个取巧的方式:使用反射。Unsafe这个class内的单例对象theUnsafe可以通过反射getDeclaredField直接拿到然后设置这个field为可以访问(setAccessible(true))。这样可以绕过sun的限制,把Unsafe对象拿到rt.jar之外使用。

HybridMemorySegment


HybridMemorySegment既可以是堆内内存也可以是堆外内存
内部有几个重要的成员变量:

  • heapMemory:字节数组类型,代表堆内内存。该变量从MemorySegment继承而来。
  • offHeapBuffer:ByteBuffer类型,代表堆外内存。
  • address存放内存的起始地址。如果是堆内内存,值为BYTE_ARRAY_BASE_OFFSET,即UNSAFE.arrayBaseOffset(byte[].class)。如果是堆外内存,值为ByteBuffer的起始地址。
  • addressLimit:堆外内存的结束地址(address + size)

除了操作堆内内存的方法外,还有两个方法:

  • get(int offset, ByteBuffer target, int numBytes)
  • put(int offset, ByteBuffer source, int numBytes)

分别用于读取和写入堆外内存。

MemorySegmentFactory

负责创建出符合要求的MemorySegment。
注意:该工厂类创建出来的MemorySegment都是HybridMemorySegment类型,便于JVM使用JIT优化,从而提高性能。
MemorySegmentFactory提供了如下方法:

  • wrap:包装字节数组为HybridMemorySegment。内存位于堆内。由于此Factory返回的都是HybridMemorySegment类型,后面统一使用MemorySegment称呼也不会引起混淆。
  • allocateUnpooledSegment:分配指定字节数的内存,位于堆内
  • allocateUnpooledOffHeapMemory:分配指定字节数的内存,位于堆外。返回MemorySegment包装ByteBuffer.allocateDirect()方式分配的堆外内存。
  • allocateOffHeapUnsafeMemory:使用sun.misc.Unsafe的allocateMemory方法分配堆外内存。注意,使用这种方式分配的内存不受-XX:MaxDirectMemorySize这个JVM参数的限制。
  • wrapOffHeapMemory:包装ByteBuffer类型堆外内存为MemorySegment类型。

SpanningRecordSerializer

SpanningRecordSerializer负责将数据流中的元素序列化调用copyToBufferBuilder,将已经序列化的数据复制到buffer中。

SpanningRecordSerializer有两个重要的方法:

  • serializeRecord:将一个元素序列化,存入到内部serializationBuffer中
  • copyToBufferBuilder:将serializationBuffer中的数据存入BufferBuilder中。BufferBuilder是MemorySegment的一个封装形式,放在后面分析。

SpanningRecordSerializer内部持有一个DataOutputSerializer类型的对象serializationBuffer。
DataOutputSerializer内部有一个字节数组缓存空间,用来存放当前正在进行序列化操作的数据。
SpanningRecordSerializer构造方法中创建了一个DataOutputSerializer,初始缓存大小是128字节。

DataOutputSerializer提供了大量的write重载方法,用于序列化各种类型的数据。在序列化数据的时候,初始的缓存容量有可能不够使用,此时自动调用resize方法扩大缓冲区。DataOutputSerializer还提供了一个pruneBuffer方法,将扩容过的当前缓冲区还原回初始大小。