Java Netty

前言

数据序列化存储,或者数据通过网络传输时,会遇到不可避免将数据转成字节数组的场景。字节数组的读写不会太难,但又有点繁琐,为了避免重复造轮子,jdk推出了ByteBuffer来帮助操作字节数组;而Netty是一款当前流行的java网络IO框架,它内部定义了一个ByteBuf来管理字节数组,和ByteBuffer大同小异

  • ByteBuffer
  • 零拷贝之MappedByteBuffer
  • DirectByteBuffer堆外内存回收机制
  • netty之ByteBuf

    Buffer结构

    1. public abstract class Buffer {
    2. //关系: mark <= position <= limit <= capacity
    3. private int mark = -1;
    4. private int position = 0;
    5. private int limit;
    6. private int capacity;
    7. long address; // Used only by direct buffers,直接内存的地址
    8. }
  • mark:调用mark()方法的话,mark值将存储当前position的值,等下次调用reset()方法时,会设定position的值为之前的标记值

  • position:是下一个要被读写的byte元素的下标索引
  • limit:是缓冲区中第一个不能读写的元素的数组下标索引,也可以认为是缓冲区中实际元素的数量
  • capacity:是缓冲区能够容纳元素的最大数量,这个值在缓冲区创建时被设定,而且不能够改变

    Buffer.API

    1. Buffer(int mark, int pos, int lim, int cap)
    2. //Buffer创建时设置的最大数组容量值
    3. public final int capacity()
    4. //当前指针的位置
    5. public final int position()
    6. //限制可读写大小
    7. public final Buffer limit(int newLimit)
    8. //标记当前position的位置
    9. public final Buffer mark()
    10. //配合mark使用,position成之前mark()标志的位置。先前没调用mark则报错
    11. public final Buffer reset()
    12. //写->读模式翻转,单向的
    13. //position变成了初值位置0,而limit变成了写模式下position位置
    14. public final Buffer flip()
    15. //重置position指针位置为0,mark为-1;相对flip方法是limit不变
    16. public final Buffer rewind() //复位
    17. //和rewind一样,多出一步是limit会被设置成capacity
    18. public final Buffer clear()
    19. //返回剩余未读字节数
    20. public final int remaining()

    ByteBuffer结构

    1. public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>{
    2. final byte[] hb; //仅限堆内内存使用
    3. final int offset;
    4. boolean isReadOnly;
    5. }

    ByteBuffer.API

    1. //申请堆外内存
    2. public static ByteBuffer allocateDirect(int capacity)
    3. //申请堆内内存
    4. public static ByteBuffer allocate(int capacity)
    5. //原始字节包装成ByteBuffer
    6. public static ByteBuffer wrap(byte[] array, int offset, int length)
    7. //原始字节包装成ByteBuffer
    8. public static ByteBuffer wrap(byte[] array)
    9. //创建共享此缓冲区内容的新字节缓冲区
    10. public abstract ByteBuffer duplicate()
    11. //分片,创建一个新的字节缓冲区
    12. //新ByteBuffer的开始位置是此缓冲区的当前位置position
    13. public abstract ByteBuffer slice()
    14. //获取字节内容
    15. public abstract byte get()
    16. //从ByteBuffer偏移offset的位置,获取length长的字节数组,然后返回当前ByteBuffer对象
    17. public ByteBuffer get(byte[] dst, int offset, int length)
    18. //设置byte内存
    19. public abstract ByteBuffer put(byte b);
    20. //以offset为起始位置设置length长src的内容,并返回当前ByteBuffer对象
    21. public ByteBuffer put(byte[] src, int offset, int length长)
    22. //将没有读完的数据移到到缓冲区的初始位置,position设置为最后一没读字节数据的下个索引,limit重置为为capacity
    23. //读->写模式,相当于flip的反向操作
    24. public abstract ByteBuffer compact()
    25. //是否是直接内存
    26. public abstract boolean isDirect()
  • ByteBuffer bf = ByteBuffer.allocate(10);,创建大小为10的ByteBuffer对象

2021-05-27-17-25-35-180286.png

  • 写入数据

    1. ByteBuffer buf ByteBuffer.allocate(10);
    2. buf.put("csc".getBytes());

    2021-05-27-17-25-35-312934.png

  • 调用flip转换缓冲区为读模式; buf.flip();

2021-05-27-17-25-35-445577.png

  • 读取缓冲区中到内容:get(); System.out.println((char) buf.get());

2021-05-27-17-25-35-611133.png2021-05-27-17-25-35-754749.png

