一、NIO编程模型概述
ACCEPT事件:
服务器端启动ServerSocketChannel,绑定端口后注册ACCEPT事件,当客户端发送并被服务器接收的时候则触发ACCEPT事件,并通过handles处理事件、注册新的事件READ。
在客户发送信息以后READ事件就可以被触发。
注意:
处理事件都是在同一线程中完成的:
READ事件:
接上文,当用户SocketChannel中拥有数据时,Selector会发现拥有可读事件,READ被触发,通过handles把数据转发出去,同样,处理事件都是在同一线程中完成的。
注意:
虽然NIO的读写都是非阻塞性的但是Selector是阻塞性的,当没有事件发生的时候Selector的select会一直阻塞,直到新的事件触发。
第二个链接建立:
过程同上,形成如图:
注册了监听Client2的READ事件。
二、代码实践
服务器端
ChatServer
package demo3.nio1.server;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.util.Set;public class ChatServer {private static final String QUIT="quit";private static final int DEFAULT_PORT=9999;private static final int BUFFER=1024;private int port;private ServerSocketChannel server;private Selector selector;private ByteBuffer readBuffer=ByteBuffer.allocate(BUFFER);private ByteBuffer writeBuffer=ByteBuffer.allocate(BUFFER);//编码解码private Charset charset=Charset.forName("UTF-8");public ChatServer(int port){this.port=port;}public ChatServer(){this(DEFAULT_PORT);}private boolean readyToQuit(String str){return QUIT.equalsIgnoreCase(str);}private void close(Closeable...closeables) {try {for (Closeable shut : closeables) {if (shut != null) {shut.close();}}} catch (IOException e) {e.printStackTrace();}}public void start(){try {server = ServerSocketChannel.open();//关闭阻塞状态server.configureBlocking(false);//绑定监听端口server.socket().bind(new InetSocketAddress(port));selector = Selector.open();//注册ACCEPTserver.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务器[" + port + "]已启动");while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {handles(key);}selectionKeys.clear();}}catch (ClosedSelectorException e){} catch (IOException e) {e.printStackTrace();}finally {//不必要关闭ServerChannel,因为关闭selector以后会把它对应的通道一起关闭close(selector);}}private void handles(SelectionKey key) throws IOException {SocketChannel client = null;// ACCEPT事件--和客户端建立连接if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();client = channel.accept();client.configureBlocking(false);client.register(selector, SelectionKey.OP_READ);System.out.println(getClientName(client) + "已连接到服务器");}// READ事件--客户端发送了消息else if (key.isReadable()) {client = (SocketChannel) key.channel();String msg = recieve(client);//客户端异常if (msg.isEmpty()) {//取消继续监视这个通达key.cancel();//更新selector的状态selector.wakeup();} else {//检查是否退出if (readyToQuit(msg)) {key.cancel();selector.wakeup();System.out.println(getClientName(client) + "断开连接");} else {System.out.println(getClientName(client)+msg);//转发数据forwardMessage(client, msg);}}}}private String getClientName(SocketChannel client){return "客户端["+client.socket().getPort()+"]";}private void forwardMessage(SocketChannel client, String msg) throws IOException {for (SelectionKey key:selector.keys()){Channel connectedClient= key.channel();if (connectedClient instanceof ServerSocketChannel){continue;}if (key.isValid()&&!client.equals(connectedClient)){writeBuffer.clear();writeBuffer.put(charset.encode(getClientName(client)+":"+msg+"\n"));writeBuffer.flip();while (writeBuffer.hasRemaining()){((SocketChannel) connectedClient).write(writeBuffer);}}}}private String recieve(SocketChannel client) throws IOException {readBuffer.clear();while (client.read(readBuffer) > 0) {}readBuffer.flip();return String.valueOf(charset.decode(readBuffer));}}
Main
package demo3.nio1.server;public class ServerMain {public static void main(String[] args) {ChatServer chatServer=new ChatServer();chatServer.start();}}
用户端
ChatClient
package demo3.nio1.client;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.util.Set;public class ChatClient {private static final String DEFAULT_IP = "localhost";private static final int DEFAULT_PORT = 9999;private static final String QUIT = "quit";private static final int BUFFER = 1024;private String host;private int post;private SocketChannel client;private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER);private Selector selector;private Charset charset = Charset.forName("UTF-8");public ChatClient(String host, int post) {this.host = host;this.post = post;}public ChatClient() {this(DEFAULT_IP, DEFAULT_PORT);}public boolean readyToQuit(String str) {return QUIT.equalsIgnoreCase(str);}private void close(Closeable... closeables) {try {for (Closeable shut : closeables) {if (shut != null) {shut.close();}}} catch (IOException e) {e.printStackTrace();}}public void start() {try {client = SocketChannel.open();client.configureBlocking(false);selector = Selector.open();client.register(selector, SelectionKey.OP_CONNECT);client.connect(new InetSocketAddress(host, post));System.out.println("客户端[" + host + "]已连接到服务器");while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {handles(key);}selectionKeys.clear();}}catch (ClosedSelectorException e){} catch (IOException e) {e.printStackTrace();} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {SocketChannel channel = null;//CONNECT事件--连接就绪if (key.isConnectable()) {channel = (SocketChannel) key.channel();//判断是否就绪建立连接if (channel.isConnectionPending()) {channel.finishConnect();}//处理用户的输入new Thread(new UserInputHander(this)).start();channel.register(selector, SelectionKey.OP_READ);}//READ事件--服务器转发消息else if (key.isReadable()) {channel = (SocketChannel) key.channel();String msg = recieve(channel);if (msg.isEmpty()) {//服务器异常System.out.println("客户端[" + host + "]已断开服务器");close(selector);} else {System.out.println(msg);}}}private String recieve(SocketChannel channel) throws IOException {readBuffer.clear();while (client.read(readBuffer) > 0) {}readBuffer.flip();return String.valueOf(charset.decode(readBuffer));}public void send(String msg) throws IOException {if (msg.isEmpty()) {return;}writeBuffer.clear();writeBuffer.put(charset.encode(msg));writeBuffer.flip();while (writeBuffer.hasRemaining()) {client.write(writeBuffer);}if (readyToQuit(msg)) {close(selector);System.out.println("客户端[" + host + "]已断开服务器");}}}
输入线程
package demo3.nio1.client;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class UserInputHander implements Runnable {private ChatClient chatClient;private BufferedReader reader;public UserInputHander(ChatClient chatClient) {this.chatClient = chatClient;}@Overridepublic void run() {reader = new BufferedReader(new InputStreamReader(System.in));try {while (true) {String msg = reader.readLine();chatClient.send(msg);if (chatClient.readyToQuit(msg)) {break;}}} catch (IOException e) {e.printStackTrace();} finally {if (reader != null) {try {reader.close();} catch (IOException e) {e.printStackTrace();}}}}}
Main
package demo3.nio1.client;public class ClientMain {public static void main(String[] args) {ChatClient chatClient=new ChatClient();chatClient.start();}}
三、测试结果

