我们在《【NIO】认识Buffer》这一节中看到过下面的示意图:
点击查看【processon】
我们的Channel
是连接到资源的一端,如果发生写入就写入到资源那一侧;如果发生读出就从资源中读出数据到Buffer(内存)
中。
Channel#transferFrom() / Channel#transferTo()
A.transferFrom(B)
是指:从ChannelB
中读取数据到ChannelA
中A.transferTo(B)
是指:将ChannelA
中写入到ChannelB
中
ChannelA
和ChannelB
的交互并不需要借助中间Buffer
,可以很方便的实现转换。
transferFrom()
和transferTo()
在Linux
的底层是通过sendfile()
系统调用完成
感兴趣的小伙伴可以看《🎧关于IO 之 零拷贝》加深一下理解。
Scatter/Gather
Scatter
,分散,在Channel
里表示Scatter Read
,即 分散读**Channel**
,是一种功能特性。假设我们现在有两个Buffer
,通过Scatter Read
特性可以将**Channel**
中的数据按顺序分别读到**Buffer**
:
我们现在有这样两个文件:
- 固定大小的消息,前512字节必然是
header
,之后的字节必然是body
。msg-file-nio.txt - 大小不固定的消息,前511字节(举例)是
header
,之后的字节才是body
。msg-file-nio2.txt大小不固定的消息是指
header
或body
大小不确定的消息。
用以下方式读取这些文件:
Channel channel = file.getChannel(); // 获取 msg 文件
ByteBuffer header = ByteBuffer.allocate(512); // 分配512的Buffer用于存储Header头信息
ByteBuffer body = ByteBuffer.allocate(1024); // 分配1024的空间用于存储Body信息
ByteBuffer[] bufArrray = new ByteBuffer[]{header, body};
channel.read(bufArray); // 从Channel中按顺序读取数据分别写到header、
// body中,header满了才会写body
Channel
会按数组顺序先写满header
再去写body
,所以如果消息体的 header
大小不足 512
字节,则可能将body
的数据也读到ByteBuffer header
中,最终导致错乱。
Gather
,聚合,在Channel
里表示Gather Write
,即 合并写Channel
,是一种功能特性。假设我们现在有两个Buffer
需要写到Channel
中,那么通过Gather Write
技术,就能将Buffer
数组(包含两个Buffer
)中的数据按数组顺序写入到Channel
中。代码示意如下所示:
ByteBuffer header = ...; // header中包含512个字节的数据
ByteBuffer body = ...; // body中包含1024个字节的数据
Channel channel = file.getChannel(); // 现在想要将消息写入到 msg-receive.txt 中
ByteBuffer[] bufArrray = new ByteBuffer[]{header, body};
channel.write(bufArray); // 按顺序将 header、body 的数据写入到Channel
Channel类型
FileChannel
打开FileChannel
RandomAccessFile f = new RandomAccessFile("<文件路径>");
FileChannel fileChannel = f.getChannel();
向FileChannel写数据
String newData = "New String";
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()){
channel.write(buf);
}
因为无法保证buf
一次能够写入多少字节,需要通过buf.hasRemaining()
来判断是否还有字节没有写入。
获取/修改FileChannel的position
long pos = channel.position(); // 获取下一个position写/读的位置
channel.position(pos); // 设置channel下一个要写/读的位置
如果将position
设置到文件结束符之后,然后向Channel
中写数据,文件将会撑大到当前位置并写入数据。这样可能会导致“文件空洞”,磁盘上物理文件中写入的数据间有空隙。
获取FileChannel关联的文件大小
long fileSize = fileChannel.size();
截取文件的前N个字节
fileChannel.truncate(1024);
这个方法会保留文件的前1024
个字节,但是会将指定长度后面的部分删除。
FileChannel的force方法
在Linux
上写文件并非直接写入到磁盘的,而是将数据写到Page Cache
中,所以无法保证调用了write()
之后数据百分百落入到磁盘上。所以有了FileChannel#force()
方法,通过这个方法可以将数据强制写到磁盘上。
FileChannel#force(<是否同时将元数据也刷新到磁盘上>)
元数据指的是“权限信息”等,通过传递参数true
或false
可以决定是否同时将元数据同步到磁盘上。
SocketChannel
SocketChannel
是一个用于 连接到TCP网络套接字的通道。
打开SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("<网络地址>", 80));
关闭SocketChannel
socketChannel.close();
从SocketChannel读取数据
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);
将数据从SocketChannel
读到Buffer
中,read()
方法返回的int
值表示读取了多少个字节到Buffer
中。如果返回-1
,表示已经读到了流的末尾(连接关闭了)。
写数据到SocketChannel
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
// 切换为 读模式
buf.flip();
while(buf.hasRemaining()){
channel.write(buf);
}
非阻塞模式
在非阻塞模式下,connect()
、write()
、read()
方法都会立即返回:
- 所以在调用了
connect()
方法后,需要循环socketChannel.finishConnect()
来判断连接是否建立。连接成功后finishConnect()
会返回true
,还没有连接成功(或者没有结果)就会返回false
。 - 当连接成功后,因为
write()
方法在异步模式下会立刻返回,如果此时Socket
缓冲区已经满了,那么就只会写入0个字节。为了保证数据都能写进Channel
,所以得通过ByteBuffer#hasRemaining()
来判断是否还有数据剩余。 - 随后通过
read()
来读取数据,这里注意的是要通过flip()
来转换“模式”。public static void main(String[] args) throws IOException, InterruptedException { SocketChannel socketChannel = SocketChannel.open(); // 创建SocketChannel socketChannel.configureBlocking(false); // 设置 非阻塞 boolean connect = socketChannel.connect(new InetSocketAddress("www.baidu.com", 80)); // 连接 www.baidu.com ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 分配1024个字节 byteBuffer.put(getRequest().getBytes(StandardCharsets.UTF_8)); // 构造请求 while(!socketChannel.finishConnect()) { // 判断连接是否建立,如果尚未建立就进入循环执行一些事 // do something } byteBuffer.flip(); // 让position指向0 limit指向之前读的位置。如果没有这一步,就会发送失败 while(byteBuffer.hasRemaining()) { // 连接建立成功,写入 byteBuffer socketChannel.write(byteBuffer); // 写入 } byteBuffer.clear(); // 清空Buffer,position=0,limit=capacity int read = -1; boolean isRead = false; while((read = socketChannel.read(byteBuffer)) != -1) { // 从socketChannel中读取数据到byteBuffer if(read == 0){ // 读取了之后,position = N, limit=capacity if(isRead){ break; } Thread.sleep(1000); continue; } byteBuffer.flip(); // 如果不转换模式,就会导致读取的内容为空,那么position=N,limit=N,decode方法会返回一个空字符串 System.out.println(StandardCharsets.UTF_8.decode(byteBuffer)); // 输出数据 byteBuffer.clear(); // 清空 byteBuffer,position=0,limit=capacity isRead = true; } System.out.println("读取完毕"); socketChannel.close(); } private static String getRequest() { StringBuilder sb = new StringBuilder(); sb.append("GET http://www.baidu.com/ HTTP/1.1\r\n"); sb.append("Host: 183.134.208.122\r\n"); sb.append("\r\n"); return sb.toString(); }
ServerSocketChannel
打开ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
关闭ServerSocketChannel
serverSocketChannel.close();
监听新连接
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
// 通过 socketChannel 执行一些工作
}
非阻塞模式监听
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){
// 通过 socketChannel 执行一些工作
}
}
这部分比较简单,监听,然后拿着SocketChannel
去做一些响应即可。