Java Channel FileChannel
深入分析Channel和FileChannel - 图1
NIO 中的 Buffer可以认为是装载数据的容器,有了容器,还需要传输数据的通道才能完成数据的传输。
Channel 可以认为它是本地 I/O 设备、网络 I/O 的通信桥梁,只有搭建了这座桥梁,数据才能被写入 Buffer 。

Channel

在 NIO 中,Channel 和 Buffer 是相辅相成的,只能从 Channel 读取数据到 Buffer 中,或者从 Buffer 写入数据到 Channle,如下图:
深入分析Channel和FileChannel - 图2
Channel 类似于 OIO 中的流(Stream),但是又有所区别:

  • 流是单向的,但 Channel 是双向的,可读可写。
  • 流是阻塞的,但 Channle 可以异步读写。
  • 流中的数据可以选择性的先读到缓存中,而 Channel 的数据总是要先读到一个 Buffer 中,或从 Buffer 中写入,如上图。

NIO 中通过 Channel 封装了对数据源的操作,通过 Channel 可以操作数据源,但是又不必关注数据源的具体物理结构,这个数据源可以是文件,也可以是socket。
Channel 的接口定义如下:

  1. publicinterface Channel extends Closeable {
  2. public boolean isOpen();
  3. public void close() throws IOException;
  4. }

Channel 接口仅定义两个方法:

  • isOpen():Channel 是否打开
  • close():关闭 Channel

它的主要实现有:

  • FileChannel:文件通道,用于文件的数据读写。
  • SocketChannel:套接字通道,能通过 TCP 读写网络中的数据。
  • ServerSocketChannel:服务器套接字通道,监听新进来的 TCP 连接,像 web 服务器那样,对每一个新进来的连接都会创建一个 SocketChannel
  • DatagramChannel:数据报通道,能通过 UDP 读写网络中的数据。

基本类图如下:
深入分析Channel和FileChannel - 图3
下面就 FileChannel 做详细介绍。

FileChannel