零拷贝之MappedByteBuffer

  • 共享内存映射文件,对应的ByteBuffer子操作类,MappedByteBuffer是基于mmap实现的。MappedByteBuffer需要FileChannel调用本地map函数映射。C++代码可以查阅下FileChannelImpl.c-Java_sun_nio_ch_FileChannelImpl_map0方法
  • 使用MappedByteBuffer和文件映射,其读写可以减少内存拷贝次数
    1. FileChannel readChannel = FileChannel.open(Paths.get("./cscw.txt"), StandardOpenOption.READ);
    2. MappedByteBuffer data = readChannel.map(FileChannel.MapMode.READ_ONLY, 0, 1024 * 1024 * 40);

    DirectByteBuffer堆外内存回收机制Cleaner

    下面看看直接内存的回收机制(java8);DirectByteBuffer内部存在一个Cleaner对象,并且委托内部类Deallocator对象进行内存回收
    1. class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer
    2. {
    3. //构造函数
    4. DirectByteBuffer(int cap) {
    5. .... //内存分配
    6. cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    7. ...
    8. }
    9. private static class Deallocator implements Runnable{
    10. ...
    11. public void run() {
    12. if (address == 0) {
    13. // Paranoia
    14. return;
    15. }
    16. unsafe.freeMemory(address); //回收内存
    17. address = 0;
    18. Bits.unreserveMemory(size, capacity);
    19. }
    20. }
    细看下Cleaner,继承于PhantomReference,并且在public void clean()方法会调用Deallocator进行清除操作
    1. public class Cleaner extends PhantomReference<Object> {
    2. //如果DirectByteBuffer对象被回收,相应的Cleaner会被放入dummyQueue队列
    3. private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue();
    4. //构造函数
    5. public static Cleaner create(Object var0, Runnable var1) {
    6. return var1 == null ? null : add(new Cleaner(var0, var1));
    7. }
    8. private Cleaner(Object var1, Runnable var2) {
    9. super(var1, dummyQueue);
    10. this.thunk = var2;
    11. }
    12. private final Runnable thunk;
    13. public void clean() {
    14. if (remove(this)) {
    15. try {
    16. this.thunk.run();
    17. } catch (final Throwable var2) {
    18. ....
    Reference内部存在一个守护线程,循环获取Reference,并判断是否Cleaner对象,如果是则调用其clean方法 ```java public abstract class Reference static {
    1. ThreadGroup tg = Thread.currentThread().getThreadGroup();
    2. for (ThreadGroup tgn = tg; tgn != null; g = tgn, tgn = tg.getParent());
    3. Thread handler = new ReferenceHandler(tg, "Reference Handler");
    4. ...
    5. handler.setDaemon(true);
    6. handler.start();
    7. ...
    } … //内部类调用 tryHandlePending private static class ReferenceHandler extends Thread {
    1. public void run() {
    2. while (true) {
    3. tryHandlePending(true);
    4. }
    5. }
    … static boolean tryHandlePending(boolean waitForNotify) {
    1. Cleaner c;
    2. .... //从链表获取对象被回收的引用
    3. // 判断Reference是否Cleaner,如果是则调用其clean方法
    4. if (c != null) {
    5. c.clean(); //调用Cleaner的clean方法
    6. return true;
    7. }
    8. ReferenceQueue<? super Object> q = r.queue;
    9. if (q != ReferenceQueue.NULL) q.enqueue(r);
    10. return true;
  1. <a name="ifzL3"></a>
  2. ## Netty之`ByteBuf`
  3. <a name="ky6Ls"></a>
  4. ### `ByteBuf`原理
  5. - `Bytebuf`通过两个位置指针来协助缓冲区的读写操作,分别是`readIndex`和`writerIndex`
  6. ```java
  7. * +-------------------+------------------+------------------+
  8. * | discardable bytes | readable bytes | writable bytes |
  9. * | | (CONTENT) | |
  10. * +-------------------+------------------+------------------+
  11. * | | | |
  12. * 0 <= readerIndex <= writerIndex <= capacity

ByteBuf.API

  1. //获取ByteBuf分配器
  2. public abstract ByteBufAllocator alloc()
  3. //丢弃可读字节
  4. public abstract ByteBuf discardReadBytes()
  5. //返回读指针
  6. public abstract int readerIndex()
  7. //设置读指针
  8. public abstract ByteBuf readerIndex(int readerIndex);
  9. //标志当前读指针位置,配合resetReaderIndex使用
  10. public abstract ByteBuf markReaderIndex()
  11. public abstract ByteBuf resetReaderIndex()
  12. //返回可读字节数
  13. public abstract int readableBytes()
  14. //返回写指针
  15. public abstract int writerIndex()
  16. //设置写指针
  17. public abstract ByteBuf writerIndex(int writerIndex);
  18. //标志当前写指针位置,配合resetWriterIndex使用
  19. public abstract ByteBuf markWriterIndex()
  20. public abstract ByteBuf resetWriterIndex()
  21. //返回可写字节数
  22. public abstract int writableBytes()
  23. public abstract ByteBuf clear();
  24. //设置读写指针
  25. public abstract ByteBuf setIndex(int readerIndex, int writerIndex)
  26. //指针跳过length
  27. public abstract ByteBuf skipBytes(int length)
  28. //以当前位置切分ByteBuf todo
  29. public abstract ByteBuf slice();
  30. //切割起始位置为index,长度为length的ByteBuf todo
  31. public abstract ByteBuf slice(int index, int length);
  32. //Returns a copy of this buffer's readable bytes. //复制ByteBuf todo
  33. public abstract ByteBuf copy()
  34. //是否可读
  35. public abstract boolean isReadable()
  36. //是否可写
  37. public abstract boolean isWritable()
  38. //字节编码顺序
  39. public abstract ByteOrder order()
  40. //是否在直接内存申请的ByteBuf
  41. public abstract boolean isDirect()
  42. //转为jdk.NIO的ByteBuffer类
  43. public abstract ByteBuffer nioBuffer()

使用示例

  1. public static void main(String[] args) {
  2. //分配大小为10的内存
  3. ByteBuf buf = Unpooled.buffer(10);
  4. //写入
  5. buf.writeBytes("csc".getBytes());
  6. //读取
  7. byte[] b = new byte[3];
  8. buf.readBytes(b);
  9. System.out.println(new String(b));
  10. System.out.println(buf.writerIndex());
  11. System.out.println(buf.readerIndex());
  12. }

----result----

  1. csc
  2. 3
  3. 3
  • ByteBuf初始化时,readIndexwriterIndex等于0,调用writeXXX()方法写入数据,writerIndex会增加(setXXX方法无作用);调用readXXX()方法读取数据,则会使readIndex增加(getXXX方法无作用),但不会超过writerIndex
  • 在读取数据之后,0-readIndex之间的byte数据被视为discard,调用discardReadBytes(),释放这部分空间,作用类似于ByteBuffercompact方法