学习Netty一般都是使用其中的NIO,因此就必须要了解JDK NIO的一些知识,包括 SocketChannel,ServerSocketChannel,Buffer,

Selector, SelectionKey等等,只有对NIO有足够了解,学习Netty起来才会事半功倍。在本文中,会结合源码对JDK的NIO实现进行解析, 源码中加入了我个人的理解的注释,不足之处希望能在评论中不吝指出。

下面是一段JDK的NIO的服务端代码:

  1. /**
  2. * 代码片段1
  3. * @author zhangc
  4. * @version 1.0
  5. * @date 2019/12/2
  6. */
  7. public class NioServer {
  8. public static void main(String[] args) throws IOException {
  9. Selector selector1 = Selector.open();
  10. Selector selector2 = Selector.open();
  11. new Thread(() -> {
  12. try {
  13. // 启动服务端
  14. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  15. //绑定1214端口
  16. serverSocketChannel.socket().bind(new InetSocketAddress(1214));
  17. //设置为非阻塞模式
  18. serverSocketChannel.configureBlocking(false);
  19. //注册通道serverSocketChannel的OP_ACCEPT(服务端建立连接就绪)事件到serverSelector上
  20. serverSocketChannel.register(selector1, SelectionKey.OP_ACCEPT);
  21. while (true) {
  22. // 监测1毫秒内是否有新的连接
  23. if (selector1.select(1) > 0) {
  24. //获取感兴趣的事件
  25. Set<SelectionKey> set = selector1.selectedKeys();
  26. Iterator<SelectionKey> keyIterator = set.iterator();
  27. while (keyIterator.hasNext()) {
  28. SelectionKey key = keyIterator.next();
  29. if (key.isAcceptable()) {
  30. try {
  31. // 获取此次OP_ACCEPT事件建的通道
  32. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
  33. // 设置非阻塞模型
  34. clientChannel.configureBlocking(false);
  35. // 给这个通道绑定OP_READ(读)事件
  36. clientChannel.register(selector2, SelectionKey.OP_READ);
  37. } finally {
  38. keyIterator.remove();
  39. }
  40. }
  41. }
  42. }
  43. }
  44. } catch (IOException e) {
  45. }
  46. }).start();
  47. new Thread(() -> {
  48. try {
  49. while (true) {
  50. // 监测1毫秒内是否有新的可读数据
  51. if (selector2.select(1) > 0) {
  52. //获取感兴趣的事件
  53. Set<SelectionKey> set = selector2.selectedKeys();
  54. Iterator<SelectionKey> keyIterator = set.iterator();
  55. while (keyIterator.hasNext()) {
  56. SelectionKey key = keyIterator.next();
  57. if (key.isReadable()) {
  58. try {
  59. //获取此次读事件的通道
  60. SocketChannel clientChannel = (SocketChannel) key.channel();
  61. //批量读取数据
  62. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  63. clientChannel.read(byteBuffer);
  64. //byteBuffer转变为
  65. byteBuffer.flip();
  66. System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
  67. .toString());
  68. } finally {
  69. keyIterator.remove();
  70. }
  71. }
  72. }
  73. }
  74. }
  75. } catch (IOException e) {
  76. }
  77. }).start();
  78. }
  79. }

Selector是多路复用器(实现可以是select,poll,epoll),相当于一个listener(个人看法),每一个Selector,都需要一个独立的线程去处理。
一个Selector中可以绑定多个SocketChannel/ServerSocketChannel,当绑定的SocketChannel/ServerSocketChannel
中发生Selector感兴趣的事件(SelectionKey)时,Selector中的publicSelectedKeys(Set)中会增加一个
SelectionKey(这次的事件)对象。SelectionKey有四种类型,分别是OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE,
其中ServerSocketChannel的有效事件(validOps)为OP_ACCEPT,其余三种为SocketChannel的有效事件。
上面的代码一个线程中在selector1上给ServerSocketChannel注册了OP_ACCEPT事件,selector1只负责监听新的连接,
每当有新的连接建立时,selector1会新建一个此次连接的TCP通道——SocketChannel,然后在selector2上为这个
SocketChannel 注册OP_READ事件。在另一个线程中,selector2中绑定的SocketChannel每当有OP_READ事件时,
读取通道中的数据并打印在控制台上。这就是主从reactor多线程模式