FileChannel 主要是用来读写和映射一个系统文件的 Channel,它是一个抽象类,具体由 FileChannelImpl 来实现。
定义如下:

  1. package java.nio.channels;
  2. publicabstractclass FileChannel
  3. extends AbstractInterruptibleChannel
  4. implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel{
  5. /**
  6. * 初始化一个无参构造器.
  7. */
  8. protected FileChannel() { }
  9. //打开或创建一个文件,返回一个文件通道来访问文件
  10. public static FileChannel open(Path path,
  11. Set<? extends OpenOption> options,
  12. FileAttribute<?>... attrs)
  13. throws IOException
  14. {
  15. FileSystemProvider provider = path.getFileSystem().provider();
  16. return provider.newFileChannel(path, options, attrs);
  17. }
  18. privatestaticfinal FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
  19. //打开或创建一个文件,返回一个文件通道来访问文件
  20. public static FileChannel open(Path path, OpenOption... options)
  21. throws IOException
  22. {
  23. Set<OpenOption> set = new HashSet<OpenOption>(options.length);
  24. Collections.addAll(set, options);
  25. return open(path, set, NO_ATTRIBUTES);
  26. }
  27. //从这个通道读入一个字节序列到给定的缓冲区
  28. public abstract int read(ByteBuffer dst) throws IOException;
  29. //从这个通道读入指定开始位置和长度的字节序列到给定的缓冲区
  30. public abstract long read(ByteBuffer[] dsts, int offset, int length)
  31. throws IOException;
  32. /**
  33. * 从这个通道读入一个字节序列到给定的缓冲区
  34. */
  35. public final long read(ByteBuffer[] dsts) throws IOException {
  36. return read(dsts, 0, dsts.length);
  37. }
  38. /**
  39. * 从给定的缓冲区写入字节序列到这个通道
  40. */
  41. public abstract int write(ByteBuffer src) throws IOException;
  42. /**
  43. * 从给定缓冲区的子序列向该信道写入字节序列
  44. */
  45. public abstract long write(ByteBuffer[] srcs, int offset, int length)
  46. throws IOException;
  47. /**
  48. * 从给定的缓冲区写入字节序列到这个通道
  49. */
  50. public final long write(ByteBuffer[] srcs) throws IOException {
  51. return write(srcs, 0, srcs.length);
  52. }
  53. /**
  54. * 返回通道读写缓冲区中的开始位置
  55. */
  56. public abstract long position() throws IOException;
  57. /**
  58. * 设置通道读写缓冲区中的开始位置
  59. */
  60. public abstract FileChannel position(long newPosition) throws IOException;
  61. /**
  62. * 返回此通道文件的当前大小
  63. */
  64. public abstract long size() throws IOException;
  65. /**
  66. * 通过指定的参数size来截取通道的大小
  67. */
  68. public abstract FileChannel truncate(long size) throws IOException;
  69. /**
  70. * 强制将通道中的更新文件写入到存储设备(磁盘等)中
  71. */
  72. public abstract void force(boolean metaData) throws IOException;
  73. /**
  74. * 将当前通道中的文件写入到可写字节通道中
  75. * position就是开始写的位置,long就是写的长度
  76. */
  77. public abstract long transferTo(long position, long count,
  78. WritableByteChannel target)
  79. throws IOException;
  80. /**
  81. * 将当前通道中的文件写入可读字节通道中
  82. * position就是开始写的位置,long就是写的长度
  83. */
  84. public abstract long transferFrom(ReadableByteChannel src,
  85. long position, long count)
  86. throws IOException;
  87. /**
  88. * 从通道中读取一系列字节到给定的缓冲区中
  89. * 从指定的读取开始位置position处读取
  90. */
  91. public abstract int read(ByteBuffer dst, long position) throws IOException;
  92. /**
  93. * 从给定的缓冲区写入字节序列到这个通道
  94. * 从指定的读取开始位置position处开始写
  95. */
  96. public abstract int write(ByteBuffer src, long position) throws IOException;
  97. // -- Memory-mapped buffers --
  98. /**
  99. * 一个文件映射模式类型安全枚举
  100. */
  101. publicstaticclass MapMode {
  102. //只读映射模型
  103. publicstaticfinal MapMode READ_ONLY
  104. = new MapMode("READ_ONLY");
  105. //读写映射模型
  106. publicstaticfinal MapMode READ_WRITE
  107. = new MapMode("READ_WRITE");
  108. /**
  109. * 私有模式(复制在写)映射
  110. */
  111. publicstaticfinal MapMode PRIVATE
  112. = new MapMode("PRIVATE");
  113. privatefinal String name;
  114. private MapMode(String name) {
  115. this.name = name;
  116. }
  117. }
  118. /**
  119. * 将该通道文件的一个区域直接映射到内存中
  120. */
  121. public abstract MappedByteBuffer map(MapMode mode,
  122. long position, long size)
  123. throws IOException;
  124. /**
  125. * 获取当前通道文件的给定区域上的锁
  126. * 区域就是从position处开始,size长度
  127. * shared为true代表获取共享锁,false代表获取独占锁
  128. */
  129. public abstract FileLock lock(long position, long size, boolean shared)
  130. throws IOException;
  131. /**
  132. * 获取当前通道文件上的独占锁
  133. */
  134. public final FileLock lock() throws IOException {
  135. return lock(0L, Long.MAX_VALUE, false);
  136. }
  137. /**
  138. * 尝试获取给定的通道文件区域上的锁
  139. * 区域就是从position处开始,size长度
  140. * shared为true代表获取共享锁,false代表获取独占锁
  141. */
  142. public abstract FileLock tryLock(long position, long size, boolean shared)
  143. throws IOException;
  144. /**
  145. * 尝试获取当前通道文件上的独占锁
  146. */
  147. public final FileLock tryLock() throws IOException {
  148. return tryLock(0L, Long.MAX_VALUE, false);
  149. }
  150. }

