一、NIO编程模型概述

ACCEPT事件:

服务器端启动ServerSocketChannel,绑定端口后注册ACCEPT事件,当客户端发送并被服务器接收的时候则触发ACCEPT事件,并通过handles处理事件、注册新的事件READ。
image.png
在客户发送信息以后READ事件就可以被触发。
注意:
处理事件都是在同一线程中完成的:
image.png

READ事件:

接上文,当用户SocketChannel中拥有数据时,Selector会发现拥有可读事件,READ被触发,通过handles把数据转发出去,同样,处理事件都是在同一线程中完成的。

注意:

虽然NIO的读写都是非阻塞性的但是Selector是阻塞性的,当没有事件发生的时候Selector的select会一直阻塞,直到新的事件触发。

第二个链接建立:

过程同上,形成如图:
image.png
注册了监听Client2的READ事件。

二、代码实践

服务器端

ChatServer

  1. package demo3.nio1.server;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.*;
  7. import java.nio.charset.Charset;
  8. import java.util.Set;
  9. public class ChatServer {
  10. private static final String QUIT="quit";
  11. private static final int DEFAULT_PORT=9999;
  12. private static final int BUFFER=1024;
  13. private int port;
  14. private ServerSocketChannel server;
  15. private Selector selector;
  16. private ByteBuffer readBuffer=ByteBuffer.allocate(BUFFER);
  17. private ByteBuffer writeBuffer=ByteBuffer.allocate(BUFFER);
  18. //编码解码
  19. private Charset charset=Charset.forName("UTF-8");
  20. public ChatServer(int port){
  21. this.port=port;
  22. }
  23. public ChatServer(){
  24. this(DEFAULT_PORT);
  25. }
  26. private boolean readyToQuit(String str){
  27. return QUIT.equalsIgnoreCase(str);
  28. }
  29. private void close(Closeable...closeables) {
  30. try {
  31. for (Closeable shut : closeables) {
  32. if (shut != null) {
  33. shut.close();
  34. }
  35. }
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. public void start(){
  41. try {
  42. server = ServerSocketChannel.open();
  43. //关闭阻塞状态
  44. server.configureBlocking(false);
  45. //绑定监听端口
  46. server.socket().bind(new InetSocketAddress(port));
  47. selector = Selector.open();
  48. //注册ACCEPT
  49. server.register(selector, SelectionKey.OP_ACCEPT);
  50. System.out.println("服务器[" + port + "]已启动");
  51. while (true) {
  52. selector.select();
  53. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  54. for (SelectionKey key : selectionKeys) {
  55. handles(key);
  56. }
  57. selectionKeys.clear();
  58. }
  59. }catch (ClosedSelectorException e){
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }finally {
  63. //不必要关闭ServerChannel,因为关闭selector以后会把它对应的通道一起关闭
  64. close(selector);
  65. }
  66. }
  67. private void handles(SelectionKey key) throws IOException {
  68. SocketChannel client = null;
  69. // ACCEPT事件--和客户端建立连接
  70. if (key.isAcceptable()) {
  71. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
  72. client = channel.accept();
  73. client.configureBlocking(false);
  74. client.register(selector, SelectionKey.OP_READ);
  75. System.out.println(getClientName(client) + "已连接到服务器");
  76. }
  77. // READ事件--客户端发送了消息
  78. else if (key.isReadable()) {
  79. client = (SocketChannel) key.channel();
  80. String msg = recieve(client);
  81. //客户端异常
  82. if (msg.isEmpty()) {
  83. //取消继续监视这个通达
  84. key.cancel();
  85. //更新selector的状态
  86. selector.wakeup();
  87. } else {
  88. //检查是否退出
  89. if (readyToQuit(msg)) {
  90. key.cancel();
  91. selector.wakeup();
  92. System.out.println(getClientName(client) + "断开连接");
  93. } else {
  94. System.out.println(getClientName(client)+msg);
  95. //转发数据
  96. forwardMessage(client, msg);
  97. }
  98. }
  99. }
  100. }
  101. private String getClientName(SocketChannel client){
  102. return "客户端["+client.socket().getPort()+"]";
  103. }
  104. private void forwardMessage(SocketChannel client, String msg) throws IOException {
  105. for (SelectionKey key:selector.keys()){
  106. Channel connectedClient= key.channel();
  107. if (connectedClient instanceof ServerSocketChannel){
  108. continue;
  109. }
  110. if (key.isValid()&&!client.equals(connectedClient)){
  111. writeBuffer.clear();
  112. writeBuffer.put(charset.encode(getClientName(client)+":"+msg+"\n"));
  113. writeBuffer.flip();
  114. while (writeBuffer.hasRemaining()){
  115. ((SocketChannel) connectedClient).write(writeBuffer);
  116. }
  117. }
  118. }
  119. }
  120. private String recieve(SocketChannel client) throws IOException {
  121. readBuffer.clear();
  122. while (client.read(readBuffer) > 0) {
  123. }
  124. readBuffer.flip();
  125. return String.valueOf(charset.decode(readBuffer));
  126. }
  127. }

Main

  1. package demo3.nio1.server;
  2. public class ServerMain {
  3. public static void main(String[] args) {
  4. ChatServer chatServer=new ChatServer();
  5. chatServer.start();
  6. }
  7. }

用户端

ChatClient

  1. package demo3.nio1.client;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.*;
  7. import java.nio.charset.Charset;
  8. import java.util.Set;
  9. public class ChatClient {
  10. private static final String DEFAULT_IP = "localhost";
  11. private static final int DEFAULT_PORT = 9999;
  12. private static final String QUIT = "quit";
  13. private static final int BUFFER = 1024;
  14. private String host;
  15. private int post;
  16. private SocketChannel client;
  17. private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER);
  18. private ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER);
  19. private Selector selector;
  20. private Charset charset = Charset.forName("UTF-8");
  21. public ChatClient(String host, int post) {
  22. this.host = host;
  23. this.post = post;
  24. }
  25. public ChatClient() {
  26. this(DEFAULT_IP, DEFAULT_PORT);
  27. }
  28. public boolean readyToQuit(String str) {
  29. return QUIT.equalsIgnoreCase(str);
  30. }
  31. private void close(Closeable... closeables) {
  32. try {
  33. for (Closeable shut : closeables) {
  34. if (shut != null) {
  35. shut.close();
  36. }
  37. }
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. public void start() {
  43. try {
  44. client = SocketChannel.open();
  45. client.configureBlocking(false);
  46. selector = Selector.open();
  47. client.register(selector, SelectionKey.OP_CONNECT);
  48. client.connect(new InetSocketAddress(host, post));
  49. System.out.println("客户端[" + host + "]已连接到服务器");
  50. while (true) {
  51. selector.select();
  52. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  53. for (SelectionKey key : selectionKeys) {
  54. handles(key);
  55. }
  56. selectionKeys.clear();
  57. }
  58. }catch (ClosedSelectorException e){
  59. } catch (IOException e) {
  60. e.printStackTrace();
  61. } finally {
  62. close(selector);
  63. }
  64. }
  65. private void handles(SelectionKey key) throws IOException {
  66. SocketChannel channel = null;
  67. //CONNECT事件--连接就绪
  68. if (key.isConnectable()) {
  69. channel = (SocketChannel) key.channel();
  70. //判断是否就绪建立连接
  71. if (channel.isConnectionPending()) {
  72. channel.finishConnect();
  73. }
  74. //处理用户的输入
  75. new Thread(new UserInputHander(this)).start();
  76. channel.register(selector, SelectionKey.OP_READ);
  77. }
  78. //READ事件--服务器转发消息
  79. else if (key.isReadable()) {
  80. channel = (SocketChannel) key.channel();
  81. String msg = recieve(channel);
  82. if (msg.isEmpty()) {
  83. //服务器异常
  84. System.out.println("客户端[" + host + "]已断开服务器");
  85. close(selector);
  86. } else {
  87. System.out.println(msg);
  88. }
  89. }
  90. }
  91. private String recieve(SocketChannel channel) throws IOException {
  92. readBuffer.clear();
  93. while (client.read(readBuffer) > 0) {
  94. }
  95. readBuffer.flip();
  96. return String.valueOf(charset.decode(readBuffer));
  97. }
  98. public void send(String msg) throws IOException {
  99. if (msg.isEmpty()) {
  100. return;
  101. }
  102. writeBuffer.clear();
  103. writeBuffer.put(charset.encode(msg));
  104. writeBuffer.flip();
  105. while (writeBuffer.hasRemaining()) {
  106. client.write(writeBuffer);
  107. }
  108. if (readyToQuit(msg)) {
  109. close(selector);
  110. System.out.println("客户端[" + host + "]已断开服务器");
  111. }
  112. }
  113. }

输入线程

  1. package demo3.nio1.client;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. public class UserInputHander implements Runnable {
  6. private ChatClient chatClient;
  7. private BufferedReader reader;
  8. public UserInputHander(ChatClient chatClient) {
  9. this.chatClient = chatClient;
  10. }
  11. @Override
  12. public void run() {
  13. reader = new BufferedReader(new InputStreamReader(System.in));
  14. try {
  15. while (true) {
  16. String msg = reader.readLine();
  17. chatClient.send(msg);
  18. if (chatClient.readyToQuit(msg)) {
  19. break;
  20. }
  21. }
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. } finally {
  25. if (reader != null) {
  26. try {
  27. reader.close();
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }
  34. }

Main

  1. package demo3.nio1.client;
  2. public class ClientMain {
  3. public static void main(String[] args) {
  4. ChatClient chatClient=new ChatClient();
  5. chatClient.start();
  6. }
  7. }

三、测试结果

image.png