关于SelectionKey

SelectionKey是一个抽象类,它有且仅有1个子类AbstractSelectionKey,AbstractSelectionKey有且仅有一个子类是
SelectionKeyImpl。SelectionKey定义了4个常量:

  1. /**
  2. * 代码片段2
  3. */
  4. public static final int OP_READ = 1 << 0;
  5. public static final int OP_WRITE = 1 << 2;
  6. public static final int OP_CONNECT = 1 << 3;
  7. public static final int OP_ACCEPT = 1 << 4;

只看2进制的后5位(前27位为0),OP_READ为00001,OP_WRITE为00100,OP_CONNECT为01000,OP_ACCEPT为10000,
也就是分别是右边第1,3,4,5位为1,其余位为0。基于此,SocketChannel和ServerSocketChannel中获取有效事件
的方法是这样的:

  1. /**
  2. * 代码片段3
  3. * NIO代码示例
  4. */
  5. /*
  6. * SocketChannel获取有效事件 返回的二进制前27位为0,后5位为01101
  7. */
  8. public final int validOps() {
  9. return (SelectionKey.OP_READ
  10. | SelectionKey.OP_WRITE
  11. | SelectionKey.OP_CONNECT);
  12. }
  13. /*
  14. * ServerSocketChannel获取有效事件 返回的二进制前27位为0,后5位为10000
  15. */
  16. public final int validOps() {
  17. return SelectionKey.OP_ACCEPT;
  18. }

下面我们看serverSocketChannel.register(selector1, SelectionKey.OP_ACCEPT)这个方法的实现:

  1. /**
  2. * 代码片段4
  3. * AbstractSelectableChannel代码(ServerSocketChanner和SocketChannel的父类)代码
  4. */
  5. public final SelectionKey register(Selector sel, int ops)
  6. throws ClosedChannelException
  7. {
  8. return register(sel, ops, null);
  9. }
  10. public final SelectionKey register(Selector sel, int ops, Object att)
  11. throws ClosedChannelException
  12. {
  13. synchronized (regLock) {
  14. if (!isOpen())
  15. throw new ClosedChannelException();
  16. //判断注册的事件是否有效 validOps()获取int类型表示的有效事件v,
  17. //v的32位2进制数为1的位,ops对应的位可以1,v为0的位ops对应的位只能为0,否则就无效
  18. if ((ops & ~validOps()) != 0)
  19. throw new IllegalArgumentException();
  20. if (blocking)
  21. throw new IllegalBlockingModeException();
  22. //看这个channel是否已经绑定了这个Selector,如果已经绑定了这个Selector就重新设置这个Selector的interestOps
  23. //如果没有绑定该Selector就会将其绑定
  24. SelectionKey k = findKey(sel);
  25. if (k != null) {
  26. k.interestOps(ops);
  27. k.attach(att);
  28. }
  29. if (k == null) {
  30. // New registration
  31. synchronized (keyLock) {
  32. if (!isOpen())
  33. throw new ClosedChannelException();
  34. k = ((AbstractSelector)sel).register(this, ops, att);
  35. addKey(k);
  36. }
  37. }
  38. return k;
  39. }
  40. }

