初识NIO
简介
Non-blocking IO(非阻塞式IO)是JDK1.4推出的新特性,为所有的原始类型(boolean除外)提供缓存支持的数据容器,使用它可以提供非阻塞式高伸缩网络。NIO与BIO(同步阻塞式IO)有着相同的目的,但是使用方法完全不同,NIO是支持面向缓冲区,基于通道的IO操作。
NIO与BIO差异
| BIO |
面向流 |
阻塞IO 单向 |
无选择器 |
| NIO |
面向缓冲区 |
非阻塞IO 双向 |
Selector选择器 |
NIO核心组成
1.Buffer: 缓冲区缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供一组方法,用来方便的访问该内存。与之间操作数组相比,Buffer API更易于操作和管理。2.Channel: 通道既可以从通道中读取数据,又可以写数据到通道中,读写双向非阻塞。3.Selector: 选择器可以检测一个或多个NIO通道,当通道注册到选择器上,确定这些通道是否已经准备好进行读取或写入。这样就可以实现使用一个线程管理多个连接(通道),节省服务器线程资源开销,提高性能。
Buffer(缓冲区)
概述
缓冲区是一个用于特定基本数据类型的容器,所有的缓冲区都是Buffer抽象类的子类。NIO中的缓冲区主要用于与NIO通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道的。ByteBuffer 字节类型缓冲区缓冲区概念1.容量 capacity: 表示一个内存块,Buffer具有一定的固定大小,容量大小固定。2.限制 表示缓冲区可以操作数据的大小(limit后的数据不能进行读写),limit不能为负数且不大于容量。写入模式:limit等于buffer的容量读取模式:limit等于写入的数据量3.position: 下一个要读取或写入数据的索引。0 <= position <= limit4.mark和reset: 标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的position, 之后调用reset()恢复到position()0 <= mark <= position <= limit <= capacity
API
//大部分数据类型都提供了Buffer类,以最常用的ByteBuffer为例。ByteBuffer: 字节缓冲区。ByteBuffer不提供构造方法,一般通过静态方法开辟字节缓冲区。1.allocate(int capacity)在堆内存分配一个新的字节缓冲区,指定缓冲区大小。2.allocateDirect(int capacity)在直接内存分配新的直接字节缓冲区,指定缓冲区大小。成员方法:1.clear() 清空缓冲区并返回对缓冲区的引用,不清除数据,仅仅把position的位置恢复到第一个位置2.flip() 将缓冲区的界限设置为当前位置,并将当前位置重置为03.capacity() 返回buffer的容量大小4.hasRemaining() 判断缓冲区中是否含有元素5.limit() 返回buffer的界限limit的位置6.limit(int n) 将设置缓冲区的界限为n,并返回一个具有limit的缓冲区对象7.mark() 对象缓冲区设置标记8.position() 返回缓冲区的当前位置position9.position(int n) 将设置缓冲区的当前位置为n,并返回修改后的缓冲区对象10.remaining() 返回position和limit之间的元素个数11.reset() 将位置position转到以前设置的mark所在的位置12.rewind() 将位置设置为0,取消设置的mark 13.array() 返回实现此缓冲区的byte数组(可选操作)。14.compact() 压缩此缓冲区(可选操作)。//读取缓冲区中的数据15.get() 读取单个字节16.get(byte[] dst) 批量读取字节到dst中17.get(int index) 读取指定位置的字节//向缓冲区写入数据18.put(byte b) 往缓冲区中加一个字节19.put(byte[] src) 往缓冲区加一个字节数组20.put(int index,byte b) 将指定字节写入缓冲区的索引位置
public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10); System.out.println(buffer.position()); System.out.println(buffer.limit()); System.out.println(buffer.capacity()); System.out.println("------------------------"); String name = "hello"; buffer.put(name.getBytes()); System.out.println(buffer.position()); System.out.println(buffer.limit()); System.out.println(buffer.capacity()); System.out.println("------------------------"); buffer.flip(); buffer.mark(); System.out.println(buffer.position()); System.out.println(buffer.limit()); System.out.println(buffer.capacity()); System.out.println("------------------------"); byte[] bs = new byte[3]; buffer.get(bs); System.out.println(new String(bs,0,3)); System.out.println(buffer.position()); System.out.println(buffer.limit()); System.out.println(buffer.capacity()); System.out.println("------------------------"); buffer.reset(); if (buffer.hasRemaining()) { System.err.println("position ~ limit元素个数: " + buffer.remaining()); } System.err.println(buffer.position()); System.err.println(buffer.limit()); System.err.println(buffer.capacity());}
直接内存和堆内存
直接内存: 直接内存,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。 本地IO-->直接内存-->本地IO非直接内存(堆内存): 非直接内存,也就是堆内存中的数据,如果要作IO操作,会先复制到直接内存,再利用本地IO处理。 本地IO-->直接内存-->非直接内存-->直接内存-->本地IO直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,这部分的数据是在JVM之外的,因此它不会占用应用的内存。如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。
Channel(通道)
概述
Channel表示IO源与目标打开的连接,Channel本身能直接访问数据,需要借助Buffer完成数据的传输。通道是双向的,流是单向的。通道可以实现异步读写。通道可以从缓冲区读取数据,也可以将数据写入缓冲区。常用类FileChannel 用于读取,写入,映射和操作文件的通道。SocketChannel 通过TCP读写网络中的数据的通道。ServerSocketChannel 可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel。DatagramChannel 通过UDP读写网络数据报的通道。
API
FileChannel: 用于读取、写入、映射和操作文件的通道。不提供构造方法,一般通过FileOutputStream或者FileInputStream实例的getCahnnel()获得通道。成员方法:1.read(ByteBuffer dst) 将字节序列从此通道读入给定的缓冲区。2.read(ByteBuffer[] dsts) 将字节序列从此通道读入给定的缓冲区。3.read(ByteBuffer[] dsts,int offset,int length) 将字节序列从此通道读入给定缓冲区的子序列中。4.write(ByteBuffer src) 将字节序列从给定的缓冲区写入此通道。5.write(ByteBuffer[] srcs) 将字节序列从给定的缓冲区写入此通道。6.write(ByteBuffer[] srcs, int offset, int length) 将字节序列从给定缓冲区的子序列写入此通道。7.write(ByteBuffer src, long position) 从给定的文件位置开始,将字节序列从给定缓冲区写入此通道。8.transferFrom(ReadableByteChannel src, long position, long count) 将字节从给定的可读取字节通道传输到此通道的文件中。9.transferTo(long position, long count, WritableByteChannel target) 将字节从此通道的文件传输到给定的可写入字节通道。10.size() 返回此通道的文件的当前大小。SocketChannel: 针对面向流的连接套接字的可选择通道。不提供构造方法,一般调用静态open(InetSocketAddress remote)获取实例对象。成员方法:1.read(ByteBuffer dst) 将字节序列从此通道中读入给定的缓冲区。2.read(ByteBuffer[] dsts, int offset, int length) 将字节序列从此通道读入给定缓冲区的子序列中。3.write(ByteBuffer src) 将字节序列从给定的缓冲区中写入此通道。4.write(ByteBuffer[] srcs, int offset, int length) 将字节序列从给定缓冲区的子序列写入此通道。ServerSocketChannel: 针对面向流的侦听套接字的可选择通道。不提供构造方法,一般通过静态方法open()获取对象。成员方法:1.accept() 接受到此通道套接字的连接。2.socket() 获取与此通道关联的服务器套接字。3.validOps() 返回一个操作集,标识此通道所支持的操作。
将缓冲区流写入管道
public static void main(String[] args) { FileOutputStream fos = null; try { String filePath = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt"; fos = new FileOutputStream(new File(filePath)); //得到字节输出流对应的通道 FileChannel channel = fos.getChannel(); //分配缓冲区 ByteBuffer buffer = ByteBuffer.allocateDirect(1024); buffer.put("Hello Java".getBytes()); buffer.flip();//把缓冲区切换为读模式 channel.write(buffer);//将缓冲区的数据流写到管道中 channel.close();//关闭管道 } catch (Exception e) { e.printStackTrace(); } finally { if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } }}
将管道中流读入缓冲区
public static void main(String[] args) { FileInputStream fis = null; try { String filePath = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt"; fis = new FileInputStream(new File(filePath)); FileChannel channel = fis.getChannel(); int size = fis.available(); ByteBuffer buffer = ByteBuffer.allocate(size); channel.read(buffer); buffer.flip(); System.out.println(new String(buffer.array(),0,buffer.remaining())); channel.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } }}
文件拷贝案例
public static void main(String[] args) { copy();}//文件拷贝public static void copy() { FileInputStream fis = null; FileOutputStream fos = null; try { String src = "C:\\Users\\ms674\\Desktop\\icon\\jmeter.ico"; String copy = "C:\\Users\\ms674\\Desktop\\File\\NIO\\jmeter.ico"; fis = new FileInputStream(new File(src)); fos = new FileOutputStream(new File(copy)); //得到文件通道 FileChannel fisChannel = fis.getChannel(); FileChannel fosChannel = fos.getChannel(); //分配缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { //清空缓冲区再写入数据 buffer.clear(); int flag = fisChannel.read(buffer); if (flag == -1) { break;//读取为空返回-1 } //已经读取到数据,切断可读模式 buffer.flip(); //将数据写入输出管道 fosChannel.write(buffer); } } catch (Exception e) { e.printStackTrace(); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } }}
分散读取和聚合写入
public static void main(String[] args) { FileInputStream fis = null; FileOutputStream fos = null; try { String src = "C:\\Users\\ms674\\Desktop\\icon\\jmeter.ico"; String copy = "C:\\Users\\ms674\\Desktop\\File\\DDD\\data-copy.ico"; fis = new FileInputStream(new File(src)); fos = new FileOutputStream(new File(copy)); //得到文件通道 FileChannel fisChannel = fis.getChannel(); FileChannel fosChannel = fos.getChannel(); //定义多个缓冲区 ByteBuffer buffer1 = ByteBuffer.allocate((fis.available()/3)+1); ByteBuffer buffer2 = ByteBuffer.allocate((fis.available()/3)+1); ByteBuffer buffer3 = ByteBuffer.allocate((fis.available())/3+1); ByteBuffer[] buffers = {buffer1, buffer2, buffer3}; //从通道中读取数据分散到各个缓冲区 fisChannel.read(buffers); //将各个缓冲区的数据聚集写入通道 for (ByteBuffer buffer : buffers) { buffer.flip();//将各个缓冲区切换为可读模式 } fosChannel.write(buffers); } catch (Exception e) { e.printStackTrace(); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } }}
通道数据流拷贝
public static void main(String[] args) { FileInputStream fis = null; FileOutputStream fos = null; try { String src = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt"; String copy = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data-transferFrom.txt"; fis = new FileInputStream(new File(src)); fos = new FileOutputStream(new File(copy)); //得到文件通道 FileChannel fisChannel = fis.getChannel(); FileChannel fosChannel = fos.getChannel(); //通道数据转移拷贝 - 不需要自定义缓冲区 fosChannel.transferFrom(fisChannel,fisChannel.position(),fisChannel.size()); //关闭通道 fosChannel.close(); fisChannel.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } }}
Selector(多路复用器)
概述
Selector是SelectableChannel对象的多路复用器,Selector可以同时监控多个SelectableChannel的IO状况,通过Selector可以实现用一个线程去管理多个线程,Selector是非阻塞IO的核心。监听状态SelectionKey.OP_CONNECT 连接就绪SelectionKey.OP_ACCEPT 接收就绪SelectionKey.OP_READ 读就绪SelectionKey.OP_WRITE 写就绪如果你对不止一种事件感兴趣,使用或运算符即可SelectionKey.OP_READ | SelectionKey.OP_WRITE
API
Selector: SelectableChannel对象的多路复用器。不提供构造器,使用静态方法open()获取Selector对象。静态方法:1.open() 打开一个选择器。2.keys() 返回此选择器的键集。3.isOpen() 告知此选择器是否已打开。4.select() 选择一组键,其相应的通道已为 I/O 操作准备就绪。5.selectedKeys() 返回此选择器的已选择键集。6.close() 关闭此选择器。
服务端
public static void main(String[] args) { ServerSocketChannel serverSocketChannel = null; try { //获取服务端通道 serverSocketChannel = ServerSocketChannel.open(); //切换为非阻塞模式 serverSocketChannel.configureBlocking(false); //绑定端口,让客户端连接服务端 serverSocketChannel.bind(new InetSocketAddress(8192)); //获取选择器Selector Selector selector = Selector.open(); //将通道注册到选择器上,指定监听事件(接收事件) serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //使用Selector选择器轮询已经就绪好的事件,如果没事件就一直等待 while (selector.select() > 0) { //获取选择器中的所有注册的通道中已经就绪好的事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //开始遍历准备好的事件 while (it.hasNext()) { //提取当前事件 SelectionKey sk = it.next(); //判断这个事件具体是什么 if (sk.isAcceptable()) { //直接获取当前接入的客户端通道 SocketChannel socketChannel = serverSocketChannel.accept(); //客户端通道切换非阻塞模式 socketChannel.configureBlocking(false); //将客户端通道注册到选择器上,读监听 socketChannel.register(selector,SelectionKey.OP_READ); } else if (sk.isReadable()) { //获取当前选择器上的都就绪事件 SocketChannel socketChannel = (SocketChannel) sk.channel(); //开始读取数据 ByteBuffer buffer = ByteBuffer.allocate(1024); int len; while ((len = socketChannel.read(buffer)) > 0) { buffer.flip();//切换为读模式 System.out.println(new String(buffer.array(),0,len)); buffer.clear();//重置position到第一个位置 } } it.remove();//处理完事件后移除 } } } catch (IOException e) { e.printStackTrace(); } finally { if (serverSocketChannel != null) { try { serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }}
客户端
public static void main(String[] args) { SocketChannel socketChannel = null; try { //获取通道 socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8192)); //切换成非阻塞模式 socketChannel.configureBlocking(false); //分配指定的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //发送数据给服务端 Scanner scanner = new Scanner(System.in); while (true) { System.out.print("请输入信息:"); String line = scanner.nextLine(); if ("exit".equals(line)) { break; } String msg = "消息内容:"+line+"\t发送时间:"+ LocalDateTime.now(); buffer.put(msg.getBytes()); buffer.flip();//切换为可读模式 socketChannel.write(buffer); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if (socketChannel != null) { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }}