简介

  • NIO,是jdk4提供的一种新的IO标准,是一种同步非阻塞的IO,NIO是以块为单位进行数据处理的,当然块的大小是程序员自己指定的。其相对于BIO的以字节为单位所进行的阻塞式处理方式,大大提高了读写效率和并发度
    • BIO,每个客户端连接到来,都会有一个channel,同时都会创建一个专门负责处理此业务的线程,但是计算机的线程数有限,所以,高并发场景下,会将计算机的线程数全部打满。不可用
    • NIO,每个客户端连接到来,都会有一个channel,但是只有一个线程处理所有的请求,此线程会根据选择器选择准备就绪的channel,然后执行相应逻辑

Selector

  • 多路复用器,包括三个set
    • keys:所有注册连接的channel,都会有对应的SelectKey加入到该set中
    • select-keys:所有已经准备就绪的channel的key,会加到此set中,keys中的是不会变的,是keys的子集
    • cancel-keys:如果channel被取消了,会先将此channel的key从select-key中移除到cancel-key中,在下次执行select方法的时候,会将cancel-keys已经keys中已经取消的key全部清除。

SelectionKey

  • 含义:一个令牌,表示一个Channel注册到selector的令牌
  • 时机:每次channel注册到selector都会创建一个selectionKey,一个key保持他的有效性,直到channel被cancel取消掉。有三种方式关闭,channel.cancel、channel.close、selector.close。取消一个selectionKey,并不会直接删除,而是添加到cancel-keys中,为的是,在下次select操作时,会将cancel-keys清空,同时删除selector的keys中已经取消的selectionKey
  • 每个selectionKey包含了两个整数,这两个操作集合都是整型数值。
    • 一个是表示拥有此key的channel所关注的操作事件,是一个二进制位的整数,即interest set:表示与此selectionKey相关的channel,关注的客户端请求是什么状态,好比,当channel注册到selector的时候,会告知selector,此channel关注的,感兴趣的是什么事件,好比客户端连接事件,此channel关注哪种操作类型。存在四种操作类型,四种状态。通过selectionKey.interestOps():int 获取到感兴趣的集合
    • 一个是就绪的集合,ready set,也是一个整数,是操作系统底层去查询判断就绪的信息。通过selectionKey.readyOps():int,可以获取到准备就绪的集合。
  • 通过SelectionKey可以获取到
    • Selector
    • channel
    • 还可以获取到状态,是可连接的,还是可写的,四种状态。

NIO简单通信案例

服务端

  1. public class NioServer {
  2. public static void main(String[] args) throws IOException {
  3. //创建一个服务端channel
  4. ServerSocketChannel serverChanel = ServerSocketChannel.open();
  5. //指定channel采用的为非阻塞模式
  6. serverChanel.configureBlocking(false);
  7. //指定要监听的端口
  8. serverChanel.bind(new InetSocketAddress(8888));
  9. //创建一个多路复用器selector
  10. Selector selector = Selector.open();
  11. //将channel注册到selector,并告诉selector让其监听"接受client连接事件"
  12. serverChanel.register(selector, SelectionKey.OP_ACCEPT);
  13. while (true){
  14. //select()是一个阻塞方法,若阻塞1秒的时间到了,或在阻塞期间有channel就绪,都会打破阻塞
  15. if (selector.select(1000)==0) {
  16. System.out.println("当前没有找到就绪的channel");
  17. continue;
  18. }
  19. //获取所有就绪的channel的key
  20. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  21. for (SelectionKey key : selectionKeys) {
  22. //若当前key为OPT_ACCEPT,则说明当前channel是可以接收客户端连接的
  23. //那么,这里的代码就是用于接收客户端连接的
  24. if(key.isAcceptable()){
  25. System.out.println("接收到client的连接");
  26. //获取连接到Server的客户端channel,其是客户端channel在Server端的代表(驻京办)
  27. SocketChannel clientChannel = serverChanel.accept();
  28. clientChannel.configureBlocking(false);
  29. //将客户端channel注册到selector,并告诉selector让其监听这个channel中是否发生了读事件
  30. clientChannel.register(selector,SelectionKey.OP_READ);
  31. }
  32. //若当前key为OPT_READ,则说明当前channel中有客户端发送来的数据。
  33. //那么,这里的代码就是用于读取channel中的数据的
  34. if (key.isReadable()) {
  35. try {
  36. //创建buffer
  37. ByteBuffer buffer = ByteBuffer.allocate(1024);
  38. //根据key获取其对应的channel
  39. SocketChannel clientChannel = (SocketChannel)key.channel();
  40. //将channel中的数据读取到buffer
  41. clientChannel.read(buffer);
  42. }catch (Exception e){
  43. //若在读取过程中发生异常,则直接取消该Key,即放弃该channel
  44. key.cancel();
  45. }
  46. }
  47. //删除当前处理过得key,以免重复处理
  48. selectionKeys.remove(key);
  49. }//end-for
  50. }
  51. }
  52. }

客户端

  1. public class NioClient {
  2. public static void main(String[] args) throws Exception {
  3. SocketChannel clientChannel = SocketChannel.open();
  4. clientChannel.configureBlocking(false);
  5. InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
  6. //连接Server
  7. if (!clientChannel.connect(serverAddr)) { //首次连接
  8. while (!clientChannel.finishConnect()) { //完成重连
  9. continue;
  10. }
  11. }
  12. //将消息写入到channel
  13. clientChannel.write(ByteBuffer.wrap("hello".getBytes("UTF-8")));
  14. System.out.println("Client消息已发送");
  15. System.in.read();
  16. }
  17. }

NIO群聊案例

服务端

  1. public class NioChatServerStarter {
  2. public static void main(String[] args) throws Exception{
  3. //创建一个服务端channel
  4. ServerSocketChannel serverChanel = ServerSocketChannel.open();
  5. //指定channel采用的为非阻塞模式
  6. serverChanel.configureBlocking(false);
  7. //指定要监听的端口
  8. serverChanel.bind(new InetSocketAddress(8888));
  9. //创建一个多路复用器selector
  10. Selector selector = Selector.open();
  11. serverChanel.register(selector, SelectionKey.OP_ACCEPT);
  12. //创建支持群聊的NIO Server
  13. NioChatServer chatServer = new NioChatServer();
  14. chatServer.enableChat(serverChanel,selector);
  15. }
  16. }
  1. public class NioChatServer {
  2. //开启Server的支持群聊功能
  3. public void enableChat(ServerSocketChannel serverChanel, Selector selector) throws Exception{
  4. System.out.println("chatServer启动....");
  5. while (true){
  6. if (selector.select(1)==0) {
  7. continue;
  8. }
  9. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  10. for (SelectionKey key : selectionKeys) {
  11. //处理客户端上线
  12. if (key.isAcceptable()) {
  13. SocketChannel clientChannel = serverChanel.accept();
  14. clientChannel.configureBlocking(false);
  15. clientChannel.register(selector,SelectionKey.OP_READ);
  16. //获取到client地址
  17. String msg = clientChannel.getRemoteAddress()+ "-上线了-";
  18. //将上线通知,广播给所有在线的其他client
  19. sendMsgToOtherClientOnline(selector,clientChannel,msg);
  20. }
  21. //处理客户端发送消息
  22. if (key.isReadable()) {
  23. SocketChannel clientChannel = (SocketChannel) key.channel();
  24. ByteBuffer buffer = ByteBuffer.allocate(1024);
  25. clientChannel.read(buffer);
  26. //获取来自于client的消息,trim()将buffer中没有数据的内容转为了空格去掉
  27. String msgFromClient = new String(buffer.array()).trim();
  28. if (msgFromClient.length()>0) {
  29. //构造要发送给其他client的消息
  30. String msgToSend = clientChannel.getRemoteAddress() + "say:"+msgFromClient;
  31. //若client发送的是字符串88,则表示其要下线
  32. if ("88".equals(msgFromClient)) {
  33. msgToSend = clientChannel.getRemoteAddress() +"下线了";
  34. //取消当前key,即放弃其所对应的channel
  35. //将其对应的channel从selector中去掉
  36. key.cancel();
  37. }
  38. //将client消息广播给所有在线的其他client
  39. sendMsgToOtherClientOnline(selector,clientChannel,msgToSend);
  40. }
  41. }//end-if
  42. selectionKeys.remove(key);
  43. }//end-for
  44. }
  45. }
  46. private void sendMsgToOtherClientOnline(Selector selector, SocketChannel self, String msg) throws IOException {
  47. //遍历所有注册到selector的channel,即所有在线的client
  48. Set<SelectionKey> keys = selector.keys();
  49. for (SelectionKey key : keys) {
  50. SelectableChannel channel = key.channel();
  51. //将消息发送给所有其他client
  52. if (channel instanceof SocketChannel && channel != self) {
  53. ((SocketChannel)channel).write(ByteBuffer.wrap(msg.trim().getBytes()));
  54. }
  55. }
  56. }
  57. }

