在 BIO 模型中,由于 serverSocket.accept()会造成阻塞、输入流的读写操作是synchronized的、无法在同一个线程中处理多个 Stream I/O,因此性能低下是不可避免的。而NIO具有非阻塞性,目前已成为BIO的替代品<br /> <br /><br />**NIO操作逻辑**
NIO 基于Channel (通道)和 Buffer (缓存)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,Selector (选择器)用于监听多个通道的事件(比如连接请求、数据到达),因此使用单线程即可监听多个客户端通道
| NIO | BIO |
|---|---|
| 面向缓冲区 Buffer | 面向流 Stream |
| 非阻塞 | 阻塞IO |
| Selectors 选择器 |
包含三大核心:Channel(通道)、Buffer(缓冲区)、Selector(选择器)

Buffer、Channel、Selector的关系
1、每个Channel都会对应一个Buffer2、一个Selector对应一个线程,一个Selector对应多个注册的Channel3、程序切换到哪个Channel是由事件类型决定的4、Selector会根据不同的事件在各个通道上切换5、数据的读取通过Buffer实现(和BIO的本质不同处)6、BIO要么是输入流要么是输出流,不能双向;NIO的Buffer可以进行读写,双向(channel也是双向)
源码下载:
https://gitee.com/dmbjzorg/Netty
Buffer:
本质是可以读写数据的内存,这一块内存被NIO包装为 NIO Buffer 对象进行访问管理
使用 flip() 将写模式切换为读模式,数据读取后使用 clear() 还原指针位置并进行模式切换;compact() 压缩未读取的数据
常用子类:CharBuffer、ByteBuffer、LongBuffer、IntBuffer,通常使用 xxxBuffer.allocate(int size) 获得一个容量为size的Buffer对象
参数:
position:目前所在的位置,默认指向Buffer启始位置
limit:表示缓冲区的极限位置,不能对缓冲区超过极限的位置进行读写操作,极限位置可以修改
capacity:Buffer最大容量
标记(mark)和重置(reset):标记是一个索引,通过Buffer的mark方法指定一个特定的 position,之后可以通过 reset 方法恢复到这个 position
常用方法:
Buffer clear() 还原position、limit的位置(不是清空数据,可以用来将读模式切换为写模式)Buffer flip() 从写模式切换为读模式int capacity() 返回Buffer的容量大小boolean hasRemaining() 判断缓冲取中是否还有元素int limit() 返回Buffer的界限的位置Buffer limit(int n) 将设置缓冲区界限为n,并返回一个具有新Limit的缓存区对象Buffer mark() 对缓冲区设置标记int position() 范湖缓冲区的当前位置Buffer position(int n) 将设置缓冲区界限为n,并返回修改后的BUffer对象int remaining() 返回position和limit之间的元素个数Buffer reset() 将position转到以前设置mark的位置Buffer rewind() 将位置设置为0,取消设置的mark,用于使用同一个Buffer进行多次读取boolean hasRemaining() position位置是否小于limit(是否还有数据未读取)wrap(byte[] array,int offset, int length) 把数组数据写入到缓冲区后反转为读模式(Buffer子类方法)ByteBuffer asReadOnlyBuffer() 转换为只读Buffer
缓冲区数据操作方法:
Buffer 所有子类都有 put() 和 get() 方法获取Buffer中的数据
get() 读取单个字节get(byte[] dst) 批量读取多个字节到dst中get(int index) 读取指定索引位置的字节(不移动position)put(byte b) 将给定单个字节希尔缓冲区的当前位置put(byte[] src) 将src中的字节写入缓冲区的当前位置put(int index,byte b) 将指定字节写入缓冲区的索引位置(不移动position)
模式讲解:
写模式:
将数据写入缓冲区后,position 将指向数据的末尾;limit 和 capacity 的位置相同,都指向当前Buffer容量最大值位置

读模式:
写模式使用 flip() 切换到读模式后,position 的位置被还原,limit 变为之前 position 的位置(代表当前数据在缓冲区的最大位置)
数据全部读取完毕后使用 clear() 还原指针位置并切换为写模式;如果需要切换为写模式并保留未读取数据到下一次读取,需要使用 compact() 拷贝未读取数据,compact() 会将拷贝数据移动到头部并使 position 位置下移![)R@{WRQ1NZ]P69D@MTGQLSH.png](/uploads/projects/yuxuandmbjz@netty/a80d9c9b74d7ee2754704fd301964f65.png)
compact() 拷贝数据并修改position位置
案例:
package buffer;import org.junit.Test;import java.nio.ByteBuffer;import java.nio.charset.StandardCharsets;/* 演示 Buffer 方法 */public class BufferTest {@Testpublic void test(){ByteBuffer buffer = ByteBuffer.allocate(1024); //分配一个1024大小的缓存System.out.println("Limit的位置是: "+buffer.limit());System.out.println("容量大小是: "+buffer.capacity());System.out.println("当前位置是: "+buffer.position());System.out.println("=======================");/* 存入数据 */String name = "helloBuffer";buffer.put(name.getBytes(StandardCharsets.UTF_8));System.out.println("添加helloBuffer后的当前位置是: "+buffer.position());System.out.println("添加helloBuffer后Limit的位置是: "+buffer.limit());System.out.println("添加helloBuffer后容量大小是: "+buffer.capacity());System.out.println("=======================");/* 翻转模式 */buffer.flip();System.out.println("切换模式后的当前位置是: "+buffer.position());System.out.println("切换模式后Limit的位置是: "+buffer.limit());System.out.println("切换模式后容量大小是: "+buffer.capacity());System.out.println("=======================");/* 读取数据 */byte[] info = new byte[3];buffer.get(info);System.out.println("读取3个字节后的当前位置是: "+buffer.position());System.out.println("读取3个字节后Limit的位置是: "+buffer.limit());System.out.println("读取3个字节后容量大小是: "+buffer.capacity());System.out.println("=======================");/* 暂存数据 */buffer.compact();System.out.println("暂存数据后的当前位置是: "+buffer.position());System.out.println("暂存数据后Limit的位置是: "+buffer.limit());System.out.println("暂存数据后容量大小是: "+buffer.capacity());System.out.println("=======================");/* 清空数据 */buffer.clear();System.out.println("清空数据后的当前位置是: "+buffer.position());System.out.println("清空数据后Limit的位置是: "+buffer.limit());System.out.println("清空数据后容量大小是: "+buffer.capacity());}}

测试结果
package buffer;import org.junit.Test;import java.nio.ByteBuffer;import java.nio.charset.StandardCharsets;/* 演示 Buffer 方法 */public class BufferTest {@Testpublic void test2(){ByteBuffer buffer = ByteBuffer.allocate(1024); //分配一个1024大小的缓存String name = "helloBuffer";buffer.put(name.getBytes(StandardCharsets.UTF_8));buffer.flip(); //切换模式byte[] info = new byte[3];buffer.get(info);System.out.println("读取3个字节后的当前位置是: "+buffer.position());System.out.println("读取3个字节后Limit的位置是: "+buffer.limit());System.out.println("读取3个字节后容量大小是: "+buffer.capacity());System.out.println("=======================");buffer.mark(); //标记位置byte[] info2 = new byte[3];buffer.get(info2);System.out.println("标记后的当前位置是: "+buffer.position());System.out.println("标记后Limit的位置是: "+buffer.limit());System.out.println("标记后容量大小是: "+buffer.capacity());buffer.reset(); //返回到标记位置if(buffer.hasRemaining()){System.out.println("还有剩余元素");}}}

测试结果
异常演示:
当对同一个Buffer存放多个不同类型的值时,取值的格式需要和存值的顺序对应,否则可能会抛出 BufferUnderflowException
@Testpublic void nioEx(){ByteBuffer buffer = ByteBuffer.allocate(64);buffer.putInt(100);buffer.putLong(20L);buffer.putChar('h');buffer.put("你好".getBytes(StandardCharsets.UTF_8));buffer.flip();System.out.println(buffer.getInt());System.out.println(buffer.getLong());System.out.println(buffer.getChar());System.out.println(buffer.getLong());}

运行结果
只读模式:
普通Buffer转化为只读Buffer后再添加数据将会报错
@Testpublic void readOnlyBuf(){ByteBuffer buffer = ByteBuffer.allocate(640);for (int i = 0; i < 10; i++) {buffer.put((byte) i);}buffer.flip();ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); //转换为只读Bufferwhile (readOnlyBuffer.hasRemaining()){System.out.println(readOnlyBuffer.get());}readOnlyBuffer.put("hello".getBytes(StandardCharsets.UTF_8)); //只读Buffer添加数据将会抛出异常}

对只读Buffer写入数将抛出异常
直接内存和非直接内存:
byte byffer 可以是两种类型,一种是基于直接内存(也就是非堆内存) ;另一种是非直接内存(也就是堆内
存),对于直接内存来说,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的I0操作。而非直接内存(堆内存中的数据),如果要作IO操作,会先从本进程内存复制到直接内存,再利用本地IO处理
从数据流的角度,非直接内存是下面这样的作用链:
本地IO --> 直接内存 --> 非直接内存 --> 直接内存 --> 本地IO
而直接内存是:
本地IO --> 直接内存 --> 本地IO
二者一对比就可发现在做IO处理时(比如网络发送大量数据),直接内存会具有更高的效率。直接内存使用XXXXBuffer.allocateDirect() 创建,但是它比申请普通的堆内存需要耗费更高的性能。不过这部分的数据是在JVM之外的,因此它不会占用应用的内存。如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。适用场景:数据量大且生命周期又很长、频繁的IO操作(网络并发场景)
MappedByteBuffer:
MappedByteBuffer可以让文件直接在内存修改,操作系统不需要再拷贝一次
在项目下创建内容为“HelloMapped”文本并使用 MappedByteBuffer 修改数据
@Testpublic void MappedBuffer() throws Exception {RandomAccessFile file = new RandomAccessFile("HelloMapped.txt","rw");FileChannel channel = file.getChannel();/** 参数一: FileChannel.MapMode 文件模式* 参数二: 可对文件修改的启始位置* 参数三: 可对文件修改的字节数,5意味着可以修改 0~4索引的数据**/MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);map.put(0,(byte)'C');map.put(1,(byte)'S');map.put(2,(byte)'G');file.close();}

数据被修改
Channel:
本身不能直接访问数据,Channel 只能与Buffer进行交互,表示IO源于目标打开的连接
主要包含:FileChannel(文件读写IO操作)、SocketChannel + ServerSocketChannel (TCP数据读写)、DatagramChannel (UDP数据读写)
ServerSocketChannel与SocketChannel:
ServerSocketChannel 方法:
ServerSocketChannel.open() 得到一个ServerSocketChannel通道ServerSocketChannel bind(SocketAddress local) 设置服务器端口SelectableChannel configureBlocking(boolean block) 设置是否为阻塞模式SocketChannel accept() 接受一个连接,返回代表这个连接的SocketChannel对象SelectionKey register(Selector sel, int ops) 注册到选择器并设置监听事件
SocketChannel方法:
SocketChannel.open() 得到一个SocketChannel通道SelectableChannel configureBlocking(boolean block) 设置是否为阻塞模式boolean connect(SocketAddress remote) 连接服务器(open时可以连接)boolean finishConnect() 如果connect连接服务器失败,需要使用该方法完成服务器连接int write(ByteBuffer src) 将缓存数据写入到通道int read(ByteBuffer dst) 将通道数据写入到缓存SelectionKey register(Selector sel,int ops,Object att) 注册到选择器并设置监听事件,最后一个参数为共享数据void close() 关闭通道
注意事项:
1、只有注册在 Selector 上的 Channel 才能被监控,可以指定Channel需要监听的可操作状态,注册后的Channel会得到SelectorKey,将SelectorKey作为参数带入到具体方法就可以得到对应的Channel信息:select():返回当前处于可操作状态的channel,如果没有可返回的channel将会阻塞,直到可以返回为止interestOps():得到Channel的注册状态readyOps(): 对于当前SelectorKey有哪些监听的状态channel():返回SelectorKey注册的Channel对象selector():返回在哪个selector上完成的注册attachment():对于每一个注册在Selector上的Channel对象,可以再加上一个Object对象2、SocketChannel 与服务器端连接后,SocketChannel 状态处于 connect;ServerSocketChannel接收了客户端的请求后状态将变为 accept3、ServerSocketChannel 和 SocketChannel 有数据可以读取时状态为 read4、ServerSocketChannel 和 SocketChannel 处于写入状态时状态为 write5、Selector不会重置状态,因此操作完channel后需要手动重置回等待状态(清空selectionKey集合)6、ServerSocketChannel 默认为阻塞状态,需要手动修改为非阻塞
FileChannel:
常见方法:
public int read(ByteBuffer buf) 将通道数据放入到缓冲区中public int write(ByteBuffer buf) 把缓冲区数据写入到通道中public long transferFrom(ReadableByteChannel src,long position,long count) 从目标通道将数据复制当前通道public long transferTo(long position,long count,WritableByteChannel target) 把数据从当前通道复制到目标通道
写入数据到本地文件:
1.txt 将会自动创建在当前项目路径下
package channel;import org.junit.Test;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.charset.StandardCharsets;/* 演示Channel基本方法 */public class ChannelTest {@Testpublic void write() throws IOException {try {//1、字节输出流通向目标文件FileOutputStream fos = new FileOutputStream("1.txt");//2、得到字节输出流对应的通道ChannelFileChannel channel = fos.getChannel();//3、分配缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put("HelloFile".getBytes(StandardCharsets.UTF_8));//4、切换模式buffer.flip();//5、将缓存数据写入到通道channel.write(buffer);channel.close();} catch (FileNotFoundException e) {e.printStackTrace();}}}
从本地文件读取数据:
package channel;import org.junit.Test;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.charset.StandardCharsets;/* 演示Channel基本方法 */public class ChannelTest {/* 从本地文件读取数据 */@Testpublic void read() throws IOException {//定义一个文件输入流与源文件接通FileInputStream inputStream = new FileInputStream("1.txt");//创建ChannelFileChannel channel = inputStream.getChannel();//创建缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);//将通道数据写入到缓存channel.read(buffer);//读取缓冲区中的数据bin输出buffer.flip();System.out.println( new String(buffer.array()) );}}
拷贝本地文件:
package channel;import org.junit.Test;import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.nio.charset.StandardCharsets;/* 演示Channel基本方法 */public class ChannelTest {/* 拷贝本地文件 */@Testpublic void copy() throws IOException {//源文件File png = new File("E:\\图片\\电脑壁纸\\4.png");//创建通道FileInputStream inputStream = new FileInputStream(png);FileChannel inputChannel = inputStream.getChannel();FileOutputStream outputStream = new FileOutputStream("4.png");FileChannel outChannel = outputStream.getChannel();/* 从文件中循环切出1024个字节判断是否已经到末尾,没有就写入到通道中 */ByteBuffer buffer = ByteBuffer.allocate(1024);while (inputChannel.read(buffer)!=-1){buffer.flip();while (buffer.hasRemaining()) {outChannel.write(buffer);}buffer.clear();}inputChannel.close();outputStream.close();}}
分散读取与聚集写入:
package channel;import org.junit.Test;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class ChannelTest2 {@Testpublic void read() throws IOException {//字节输入管道FileInputStream inputStream = new FileInputStream("1.txt");//字节输出管道FileOutputStream outputStream = new FileOutputStream("2.txt");//定义多个缓冲区做数据分散ByteBuffer buffer1 = ByteBuffer.allocate(4);ByteBuffer buffer2 = ByteBuffer.allocate(1024);ByteBuffer[] buffers = {buffer1,buffer2};//从通道中读取数据FileChannel ischannel = inputStream.getChannel();FileChannel outchannel = outputStream.getChannel();ischannel.read(buffers); //分散读取//转换为读取模式for (ByteBuffer buffer : buffers){buffer.flip();System.out.println(new String(buffer.array(),0,buffer.remaining()));}//聚集写入通道outchannel.write(buffers);//关闭资源ischannel.close();outchannel.close();}}
transferFrom 和 transferTo:
package channel;import org.junit.Test;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;public class ChannelTest2 {/* 数据复制* transferFrom : 目标通道转移数据到原通道* transferTo: 原通道转移数据到目标通道* */@Testpublic void write() throws IOException {//字节输入管道FileInputStream inputStream = new FileInputStream("1.txt");//字节输出管道FileOutputStream outputStream = new FileOutputStream("3.txt");//从通道中读取数据FileChannel ischannel = inputStream.getChannel();FileChannel outchannel = outputStream.getChannel();//复制数据,transferFrom 和 transferTo实现结果一致//outchannel.transferFrom(ischannel,ischannel.position(),ischannel.size());ischannel.transferTo(ischannel.position(),ischannel.size(),outchannel);//关闭ischannel.close();outchannel.close();}}
Selector:
又称选择器,是SelectableChannle对象的多路复用器,利用Selector可使一个单独的线程管理多个Channel,Selector是非阻塞IO的核心
能够检测到多个注册的通道上是否有事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理,这样就可以只使用一个单线程去管理多个通道,也就是管理多个连接和请求
只有在连接/通道有读写事件发生时才会进行读写,大大地减少了系统开销,并且不必为每个连接都创建一个线程, 也不用去维护多个线程,避免了多线程之间的.上下文切换导致的开销
每个Channel都会对应一个Buffer,程序切换到哪个 Channel 是由事件决定的,Selector会根据不同的事件在各个通道上切换
通道注册在选择器上后会生成 SelectionKey,通过 SelectionKey 可以对目标通道进行操作
常用方法:
Selector.open() 创建选择器int select(long timeout) 返回对应注册事件的通道的个数,当其中有IO操作可以进行时将对应的SelectorKey加入到集合内部返回,参数代表阻塞时间(timeout毫秒后执行方法),没有参数时该方法为阻塞方法Set<SelectionKey> selectedKeys() 返回所有符合监听状态的已注册通道Keys集合Set<SelectionKey> keys() 返回所有已注册的通道keys集合Selector.wakeup() 唤醒Selectorint selectNow() 立刻返回对应注册事件的通道的个数Object attachment() 得到与之关联的共享数据int interestOps() 设置或改变监听事件
SelectorKey:
表示Selector与网络通道的注册关系,有四种状态
int OP_ACCEPT:有新的网络连接可以accept,值为16int OP_CONNECT:代表连接已经建立,值为8int OP_READ:代表读操作,值为1int OP_WRITE:代表写操作,值为4
案例:
客户端:
package tx;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Set;/* NIO非阻塞通信下的服务端 */public class Server {public static void main(String[] args) throws Exception {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(666)); //绑定端口Selector selector = Selector.open(); //创建选择器serverSocketChannel.configureBlocking(false); //设置非阻塞模式serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //注册到选择器,监听连接事件/* 等待客户端连接 */while (true){if(selector.select(1000)==0){System.out.println("服务端等待1秒,无任何客户端进行连接");continue;}Set<SelectionKey> selectionKeys = selector.selectedKeys(); //获取已将连接到客户端的SelectKey集合System.out.println("当前注册的通道数量: "+selector.keys().size()+" 当前符合监听状态的通道数量: "+selector.selectedKeys().size());for (SelectionKey selectionKey : selectionKeys) {/* 客户端处于连接状态 */if(selectionKey.isAcceptable()){SocketChannel sChannel = serverSocketChannel.accept(); //通过连接从serverSocketChannel获取客户端通道sChannel.configureBlocking(false); //设置为非阻塞模式System.out.println("客户端连接成功!");sChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024)); //注册客户端,监听读事件(客户端正在写数据到通道)}/* 客户端处于读取状态(正在写入数据到通道) */if(selectionKey.isReadable()){SocketChannel channel = (SocketChannel)selectionKey.channel(); //通过Key获取客户端ChannelByteBuffer buffer = (ByteBuffer)selectionKey.attachment(); //获取到register时关联的Bufferchannel.read(buffer); //将通道数据写入BufferSystem.out.println("客户端收到的消息:" + new String(buffer.array()));}}selectionKeys.clear();}}}
服务端:
package tx;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;import java.util.Scanner;/* NIO非阻塞通信下客户端 */public class Client {public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",666)); //获取通道socketChannel.configureBlocking(false); //切换为非阻塞模式String str = "hello NIO";ByteBuffer buffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));socketChannel.write(buffer);System.in.read();}}
<br />** 服务端收到客户端消息**<br /> <br />** 多客户端实例下的服务端运行结果**
NIO聊天室:
客户端与服务端建立好连接后状态为 ACCEPT,选择器获取到该状态后需要设置客户端 SocketChannel 为非阻塞并监听其写入状态
客户端将聊天消息传递给服务端,服务端变为 Read状态,选择器获取到该状态后将消息取出并转发给其他客户端

聊天室流程
服务端代码:
package chatroom;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.StandardCharsets;import java.util.Set;/* NIO聊天室服务端 */public class ChatServer {private Selector selector; //服务端选择器private ServerSocketChannel ssChannel; //服务端通道private static final int PORT = 9999; //监听端口/* 初始化代码逻辑 */public ChatServer(){try {selector = Selector.open(); //创建选择器ssChannel = ServerSocketChannel.open(); //创建客户端通道ssChannel.bind(new InetSocketAddress(PORT)); //绑定客户端连接的端口ssChannel.configureBlocking(false); //设置非阻塞模式ssChannel.register(selector, SelectionKey.OP_ACCEPT); //通道注册到选择器,并设置监听事件} catch (IOException e) {e.printStackTrace();}}/* 监听事件方法 */private void listen(){try {while (selector.select()>0){Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey sk : selectionKeys) {/* 判断事件的类型 */if(sk.isAcceptable()){/** 客户端接入后注册到选择器并监听其写入事件 */SocketChannel sChannel = ssChannel.accept(); //获取当前客户端通道sChannel.configureBlocking(false); //修改为非阻塞模式sChannel.register(selector,SelectionKey.OP_READ); //监听客户端Channel读事件}if(sk.isReadable()){/* 处理客户端消息,接收消息并进行转发*/readClientData(sk);}}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();}}/* 接收当前客户端的信息并转发给其他客户端通道 */private void readClientData(SelectionKey sk) {SocketChannel sChannel = null;try {sChannel = (SocketChannel)sk.channel(); //获取当前客户端通道ByteBuffer buffer = ByteBuffer.allocate(1024); //创建缓冲区if(sChannel.read(buffer) >0){buffer.flip();String msg = new String(buffer.array(),0,buffer.remaining()); //提取读取到的用户通道信息System.out.println("接收到客户端消息"+msg);sendMsgToAllClient(msg,sChannel); //把消息转发给其他在线用户}} catch (Exception e) {//当前客户端离线了,数据无法读取将抛出异常try {System.out.println("用户"+sChannel.getRemoteAddress()+"下线");sk.cancel(); //取消选择器的注册sChannel.close(); //关闭客户端通道} catch (IOException ex) {ex.printStackTrace();}}}/* 消息推送给其他在线用户 */private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException {System.out.println("服务端开始转发消息,当前转发线程是: "+Thread.currentThread().getName());for (SelectionKey key : selector.keys()) {Channel channel = key.channel();//排除自己if(channel instanceof SocketChannel && sChannel!=channel){ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));((SocketChannel) channel).write(buffer);}}}public static void main(String[] args) {ChatServer chatServer = new ChatServer();//开始监听客户端各种消息事件(连接消息、群聊消息、离线消息)chatServer.listen();}}
客户端代码:
package chatroom;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.Scanner;import java.util.Set;/* NIO群聊客户端 */public class ChatClient {private Selector selector; //选择器private static int PORT = 9999; //端口private SocketChannel socketChannel; //客户端Channel/* 初始化客户端 */public ChatClient(){try {selector = Selector.open(); //创建选择器socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",PORT)); //创建客户端通道并连接服务器socketChannel.configureBlocking(false); //设置为非阻塞模式socketChannel.register(selector, SelectionKey.OP_READ); //监听读事件System.out.println("当前客户端对象准备完成!");} catch (IOException e) {e.printStackTrace();}}private void readInfo() {try {/* 判断是否有触发事件 */while(selector.select()>0){Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);sc.read(buffer);System.out.println(new String(buffer.array()));}}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();}}private void sendToServer(String msg) {try {socketChannel.write(ByteBuffer.wrap(("用户 "+Thread.currentThread().getName()+" 说: "+msg).getBytes(StandardCharsets.UTF_8)));} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {ChatClient chatClient = new ChatClient();//定义一个线程专门监听从服务端发过来的消息事件new Thread(()->{while (true){chatClient.readInfo();}}).start();Scanner scanner = new Scanner(System.in);while(scanner.hasNextLine()){String s = scanner.nextLine();chatClient.sendToServer(s);}}}
测试:
开启多个客户端实例进行消息发送,不同用户之间消息可通过服务端进行转发