SelectionKey中维护了对应的Selector,在Channel中维护了一个SelectionKey的数组,
在将Selector上为Channel注册事件时,如果Channel中没有绑定对应的Selector,
那么就会新建一个SelectionKey(即上面代码片段4中的k = ((AbstractSelector)sel).register(this, ops, att);),
具体代码如下:

  1. /**
  2. * 代码片段5
  3. * AbstractSelectableChannel
  4. */
  5. protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
  6. if (!(var1 instanceof SelChImpl)) {
  7. throw new IllegalSelectorException();
  8. } else {
  9. //这个this就是当前的Selector,将Selector作为构造参数构造一个SelectionKey对象
  10. SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
  11. var4.attach(var3);
  12. Set var5 = this.publicKeys;
  13. //将构造的SelectionKey对象也绑定到Selector中
  14. synchronized(this.publicKeys) {
  15. this.implRegister(var4);
  16. }
  17. //设置SelectionKey的感兴趣事件
  18. var4.interestOps(var2);
  19. return var4;
  20. }
  21. }

构造完成后,会将这个SelectionKey放在Channel的SelectionKey类型数组中,而在Channel中判断是否绑定事件,
就是判断这个数组中每一个SelectionKey中维护的Selector是否是这个Selector。这样我们大致的了解了为Channel在Selector
上注册事件的流程:

  • 首先判断是否是有效事件,不同的Channnel中维护的有效事件不同,ServerSocketChannel中的有效事件为OP_ACCEPT,SocketChannel中的有效事件为OP_CONNECT、OP_READ、OP_WRITE
  • 然后在Channel中的SelectionKey数组中判断这些SelectionKey中维护的Selector是否包含当前的Selector,如果不包含就用这个Selector构造一个SelectionKey并放入数组
  • 如果包含这个Selector就将对应的SelectionKey的感兴趣事件修改

注册多个感兴趣的事件只需要将多个事件对应的int进行或操作即可:

  1. /**
  2. * 代码片段6
  3. * 注册多个事件
  4. */
  5. channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);

对SelectionKey注册多个感兴趣的时间也一样,如果要增加SelectionKey的感兴趣的事件,也只需进行或操作:

  1. /**
  2. * 代码片段7
  3. * 增加SelectionKey的感兴趣的事件
  4. */
  5. key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

关于Buffer

在Buffer类的文档注释中是这样描述的:一个特定的基本类型的数据容器,一个buffer(缓冲区,即创建的Buffer对象)是一个线性的、
有限的特定基本类型的元素序列,除了它的内容,它本质的属性是它的capacity,limit和position。capacity:buffer包含的元素数量,
不为负数且不可改变;limit:第一个不能被读或写的元素的索引,不为负数且不能大于capacity;position:下一个可以被读或写的
元素的索引,不为负数且不大于limit。对于每一个非boolean类型的基本类型,jdk都提供了一个Buffer的子类。

数据传输

Buffer的每个子类都定义了2种get和put操作:

  • 相对操作——从当前position开始读或者写一个或多个元素,并根据传输的元素增加position的值,如果需要传输的数据超过了limit,
    相对get操作会抛出BufferUnderflowException异常,相对put操作会抛出BufferOverflowException,在这两种情况下,数据不会被传输
  • 绝对操作——使用一个明确的元素索引,并不需要改变position,如果这个索引超出了limit,绝对操作会抛出IndexOutOfBoundsException

关于Buffer类为什么不直接定义get和put方法,我认为原因应该是:java的泛型不支持基本数据类型,如果要定义泛型的get和put方法就要转化成
包装类,而在IO操作中频繁地进行拆箱装箱会影响性能。因此,jdk为除了boolean之外的每个基本数据类型定义了Buffer的子类,在这些
子抽象类中定义了每种数据类型的get和put方法。

mark和reset

Buffer中的mark是当reset方法被执行时position被重置的索引,mark值并不总是被定义(没有定义的话,初始值为-1),但当mark被定义了,
它不为负数且不大于position。如果定义了mark,当position被调整到小于mark时,mark被丢弃(置为-1)。如果mark没有被定义时执行
reset方法,会抛出InvalidMarkException。

不变性

mark, position, limit和capacity的值存在以下不变性:

mark <= position <= limit <= capacity

新创建的buffer的position总是0,且mark是未定义的。初始化的limit值可能为0,也可能为其他值,取决于buffer的类型和它构造的方式,
新分配的buffer的每个元素都初始化为零。

clear,flip和rewind

除了获取position,limit和capacity的方法、获取mark的方法和reset方法,Buffer类还定义了以下基于缓冲区的操作:

  • clear——让buffer为一个新的通道读取序列或相对put操作准备就绪,设置limit为capacity、position为0;
  • flip——让buffer一个新的通道写入序列或者相对get操作准备就绪,设置limit为当前的position、position为0;
  • rewind——让buffer为重新读取它已经包含的数据准备就绪,设置limit不变,position为0。

只读的buffer

所有buffer都是可读的,但不是所有buffer都是可写的。修改buffer的方法对于每个Buffer的子类来说是可选的,当在只读的buffer上调用这些
修改buffer的方法时,会抛出ReadOnlyBufferException。只读的buffer不允许它的数据内容被改变,但它的mark,position和limit是可变的。
一个buffer是不是只读buffer取决于它的isReadOnly方法。

线程安全性

buffer不是多线程安全的。如果一个buffer用来被超过一个线程获取,那么它应该被合适的同步操作进行控制。

HeapByteBuffer和DirectByteBuffer

HeapByteBuffer和DirectByteBuffer都是MappedByteBuffer的子类,MappedByteBuffer继承了ByteBuffer类,顾名思义,HeapByteBuffer
和DirectByteBuffer都是字节缓存区,其他基本类型也都对应有HeapXXXBuffer和DirectXXXBuffer类。以HeapByteBuffer和DirectByteBuffer
为例,它们的区别如下:

  • HeapByteBuffer是分配在JVM堆(Heap)内存上的,遵循JVM的内存管理机制(GC也由JVM负责)。HeapByteBuffer中维护了一个final修饰的
    byte数组hb,来存储缓冲区的内容。
  • DirectByteBuffer是从堆外申请的内存(直接内存),这个内存大小不受-Xmx最大堆内存参数的限制。相比HeapByteBuffer,DirectByteBuffer
    减少了数据拷贝的次数,但创建和释放的代价更高。DirectByteBuffer中维护了一个名为cleaner的虚引用跟踪堆外内存以及一个long类型的address
    字段存储内存地址,当DirectByteBuffer将要被GC时,cleaner这个指向直接内存的虚引用会被放入引用队列,在cleaner指向的内存被回首之前,
    该DirectByteBuffer不会被彻底销毁。(此处建议了解DMA、java的虚引用、操作系统的用户态和内核态、Zero-Copy零拷贝的知识)

抽象类ByteBuffer提供了静态方法allocate(int capacity)和allocateDirect(int capacity)分别用来创建HeapByteBuffer和DirectByteBuffer
对象。在这个两个方法中分别调用HeapByteBuffer和DirectByteBuffer的构造方法并返回。HeapByteBuffer的构造方法比较简单,只是对对象的
mark,hb,position,limit,capacity等属性进行初始化;而DirectByteBuffer则不太一样,如下所示:

  1. /**
  2. * 代码片段8
  3. * DirectByteBuffer构造函数
  4. */
  5. // Primary constructor
  6. //
  7. DirectByteBuffer(int cap) { // package-private
  8. super(-1, 0, cap, cap);
  9. //内存是否按页分配对齐
  10. boolean pa = VM.isDirectMemoryPageAligned();
  11. //每页的大小
  12. int ps = Bits.pageSize();
  13. //分配内存的大小
  14. long size = Math.max(1L, (long)cap + (pa ? ps : 0));
  15. //将分配大小和容量大小加到Bits中的totalCapacity(总容量)和reservedMemory(总大小)两个属性(AtomicLong类型)中
  16. Bits.reserveMemory(size, cap);
  17. long base = 0;
  18. try {
  19. //在堆外分配内存
  20. base = unsafe.allocateMemory(size);
  21. } catch (OutOfMemoryError x) {
  22. //内存不够再减去
  23. Bits.unreserveMemory(size, cap);
  24. throw x;
  25. }
  26. unsafe.setMemory(base, size, (byte) 0);
  27. //计算堆外内存的首地址
  28. if (pa && (base % ps != 0)) {
  29. // Round up to page boundary
  30. address = base + ps - (base & (ps - 1));
  31. } else {
  32. address = base;
  33. }
  34. //创建cleaner虚引用绑定这个buffer,cleaner回收时会执行clean方法,clean方法会调用Deallocator的run方法
  35. cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
  36. att = null;
  37. }

可以看到,DirectByteBuffer的创建需要使用unsafe去堆外申请内存,然后将地址保存下来,并通过一个Cleanner的虚引用保存该内存地址的首部
和长度。

下面来看HeapByteBuffer和DirectByteBuffer的get、put方法,首先是HeapByteBuffer的get方法:

  1. /**
  2. * 代码片段9
  3. * HeapByteBuffer的get方法和get方法中调用的部分方法
  4. */
  5. //获取当前position实际地址的方法
  6. //返回i加上偏移量offset 这个offset在创建HeapByteBuffer时指定,是一个final修饰的int类型
  7. protected int ix(int i) {
  8. return i + offset;
  9. }
  10. //相对操作get position加1 然后返回保存数据的数组中下标为position+offset的字节
  11. public byte get() {
  12. return hb[ix(nextGetIndex())];
  13. }
  14. //绝对操作get 返回保存数据的数组中下标为i+offset的字节
  15. public byte get(int i) {
  16. return hb[ix(checkIndex(i))];
  17. }
  18. //相对操作get,将保存数据的数组中下标从position+this.offset开始length长度的部分复制到dst数组中下标从offset开始length长度的部分
  19. //并设置position += length,返回当前的buffer
  20. public ByteBuffer get(byte[] dst, int offset, int length) {
  21. checkBounds(offset, length, dst.length);
  22. if (length > remaining())
  23. throw new BufferUnderflowException();
  24. System.arraycopy(hb, ix(position()), dst, offset, length);
  25. position(position() + length);
  26. return this;
  27. }

然后是HeapByteBuffer中的put方法:

  1. /**
  2. * 代码片段10
  3. * HeapByteBuffer的put方法和put方法中调用的部分方法
  4. */
  5. //相对操作put position加1 将x保存在保存书据的数组的position+offset下标位置
  6. public ByteBuffer put(byte x) {
  7. hb[ix(nextPutIndex())] = x;
  8. return this;
  9. }
  10. //绝对操作put position不变 将x保存在保存书据的数组的i+offset下标位置
  11. public ByteBuffer put(int i, byte x) {
  12. hb[ix(checkIndex(i))] = x;
  13. return this;
  14. }
  15. //相对操作put 将src中从offset开始的length长度的数据复制到buffer中数组的position+offset开始的length长度的部分
  16. //并设置position += length,返回当前的buffer
  17. public ByteBuffer put(byte[] src, int offset, int length) {
  18. //检查是否数组越界
  19. checkBounds(offset, length, src.length);
  20. if (length > remaining())
  21. throw new BufferOverflowException();
  22. System.arraycopy(src, offset, hb, ix(position()), length);
  23. position(position() + length);
  24. return this;
  25. }
  26. //相对操作put 将另一个ByteBuffer src中从当前position+offset位置开始limit - position长度的数据
  27. //复制到this的position+offset的位置 并将src和this的position都加n(n为src的剩余可读/写长度,limit - position)
  28. public ByteBuffer put(ByteBuffer src) {
  29. if (src instanceof HeapByteBuffer) {
  30. //如果是HeapByteBuffer且不是本身就进行数组复制,并将this和src的position加n
  31. if (src == this)
  32. throw new IllegalArgumentException();
  33. HeapByteBuffer sb = (HeapByteBuffer)src;
  34. //判断this的剩余容量够不够
  35. int n = sb.remaining();
  36. if (n > remaining())
  37. throw new BufferOverflowException();
  38. System.arraycopy(sb.hb, sb.ix(sb.position()),
  39. hb, ix(position()), n);
  40. sb.position(sb.position() + n);
  41. position(position() + n);
  42. } else if (src.isDirect()) {
  43. //如果是直接字节缓冲,就用src的get操作将数据写到this的保存数据的数组,并将position加n
  44. int n = src.remaining();
  45. if (n > remaining())
  46. throw new BufferOverflowException();
  47. src.get(hb, ix(position()), n);
  48. position(position() + n);
  49. } else {
  50. //如果既不是HeapByteBuffer,也不是直接字节缓冲,就调用父类的put方法
  51. super.put(src);
  52. }
  53. return this;
  54. }

