我们在《【NIO】认识Buffer》这一节中看到过下面的示意图:
点击查看【processon】
我们的Channel是连接到资源的一端,如果发生写入就写入到资源那一侧;如果发生读出就从资源中读出数据到Buffer(内存)中。

Channel#transferFrom() / Channel#transferTo()

  • A.transferFrom(B) 是指:从ChannelB中读取数据到ChannelA
  • A.transferTo(B)是指:将ChannelA中写入到ChannelB

ChannelAChannelB的交互并不需要借助中间Buffer,可以很方便的实现转换。

transferFrom()transferTo()Linux的底层是通过sendfile()系统调用完成

感兴趣的小伙伴可以看《🎧关于IO 之 零拷贝》加深一下理解。

Scatter/Gather

Scatter,分散,在Channel里表示Scatter Read,即 分散读**Channel**,是一种功能特性。假设我们现在有两个Buffer,通过Scatter Read特性可以将**Channel**中的数据顺序分别读到**Buffer**
我们现在有这样两个文件:

  • 固定大小的消息,前512字节必然是header,之后的字节必然是bodymsg-file-nio.txt
  • 大小不固定的消息,前511字节(举例)是header,之后的字节才是bodymsg-file-nio2.txt

    大小不固定的消息是指 headerbody 大小不确定的消息。

用以下方式读取这些文件:

  1. Channel channel = file.getChannel(); // 获取 msg 文件
  2. ByteBuffer header = ByteBuffer.allocate(512); // 分配512的Buffer用于存储Header头信息
  3. ByteBuffer body = ByteBuffer.allocate(1024); // 分配1024的空间用于存储Body信息
  4. ByteBuffer[] bufArrray = new ByteBuffer[]{header, body};
  5. channel.read(bufArray); // 从Channel中按顺序读取数据分别写到header、
  6. // body中,header满了才会写body

Channel按数组顺序先写满header再去写body,所以如果消息体的 header 大小不足 512字节,则可能将body的数据也读到ByteBuffer header中,最终导致错乱。

Gather,聚合,在Channel里表示Gather Write,即 合并写Channel,是一种功能特性。假设我们现在有两个Buffer需要写到Channel中,那么通过Gather Write技术,就能将Buffer数组(包含两个Buffer)中的数据按数组顺序写入到Channel中。代码示意如下所示:

ByteBuffer header = ...;        // header中包含512个字节的数据
ByteBuffer body = ...;            // body中包含1024个字节的数据

Channel channel = file.getChannel();            // 现在想要将消息写入到 msg-receive.txt 中

ByteBuffer[] bufArrray = new ByteBuffer[]{header, body};
channel.write(bufArray);                        // 按顺序将 header、body 的数据写入到Channel

Channel类型

FileChannel

打开FileChannel

RandomAccessFile f = new RandomAccessFile("<文件路径>");
FileChannel fileChannel = f.getChannel();

向FileChannel写数据

String newData = "New String";
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();

while(buf.hasRemaining()){
    channel.write(buf);
}

因为无法保证buf一次能够写入多少字节,需要通过buf.hasRemaining()来判断是否还有字节没有写入。

获取/修改FileChannel的position

long pos = channel.position();    // 获取下一个position写/读的位置
channel.position(pos);            // 设置channel下一个要写/读的位置

如果将position设置到文件结束符之后,然后向Channel中写数据,文件将会撑大到当前位置并写入数据。这样可能会导致“文件空洞”磁盘上物理文件中写入的数据间有空隙

获取FileChannel关联的文件大小

long fileSize = fileChannel.size();

截取文件的前N个字节

fileChannel.truncate(1024);

这个方法会保留文件的前1024个字节,但是会将指定长度后面的部分删除。

FileChannel的force方法

Linux上写文件并非直接写入到磁盘的,而是将数据写到Page Cache中,所以无法保证调用了write()之后数据百分百落入到磁盘上。所以有了FileChannel#force()方法,通过这个方法可以将数据强制写到磁盘上。

