概念
传统的 IO 流架构:
现在的 NIO 架构:
区别:
IO | NIO |
---|---|
面向流(Stream Oriented) | 面向缓冲区(Buffer Oriented) |
阻塞IO(Blocking IO) | 非阻塞IO(Non Blocking IO) |
无 | 选择器(Selectors) |
NIO 概念:Java NIO 系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件,套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。
简而言之:Channel 负责传输,Buffer 负责存储。
缓冲区Buffer
一、缓冲区(Buffer)
在 Java NIO 中负责数据的存取。缓冲区就是数组。用于存储不同数据类型的数据
根据数据类型不同(boolean 除外),提供了相应类型的缓冲区:
ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer
上述缓冲区的管理方式几乎一致,通过 allocate() 获取缓冲区
二、缓冲区存取数据的两个核心方法
put() : 存入数据到缓冲区中
get() : 获取缓冲区中的数据
三、缓冲区中的四个核心属性
capacity : 容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。
limit : 界限,表示缓冲区中可以操作数据的大小。(limit 后数据不能进行读写)
position : 位置,表示缓冲区中正在操作数据的位置。
mark : 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置
0 <= mark <= position <= limit <= capacity
四、直接缓冲区与非直接缓冲区:(效率高的原因)
非直接缓冲区:通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存中
直接缓冲区:通过 allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率
上演代码:
先展示如下代码的结果:
————————-allocate()————————
0
1024
1024
————————-put()————————
5
1024
1024
————————-flip()————————
0
5
1024
abcde
————————-get()————————
5
5
1024
————————-rewind()————————
0
5
1024
————————-clear()————————
0
1024
1024
a
@Test
public void test1(){
String str = "abcde";
//1. 分配一个指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println("-----------------allocate()----------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 1024
System.out.println(buf.capacity()); // 1024
//2. 利用 put() 存入数据到缓冲区中
buf.put(str.getBytes());
System.out.println("-----------------put()----------------");
System.out.println(buf.position()); // 5
System.out.println(buf.limit()); // 1024
System.out.println(buf.capacity()); // 1024
//3. 切换读取数据模式
buf.flip();
System.out.println("-----------------flip()----------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 1024
//4. 利用 get() 读取缓冲区中的数据
byte[] dst = new byte[buf.limit()];
buf.get(dst);
System.out.println(new String(dst, 0, dst.length)); // abcde
System.out.println("-----------------get()----------------");
System.out.println(buf.position()); // 5
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 1024
//5. rewind() : 可重复读
buf.rewind();
System.out.println("-----------------rewind()----------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 1024
//6. clear() : 清空缓冲区. 但是缓冲区中的数据依然存在,但是处于“被遗忘”状态
buf.clear();
System.out.println("-----------------clear()----------------");
System.out.println(buf.position()); // 0
System.out.println(buf.limit()); // 1024
System.out.println(buf.capacity()); // 1024
System.out.println((char)buf.get()); // a
}
再来演示一波:
@Test
public void test2(){
String str = "abcde";
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(str.getBytes());
buf.flip();
byte[] dst = new byte[buf.limit()];
buf.get(dst, 0, 2);
System.out.println(new String(dst, 0, 2)); // ab
System.out.println(buf.position()); // 2
//mark() : 标记
buf.mark();
buf.get(dst, 2, 2);
System.out.println(new String(dst, 2, 2)); // cd
System.out.println(buf.position()); // 4
//reset() : 恢复到 mark 的位置
buf.reset();
System.out.println(buf.position()); // 2
//判断缓冲区中是否还有剩余数据
if(buf.hasRemaining()){
//获取缓冲区中可以操作的数量
System.out.println(buf.remaining()); // 3
}
}
@Test
public void test3(){
//分配直接缓冲区
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
System.out.println(buf.isDirect());
}
Channel通道
概念:由 java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的“流”,只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。
通道的主要实现类
java.nio.channels.Channel 接口:
|—FileChannel
|—SocketChannel
|—ServerSocketChannel
|—DatagramChannel
获取通道
1. Java 针对支持通道的类提供了 getChannel() 方法
本地 IO:
FileInputStream/FileOutputStream
RandomAccessFile
网络IO:
Socket
ServerSocket
DatagramSocket
2. 在 JDK 1.7 中的 NIO.2 针对各个通道提供了静态方法 open()
3. 在 JDK 1.7 中的 NIO.2 的 Files 工具类的 newByteChannel()
上代码:
非直接缓冲区复制文件
// 利用通道完成文件的复制(非直接缓冲区)
@Test
public void testa() throws Exception {
long start = System.currentTimeMillis();
//①获取通道
FileInputStream fis = new FileInputStream("D:/workspace/demo/a.txt");
FileOutputStream fos = new FileOutputStream("D:/workspace/demo/b.txt");
FileChannel inChannel = fis.getChannel();
FileChannel outChannel = fos.getChannel();
//②分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//③将通道中的数据存入缓冲区中
while(inChannel.read(buf) != -1){
buf.flip(); //切换读取数据的模式
//④将缓冲区中的数据写入通道中
outChannel.write(buf);
buf.clear(); //清空缓冲区
}
outChannel.close();
inChannel.close();
fos.close();
fis.close();
long end = System.currentTimeMillis();
System.out.println("耗费时间为:" + (end - start)); // 5
}
直接缓冲区复制文件
//使用直接缓冲区完成文件的复制(内存映射文件)
@Test
public void testb() throws IOException{//2127-1902-1777
long start = System.currentTimeMillis();
FileChannel inChannel = FileChannel.open(Paths.get("D:/workspace/demo/a.txt"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("D:/workspace/demo/c.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
//内存映射文件
MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区进行数据的读写操作
byte[] dst = new byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);
inChannel.close();
outChannel.close();
long end = System.currentTimeMillis();
System.out.println("耗费时间为:" + (end - start)); // 12
}
///////////如下还可以使用快捷方式操作/////////////
//通道之间的数据传输(直接缓冲区)
@Test
public void testc() throws IOException{
FileChannel inChannel = FileChannel.open(Paths.get("D:/workspace/demo/a.txt"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("D:/workspace/demo/d.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
// inChannel.transferTo(0, inChannel.size(), outChannel);
outChannel.transferFrom(inChannel, 0, inChannel.size());
inChannel.close();
outChannel.close();
}
直接/非直接缓存如何选择
查查资料。。。
分散(Scatter)与聚集(Gather)
分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
@Test
public void testd() throws IOException{
RandomAccessFile raf1 = new RandomAccessFile("D:/workspace/demo/a.txt", "rw");
//1. 获取通道
FileChannel channel1 = raf1.getChannel();
//2. 分配指定大小的缓冲区
ByteBuffer buf1 = ByteBuffer.allocate(100);
ByteBuffer buf2 = ByteBuffer.allocate(1024);
//3. 分散读取
ByteBuffer[] bufs = {buf1, buf2};
channel1.read(bufs);
for (ByteBuffer byteBuffer : bufs) {
byteBuffer.flip();
}
System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
System.out.println("-----------------");
System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));
//4. 聚集写入
RandomAccessFile raf2 = new RandomAccessFile("D:/workspace/demo/f.txt", "rw");
FileChannel channel2 = raf2.getChannel();
channel2.write(bufs);
}
Selector
阻塞式/非阻塞式IO
阻塞式IO:
客户端直接跟服务端交互,发出读写请求,假如此时并没有数据过来,但是客户端依然占用着 Server 端的资源不放,那么 Server 端的这个资源便什么也做不了,被阻塞着,直到有数据过来,才能进行处理;
NIO 非阻塞式IO:
中间有一个 Selector 选择器,客户端的通道 Channel 都注册到 Selector 上,然后由 Selector 来监控着这些通道,假如某个通道 Channel 上的缓冲区中有数据过来了,那么此时算是准备就绪,然后 Selector 就会将请求发给 Server 的一个线程或者多个线程进行处理;保证服务端资源每次处理都是有效的;从而提高效率;
举个例子:快递的例子,快递员相当于客户端,我们相当于服务端,早期的阻塞式就是说,快递员和我们约好时间在今天中午 10 点在大门口,他送过来给我们;但是假如到了10点我们到了大门口,快递员却没有到,而且拖了很长时间,那么这段时间我们也就什么都干不了,在这等着,浪费资源;那么此时就出来一个 Selector,即电话,快递员到了大门口之后给我们打电话,说已经确定快递员到了大门口了,此时我们在下去就刚好可以取了;这样也就不浪费资源了;
NIO完成网络通信
一、使用 NIO 完成网络通信的三个核心:
通道(Channel):负责连接
java.nio.channels.Channel 接口:
|—SelectableChannel
|—SocketChannel
|—ServerSocketChannel
|—DatagramChannel
|—Pipe.SinkChannel
|—Pipe.SourceChannel缓冲区(Buffer):负责数据的存取
选择器(Selector):是 SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况
阻塞式网络通信
阻塞式网络通信代码测试:
//客户端
@Test
public void client() throws IOException{
//1. 获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
//2. 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//3. 读取本地文件,并发送到服务端
while(inChannel.read(buf) != -1){
buf.flip();
sChannel.write(buf);
buf.clear();
}
//4. 关闭通道
inChannel.close();
sChannel.close();
}
//服务端
@Test
public void server() throws IOException{
//1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//2. 绑定连接
ssChannel.bind(new InetSocketAddress(9898));
//3. 获取客户端连接的通道
SocketChannel sChannel = ssChannel.accept();
//4. 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//5. 接收客户端的数据,并保存到本地
while(sChannel.read(buf) != -1){
buf.flip();
outChannel.write(buf);
buf.clear();
}
//6. 关闭通道
sChannel.close();
outChannel.close();
ssChannel.close();
}
继续测试阻塞式网络通信,客户端会接收服务端反馈:
//客户端
@Test
public void client() throws IOException{
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
ByteBuffer buf = ByteBuffer.allocate(1024);
while(inChannel.read(buf) != -1){
buf.flip();
sChannel.write(buf);
buf.clear();
}
sChannel.shutdownOutput();
//接收服务端的反馈
int len = 0;
while((len = sChannel.read(buf)) != -1){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
inChannel.close();
sChannel.close();
}
//服务端
@Test
public void server() throws IOException{
ServerSocketChannel ssChannel = ServerSocketChannel.open();
FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
ssChannel.bind(new InetSocketAddress(9898));
SocketChannel sChannel = ssChannel.accept();
ByteBuffer buf = ByteBuffer.allocate(1024);
while(sChannel.read(buf) != -1){
buf.flip();
outChannel.write(buf);
buf.clear();
}
//发送反馈给客户端
buf.put("服务端接收数据成功".getBytes());
buf.flip();
sChannel.write(buf);
sChannel.close();
outChannel.close();
ssChannel.close();
}
非阻塞式网络通信
//客户端
@Test
public void client() throws IOException{
//1. 获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
//2. 切换非阻塞模式
sChannel.configureBlocking(false);
//3. 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
//4. 发送数据给服务端
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
//5. 关闭通道
sChannel.close();
}
//服务端
@Test
public void server() throws IOException{
//1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 切换非阻塞模式
ssChannel.configureBlocking(false);
//3. 绑定连接
ssChannel.bind(new InetSocketAddress(9898));
//4. 获取选择器
Selector selector = Selector.open();
//5. 将通道注册到选择器上, 并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 轮询式的获取选择器上已经“准备就绪”的事件
while(selector.select() > 0){
//7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
//8. 获取准备“就绪”的是事件
SelectionKey sk = it.next();
//9. 判断具体是什么事件准备就绪
if(sk.isAcceptable()){
//10. 若“接收就绪”,获取客户端连接
SocketChannel sChannel = ssChannel.accept();
//11. 切换非阻塞模式
sChannel.configureBlocking(false);
//12. 将该通道注册到选择器上
sChannel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
//13. 获取当前选择器上“读就绪”状态的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
//14. 读取数据
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while((len = sChannel.read(buf)) > 0 ){
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}
//15. 取消选择键 SelectionKey
it.remove();
}
}
}
DatagramChannel 演示:
@Test
public void send() throws IOException{
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
Scanner scan = new Scanner(System.in);
while(scan.hasNext()){
String str = scan.next();
buf.put((new Date().toString() + ":\n" + str).getBytes());
buf.flip();
dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
buf.clear();
}
dc.close();
}
@Test
public void receive() throws IOException{
DatagramChannel dc = DatagramChannel.open();
dc.configureBlocking(false);
dc.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dc.register(selector, SelectionKey.OP_READ);
while(selector.select() > 0){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
if(sk.isReadable()){
ByteBuffer buf = ByteBuffer.allocate(1024);
dc.receive(buf);
buf.flip();
System.out.println(new String(buf.array(), 0, buf.limit()));
buf.clear();
}
}
it.remove();
}
}
Pipe 代码演示:
@Test
public void test1() throws IOException{
//1. 获取管道
Pipe pipe = Pipe.open();
//2. 将缓冲区中的数据写入管道
ByteBuffer buf = ByteBuffer.allocate(1024);
Pipe.SinkChannel sinkChannel = pipe.sink();
buf.put("通过单向管道发送数据".getBytes());
buf.flip();
sinkChannel.write(buf);
//3. 读取缓冲区中的数据
Pipe.SourceChannel sourceChannel = pipe.source();
buf.flip();
int len = sourceChannel.read(buf);
System.out.println(new String(buf.array(), 0, len));
sourceChannel.close();
sinkChannel.close();
}
实战
1)获取FileChannel
public void test3() throws Exception {
FileInputStream in = new FileInputStream("");
ReadableByteChannel channel = Channels.newChannel(in);
InputStream inputStream = Channels.newInputStream(channel);
FileChannel fileChannel = in.getChannel();
}
2)读文件
public void test4() throws Exception {
FileInputStream in = new FileInputStream("f1.txt");
FileChannel channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear();
}
}
3)写文件
public void test5() throws Exception {
FileOutputStream out = new FileOutputStream("f3.txt");
FileChannel channel = out.getChannel();
String text = "jfdlksa";
ByteBuffer buffer = ByteBuffer.wrap(text.getBytes());
channel.write(buffer);
}
4)基于内存映射读取文件,速度最快
public void test6() throws Exception {
FileInputStream in = new FileInputStream("f2.txt");
FileChannel c = in.getChannel();
MappedByteBuffer mbb = c.map(FileChannel.MapMode.READ_ONLY, 0, c.size());
while (mbb.hasRemaining()) {
System.out.print((char) mbb.get());
}
}
5)文件拷贝
public void test7() throws Exception {
try (FileChannel src = new FileInputStream("f2.txt").getChannel();
FileChannel des = new FileOutputStream("f2_back.txt").getChannel()) {
src.transferTo(0, src.size(), des);
// 或者如下
// des.transferFrom(src, 0, src.size());
}
}