一、NIO编程模型概述
ACCEPT事件:
服务器端启动ServerSocketChannel,绑定端口后注册ACCEPT事件,当客户端发送并被服务器接收的时候则触发ACCEPT事件,并通过handles处理事件、注册新的事件READ。
在客户发送信息以后READ事件就可以被触发。
注意:
处理事件都是在同一线程中完成的:
READ事件:
接上文,当用户SocketChannel中拥有数据时,Selector会发现拥有可读事件,READ被触发,通过handles把数据转发出去,同样,处理事件都是在同一线程中完成的。
注意:
虽然NIO的读写都是非阻塞性的但是Selector是阻塞性的,当没有事件发生的时候Selector的select会一直阻塞,直到新的事件触发。
第二个链接建立:
过程同上,形成如图:
注册了监听Client2的READ事件。
二、代码实践
服务器端
ChatServer
package demo3.nio1.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
public class ChatServer {
private static final String QUIT="quit";
private static final int DEFAULT_PORT=9999;
private static final int BUFFER=1024;
private int port;
private ServerSocketChannel server;
private Selector selector;
private ByteBuffer readBuffer=ByteBuffer.allocate(BUFFER);
private ByteBuffer writeBuffer=ByteBuffer.allocate(BUFFER);
//编码解码
private Charset charset=Charset.forName("UTF-8");
public ChatServer(int port){
this.port=port;
}
public ChatServer(){
this(DEFAULT_PORT);
}
private boolean readyToQuit(String str){
return QUIT.equalsIgnoreCase(str);
}
private void close(Closeable...closeables) {
try {
for (Closeable shut : closeables) {
if (shut != null) {
shut.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void start(){
try {
server = ServerSocketChannel.open();
//关闭阻塞状态
server.configureBlocking(false);
//绑定监听端口
server.socket().bind(new InetSocketAddress(port));
selector = Selector.open();
//注册ACCEPT
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器[" + port + "]已启动");
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
handles(key);
}
selectionKeys.clear();
}
}catch (ClosedSelectorException e){
} catch (IOException e) {
e.printStackTrace();
}finally {
//不必要关闭ServerChannel,因为关闭selector以后会把它对应的通道一起关闭
close(selector);
}
}
private void handles(SelectionKey key) throws IOException {
SocketChannel client = null;
// ACCEPT事件--和客户端建立连接
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
client = channel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println(getClientName(client) + "已连接到服务器");
}
// READ事件--客户端发送了消息
else if (key.isReadable()) {
client = (SocketChannel) key.channel();
String msg = recieve(client);
//客户端异常
if (msg.isEmpty()) {
//取消继续监视这个通达
key.cancel();
//更新selector的状态
selector.wakeup();
} else {
//检查是否退出
if (readyToQuit(msg)) {
key.cancel();
selector.wakeup();
System.out.println(getClientName(client) + "断开连接");
} else {
System.out.println(getClientName(client)+msg);
//转发数据
forwardMessage(client, msg);
}
}
}
}
private String getClientName(SocketChannel client){
return "客户端["+client.socket().getPort()+"]";
}
private void forwardMessage(SocketChannel client, String msg) throws IOException {
for (SelectionKey key:selector.keys()){
Channel connectedClient= key.channel();
if (connectedClient instanceof ServerSocketChannel){
continue;
}
if (key.isValid()&&!client.equals(connectedClient)){
writeBuffer.clear();
writeBuffer.put(charset.encode(getClientName(client)+":"+msg+"\n"));
writeBuffer.flip();
while (writeBuffer.hasRemaining()){
((SocketChannel) connectedClient).write(writeBuffer);
}
}
}
}
private String recieve(SocketChannel client) throws IOException {
readBuffer.clear();
while (client.read(readBuffer) > 0) {
}
readBuffer.flip();
return String.valueOf(charset.decode(readBuffer));
}
}
Main
package demo3.nio1.server;
public class ServerMain {
public static void main(String[] args) {
ChatServer chatServer=new ChatServer();
chatServer.start();
}
}
用户端
ChatClient
package demo3.nio1.client;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
public class ChatClient {
private static final String DEFAULT_IP = "localhost";
private static final int DEFAULT_PORT = 9999;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private String host;
private int post;
private SocketChannel client;
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER);
private ByteBuffer writeBuffer = ByteBuffer.allocate(BUFFER);
private Selector selector;
private Charset charset = Charset.forName("UTF-8");
public ChatClient(String host, int post) {
this.host = host;
this.post = post;
}
public ChatClient() {
this(DEFAULT_IP, DEFAULT_PORT);
}
public boolean readyToQuit(String str) {
return QUIT.equalsIgnoreCase(str);
}
private void close(Closeable... closeables) {
try {
for (Closeable shut : closeables) {
if (shut != null) {
shut.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
try {
client = SocketChannel.open();
client.configureBlocking(false);
selector = Selector.open();
client.register(selector, SelectionKey.OP_CONNECT);
client.connect(new InetSocketAddress(host, post));
System.out.println("客户端[" + host + "]已连接到服务器");
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
handles(key);
}
selectionKeys.clear();
}
}catch (ClosedSelectorException e){
} catch (IOException e) {
e.printStackTrace();
} finally {
close(selector);
}
}
private void handles(SelectionKey key) throws IOException {
SocketChannel channel = null;
//CONNECT事件--连接就绪
if (key.isConnectable()) {
channel = (SocketChannel) key.channel();
//判断是否就绪建立连接
if (channel.isConnectionPending()) {
channel.finishConnect();
}
//处理用户的输入
new Thread(new UserInputHander(this)).start();
channel.register(selector, SelectionKey.OP_READ);
}
//READ事件--服务器转发消息
else if (key.isReadable()) {
channel = (SocketChannel) key.channel();
String msg = recieve(channel);
if (msg.isEmpty()) {
//服务器异常
System.out.println("客户端[" + host + "]已断开服务器");
close(selector);
} else {
System.out.println(msg);
}
}
}
private String recieve(SocketChannel channel) throws IOException {
readBuffer.clear();
while (client.read(readBuffer) > 0) {
}
readBuffer.flip();
return String.valueOf(charset.decode(readBuffer));
}
public void send(String msg) throws IOException {
if (msg.isEmpty()) {
return;
}
writeBuffer.clear();
writeBuffer.put(charset.encode(msg));
writeBuffer.flip();
while (writeBuffer.hasRemaining()) {
client.write(writeBuffer);
}
if (readyToQuit(msg)) {
close(selector);
System.out.println("客户端[" + host + "]已断开服务器");
}
}
}
输入线程
package demo3.nio1.client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class UserInputHander implements Runnable {
private ChatClient chatClient;
private BufferedReader reader;
public UserInputHander(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
reader = new BufferedReader(new InputStreamReader(System.in));
try {
while (true) {
String msg = reader.readLine();
chatClient.send(msg);
if (chatClient.readyToQuit(msg)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Main
package demo3.nio1.client;
public class ClientMain {
public static void main(String[] args) {
ChatClient chatClient=new ChatClient();
chatClient.start();
}
}