1. BIO 模型中,由于 serverSocket.accept()会造成阻塞、输入流的读写操作是synchronized的、无法在同一个线程中处理多个 Stream I/O,因此性能低下是不可避免的。而NIO具有非阻塞性,目前已成为BIO的替代品<br /> ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1642943584558-f8e14003-f70d-4a3c-9ecf-7cf64f793047.png#clientId=u7e2ac08d-69ce-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=321&id=u004d9e9d&margin=%5Bobject%20Object%5D&name=image.png&originHeight=793&originWidth=1653&originalType=binary&ratio=1&rotation=0&showTitle=false&size=167026&status=done&style=stroke&taskId=uff3f4642-4e9a-4612-aa8b-deac93aad71&title=&width=670)<br />![FK~63}HR}95PY@`DXFG`Z@0.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1642000350465-29f02447-8f29-4c95-949a-3ef0130ac19f.png#clientId=u5a80d8d2-f05c-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=372&id=uf462b909&margin=%5Bobject%20Object%5D&name=FK~63%7DHR%7D95PY%40%60DXFG%60Z%400.png&originHeight=744&originWidth=1254&originalType=binary&ratio=1&rotation=0&showTitle=false&size=181884&status=done&style=stroke&taskId=uc97c2808-5909-4448-8006-f15808e5483&title=&width=627)<br />**NIO操作逻辑**

NIO 基于Channel (通道)和 Buffer (缓存)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,Selector (选择器)用于监听多个通道的事件(比如连接请求、数据到达),因此使用单线程即可监听多个客户端通道

NIO BIO
面向缓冲区 Buffer 面向流 Stream
非阻塞 阻塞IO
Selectors 选择器

包含三大核心:Channel(通道)、Buffer(缓冲区)、Selector(选择器)

image.png
Buffer、Channel、Selector的关系

  1. 1、每个Channel都会对应一个Buffer
  2. 2、一个Selector对应一个线程,一个Selector对应多个注册的Channel
  3. 3、程序切换到哪个Channel是由事件类型决定的
  4. 4Selector会根据不同的事件在各个通道上切换
  5. 5、数据的读取通过Buffer实现(和BIO的本质不同处)
  6. 6BIO要么是输入流要么是输出流,不能双向;NIOBuffer可以进行读写,双向(channel也是双向)

源码下载:

  1. https://gitee.com/dmbjzorg/Netty

Buffer:

本质是可以读写数据的内存,这一块内存被NIO包装为 NIO Buffer 对象进行访问管理
使用 flip() 将写模式切换为读模式,数据读取后使用 clear() 还原指针位置并进行模式切换;compact() 压缩未读取的数据
常用子类:CharBuffer、ByteBuffer、LongBuffer、IntBuffer,通常使用 xxxBuffer.allocate(int size) 获得一个容量为size的Buffer对象

参数:

position:目前所在的位置,默认指向Buffer启始位置
limit:表示缓冲区的极限位置,不能对缓冲区超过极限的位置进行读写操作,极限位置可以修改
capacity:Buffer最大容量
标记(mark)和重置(reset):标记是一个索引,通过Buffer的mark方法指定一个特定的 position,之后可以通过 reset 方法恢复到这个 position

常用方法:

  1. Buffer clear() 还原positionlimit的位置(不是清空数据,可以用来将读模式切换为写模式)
  2. Buffer flip() 从写模式切换为读模式
  3. int capacity() 返回Buffer的容量大小
  4. boolean hasRemaining() 判断缓冲取中是否还有元素
  5. int limit() 返回Buffer的界限的位置
  6. Buffer limit(int n) 将设置缓冲区界限为n,并返回一个具有新Limit的缓存区对象
  7. Buffer mark() 对缓冲区设置标记
  8. int position() 范湖缓冲区的当前位置
  9. Buffer position(int n) 将设置缓冲区界限为n,并返回修改后的BUffer对象
  10. int remaining() 返回positionlimit之间的元素个数
  11. Buffer reset() position转到以前设置mark的位置
  12. Buffer rewind() 将位置设置为0,取消设置的mark,用于使用同一个Buffer进行多次读取
  13. boolean hasRemaining() position位置是否小于limit(是否还有数据未读取)
  14. wrap(byte[] array,int offset, int length) 把数组数据写入到缓冲区后反转为读模式(Buffer子类方法)
  15. ByteBuffer asReadOnlyBuffer() 转换为只读Buffer

缓冲区数据操作方法:

Buffer 所有子类都有 put() 和 get() 方法获取Buffer中的数据

  1. get() 读取单个字节
  2. get(byte[] dst) 批量读取多个字节到dst
  3. get(int index) 读取指定索引位置的字节(不移动position
  4. put(byte b) 将给定单个字节希尔缓冲区的当前位置
  5. put(byte[] src) src中的字节写入缓冲区的当前位置
  6. put(int index,byte b) 将指定字节写入缓冲区的索引位置(不移动position

模式讲解:

写模式:

将数据写入缓冲区后,position 将指向数据的末尾;limit 和 capacity 的位置相同,都指向当前Buffer容量最大值位置
image.png
image.png

读模式:

写模式使用 flip() 切换到读模式后,position 的位置被还原,limit 变为之前 position 的位置(代表当前数据在缓冲区的最大位置)
I{HP2SA_KHOZH0FB_N6W4N0.png

数据全部读取完毕后使用 clear() 还原指针位置并切换为写模式;如果需要切换为写模式并保留未读取数据到下一次读取,需要使用 compact() 拷贝未读取数据,compact() 会将拷贝数据移动到头部并使 position 位置下移
)R@{WRQ1NZ]P69D@MTGQLSH.png
compact() 拷贝数据并修改position位置

案例:

  1. package buffer;
  2. import org.junit.Test;
  3. import java.nio.ByteBuffer;
  4. import java.nio.charset.StandardCharsets;
  5. /* 演示 Buffer 方法 */
  6. public class BufferTest {
  7. @Test
  8. public void test(){
  9. ByteBuffer buffer = ByteBuffer.allocate(1024); //分配一个1024大小的缓存
  10. System.out.println("Limit的位置是: "+buffer.limit());
  11. System.out.println("容量大小是: "+buffer.capacity());
  12. System.out.println("当前位置是: "+buffer.position());
  13. System.out.println("=======================");
  14. /* 存入数据 */
  15. String name = "helloBuffer";
  16. buffer.put(name.getBytes(StandardCharsets.UTF_8));
  17. System.out.println("添加helloBuffer后的当前位置是: "+buffer.position());
  18. System.out.println("添加helloBuffer后Limit的位置是: "+buffer.limit());
  19. System.out.println("添加helloBuffer后容量大小是: "+buffer.capacity());
  20. System.out.println("=======================");
  21. /* 翻转模式 */
  22. buffer.flip();
  23. System.out.println("切换模式后的当前位置是: "+buffer.position());
  24. System.out.println("切换模式后Limit的位置是: "+buffer.limit());
  25. System.out.println("切换模式后容量大小是: "+buffer.capacity());
  26. System.out.println("=======================");
  27. /* 读取数据 */
  28. byte[] info = new byte[3];
  29. buffer.get(info);
  30. System.out.println("读取3个字节后的当前位置是: "+buffer.position());
  31. System.out.println("读取3个字节后Limit的位置是: "+buffer.limit());
  32. System.out.println("读取3个字节后容量大小是: "+buffer.capacity());
  33. System.out.println("=======================");
  34. /* 暂存数据 */
  35. buffer.compact();
  36. System.out.println("暂存数据后的当前位置是: "+buffer.position());
  37. System.out.println("暂存数据后Limit的位置是: "+buffer.limit());
  38. System.out.println("暂存数据后容量大小是: "+buffer.capacity());
  39. System.out.println("=======================");
  40. /* 清空数据 */
  41. buffer.clear();
  42. System.out.println("清空数据后的当前位置是: "+buffer.position());
  43. System.out.println("清空数据后Limit的位置是: "+buffer.limit());
  44. System.out.println("清空数据后容量大小是: "+buffer.capacity());
  45. }
  46. }

image.png
测试结果

  1. package buffer;
  2. import org.junit.Test;
  3. import java.nio.ByteBuffer;
  4. import java.nio.charset.StandardCharsets;
  5. /* 演示 Buffer 方法 */
  6. public class BufferTest {
  7. @Test
  8. public void test2(){
  9. ByteBuffer buffer = ByteBuffer.allocate(1024); //分配一个1024大小的缓存
  10. String name = "helloBuffer";
  11. buffer.put(name.getBytes(StandardCharsets.UTF_8));
  12. buffer.flip(); //切换模式
  13. byte[] info = new byte[3];
  14. buffer.get(info);
  15. System.out.println("读取3个字节后的当前位置是: "+buffer.position());
  16. System.out.println("读取3个字节后Limit的位置是: "+buffer.limit());
  17. System.out.println("读取3个字节后容量大小是: "+buffer.capacity());
  18. System.out.println("=======================");
  19. buffer.mark(); //标记位置
  20. byte[] info2 = new byte[3];
  21. buffer.get(info2);
  22. System.out.println("标记后的当前位置是: "+buffer.position());
  23. System.out.println("标记后Limit的位置是: "+buffer.limit());
  24. System.out.println("标记后容量大小是: "+buffer.capacity());
  25. buffer.reset(); //返回到标记位置
  26. if(buffer.hasRemaining()){
  27. System.out.println("还有剩余元素");
  28. }
  29. }
  30. }

image.png
测试结果

异常演示:

当对同一个Buffer存放多个不同类型的值时,取值的格式需要和存值的顺序对应,否则可能会抛出 BufferUnderflowException

  1. @Test
  2. public void nioEx(){
  3. ByteBuffer buffer = ByteBuffer.allocate(64);
  4. buffer.putInt(100);
  5. buffer.putLong(20L);
  6. buffer.putChar('h');
  7. buffer.put("你好".getBytes(StandardCharsets.UTF_8));
  8. buffer.flip();
  9. System.out.println(buffer.getInt());
  10. System.out.println(buffer.getLong());
  11. System.out.println(buffer.getChar());
  12. System.out.println(buffer.getLong());
  13. }

image.png
运行结果

只读模式:

普通Buffer转化为只读Buffer后再添加数据将会报错

  1. @Test
  2. public void readOnlyBuf(){
  3. ByteBuffer buffer = ByteBuffer.allocate(640);
  4. for (int i = 0; i < 10; i++) {
  5. buffer.put((byte) i);
  6. }
  7. buffer.flip();
  8. ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); //转换为只读Buffer
  9. while (readOnlyBuffer.hasRemaining()){
  10. System.out.println(readOnlyBuffer.get());
  11. }
  12. readOnlyBuffer.put("hello".getBytes(StandardCharsets.UTF_8)); //只读Buffer添加数据将会抛出异常
  13. }

image.png
对只读Buffer写入数将抛出异常

直接内存和非直接内存:

byte byffer 可以是两种类型,一种是基于直接内存(也就是非堆内存) ;另一种是非直接内存(也就是堆内
存),对于直接内存来说,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的I0操作。而非直接内存(堆内存中的数据),如果要作IO操作,会先从本进程内存复制到直接内存,再利用本地IO处理
从数据流的角度,非直接内存是下面这样的作用链:

  1. 本地IO --> 直接内存 --> 非直接内存 --> 直接内存 --> 本地IO

而直接内存是:

  1. 本地IO --> 直接内存 --> 本地IO

二者一对比就可发现在做IO处理时(比如网络发送大量数据),直接内存会具有更高的效率。直接内存使用XXXXBuffer.allocateDirect() 创建,但是它比申请普通的堆内存需要耗费更高的性能。不过这部分的数据是在JVM之外的,因此它不会占用应用的内存。如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。适用场景:数据量大且生命周期又很长、频繁的IO操作(网络并发场景)

MappedByteBuffer:

MappedByteBuffer可以让文件直接在内存修改,操作系统不需要再拷贝一次
在项目下创建内容为“HelloMapped”文本并使用 MappedByteBuffer 修改数据

  1. @Test
  2. public void MappedBuffer() throws Exception {
  3. RandomAccessFile file = new RandomAccessFile("HelloMapped.txt","rw");
  4. FileChannel channel = file.getChannel();
  5. /*
  6. * 参数一: FileChannel.MapMode 文件模式
  7. * 参数二: 可对文件修改的启始位置
  8. * 参数三: 可对文件修改的字节数,5意味着可以修改 0~4索引的数据
  9. *
  10. */
  11. MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
  12. map.put(0,(byte)'C');
  13. map.put(1,(byte)'S');
  14. map.put(2,(byte)'G');
  15. file.close();
  16. }

image.png
数据被修改


Channel:

本身不能直接访问数据,Channel 只能与Buffer进行交互,表示IO源于目标打开的连接
主要包含:FileChannel(文件读写IO操作)、SocketChannel + ServerSocketChannel (TCP数据读写)、DatagramChannel (UDP数据读写)

ServerSocketChannel与SocketChannel:

ServerSocketChannel 方法:

  1. ServerSocketChannel.open() 得到一个ServerSocketChannel通道
  2. ServerSocketChannel bind(SocketAddress local) 设置服务器端口
  3. SelectableChannel configureBlocking(boolean block) 设置是否为阻塞模式
  4. SocketChannel accept() 接受一个连接,返回代表这个连接的SocketChannel对象
  5. SelectionKey register(Selector sel, int ops) 注册到选择器并设置监听事件

SocketChannel方法:

  1. SocketChannel.open() 得到一个SocketChannel通道
  2. SelectableChannel configureBlocking(boolean block) 设置是否为阻塞模式
  3. boolean connect(SocketAddress remote) 连接服务器(open时可以连接)
  4. boolean finishConnect() 如果connect连接服务器失败,需要使用该方法完成服务器连接
  5. int write(ByteBuffer src) 将缓存数据写入到通道
  6. int read(ByteBuffer dst) 将通道数据写入到缓存
  7. SelectionKey register(Selector sel,int ops,Object att) 注册到选择器并设置监听事件,最后一个参数为共享数据
  8. void close() 关闭通道

注意事项:

  1. 1、只有注册在 Selector 上的 Channel 才能被监控,可以指定Channel需要监听的可操作状态,注册后的Channel会得到SelectorKey,将SelectorKey作为参数带入到具体方法就可以得到对应的Channel信息:
  2. select():返回当前处于可操作状态的channel,如果没有可返回的channel将会阻塞,直到可以返回为止
  3. interestOps():得到Channel的注册状态
  4. readyOps(): 对于当前SelectorKey有哪些监听的状态
  5. channel():返回SelectorKey注册的Channel对象
  6. selector():返回在哪个selector上完成的注册
  7. attachment():对于每一个注册在Selector上的Channel对象,可以再加上一个Object对象
  8. 2SocketChannel 与服务器端连接后,SocketChannel 状态处于 connectServerSocketChannel接收了客户端的请求后状态将变为 accept
  9. 3ServerSocketChannel SocketChannel 有数据可以读取时状态为 read
  10. 4ServerSocketChannel SocketChannel 处于写入状态时状态为 write
  11. 5Selector不会重置状态,因此操作完channel后需要手动重置回等待状态(清空selectionKey集合)
  12. 6ServerSocketChannel 默认为阻塞状态,需要手动修改为非阻塞

FileChannel:

image.png
FileChannel 读写文件的流程

常见方法:

  1. public int read(ByteBuffer buf) 将通道数据放入到缓冲区中
  2. public int write(ByteBuffer buf) 把缓冲区数据写入到通道中
  3. public long transferFrom(ReadableByteChannel src,long position,long count) 从目标通道将数据复制当前通道
  4. public long transferTo(long position,long count,WritableByteChannel target) 把数据从当前通道复制到目标通道

写入数据到本地文件:

1.txt 将会自动创建在当前项目路径下

  1. package channel;
  2. import org.junit.Test;
  3. import java.io.FileNotFoundException;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.io.OutputStream;
  7. import java.nio.ByteBuffer;
  8. import java.nio.channels.FileChannel;
  9. import java.nio.charset.StandardCharsets;
  10. /* 演示Channel基本方法 */
  11. public class ChannelTest {
  12. @Test
  13. public void write() throws IOException {
  14. try {
  15. //1、字节输出流通向目标文件
  16. FileOutputStream fos = new FileOutputStream("1.txt");
  17. //2、得到字节输出流对应的通道Channel
  18. FileChannel channel = fos.getChannel();
  19. //3、分配缓冲区
  20. ByteBuffer buffer = ByteBuffer.allocate(1024);
  21. buffer.put("HelloFile".getBytes(StandardCharsets.UTF_8));
  22. //4、切换模式
  23. buffer.flip();
  24. //5、将缓存数据写入到通道
  25. channel.write(buffer);
  26. channel.close();
  27. } catch (FileNotFoundException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }

从本地文件读取数据:

  1. package channel;
  2. import org.junit.Test;
  3. import java.io.FileInputStream;
  4. import java.io.FileNotFoundException;
  5. import java.io.FileOutputStream;
  6. import java.io.IOException;
  7. import java.nio.ByteBuffer;
  8. import java.nio.channels.FileChannel;
  9. import java.nio.charset.StandardCharsets;
  10. /* 演示Channel基本方法 */
  11. public class ChannelTest {
  12. /* 从本地文件读取数据 */
  13. @Test
  14. public void read() throws IOException {
  15. //定义一个文件输入流与源文件接通
  16. FileInputStream inputStream = new FileInputStream("1.txt");
  17. //创建Channel
  18. FileChannel channel = inputStream.getChannel();
  19. //创建缓冲区
  20. ByteBuffer buffer = ByteBuffer.allocate(1024);
  21. //将通道数据写入到缓存
  22. channel.read(buffer);
  23. //读取缓冲区中的数据bin输出
  24. buffer.flip();
  25. System.out.println( new String(buffer.array()) );
  26. }
  27. }

拷贝本地文件:

  1. package channel;
  2. import org.junit.Test;
  3. import java.io.*;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.FileChannel;
  6. import java.nio.charset.StandardCharsets;
  7. /* 演示Channel基本方法 */
  8. public class ChannelTest {
  9. /* 拷贝本地文件 */
  10. @Test
  11. public void copy() throws IOException {
  12. //源文件
  13. File png = new File("E:\\图片\\电脑壁纸\\4.png");
  14. //创建通道
  15. FileInputStream inputStream = new FileInputStream(png);
  16. FileChannel inputChannel = inputStream.getChannel();
  17. FileOutputStream outputStream = new FileOutputStream("4.png");
  18. FileChannel outChannel = outputStream.getChannel();
  19. /* 从文件中循环切出1024个字节判断是否已经到末尾,没有就写入到通道中 */
  20. ByteBuffer buffer = ByteBuffer.allocate(1024);
  21. while (inputChannel.read(buffer)!=-1){
  22. buffer.flip();
  23. while (buffer.hasRemaining()) {
  24. outChannel.write(buffer);
  25. }
  26. buffer.clear();
  27. }
  28. inputChannel.close();
  29. outputStream.close();
  30. }
  31. }

分散读取与聚集写入:

  1. package channel;
  2. import org.junit.Test;
  3. import java.io.FileInputStream;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.nio.ByteBuffer;
  7. import java.nio.channels.FileChannel;
  8. public class ChannelTest2 {
  9. @Test
  10. public void read() throws IOException {
  11. //字节输入管道
  12. FileInputStream inputStream = new FileInputStream("1.txt");
  13. //字节输出管道
  14. FileOutputStream outputStream = new FileOutputStream("2.txt");
  15. //定义多个缓冲区做数据分散
  16. ByteBuffer buffer1 = ByteBuffer.allocate(4);
  17. ByteBuffer buffer2 = ByteBuffer.allocate(1024);
  18. ByteBuffer[] buffers = {buffer1,buffer2};
  19. //从通道中读取数据
  20. FileChannel ischannel = inputStream.getChannel();
  21. FileChannel outchannel = outputStream.getChannel();
  22. ischannel.read(buffers); //分散读取
  23. //转换为读取模式
  24. for (ByteBuffer buffer : buffers){
  25. buffer.flip();
  26. System.out.println(new String(buffer.array(),0,buffer.remaining()));
  27. }
  28. //聚集写入通道
  29. outchannel.write(buffers);
  30. //关闭资源
  31. ischannel.close();
  32. outchannel.close();
  33. }
  34. }

transferFrom 和 transferTo:

  1. package channel;
  2. import org.junit.Test;
  3. import java.io.FileInputStream;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.nio.ByteBuffer;
  7. import java.nio.channels.FileChannel;
  8. public class ChannelTest2 {
  9. /* 数据复制
  10. * transferFrom : 目标通道转移数据到原通道
  11. * transferTo: 原通道转移数据到目标通道
  12. * */
  13. @Test
  14. public void write() throws IOException {
  15. //字节输入管道
  16. FileInputStream inputStream = new FileInputStream("1.txt");
  17. //字节输出管道
  18. FileOutputStream outputStream = new FileOutputStream("3.txt");
  19. //从通道中读取数据
  20. FileChannel ischannel = inputStream.getChannel();
  21. FileChannel outchannel = outputStream.getChannel();
  22. //复制数据,transferFrom 和 transferTo实现结果一致
  23. //outchannel.transferFrom(ischannel,ischannel.position(),ischannel.size());
  24. ischannel.transferTo(ischannel.position(),ischannel.size(),outchannel);
  25. //关闭
  26. ischannel.close();
  27. outchannel.close();
  28. }
  29. }

Selector:

又称选择器,是SelectableChannle对象的多路复用器,利用Selector可使一个单独的线程管理多个Channel,Selector是非阻塞IO的核心
image.png
能够检测到多个注册的通道上是否有事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理,这样就可以只使用一个单线程去管理多个通道,也就是管理多个连接和请求
只有在连接/通道有读写事件发生时才会进行读写,大大地减少了系统开销,并且不必为每个连接都创建一个线程, 也不用去维护多个线程,避免了多线程之间的.上下文切换导致的开销
每个Channel都会对应一个Buffer,程序切换到哪个 Channel 是由事件决定的,Selector会根据不同的事件在各个通道上切换
通道注册在选择器上后会生成 SelectionKey,通过 SelectionKey 可以对目标通道进行操作

常用方法:

  1. Selector.open() 创建选择器
  2. int select(long timeout) 返回对应注册事件的通道的个数,当其中有IO操作可以进行时将对应的SelectorKey加入到集合内部返回,参数代表阻塞时间(timeout毫秒后执行方法),没有参数时该方法为阻塞方法
  3. Set<SelectionKey> selectedKeys() 返回所有符合监听状态的已注册通道Keys集合
  4. Set<SelectionKey> keys() 返回所有已注册的通道keys集合
  5. Selector.wakeup() 唤醒Selector
  6. int selectNow() 立刻返回对应注册事件的通道的个数
  7. Object attachment() 得到与之关联的共享数据
  8. int interestOps() 设置或改变监听事件

SelectorKey:

表示Selector与网络通道的注册关系,有四种状态

  1. int OP_ACCEPT:有新的网络连接可以accept,值为16
  2. int OP_CONNECT:代表连接已经建立,值为8
  3. int OP_READ:代表读操作,值为1
  4. int OP_WRITE:代表写操作,值为4

案例:

客户端:

  1. package tx;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.*;
  5. import java.util.Set;
  6. /* NIO非阻塞通信下的服务端 */
  7. public class Server {
  8. public static void main(String[] args) throws Exception {
  9. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  10. serverSocketChannel.bind(new InetSocketAddress(666)); //绑定端口
  11. Selector selector = Selector.open(); //创建选择器
  12. serverSocketChannel.configureBlocking(false); //设置非阻塞模式
  13. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //注册到选择器,监听连接事件
  14. /* 等待客户端连接 */
  15. while (true){
  16. if(selector.select(1000)==0){
  17. System.out.println("服务端等待1秒,无任何客户端进行连接");
  18. continue;
  19. }
  20. Set<SelectionKey> selectionKeys = selector.selectedKeys(); //获取已将连接到客户端的SelectKey集合
  21. System.out.println("当前注册的通道数量: "+selector.keys().size()+" 当前符合监听状态的通道数量: "+selector.selectedKeys().size());
  22. for (SelectionKey selectionKey : selectionKeys) {
  23. /* 客户端处于连接状态 */
  24. if(selectionKey.isAcceptable()){
  25. SocketChannel sChannel = serverSocketChannel.accept(); //通过连接从serverSocketChannel获取客户端通道
  26. sChannel.configureBlocking(false); //设置为非阻塞模式
  27. System.out.println("客户端连接成功!");
  28. sChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024)); //注册客户端,监听读事件(客户端正在写数据到通道)
  29. }
  30. /* 客户端处于读取状态(正在写入数据到通道) */
  31. if(selectionKey.isReadable()){
  32. SocketChannel channel = (SocketChannel)selectionKey.channel(); //通过Key获取客户端Channel
  33. ByteBuffer buffer = (ByteBuffer)selectionKey.attachment(); //获取到register时关联的Buffer
  34. channel.read(buffer); //将通道数据写入Buffer
  35. System.out.println("客户端收到的消息:" + new String(buffer.array()));
  36. }
  37. }
  38. selectionKeys.clear();
  39. }
  40. }
  41. }

服务端:

  1. package tx;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SocketChannel;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Scanner;
  8. /* NIO非阻塞通信下客户端 */
  9. public class Client {
  10. public static void main(String[] args) throws IOException {
  11. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",666)); //获取通道
  12. socketChannel.configureBlocking(false); //切换为非阻塞模式
  13. String str = "hello NIO";
  14. ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
  15. socketChannel.write(buffer);
  16. System.in.read();
  17. }
  18. }
  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1643461288926-6584d7bb-90f5-40f0-a0e0-cc1f059fa072.png#clientId=u7c21a453-d45d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=224&id=u3704f6ef&margin=%5Bobject%20Object%5D&name=image.png&originHeight=336&originWidth=825&originalType=binary&ratio=1&rotation=0&showTitle=false&size=51704&status=done&style=stroke&taskId=uf69606a7-78fa-4404-a92b-ea57c5fe543&title=&width=550)<br />** 服务端收到客户端消息**<br /> ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1643463014163-3b80ce48-fb94-4cd4-88a3-a946ef82dbca.png#clientId=u9bf58aff-1b78-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=279&id=ub192c62b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=483&originWidth=805&originalType=binary&ratio=1&rotation=0&showTitle=false&size=71621&status=done&style=stroke&taskId=u162fcf0b-31d1-4cbb-aa55-26376ba2a53&title=&width=464.66668701171875)<br />** 多客户端实例下的服务端运行结果**

NIO聊天室:

客户端与服务端建立好连接后状态为 ACCEPT,选择器获取到该状态后需要设置客户端 SocketChannel 为非阻塞并监听其写入状态
客户端将聊天消息传递给服务端,服务端变为 Read状态,选择器获取到该状态后将消息取出并转发给其他客户端
image.png
聊天室流程

服务端代码:

  1. package chatroom;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.*;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Set;
  8. /* NIO聊天室服务端 */
  9. public class ChatServer {
  10. private Selector selector; //服务端选择器
  11. private ServerSocketChannel ssChannel; //服务端通道
  12. private static final int PORT = 9999; //监听端口
  13. /* 初始化代码逻辑 */
  14. public ChatServer(){
  15. try {
  16. selector = Selector.open(); //创建选择器
  17. ssChannel = ServerSocketChannel.open(); //创建客户端通道
  18. ssChannel.bind(new InetSocketAddress(PORT)); //绑定客户端连接的端口
  19. ssChannel.configureBlocking(false); //设置非阻塞模式
  20. ssChannel.register(selector, SelectionKey.OP_ACCEPT); //通道注册到选择器,并设置监听事件
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. /* 监听事件方法 */
  26. private void listen(){
  27. try {
  28. while (selector.select()>0){
  29. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  30. for (SelectionKey sk : selectionKeys) {
  31. /* 判断事件的类型 */
  32. if(sk.isAcceptable()){
  33. /** 客户端接入后注册到选择器并监听其写入事件 */
  34. SocketChannel sChannel = ssChannel.accept(); //获取当前客户端通道
  35. sChannel.configureBlocking(false); //修改为非阻塞模式
  36. sChannel.register(selector,SelectionKey.OP_READ); //监听客户端Channel读事件
  37. }
  38. if(sk.isReadable()){
  39. /* 处理客户端消息,接收消息并进行转发*/
  40. readClientData(sk);
  41. }
  42. }
  43. selectionKeys.clear();
  44. }
  45. } catch (IOException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. /* 接收当前客户端的信息并转发给其他客户端通道 */
  50. private void readClientData(SelectionKey sk) {
  51. SocketChannel sChannel = null;
  52. try {
  53. sChannel = (SocketChannel)sk.channel(); //获取当前客户端通道
  54. ByteBuffer buffer = ByteBuffer.allocate(1024); //创建缓冲区
  55. if(sChannel.read(buffer) >0){
  56. buffer.flip();
  57. String msg = new String(buffer.array(),0,buffer.remaining()); //提取读取到的用户通道信息
  58. System.out.println("接收到客户端消息"+msg);
  59. sendMsgToAllClient(msg,sChannel); //把消息转发给其他在线用户
  60. }
  61. } catch (Exception e) {
  62. //当前客户端离线了,数据无法读取将抛出异常
  63. try {
  64. System.out.println("用户"+sChannel.getRemoteAddress()+"下线");
  65. sk.cancel(); //取消选择器的注册
  66. sChannel.close(); //关闭客户端通道
  67. } catch (IOException ex) {
  68. ex.printStackTrace();
  69. }
  70. }
  71. }
  72. /* 消息推送给其他在线用户 */
  73. private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException {
  74. System.out.println("服务端开始转发消息,当前转发线程是: "+Thread.currentThread().getName());
  75. for (SelectionKey key : selector.keys()) {
  76. Channel channel = key.channel();
  77. //排除自己
  78. if(channel instanceof SocketChannel && sChannel!=channel){
  79. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
  80. ((SocketChannel) channel).write(buffer);
  81. }
  82. }
  83. }
  84. public static void main(String[] args) {
  85. ChatServer chatServer = new ChatServer();
  86. //开始监听客户端各种消息事件(连接消息、群聊消息、离线消息)
  87. chatServer.listen();
  88. }
  89. }

客户端代码:

  1. package chatroom;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.nio.charset.StandardCharsets;
  9. import java.util.Iterator;
  10. import java.util.Scanner;
  11. import java.util.Set;
  12. /* NIO群聊客户端 */
  13. public class ChatClient {
  14. private Selector selector; //选择器
  15. private static int PORT = 9999; //端口
  16. private SocketChannel socketChannel; //客户端Channel
  17. /* 初始化客户端 */
  18. public ChatClient(){
  19. try {
  20. selector = Selector.open(); //创建选择器
  21. socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",PORT)); //创建客户端通道并连接服务器
  22. socketChannel.configureBlocking(false); //设置为非阻塞模式
  23. socketChannel.register(selector, SelectionKey.OP_READ); //监听读事件
  24. System.out.println("当前客户端对象准备完成!");
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. private void readInfo() {
  30. try {
  31. /* 判断是否有触发事件 */
  32. while(selector.select()>0){
  33. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  34. for (SelectionKey key : selectionKeys) {
  35. if(key.isReadable()){
  36. SocketChannel sc = (SocketChannel) key.channel();
  37. ByteBuffer buffer = ByteBuffer.allocate(1024);
  38. sc.read(buffer);
  39. System.out.println(new String(buffer.array()));
  40. }
  41. }
  42. selectionKeys.clear();
  43. }
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. private void sendToServer(String msg) {
  49. try {
  50. socketChannel.write(ByteBuffer.wrap(("用户 "+Thread.currentThread().getName()+" 说: "+msg).getBytes(StandardCharsets.UTF_8)));
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. public static void main(String[] args) {
  56. ChatClient chatClient = new ChatClient();
  57. //定义一个线程专门监听从服务端发过来的消息事件
  58. new Thread(()->{
  59. while (true){
  60. chatClient.readInfo();
  61. }
  62. }).start();
  63. Scanner scanner = new Scanner(System.in);
  64. while(scanner.hasNextLine()){
  65. String s = scanner.nextLine();
  66. chatClient.sendToServer(s);
  67. }
  68. }
  69. }

测试:

开启多个客户端实例进行消息发送,不同用户之间消息可通过服务端进行转发
image.pngimage.pngimage.png


零拷贝: