初识NIO
简介
Non-blocking IO(非阻塞式IO)
是JDK1.4推出的新特性,为所有的原始类型(boolean除外)提供缓存支持的数据容器,
使用它可以提供非阻塞式高伸缩网络。NIO与BIO(同步阻塞式IO)有着相同的目的,
但是使用方法完全不同,NIO是支持面向缓冲区,基于通道的IO操作。
NIO与BIO差异
BIO |
面向流 |
阻塞IO 单向 |
无选择器 |
NIO |
面向缓冲区 |
非阻塞IO 双向 |
Selector选择器 |
NIO核心组成
1.Buffer: 缓冲区
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,
并提供一组方法,用来方便的访问该内存。与之间操作数组相比,Buffer API更易于操作和管理。
2.Channel: 通道
既可以从通道中读取数据,又可以写数据到通道中,读写双向非阻塞。
3.Selector: 选择器
可以检测一个或多个NIO通道,当通道注册到选择器上,确定这些通道是否已经准备好进行读取或写入。
这样就可以实现使用一个线程管理多个连接(通道),节省服务器线程资源开销,提高性能。
Buffer(缓冲区)
概述
缓冲区是一个用于特定基本数据类型的容器,所有的缓冲区都是Buffer抽象类的子类。
NIO中的缓冲区主要用于与NIO通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道的。
ByteBuffer 字节类型缓冲区
缓冲区概念
1.容量 capacity: 表示一个内存块,Buffer具有一定的固定大小,容量大小固定。
2.限制 表示缓冲区可以操作数据的大小(limit后的数据不能进行读写),limit不能
为负数且不大于容量。
写入模式:limit等于buffer的容量
读取模式:limit等于写入的数据量
3.position: 下一个要读取或写入数据的索引。0 <= position <= limit
4.mark和reset: 标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的position,
之后调用reset()恢复到position()
0 <= mark <= position <= limit <= capacity
API
//大部分数据类型都提供了Buffer类,以最常用的ByteBuffer为例。
ByteBuffer: 字节缓冲区。
ByteBuffer不提供构造方法,一般通过静态方法开辟字节缓冲区。
1.allocate(int capacity)在堆内存分配一个新的字节缓冲区,指定缓冲区大小。
2.allocateDirect(int capacity)在直接内存分配新的直接字节缓冲区,指定缓冲区大小。
成员方法:
1.clear() 清空缓冲区并返回对缓冲区的引用,不清除数据,仅仅把position的位置恢复到第一个位置
2.flip() 将缓冲区的界限设置为当前位置,并将当前位置重置为0
3.capacity() 返回buffer的容量大小
4.hasRemaining() 判断缓冲区中是否含有元素
5.limit() 返回buffer的界限limit的位置
6.limit(int n) 将设置缓冲区的界限为n,并返回一个具有limit的缓冲区对象
7.mark() 对象缓冲区设置标记
8.position() 返回缓冲区的当前位置position
9.position(int n) 将设置缓冲区的当前位置为n,并返回修改后的缓冲区对象
10.remaining() 返回position和limit之间的元素个数
11.reset() 将位置position转到以前设置的mark所在的位置
12.rewind() 将位置设置为0,取消设置的mark
13.array() 返回实现此缓冲区的byte数组(可选操作)。
14.compact() 压缩此缓冲区(可选操作)。
//读取缓冲区中的数据
15.get() 读取单个字节
16.get(byte[] dst) 批量读取字节到dst中
17.get(int index) 读取指定位置的字节
//向缓冲区写入数据
18.put(byte b) 往缓冲区中加一个字节
19.put(byte[] src) 往缓冲区加一个字节数组
20.put(int index,byte b) 将指定字节写入缓冲区的索引位置
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("------------------------");
String name = "hello";
buffer.put(name.getBytes());
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("------------------------");
buffer.flip();
buffer.mark();
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("------------------------");
byte[] bs = new byte[3];
buffer.get(bs);
System.out.println(new String(bs,0,3));
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println("------------------------");
buffer.reset();
if (buffer.hasRemaining()) {
System.err.println("position ~ limit元素个数: " + buffer.remaining());
}
System.err.println(buffer.position());
System.err.println(buffer.limit());
System.err.println(buffer.capacity());
}
直接内存和堆内存
直接内存:
直接内存,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。
本地IO-->直接内存-->本地IO
非直接内存(堆内存):
非直接内存,也就是堆内存中的数据,如果要作IO操作,会先复制到直接内存,再利用本地IO处理。
本地IO-->直接内存-->非直接内存-->直接内存-->本地IO
直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,
这部分的数据是在JVM之外的,因此它不会占用应用的内存。如果不是能带来很明显的性能提升,
还是推荐直接使用堆内存。
Channel(通道)
概述
Channel表示IO源与目标打开的连接,Channel本身能直接访问数据,需要借助Buffer完成数据的传输。
通道是双向的,流是单向的。
通道可以实现异步读写。
通道可以从缓冲区读取数据,也可以将数据写入缓冲区。
常用类
FileChannel 用于读取,写入,映射和操作文件的通道。
SocketChannel 通过TCP读写网络中的数据的通道。
ServerSocketChannel 可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel。
DatagramChannel 通过UDP读写网络数据报的通道。
API
FileChannel: 用于读取、写入、映射和操作文件的通道。
不提供构造方法,一般通过FileOutputStream或者FileInputStream实例的getCahnnel()获得通道。
成员方法:
1.read(ByteBuffer dst) 将字节序列从此通道读入给定的缓冲区。
2.read(ByteBuffer[] dsts) 将字节序列从此通道读入给定的缓冲区。
3.read(ByteBuffer[] dsts,int offset,int length) 将字节序列从此通道读入给定缓冲区的子序列中。
4.write(ByteBuffer src) 将字节序列从给定的缓冲区写入此通道。
5.write(ByteBuffer[] srcs) 将字节序列从给定的缓冲区写入此通道。
6.write(ByteBuffer[] srcs, int offset, int length) 将字节序列从给定缓冲区的子序列写入此通道。
7.write(ByteBuffer src, long position) 从给定的文件位置开始,将字节序列从给定缓冲区写入此通道。
8.transferFrom(ReadableByteChannel src, long position, long count) 将字节从给定的可读取字节通道传输到此通道的文件中。
9.transferTo(long position, long count, WritableByteChannel target) 将字节从此通道的文件传输到给定的可写入字节通道。
10.size() 返回此通道的文件的当前大小。
SocketChannel: 针对面向流的连接套接字的可选择通道。
不提供构造方法,一般调用静态open(InetSocketAddress remote)获取实例对象。
成员方法:
1.read(ByteBuffer dst) 将字节序列从此通道中读入给定的缓冲区。
2.read(ByteBuffer[] dsts, int offset, int length) 将字节序列从此通道读入给定缓冲区的子序列中。
3.write(ByteBuffer src) 将字节序列从给定的缓冲区中写入此通道。
4.write(ByteBuffer[] srcs, int offset, int length) 将字节序列从给定缓冲区的子序列写入此通道。
ServerSocketChannel: 针对面向流的侦听套接字的可选择通道。
不提供构造方法,一般通过静态方法open()获取对象。
成员方法:
1.accept() 接受到此通道套接字的连接。
2.socket() 获取与此通道关联的服务器套接字。
3.validOps() 返回一个操作集,标识此通道所支持的操作。
将缓冲区流写入管道
public static void main(String[] args) {
FileOutputStream fos = null;
try {
String filePath = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt";
fos = new FileOutputStream(new File(filePath));
//得到字节输出流对应的通道
FileChannel channel = fos.getChannel();
//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
buffer.put("Hello Java".getBytes());
buffer.flip();//把缓冲区切换为读模式
channel.write(buffer);//将缓冲区的数据流写到管道中
channel.close();//关闭管道
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
将管道中流读入缓冲区
public static void main(String[] args) {
FileInputStream fis = null;
try {
String filePath = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt";
fis = new FileInputStream(new File(filePath));
FileChannel channel = fis.getChannel();
int size = fis.available();
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(),0,buffer.remaining()));
channel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
文件拷贝案例
public static void main(String[] args) {
copy();
}
//文件拷贝
public static void copy() {
FileInputStream fis = null;
FileOutputStream fos = null;
try {
String src = "C:\\Users\\ms674\\Desktop\\icon\\jmeter.ico";
String copy = "C:\\Users\\ms674\\Desktop\\File\\NIO\\jmeter.ico";
fis = new FileInputStream(new File(src));
fos = new FileOutputStream(new File(copy));
//得到文件通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
//分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
//清空缓冲区再写入数据
buffer.clear();
int flag = fisChannel.read(buffer);
if (flag == -1) {
break;//读取为空返回-1
}
//已经读取到数据,切断可读模式
buffer.flip();
//将数据写入输出管道
fosChannel.write(buffer);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
分散读取和聚合写入
public static void main(String[] args) {
FileInputStream fis = null;
FileOutputStream fos = null;
try {
String src = "C:\\Users\\ms674\\Desktop\\icon\\jmeter.ico";
String copy = "C:\\Users\\ms674\\Desktop\\File\\DDD\\data-copy.ico";
fis = new FileInputStream(new File(src));
fos = new FileOutputStream(new File(copy));
//得到文件通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
//定义多个缓冲区
ByteBuffer buffer1 = ByteBuffer.allocate((fis.available()/3)+1);
ByteBuffer buffer2 = ByteBuffer.allocate((fis.available()/3)+1);
ByteBuffer buffer3 = ByteBuffer.allocate((fis.available())/3+1);
ByteBuffer[] buffers = {buffer1, buffer2, buffer3};
//从通道中读取数据分散到各个缓冲区
fisChannel.read(buffers);
//将各个缓冲区的数据聚集写入通道
for (ByteBuffer buffer : buffers) {
buffer.flip();//将各个缓冲区切换为可读模式
}
fosChannel.write(buffers);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
通道数据流拷贝
public static void main(String[] args) {
FileInputStream fis = null;
FileOutputStream fos = null;
try {
String src = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt";
String copy = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data-transferFrom.txt";
fis = new FileInputStream(new File(src));
fos = new FileOutputStream(new File(copy));
//得到文件通道
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
//通道数据转移拷贝 - 不需要自定义缓冲区
fosChannel.transferFrom(fisChannel,fisChannel.position(),fisChannel.size());
//关闭通道
fosChannel.close();
fisChannel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Selector(多路复用器)
概述
Selector是SelectableChannel对象的多路复用器,Selector可以同时监控
多个SelectableChannel的IO状况,通过Selector可以实现用一个线程去管理多个
线程,Selector是非阻塞IO的核心。
监听状态
SelectionKey.OP_CONNECT 连接就绪
SelectionKey.OP_ACCEPT 接收就绪
SelectionKey.OP_READ 读就绪
SelectionKey.OP_WRITE 写就绪
如果你对不止一种事件感兴趣,使用或运算符即可
SelectionKey.OP_READ | SelectionKey.OP_WRITE
API
Selector: SelectableChannel对象的多路复用器。
不提供构造器,使用静态方法open()获取Selector对象。
静态方法:
1.open() 打开一个选择器。
2.keys() 返回此选择器的键集。
3.isOpen() 告知此选择器是否已打开。
4.select() 选择一组键,其相应的通道已为 I/O 操作准备就绪。
5.selectedKeys() 返回此选择器的已选择键集。
6.close() 关闭此选择器。
服务端
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
try {
//获取服务端通道
serverSocketChannel = ServerSocketChannel.open();
//切换为非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定端口,让客户端连接服务端
serverSocketChannel.bind(new InetSocketAddress(8192));
//获取选择器Selector
Selector selector = Selector.open();
//将通道注册到选择器上,指定监听事件(接收事件)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//使用Selector选择器轮询已经就绪好的事件,如果没事件就一直等待
while (selector.select() > 0) {
//获取选择器中的所有注册的通道中已经就绪好的事件
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
//开始遍历准备好的事件
while (it.hasNext()) {
//提取当前事件
SelectionKey sk = it.next();
//判断这个事件具体是什么
if (sk.isAcceptable()) {
//直接获取当前接入的客户端通道
SocketChannel socketChannel = serverSocketChannel.accept();
//客户端通道切换非阻塞模式
socketChannel.configureBlocking(false);
//将客户端通道注册到选择器上,读监听
socketChannel.register(selector,SelectionKey.OP_READ);
} else if (sk.isReadable()) {
//获取当前选择器上的都就绪事件
SocketChannel socketChannel = (SocketChannel) sk.channel();
//开始读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();//切换为读模式
System.out.println(new String(buffer.array(),0,len));
buffer.clear();//重置position到第一个位置
}
}
it.remove();//处理完事件后移除
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocketChannel != null) {
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端
public static void main(String[] args) {
SocketChannel socketChannel = null;
try {
//获取通道
socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8192));
//切换成非阻塞模式
socketChannel.configureBlocking(false);
//分配指定的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//发送数据给服务端
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入信息:");
String line = scanner.nextLine();
if ("exit".equals(line)) {
break;
}
String msg = "消息内容:"+line+"\t发送时间:"+ LocalDateTime.now();
buffer.put(msg.getBytes());
buffer.flip();//切换为可读模式
socketChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (socketChannel != null) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}