FileChannel#force(<是否同时将元数据也刷新到磁盘上>)

元数据指的是“权限信息”等,通过传递参数truefalse可以决定是否同时将元数据同步到磁盘上。

SocketChannel

SocketChannel是一个用于 连接到TCP网络套接字的通道。

打开SocketChannel

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("<网络地址>", 80));

关闭SocketChannel

socketChannel.close();

从SocketChannel读取数据

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

将数据从SocketChannel读到Buffer中,read()方法返回的int值表示读取了多少个字节到Buffer中。如果返回-1,表示已经读到了流的末尾(连接关闭了)。

写数据到SocketChannel

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
// 切换为 读模式
buf.flip();
while(buf.hasRemaining()){
    channel.write(buf);
}

这部分跟FileChannel写数据流程原理一样。

非阻塞模式

在非阻塞模式下,connect()write()read()方法都会立即返回:

  1. 所以在调用了connect()方法后,需要循环socketChannel.finishConnect()来判断连接是否建立。连接成功后finishConnect()会返回true,还没有连接成功(或者没有结果)就会返回false
  2. 当连接成功后,因为write()方法在异步模式下会立刻返回,如果此时Socket缓冲区已经满了,那么就只会写入0个字节。为了保证数据都能写进Channel,所以得通过ByteBuffer#hasRemaining()来判断是否还有数据剩余。
  3. 随后通过read()来读取数据,这里注意的是要通过flip()来转换“模式”。
    public static void main(String[] args) throws IOException, InterruptedException {
     SocketChannel socketChannel = SocketChannel.open();     // 创建SocketChannel
     socketChannel.configureBlocking(false);                 // 设置 非阻塞
     boolean connect = socketChannel.connect(new InetSocketAddress("www.baidu.com", 80));    // 连接 www.baidu.com
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);      // 分配1024个字节
     byteBuffer.put(getRequest().getBytes(StandardCharsets.UTF_8));      // 构造请求
     while(!socketChannel.finishConnect()) {                 // 判断连接是否建立,如果尚未建立就进入循环执行一些事
         // do something
     }
     byteBuffer.flip();                                      // 让position指向0 limit指向之前读的位置。如果没有这一步,就会发送失败
     while(byteBuffer.hasRemaining()) {                      // 连接建立成功,写入 byteBuffer
         socketChannel.write(byteBuffer);                        // 写入
     }
     byteBuffer.clear();                                     // 清空Buffer,position=0,limit=capacity
     int read = -1;
     boolean isRead = false;
     while((read = socketChannel.read(byteBuffer)) != -1) {  // 从socketChannel中读取数据到byteBuffer
         if(read == 0){                                      // 读取了之后,position = N, limit=capacity
             if(isRead){
                 break;
             }
             Thread.sleep(1000);
             continue;
         }
         byteBuffer.flip();                                  // 如果不转换模式,就会导致读取的内容为空,那么position=N,limit=N,decode方法会返回一个空字符串
         System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));        // 输出数据
         byteBuffer.clear();                                 // 清空 byteBuffer,position=0,limit=capacity
         isRead = true;
     }
     System.out.println("读取完毕");
     socketChannel.close();
    }
    private static String getRequest() {
     StringBuilder sb = new StringBuilder();
     sb.append("GET http://www.baidu.com/ HTTP/1.1\r\n");
     sb.append("Host: 183.134.208.122\r\n");
     sb.append("\r\n");
     return sb.toString();
    }
    

ServerSocketChannel

打开ServerSocketChannel

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

关闭ServerSocketChannel

serverSocketChannel.close();

监听新连接

while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    // 通过 socketChannel 执行一些工作
}

非阻塞模式监听

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel != null){
        // 通过 socketChannel 执行一些工作
    }
}

这部分比较简单,监听,然后拿着SocketChannel去做一些响应即可。