下面是DirectByteBuffer的get方法:

  1. /**
  2. * 代码片段11
  3. * DirectByteBuffer的get方法和get方法中调用的部分方法
  4. */
  5. //获取当前position实际地址的方法
  6. //返回i加上address
  7. private long ix(int i) {
  8. return address + ((long)i << 0);
  9. }
  10. //相对操作get 当前position++ 然后获取从直接内存中获取position+addess对应位置的值
  11. public byte get() {
  12. return ((unsafe.getByte(ix(nextGetIndex()))));
  13. }
  14. //绝对操作get position不变 从直接内存中获取i+addess对应位置的值
  15. public byte get(int i) {
  16. return ((unsafe.getByte(ix(checkIndex(i)))));
  17. }
  18. //相对操作的get 从内存中将position+address开始、长度为length的数据取出放入数组dst的下标offset开始、length长度的部分
  19. //并设置position += length,返回当前的buffer
  20. public ByteBuffer get(byte[] dst, int offset, int length) {
  21. if (((long)length << 0) > Bits.JNI_COPY_TO_ARRAY_THRESHOLD) {
  22. //如果length大于Bits.JNI_COPY_TO_ARRAY_THRESHOLD(这个值为6)就直接复制到数组中
  23. //JNI_COPY_TO_ARRAY_THRESHOLD表示根据经验确定JNI调用的平均成本逐个元素复制超过整个数组复制的点,这个数字可能会随着时间改变
  24. //检查数组是否越界
  25. checkBounds(offset, length, dst.length);
  26. int pos = position();
  27. int lim = limit();
  28. assert (pos <= lim);
  29. int rem = (pos <= lim ? lim - pos : 0);
  30. if (length > rem)
  31. throw new BufferUnderflowException();
  32. Bits.copyToArray(ix(pos), dst, arrayBaseOffset,
  33. (long)offset << 0,
  34. (long)length << 0);
  35. position(pos + length);
  36. } else {
  37. //如果不大于6,就调用父类的get方法进行逐个元素复制
  38. super.get(dst, offset, length);
  39. }
  40. return this;
  41. }
  42. //父类ByteBuffer中的get方法 循环去执行无参的相对get方法
  43. public ByteBuffer get(byte[] dst, int offset, int length) {
  44. checkBounds(offset, length, dst.length);
  45. if (length > remaining())
  46. throw new BufferUnderflowException();
  47. int end = offset + length;
  48. for (int i = offset; i < end; i++)
  49. dst[i] = get();
  50. return this;
  51. }

然后是DirectByteBuffer的put方法:

  1. /**
  2. * 代码片段12
  3. * DirectByteBuffer的put方法和put方法中调用的部分方法
  4. */
  5. //相对操作put position++ 然后将直接内存中address+position对应位置的设置为x
  6. public ByteBuffer put(byte x) {
  7. unsafe.putByte(ix(nextPutIndex()), ((x)));
  8. return this;
  9. }
  10. //绝对操作put position不变 将直接内存中address+i对应位置的设置为x
  11. public ByteBuffer put(int i, byte x) {
  12. unsafe.putByte(ix(checkIndex(i)), ((x)));
  13. return this;
  14. }
  15. //相对操作put 然后将直接内存中address+position对应位置开始、src剩余可读/写容量长度的内存区域设置为
  16. //src的position开始、src剩余可读/写容量长度的内容
  17. //并且src和this的position都加n(n为src剩余可读/写容量长度,limit - position )
  18. public ByteBuffer put(ByteBuffer src) {
  19. if (src instanceof DirectByteBuffer且不为本身,) {
  20. //如果src是DirectByteBuffer且不为本身,就在堆外进行内存复制
  21. if (src == this)
  22. throw new IllegalArgumentException();
  23. DirectByteBuffer sb = (DirectByteBuffer)src;
  24. int spos = sb.position();
  25. int slim = sb.limit();
  26. assert (spos <= slim);
  27. int srem = (spos <= slim ? slim - spos : 0);
  28. int pos = position();
  29. int lim = limit();
  30. assert (pos <= lim);
  31. int rem = (pos <= lim ? lim - pos : 0);
  32. //判断是否有足够的容量
  33. if (srem > rem)
  34. throw new BufferOverflowException();
  35. //内存拷贝
  36. unsafe.copyMemory(sb.ix(spos), ix(pos), (long)srem << 0);
  37. sb.position(spos + srem);
  38. position(pos + srem);
  39. } else if (src.hb != null) {
  40. //如果src不是DirectByteBuffer,且保存数据的数组hb不为空,就讲hb作为参数调用下面的另一个重载的put方法
  41. int spos = src.position();
  42. int slim = src.limit();
  43. assert (spos <= slim);
  44. int srem = (spos <= slim ? slim - spos : 0);
  45. put(src.hb, src.offset + spos, srem);
  46. src.position(spos + srem);
  47. } else {
  48. super.put(src);
  49. }
  50. return this;
  51. }
  52. //相对操作的put 从内存中将position+address开始、长度为length的数据存入数组src的下标offset开始、length长度的部分的内容
  53. //并设置position += length,返回当前的buffer
  54. public ByteBuffer put(byte[] src, int offset, int length) {
  55. if (((long)length << 0) > Bits.JNI_COPY_FROM_ARRAY_THRESHOLD) {
  56. //如果length大于JNI_COPY_FROM_ARRAY_THRESHOLD(值为6) 就将整个数组直接复制到内存
  57. //表示根据经验确定JNI调用的平均成本逐个元素复制超过整个数组复制的点,这个数字可能会随着时间改变
  58. checkBounds(offset, length, src.length);
  59. int pos = position();
  60. int lim = limit();
  61. assert (pos <= lim);
  62. int rem = (pos <= lim ? lim - pos : 0);
  63. if (length > rem)
  64. throw new BufferOverflowException();
  65. //直接将数组拷贝到内存
  66. Bits.copyFromArray(src, arrayBaseOffset,
  67. (long)offset << 0,
  68. ix(pos),
  69. (long)length << 0);
  70. position(pos + length);
  71. } else {
  72. //如果小于6 调用父类的put方法 循环将数组从offset到offset+length的数据逐个复制
  73. super.put(src, offset, length);
  74. }
  75. return this;
  76. }

以上的源码中都清楚地通过注释阐明了HeapByteBuffer和DirectByteBuffer的put和get操作的工作流程,根据源码,可以总结出:

  • position就是buffer中当前读到/写到的地方的索引,当相对读完或者相对写完数据后,position会发生改变。因此在相对写之后、
    相对读之前,要调用buffer.flip()——设置limit为当前的position(因为position往后就没有数据了)、position为0(为相对读做好准备);
    在相对读之后、相对写之前,要调用buffer.clear()——设置limit为capacity(想写多少写多少,这里limit限制的是读)、position为0
    (为相对写做好准备)。flip和clear并不会改变数组hb或者堆外内存中数据的内容,只是改变几个标记值。

总结

JDK的NIO虽然与Netty相比不尽完善,但还是有很多可取之处的。而且,学习JDK的NIO之后也能更方便地阅读Netty的源码。如果你想学习Netty,
并正在了解JDK NIO的相关知识,希望这篇文章能给你带来收获。