初识NIO

简介

  1. Non-blocking IO(非阻塞式IO)
  2. JDK1.4推出的新特性,为所有的原始类型(boolean除外)提供缓存支持的数据容器,
  3. 使用它可以提供非阻塞式高伸缩网络。NIOBIO(同步阻塞式IO)有着相同的目的,
  4. 但是使用方法完全不同,NIO是支持面向缓冲区,基于通道的IO操作。

NIO与BIO差异

BIO 面向流 阻塞IO 单向 无选择器
NIO 面向缓冲区 非阻塞IO 双向 Selector选择器

NIO核心组成

  1. 1.Buffer: 缓冲区
  2. 缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,
  3. 并提供一组方法,用来方便的访问该内存。与之间操作数组相比,Buffer API更易于操作和管理。
  4. 2.Channel: 通道
  5. 既可以从通道中读取数据,又可以写数据到通道中,读写双向非阻塞。
  6. 3.Selector: 选择器
  7. 可以检测一个或多个NIO通道,当通道注册到选择器上,确定这些通道是否已经准备好进行读取或写入。
  8. 这样就可以实现使用一个线程管理多个连接(通道),节省服务器线程资源开销,提高性能。

Buffer(缓冲区)

概述

  1. 缓冲区是一个用于特定基本数据类型的容器,所有的缓冲区都是Buffer抽象类的子类。
  2. NIO中的缓冲区主要用于与NIO通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道的。
  3. ByteBuffer 字节类型缓冲区
  4. 缓冲区概念
  5. 1.容量 capacity: 表示一个内存块,Buffer具有一定的固定大小,容量大小固定。
  6. 2.限制 表示缓冲区可以操作数据的大小(limit后的数据不能进行读写),limit不能
  7. 为负数且不大于容量。
  8. 写入模式:limit等于buffer的容量
  9. 读取模式:limit等于写入的数据量
  10. 3.position: 下一个要读取或写入数据的索引。0 <= position <= limit
  11. 4.markreset: 标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的position
  12. 之后调用reset()恢复到position()
  13. 0 <= mark <= position <= limit <= capacity

API

  1. //大部分数据类型都提供了Buffer类,以最常用的ByteBuffer为例。
  2. ByteBuffer: 字节缓冲区。
  3. ByteBuffer不提供构造方法,一般通过静态方法开辟字节缓冲区。
  4. 1.allocate(int capacity)在堆内存分配一个新的字节缓冲区,指定缓冲区大小。
  5. 2.allocateDirect(int capacity)在直接内存分配新的直接字节缓冲区,指定缓冲区大小。
  6. 成员方法:
  7. 1.clear() 清空缓冲区并返回对缓冲区的引用,不清除数据,仅仅把position的位置恢复到第一个位置
  8. 2.flip() 将缓冲区的界限设置为当前位置,并将当前位置重置为0
  9. 3.capacity() 返回buffer的容量大小
  10. 4.hasRemaining() 判断缓冲区中是否含有元素
  11. 5.limit() 返回buffer的界限limit的位置
  12. 6.limit(int n) 将设置缓冲区的界限为n,并返回一个具有limit的缓冲区对象
  13. 7.mark() 对象缓冲区设置标记
  14. 8.position() 返回缓冲区的当前位置position
  15. 9.position(int n) 将设置缓冲区的当前位置为n,并返回修改后的缓冲区对象
  16. 10.remaining() 返回positionlimit之间的元素个数
  17. 11.reset() 将位置position转到以前设置的mark所在的位置
  18. 12.rewind() 将位置设置为0,取消设置的mark
  19. 13.array() 返回实现此缓冲区的byte数组(可选操作)。
  20. 14.compact() 压缩此缓冲区(可选操作)。
  21. //读取缓冲区中的数据
  22. 15.get() 读取单个字节
  23. 16.get(byte[] dst) 批量读取字节到dst
  24. 17.get(int index) 读取指定位置的字节
  25. //向缓冲区写入数据
  26. 18.put(byte b) 往缓冲区中加一个字节
  27. 19.put(byte[] src) 往缓冲区加一个字节数组
  28. 20.put(int index,byte b) 将指定字节写入缓冲区的索引位置
  1. public static void main(String[] args) {
  2. ByteBuffer buffer = ByteBuffer.allocate(10);
  3. System.out.println(buffer.position());
  4. System.out.println(buffer.limit());
  5. System.out.println(buffer.capacity());
  6. System.out.println("------------------------");
  7. String name = "hello";
  8. buffer.put(name.getBytes());
  9. System.out.println(buffer.position());
  10. System.out.println(buffer.limit());
  11. System.out.println(buffer.capacity());
  12. System.out.println("------------------------");
  13. buffer.flip();
  14. buffer.mark();
  15. System.out.println(buffer.position());
  16. System.out.println(buffer.limit());
  17. System.out.println(buffer.capacity());
  18. System.out.println("------------------------");
  19. byte[] bs = new byte[3];
  20. buffer.get(bs);
  21. System.out.println(new String(bs,0,3));
  22. System.out.println(buffer.position());
  23. System.out.println(buffer.limit());
  24. System.out.println(buffer.capacity());
  25. System.out.println("------------------------");
  26. buffer.reset();
  27. if (buffer.hasRemaining()) {
  28. System.err.println("position ~ limit元素个数: " + buffer.remaining());
  29. }
  30. System.err.println(buffer.position());
  31. System.err.println(buffer.limit());
  32. System.err.println(buffer.capacity());
  33. }

