一、AIO编程模型梳理
回忆上一篇文章的AIO模型,这里做一些大概梳理。
首先在服务器端创建一部服务器通道,绑定监听端口
这里使用的AsynchronousServerSocketChannel 他其实属于一个通道群(AsynchronousChannelGroups),而这个通道群代表着一组可以被多个一部通道共享的资源群组。通过这个通道群来调用handler 。
当我们不做额外设定时候,系统会使用默认的通道群。
异步如何实现?
创建一个handler ,然后通过行为调用handler 来处理行为。
具体物理逻辑见上一篇文章
二、服务器创建
chatServer:
package demo5.aio.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChatServer {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 9999;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private static final int THREADPOOL_SIZE = 8;
private AsynchronousServerSocketChannel serverSocketChannel;
private AsynchronousChannelGroup channelGroup;
private Charset charset = StandardCharsets.UTF_8;
private List<ClientHandler> connectedClients;
private int port;
private String host;
private void shutdown(Closeable... closeables) {
try {
for (Closeable shut : closeables) {
if (shut != null) {
shut.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean readyToQuit(String str) {
return QUIT.equalsIgnoreCase(str);
}
public ChatServer(int port, String host) {
this.host = host;
this.port = port;
connectedClients = new ArrayList<>();
}
public ChatServer() {
this(DEFAULT_PORT, LOCALHOST);
}
public void start() {
try {
ExecutorService pool = Executors.newFixedThreadPool(THREADPOOL_SIZE);
channelGroup = AsynchronousChannelGroup.withThreadPool(pool);
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress(host, port));
System.out.println("服务器已启动,监听端口[" + serverSocketChannel.getLocalAddress() + "]");
while (true) {
serverSocketChannel.accept(null, new AcceptHandler());
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
shutdown(serverSocketChannel);
}
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
if (serverSocketChannel.isOpen()) {
serverSocketChannel.accept(null, this);
}
if (clientChannel != null && clientChannel.isOpen()) {
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
//将用户添加到在线列表
addClient(handler);
clientChannel.read(buffer, buffer, handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("用户连接失败" + exc);
}
}
private synchronized void addClient(ClientHandler handler) {
connectedClients.add(handler);
System.out.println(getClientName(handler.clientChannel) + "已成功连接到服务器");
}
private synchronized void removeClient(ClientHandler clientHandler) {
connectedClients.remove(clientHandler);
System.err.println(getClientName(clientHandler.clientChannel) + "已断开服务器连接");
shutdown(clientHandler.clientChannel);
}
private String getClientName(AsynchronousSocketChannel clientChannel) {
try {
InetSocketAddress address= (InetSocketAddress) clientChannel.getRemoteAddress();
String str="客户端["+address.getPort()+"]:";
return str;
} catch (IOException e) {
e.printStackTrace();
}
return "获取客户端失败";
}
private class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment != null) {
if (result <= 0) {
//客户端异常
//移除在线列表
removeClient(this);
} else {
attachment.flip();
String msg = receive(attachment);
System.out.println(getClientName(clientChannel) + msg);
forwardMessage(clientChannel, msg);
attachment.clear();
//检查用户是否决定退出
if (readyToQuit(msg)) {
removeClient(this);
} else {
clientChannel.read(attachment, attachment, this);
}
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("用户读写失败" + exc);
}
}
private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String msg) {
for (ClientHandler handler : connectedClients) {
try {
if (!clientChannel.equals(handler.clientChannel)) {
String str = getClientName(handler.clientChannel);
ByteBuffer buffer = null;
if (msg == null) {
return;
}
if (readyToQuit(msg)) {
buffer = charset.encode(str + "已断开连接");
handler.clientChannel.write(buffer, null, handler);
} else {
buffer = charset.encode(str + msg);
handler.clientChannel.write(buffer, null, handler);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private String receive(ByteBuffer attachment) {
return String.valueOf(charset.decode(attachment)).trim();
}
}
ServerMain:
package demo5.aio.server;
/**
* @ClassName:
* @Description:
* @author: hszjj
* @date: 2019/11/23 19:43
*/
public class ServerMain {
public static void main(String[] args) {
ChatServer server=new ChatServer();
server.start();
}
}
三、客户端创建
chatClient:
package demo5.aio.client;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ChatClient {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 9999;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private Charset charset = Charset.forName("UTF-8");
private int port;
private String host;
private AsynchronousSocketChannel clientChannel;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public ChatClient() {
this(LOCALHOST, DEFAULT_PORT);
}
private void shutdown(Closeable... closeables) {
try {
for (Closeable shut : closeables) {
if (shut != null) {
shut.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public boolean readyToQuit(String str) {
return QUIT.equalsIgnoreCase(str);
}
public void start() {
try {
clientChannel = AsynchronousSocketChannel.open();
Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port));
System.out.println("已成功连接到服务器");
future.get();
//处理用户输入
new Thread(new UserInputHander(this)).start();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
while (true) {
Future<Integer> readResult = clientChannel.read(buffer);
int result = readResult.get();
if (result <= 0) {
//服务器异常
System.err.println("服务器断开");
shutdown(clientChannel);
System.exit(-1);
} else {
buffer.flip();
String msg = String.valueOf(charset.decode(buffer));
buffer.clear();
System.out.println(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public void send(String msg) {
try {
if (msg.isEmpty()) {
return;
} else {
ByteBuffer buffer = charset.encode(msg);
Future<Integer> future = clientChannel.write(buffer);
future.get();
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
System.err.println("消息发送失败");
}
}
}
UserInputHander:
package demo5.aio.client;
import demo3.nio1.client.Demo3ChatClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class UserInputHander implements Runnable {
private ChatClient chatClient;
private BufferedReader reader;
public UserInputHander(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
reader = new BufferedReader(new InputStreamReader(System.in));
try {
while (true) {
String msg = reader.readLine();
chatClient.send(msg);
if (chatClient.readyToQuit(msg)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
System.out.println("已断开服务器连接");
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
ClientMain:
package demo5.aio.client;
/**
* @ClassName:
* @Description:
* @author: hszjj
* @date: 2019/11/23 20:16
*/
public class ClientMain {
public static void main(String[] args) {
ChatClient client=new ChatClient();
client.start();
}
}