客户端

  1. public class NioChatClientStarter {
  2. public static void main(String[] args) throws Exception {
  3. SocketChannel clientChannel = SocketChannel.open();
  4. clientChannel.configureBlocking(false);
  5. InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
  6. //连接Server
  7. if (!clientChannel.connect(serverAddr)) { //首次连接
  8. while (!clientChannel.finishConnect()) { //完成重连
  9. continue;
  10. }
  11. }
  12. NioChatClient client = new NioChatClient();
  13. client.enableChat(clientChannel);
  14. }
  15. }
  1. public class NioChatClient {
  2. //启动聊天功能
  3. public void enableChat(SocketChannel clientChannel) throws Exception {
  4. //获取client自己的地址
  5. SocketAddress selfAddr = clientChannel.getLocalAddress();
  6. System.out.println(selfAddr +",你已经成功上线了");
  7. //创建一个线程用于不间断的接收来自于Server的消息
  8. new Thread(){
  9. @Override
  10. public void run() {
  11. //实现不间断
  12. while (true){
  13. try {
  14. //若当前client已经关闭,则结束循环
  15. //否则正常接收来自Server的消息
  16. if (!clientChannel.isConnected()) {
  17. return;
  18. }
  19. receiveMsgFromServer(clientChannel);
  20. TimeUnit.SECONDS.sleep(1);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }.start();
  27. //注意,该方法不能写在前面的创建线程之前,这样会导致无法接收到来自于Server的消息,
  28. //因为该方法中的Scanner是阻塞的
  29. //向Server发送消息
  30. sendMsgToServer(clientChannel);
  31. }
  32. //向Server发送消息
  33. private void sendMsgToServer(SocketChannel clientChannel) throws Exception {
  34. //接收来自键盘的输入
  35. Scanner scanner = new Scanner(System.in);
  36. while (scanner.hasNext()){
  37. String msg = scanner.nextLine();
  38. //将消息写入到channel,其中有可能是下线请求消息88
  39. clientChannel.write(ByteBuffer.wrap(msg.trim().getBytes()));
  40. //若消息为88,则表示当前client要下线,则将该channel关闭
  41. if ("88".equalsIgnoreCase(msg.trim())) {
  42. //关闭客户端
  43. clientChannel.close();
  44. return;
  45. }
  46. }
  47. }
  48. private void receiveMsgFromServer(SocketChannel clientChannel) throws Exception{
  49. ByteBuffer buffer = ByteBuffer.allocate(1024);
  50. clientChannel.read(buffer);
  51. String msg = new String(buffer.array()).trim();
  52. if (msg.length()>0) {
  53. System.out.println(msg);
  54. }
  55. }
  56. }

测试

  • 开启多个客户端的结果
    • serverimage.png
    • client1

image.png

  • client2

image.png