直接内存和堆内存

  1. 直接内存:
  2. 直接内存,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。
  3. 本地IO-->直接内存-->本地IO
  4. 非直接内存(堆内存):
  5. 非直接内存,也就是堆内存中的数据,如果要作IO操作,会先复制到直接内存,再利用本地IO处理。
  6. 本地IO-->直接内存-->非直接内存-->直接内存-->本地IO
  7. 直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,
  8. 这部分的数据是在JVM之外的,因此它不会占用应用的内存。如果不是能带来很明显的性能提升,
  9. 还是推荐直接使用堆内存。

Channel(通道)

概述

  1. Channel表示IO源与目标打开的连接,Channel本身能直接访问数据,需要借助Buffer完成数据的传输。
  2. 通道是双向的,流是单向的。
  3. 通道可以实现异步读写。
  4. 通道可以从缓冲区读取数据,也可以将数据写入缓冲区。
  5. 常用类
  6. FileChannel 用于读取,写入,映射和操作文件的通道。
  7. SocketChannel 通过TCP读写网络中的数据的通道。
  8. ServerSocketChannel 可以监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel
  9. DatagramChannel 通过UDP读写网络数据报的通道。

API

  1. FileChannel: 用于读取、写入、映射和操作文件的通道。
  2. 不提供构造方法,一般通过FileOutputStream或者FileInputStream实例的getCahnnel()获得通道。
  3. 成员方法:
  4. 1.read(ByteBuffer dst) 将字节序列从此通道读入给定的缓冲区。
  5. 2.read(ByteBuffer[] dsts) 将字节序列从此通道读入给定的缓冲区。
  6. 3.read(ByteBuffer[] dsts,int offset,int length) 将字节序列从此通道读入给定缓冲区的子序列中。
  7. 4.write(ByteBuffer src) 将字节序列从给定的缓冲区写入此通道。
  8. 5.write(ByteBuffer[] srcs) 将字节序列从给定的缓冲区写入此通道。
  9. 6.write(ByteBuffer[] srcs, int offset, int length) 将字节序列从给定缓冲区的子序列写入此通道。
  10. 7.write(ByteBuffer src, long position) 从给定的文件位置开始,将字节序列从给定缓冲区写入此通道。
  11. 8.transferFrom(ReadableByteChannel src, long position, long count) 将字节从给定的可读取字节通道传输到此通道的文件中。
  12. 9.transferTo(long position, long count, WritableByteChannel target) 将字节从此通道的文件传输到给定的可写入字节通道。
  13. 10.size() 返回此通道的文件的当前大小。
  14. SocketChannel: 针对面向流的连接套接字的可选择通道。
  15. 不提供构造方法,一般调用静态open(InetSocketAddress remote)获取实例对象。
  16. 成员方法:
  17. 1.read(ByteBuffer dst) 将字节序列从此通道中读入给定的缓冲区。
  18. 2.read(ByteBuffer[] dsts, int offset, int length) 将字节序列从此通道读入给定缓冲区的子序列中。
  19. 3.write(ByteBuffer src) 将字节序列从给定的缓冲区中写入此通道。
  20. 4.write(ByteBuffer[] srcs, int offset, int length) 将字节序列从给定缓冲区的子序列写入此通道。
  21. ServerSocketChannel: 针对面向流的侦听套接字的可选择通道。
  22. 不提供构造方法,一般通过静态方法open()获取对象。
  23. 成员方法:
  24. 1.accept() 接受到此通道套接字的连接。
  25. 2.socket() 获取与此通道关联的服务器套接字。
  26. 3.validOps() 返回一个操作集,标识此通道所支持的操作。

将缓冲区流写入管道

  1. public static void main(String[] args) {
  2. FileOutputStream fos = null;
  3. try {
  4. String filePath = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt";
  5. fos = new FileOutputStream(new File(filePath));
  6. //得到字节输出流对应的通道
  7. FileChannel channel = fos.getChannel();
  8. //分配缓冲区
  9. ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
  10. buffer.put("Hello Java".getBytes());
  11. buffer.flip();//把缓冲区切换为读模式
  12. channel.write(buffer);//将缓冲区的数据流写到管道中
  13. channel.close();//关闭管道
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. } finally {
  17. if (fos != null) {
  18. try {
  19. fos.close();
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. }

将管道中流读入缓冲区

  1. public static void main(String[] args) {
  2. FileInputStream fis = null;
  3. try {
  4. String filePath = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt";
  5. fis = new FileInputStream(new File(filePath));
  6. FileChannel channel = fis.getChannel();
  7. int size = fis.available();
  8. ByteBuffer buffer = ByteBuffer.allocate(size);
  9. channel.read(buffer);
  10. buffer.flip();
  11. System.out.println(new String(buffer.array(),0,buffer.remaining()));
  12. channel.close();
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. } finally {
  16. if (fis != null) {
  17. try {
  18. fis.close();
  19. } catch (IOException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }

文件拷贝案例

  1. public static void main(String[] args) {
  2. copy();
  3. }
  4. //文件拷贝
  5. public static void copy() {
  6. FileInputStream fis = null;
  7. FileOutputStream fos = null;
  8. try {
  9. String src = "C:\\Users\\ms674\\Desktop\\icon\\jmeter.ico";
  10. String copy = "C:\\Users\\ms674\\Desktop\\File\\NIO\\jmeter.ico";
  11. fis = new FileInputStream(new File(src));
  12. fos = new FileOutputStream(new File(copy));
  13. //得到文件通道
  14. FileChannel fisChannel = fis.getChannel();
  15. FileChannel fosChannel = fos.getChannel();
  16. //分配缓冲区
  17. ByteBuffer buffer = ByteBuffer.allocate(1024);
  18. while (true) {
  19. //清空缓冲区再写入数据
  20. buffer.clear();
  21. int flag = fisChannel.read(buffer);
  22. if (flag == -1) {
  23. break;//读取为空返回-1
  24. }
  25. //已经读取到数据,切断可读模式
  26. buffer.flip();
  27. //将数据写入输出管道
  28. fosChannel.write(buffer);
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally {
  33. if (fis != null) {
  34. try {
  35. fis.close();
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. if (fos != null) {
  41. try {
  42. fos.close();
  43. } catch (IOException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. }

分散读取和聚合写入

  1. public static void main(String[] args) {
  2. FileInputStream fis = null;
  3. FileOutputStream fos = null;
  4. try {
  5. String src = "C:\\Users\\ms674\\Desktop\\icon\\jmeter.ico";
  6. String copy = "C:\\Users\\ms674\\Desktop\\File\\DDD\\data-copy.ico";
  7. fis = new FileInputStream(new File(src));
  8. fos = new FileOutputStream(new File(copy));
  9. //得到文件通道
  10. FileChannel fisChannel = fis.getChannel();
  11. FileChannel fosChannel = fos.getChannel();
  12. //定义多个缓冲区
  13. ByteBuffer buffer1 = ByteBuffer.allocate((fis.available()/3)+1);
  14. ByteBuffer buffer2 = ByteBuffer.allocate((fis.available()/3)+1);
  15. ByteBuffer buffer3 = ByteBuffer.allocate((fis.available())/3+1);
  16. ByteBuffer[] buffers = {buffer1, buffer2, buffer3};
  17. //从通道中读取数据分散到各个缓冲区
  18. fisChannel.read(buffers);
  19. //将各个缓冲区的数据聚集写入通道
  20. for (ByteBuffer buffer : buffers) {
  21. buffer.flip();//将各个缓冲区切换为可读模式
  22. }
  23. fosChannel.write(buffers);
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. } finally {
  27. if (fis != null) {
  28. try {
  29. fis.close();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. if (fos != null) {
  35. try {
  36. fos.close();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }
  42. }

通道数据流拷贝

  1. public static void main(String[] args) {
  2. FileInputStream fis = null;
  3. FileOutputStream fos = null;
  4. try {
  5. String src = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data1.txt";
  6. String copy = "C:\\Users\\ms674\\Desktop\\File\\NIO\\data-transferFrom.txt";
  7. fis = new FileInputStream(new File(src));
  8. fos = new FileOutputStream(new File(copy));
  9. //得到文件通道
  10. FileChannel fisChannel = fis.getChannel();
  11. FileChannel fosChannel = fos.getChannel();
  12. //通道数据转移拷贝 - 不需要自定义缓冲区
  13. fosChannel.transferFrom(fisChannel,fisChannel.position(),fisChannel.size());
  14. //关闭通道
  15. fosChannel.close();
  16. fisChannel.close();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. if (fis != null) {
  21. try {
  22. fis.close();
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. if (fos != null) {
  28. try {
  29. fos.close();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }

Selector(多路复用器)

概述

  1. SelectorSelectableChannel对象的多路复用器,Selector可以同时监控
  2. 多个SelectableChannelIO状况,通过Selector可以实现用一个线程去管理多个
  3. 线程,Selector是非阻塞IO的核心。
  4. 监听状态
  5. SelectionKey.OP_CONNECT 连接就绪
  6. SelectionKey.OP_ACCEPT 接收就绪
  7. SelectionKey.OP_READ 读就绪
  8. SelectionKey.OP_WRITE 写就绪
  9. 如果你对不止一种事件感兴趣,使用或运算符即可
  10. SelectionKey.OP_READ | SelectionKey.OP_WRITE

API

  1. Selector: SelectableChannel对象的多路复用器。
  2. 不提供构造器,使用静态方法open()获取Selector对象。
  3. 静态方法:
  4. 1.open() 打开一个选择器。
  5. 2.keys() 返回此选择器的键集。
  6. 3.isOpen() 告知此选择器是否已打开。
  7. 4.select() 选择一组键,其相应的通道已为 I/O 操作准备就绪。
  8. 5.selectedKeys() 返回此选择器的已选择键集。
  9. 6.close() 关闭此选择器。

服务端

  1. public static void main(String[] args) {
  2. ServerSocketChannel serverSocketChannel = null;
  3. try {
  4. //获取服务端通道
  5. serverSocketChannel = ServerSocketChannel.open();
  6. //切换为非阻塞模式
  7. serverSocketChannel.configureBlocking(false);
  8. //绑定端口,让客户端连接服务端
  9. serverSocketChannel.bind(new InetSocketAddress(8192));
  10. //获取选择器Selector
  11. Selector selector = Selector.open();
  12. //将通道注册到选择器上,指定监听事件(接收事件)
  13. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  14. //使用Selector选择器轮询已经就绪好的事件,如果没事件就一直等待
  15. while (selector.select() > 0) {
  16. //获取选择器中的所有注册的通道中已经就绪好的事件
  17. Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  18. //开始遍历准备好的事件
  19. while (it.hasNext()) {
  20. //提取当前事件
  21. SelectionKey sk = it.next();
  22. //判断这个事件具体是什么
  23. if (sk.isAcceptable()) {
  24. //直接获取当前接入的客户端通道
  25. SocketChannel socketChannel = serverSocketChannel.accept();
  26. //客户端通道切换非阻塞模式
  27. socketChannel.configureBlocking(false);
  28. //将客户端通道注册到选择器上,读监听
  29. socketChannel.register(selector,SelectionKey.OP_READ);
  30. } else if (sk.isReadable()) {
  31. //获取当前选择器上的都就绪事件
  32. SocketChannel socketChannel = (SocketChannel) sk.channel();
  33. //开始读取数据
  34. ByteBuffer buffer = ByteBuffer.allocate(1024);
  35. int len;
  36. while ((len = socketChannel.read(buffer)) > 0) {
  37. buffer.flip();//切换为读模式
  38. System.out.println(new String(buffer.array(),0,len));
  39. buffer.clear();//重置position到第一个位置
  40. }
  41. }
  42. it.remove();//处理完事件后移除
  43. }
  44. }
  45. } catch (IOException e) {
  46. e.printStackTrace();
  47. } finally {
  48. if (serverSocketChannel != null) {
  49. try {
  50. serverSocketChannel.close();
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. }

客户端

  1. public static void main(String[] args) {
  2. SocketChannel socketChannel = null;
  3. try {
  4. //获取通道
  5. socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8192));
  6. //切换成非阻塞模式
  7. socketChannel.configureBlocking(false);
  8. //分配指定的缓冲区
  9. ByteBuffer buffer = ByteBuffer.allocate(1024);
  10. //发送数据给服务端
  11. Scanner scanner = new Scanner(System.in);
  12. while (true) {
  13. System.out.print("请输入信息:");
  14. String line = scanner.nextLine();
  15. if ("exit".equals(line)) {
  16. break;
  17. }
  18. String msg = "消息内容:"+line+"\t发送时间:"+ LocalDateTime.now();
  19. buffer.put(msg.getBytes());
  20. buffer.flip();//切换为可读模式
  21. socketChannel.write(buffer);
  22. buffer.clear();
  23. }
  24. } catch (IOException e) {
  25. e.printStackTrace();
  26. } finally {
  27. if (socketChannel != null) {
  28. try {
  29. socketChannel.close();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }