简介
- NIO,是jdk4提供的一种新的IO标准,是一种同步非阻塞的IO,NIO是以块为单位进行数据处理的,当然块的大小是程序员自己指定的。其相对于BIO的以字节为单位所进行的阻塞式处理方式,大大提高了读写效率和并发度
- BIO,每个客户端连接到来,都会有一个channel,同时都会创建一个专门负责处理此业务的线程,但是计算机的线程数有限,所以,高并发场景下,会将计算机的线程数全部打满。不可用
- NIO,每个客户端连接到来,都会有一个channel,但是只有一个线程处理所有的请求,此线程会根据选择器选择准备就绪的channel,然后执行相应逻辑
Selector
- 多路复用器,包括三个set
- keys:所有注册连接的channel,都会有对应的SelectKey加入到该set中
- select-keys:所有已经准备就绪的channel的key,会加到此set中,keys中的是不会变的,是keys的子集
- cancel-keys:如果channel被取消了,会先将此channel的key从select-key中移除到cancel-key中,在下次执行select方法的时候,会将cancel-keys已经keys中已经取消的key全部清除。
SelectionKey
- 含义:一个令牌,表示一个Channel注册到selector的令牌
- 时机:每次channel注册到selector都会创建一个selectionKey,一个key保持他的有效性,直到channel被cancel取消掉。有三种方式关闭,channel.cancel、channel.close、selector.close。取消一个selectionKey,并不会直接删除,而是添加到cancel-keys中,为的是,在下次select操作时,会将cancel-keys清空,同时删除selector的keys中已经取消的selectionKey
- 每个selectionKey包含了两个整数,这两个操作集合都是整型数值。
- 一个是表示拥有此key的channel所关注的操作事件,是一个二进制位的整数,即interest set:表示与此selectionKey相关的channel,关注的客户端请求是什么状态,好比,当channel注册到selector的时候,会告知selector,此channel关注的,感兴趣的是什么事件,好比客户端连接事件,此channel关注哪种操作类型。存在四种操作类型,四种状态。通过selectionKey.interestOps():int 获取到感兴趣的集合
- 一个是就绪的集合,ready set,也是一个整数,是操作系统底层去查询判断就绪的信息。通过selectionKey.readyOps():int,可以获取到准备就绪的集合。
- 通过SelectionKey可以获取到
- Selector
- channel
- 还可以获取到状态,是可连接的,还是可写的,四种状态。
NIO简单通信案例
服务端
public class NioServer { public static void main(String[] args) throws IOException { //创建一个服务端channel ServerSocketChannel serverChanel = ServerSocketChannel.open(); //指定channel采用的为非阻塞模式 serverChanel.configureBlocking(false); //指定要监听的端口 serverChanel.bind(new InetSocketAddress(8888)); //创建一个多路复用器selector Selector selector = Selector.open(); //将channel注册到selector,并告诉selector让其监听"接受client连接事件" serverChanel.register(selector, SelectionKey.OP_ACCEPT); while (true){ //select()是一个阻塞方法,若阻塞1秒的时间到了,或在阻塞期间有channel就绪,都会打破阻塞 if (selector.select(1000)==0) { System.out.println("当前没有找到就绪的channel"); continue; } //获取所有就绪的channel的key Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { //若当前key为OPT_ACCEPT,则说明当前channel是可以接收客户端连接的 //那么,这里的代码就是用于接收客户端连接的 if(key.isAcceptable()){ System.out.println("接收到client的连接"); //获取连接到Server的客户端channel,其是客户端channel在Server端的代表(驻京办) SocketChannel clientChannel = serverChanel.accept(); clientChannel.configureBlocking(false); //将客户端channel注册到selector,并告诉selector让其监听这个channel中是否发生了读事件 clientChannel.register(selector,SelectionKey.OP_READ); } //若当前key为OPT_READ,则说明当前channel中有客户端发送来的数据。 //那么,这里的代码就是用于读取channel中的数据的 if (key.isReadable()) { try { //创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //根据key获取其对应的channel SocketChannel clientChannel = (SocketChannel)key.channel(); //将channel中的数据读取到buffer clientChannel.read(buffer); }catch (Exception e){ //若在读取过程中发生异常,则直接取消该Key,即放弃该channel key.cancel(); } } //删除当前处理过得key,以免重复处理 selectionKeys.remove(key); }//end-for } }}
客户端
public class NioClient { public static void main(String[] args) throws Exception { SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888); //连接Server if (!clientChannel.connect(serverAddr)) { //首次连接 while (!clientChannel.finishConnect()) { //完成重连 continue; } } //将消息写入到channel clientChannel.write(ByteBuffer.wrap("hello".getBytes("UTF-8"))); System.out.println("Client消息已发送"); System.in.read(); }}
NIO群聊案例
服务端
public class NioChatServerStarter { public static void main(String[] args) throws Exception{ //创建一个服务端channel ServerSocketChannel serverChanel = ServerSocketChannel.open(); //指定channel采用的为非阻塞模式 serverChanel.configureBlocking(false); //指定要监听的端口 serverChanel.bind(new InetSocketAddress(8888)); //创建一个多路复用器selector Selector selector = Selector.open(); serverChanel.register(selector, SelectionKey.OP_ACCEPT); //创建支持群聊的NIO Server NioChatServer chatServer = new NioChatServer(); chatServer.enableChat(serverChanel,selector); }}
public class NioChatServer { //开启Server的支持群聊功能 public void enableChat(ServerSocketChannel serverChanel, Selector selector) throws Exception{ System.out.println("chatServer启动...."); while (true){ if (selector.select(1)==0) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { //处理客户端上线 if (key.isAcceptable()) { SocketChannel clientChannel = serverChanel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector,SelectionKey.OP_READ); //获取到client地址 String msg = clientChannel.getRemoteAddress()+ "-上线了-"; //将上线通知,广播给所有在线的其他client sendMsgToOtherClientOnline(selector,clientChannel,msg); } //处理客户端发送消息 if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); clientChannel.read(buffer); //获取来自于client的消息,trim()将buffer中没有数据的内容转为了空格去掉 String msgFromClient = new String(buffer.array()).trim(); if (msgFromClient.length()>0) { //构造要发送给其他client的消息 String msgToSend = clientChannel.getRemoteAddress() + "say:"+msgFromClient; //若client发送的是字符串88,则表示其要下线 if ("88".equals(msgFromClient)) { msgToSend = clientChannel.getRemoteAddress() +"下线了"; //取消当前key,即放弃其所对应的channel //将其对应的channel从selector中去掉 key.cancel(); } //将client消息广播给所有在线的其他client sendMsgToOtherClientOnline(selector,clientChannel,msgToSend); } }//end-if selectionKeys.remove(key); }//end-for } } private void sendMsgToOtherClientOnline(Selector selector, SocketChannel self, String msg) throws IOException { //遍历所有注册到selector的channel,即所有在线的client Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { SelectableChannel channel = key.channel(); //将消息发送给所有其他client if (channel instanceof SocketChannel && channel != self) { ((SocketChannel)channel).write(ByteBuffer.wrap(msg.trim().getBytes())); } } }}
客户端
public class NioChatClientStarter { public static void main(String[] args) throws Exception { SocketChannel clientChannel = SocketChannel.open(); clientChannel.configureBlocking(false); InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888); //连接Server if (!clientChannel.connect(serverAddr)) { //首次连接 while (!clientChannel.finishConnect()) { //完成重连 continue; } } NioChatClient client = new NioChatClient(); client.enableChat(clientChannel); }}
public class NioChatClient { //启动聊天功能 public void enableChat(SocketChannel clientChannel) throws Exception { //获取client自己的地址 SocketAddress selfAddr = clientChannel.getLocalAddress(); System.out.println(selfAddr +",你已经成功上线了"); //创建一个线程用于不间断的接收来自于Server的消息 new Thread(){ @Override public void run() { //实现不间断 while (true){ try { //若当前client已经关闭,则结束循环 //否则正常接收来自Server的消息 if (!clientChannel.isConnected()) { return; } receiveMsgFromServer(clientChannel); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } } } }.start(); //注意,该方法不能写在前面的创建线程之前,这样会导致无法接收到来自于Server的消息, //因为该方法中的Scanner是阻塞的 //向Server发送消息 sendMsgToServer(clientChannel); } //向Server发送消息 private void sendMsgToServer(SocketChannel clientChannel) throws Exception { //接收来自键盘的输入 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.nextLine(); //将消息写入到channel,其中有可能是下线请求消息88 clientChannel.write(ByteBuffer.wrap(msg.trim().getBytes())); //若消息为88,则表示当前client要下线,则将该channel关闭 if ("88".equalsIgnoreCase(msg.trim())) { //关闭客户端 clientChannel.close(); return; } } } private void receiveMsgFromServer(SocketChannel clientChannel) throws Exception{ ByteBuffer buffer = ByteBuffer.allocate(1024); clientChannel.read(buffer); String msg = new String(buffer.array()).trim(); if (msg.length()>0) { System.out.println(msg); } }}
测试
- 开启多个客户端的结果
- server

- client1