打开 FileChannel

在使用 FileChannle 之前必须要先打开它,但是无法直接打开一个 FileChannel,需要通过使用一个 InputStream、OutputStream、RandomAcessFile 来获取一个 FileChannel 实例,如下:

  1. RandomAccessFile accessFile = new RandomAccessFile("/Users/chenssy/Documents/FileChannel.txt","rw");
  2. FileChannel fileChannel = accessFile.getChannel();

调用 getChannel() 即可获取 FileChannel 实例,源码如下:

  1. public final FileChannel getChannel() {
  2. synchronized (this) {
  3. if (channel == null) {
  4. channel = FileChannelImpl.open(fd, path, true, rw, this);
  5. }
  6. return channel;
  7. }
  8. }

getChnnel() 方法很简单,直接调用 FileChannelImpl 的静态方法 open()

  1. public static FileChannel open(Path path,
  2. Set<? extends OpenOption> options,
  3. FileAttribute<?>... attrs) throws IOException{
  4. FileSystemProvider provider = path.getFileSystem().provider();
  5. return provider.newFileChannel(path, options, attrs);
  6. }

从 FileChannel 读数据

调用 FileChannel 的 read() 方法即可从 FileChannel 中获取数据,当然不是直接获取,而是需要先写入到 Buffer 中,所以调用 read() 之前,需要分配一个 Buffer,然后调用 read() ,该方法返回 int 表示有多少数据读取到了 Buffer 中了,如果返回 -1 表示已经到文件末尾了。

  1. ByteBuffer buffer = ByteBuffer.allocate(1024);
  2. int readCount = fileChannel.read(buffer);

FileChannel 仅定义了方法,具体实现在 FileChannelImpl,如下:

  1. public int read(ByteBuffer dst) throws IOException {
  2. ensureOpen();
  3. if (!readable)
  4. thrownew NonReadableChannelException();
  5. // 加锁
  6. synchronized (positionLock) {
  7. int n = 0;
  8. int ti = -1;
  9. try {
  10. begin();
  11. ti = threads.add();
  12. if (!isOpen())
  13. return0;
  14. do {
  15. // 通过IOUtil.read实现
  16. n = IOUtil.read(fd, dst, -1, nd);
  17. } while ((n == IOStatus.INTERRUPTED) && isOpen());
  18. return IOStatus.normalize(n);
  19. } finally {
  20. threads.remove(ti);
  21. end(n > 0);
  22. assert IOStatus.check(n);
  23. }
  24. }
  25. }
  • 首先确保该 Channel 是打开的
  • 然后加锁,主要是因为写入缓冲区需要保证线程安全
  • 最后通过 IOUtils.read() 实现

    1. static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException{
    2. // 1 申请一块临时堆外DirectByteBuffer
    3. ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    4. try {
    5. // 2 先往DirectByteBuffer写入数据,提高效率
    6. int n = readIntoNativeBuffer(fd, bb, position, nd);
    7. bb.flip();
    8. if (n > 0)
    9. // 3 再拷贝到传入的buffer
    10. dst.put(bb);
    11. return n;
    12. } finally {
    13. Util.offerFirstTemporaryDirectBuffer(bb);
    14. }
    15. }
  • 首先申请一块临时的堆外 DirectByteBuffer

  • 然后先往 DirectByteBuffer 写入数据,因为这样能够提高效率,为什么会提高效率,后文分析。
  • 最后拷贝到 ByteBuffer

    写数据到 FileChannel

    read()方法是从 FileChannel 中读取数据,那 write()方法则是从 ByteBuffer中读取数据写入到 Channel 中。调用 write() 需要先申请一个 ByteBuffer ,如下:

    1. ByteBuffer buffer = ByteBuffer.allocate(1024);
    2. fileChannel.write(buffer);

    同样,实现是在 FileChannelImpl 中。

    1. public int write(ByteBuffer src) throws IOException {
    2. ensureOpen();
    3. if (!writable)
    4. thrownew NonWritableChannelException();
    5. synchronized (positionLock) {
    6. int n = 0;
    7. int ti = -1;
    8. try {
    9. begin();
    10. ti = threads.add();
    11. if (!isOpen())
    12. return0;
    13. do {
    14. n = IOUtil.write(fd, src, -1, nd);
    15. } while ((n == IOStatus.INTERRUPTED) && isOpen());
    16. return IOStatus.normalize(n);
    17. } finally {
    18. threads.remove(ti);
    19. end(n > 0);
    20. assert IOStatus.check(n);
    21. }
    22. }
    23. }

    read() 方法实现一模一样,先确定该 Channel 是打开的,然后加锁,最后调用 IOUtil 的 write()

    1. static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd)
    2. throws IOException{
    3. if (src instanceof DirectBuffer)
    4. return writeFromNativeBuffer(fd, src, position, nd);
    5. int pos = src.position();
    6. int lim = src.limit();
    7. assert (pos <= lim);
    8. int rem = (pos <= lim ? lim - pos : 0);
    9. // 2 否则构造一块跟传入缓冲区一样大小的DirectBuffer
    10. ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    11. try {
    12. bb.put(src);
    13. bb.flip();
    14. src.position(pos);
    15. // 3 调用writeFromNativeBuffer读取
    16. int n = writeFromNativeBuffer(fd, bb, position, nd);
    17. if (n > 0) {
    18. // now update src
    19. src.position(pos + n);
    20. }
    21. return n;
    22. } finally {
    23. Util.offerFirstTemporaryDirectBuffer(bb);
    24. }
    25. }
  • 首先判断传入的 Buffer 是否为 DirectBuffer,如果是的话,就直接写入

  • 否则则构造一块跟传入 Buffer 一样大小的 DirectBuffer
  • 最后调用 writeFromNativeBuffer()

    关闭 FileChannel

    保持好习惯,用完了一定要记得关闭:close()

    1. public final void close() throws IOException {
    2. synchronized (closeLock) {
    3. if (!open)
    4. return;
    5. open = false;
    6. implCloseChannel();
    7. }
    8. }

    调用 implCloseChannel() 释放 Channel。

    1. protected void implCloseChannel() throws IOException {
    2. // 释放文件锁
    3. if (fileLockTable != null) {
    4. for (FileLock fl: fileLockTable.removeAll()) {
    5. synchronized (fl) {
    6. if (fl.isValid()) {
    7. //释放锁
    8. nd.release(fd, fl.position(), fl.size());
    9. ((FileLockImpl)fl).invalidate();
    10. }
    11. }
    12. }
    13. }
    14. // 通知当前通道所有被阻塞线程
    15. threads.signalAndWait();
    16. if (parent != null) {
    17. ((java.io.Closeable)parent).close();
    18. } else {
    19. nd.close(fd);
    20. }
    21. }

    关闭 FileChannel 时,需要释放所有锁和文件流。

    示例

    读数据

    1. public static void main(String[] args) throws Exception {
    2. RandomAccessFile accessFile = new RandomAccessFile("/Users/chenssy/Documents/FileChannel.txt","rw");
    3. FileChannel fileChannel = accessFile.getChannel();
    4. ByteBuffer buffer = ByteBuffer.allocate(1024);
    5. fileChannel.read(buffer);
    6. System.out.println(new String(buffer.array()));
    7. fileChannel.close();
    8. }

    写数据

    1. public static void main(String[] args) throws Exception {
    2. String fileContent = "写入数据";
    3. RandomAccessFile accessFile = new RandomAccessFile("/Users/chenssy/Documents/FileChannel.txt","rw");
    4. FileChannel fileChannel = accessFile.getChannel();
    5. ByteBuffer buffer = ByteBuffer.allocate(1024);
    6. buffer.put(fileContent.getBytes("UTF-8"));
    7. buffer.flip();
    8. fileChannel.write(buffer);
    9. fileChannel.close();
    10. }