一、AIO编程模型梳理
回忆上一篇文章的AIO模型,这里做一些大概梳理。
首先在服务器端创建一部服务器通道,绑定监听端口
这里使用的AsynchronousServerSocketChannel 他其实属于一个通道群(AsynchronousChannelGroups),而这个通道群代表着一组可以被多个一部通道共享的资源群组。通过这个通道群来调用handler 。
当我们不做额外设定时候,系统会使用默认的通道群。
异步如何实现?
创建一个handler ,然后通过行为调用handler 来处理行为。
具体物理逻辑见上一篇文章
二、服务器创建
chatServer:
package demo5.aio.server;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ChatServer {private static final String LOCALHOST = "localhost";private static final int DEFAULT_PORT = 9999;private static final String QUIT = "quit";private static final int BUFFER = 1024;private static final int THREADPOOL_SIZE = 8;private AsynchronousServerSocketChannel serverSocketChannel;private AsynchronousChannelGroup channelGroup;private Charset charset = StandardCharsets.UTF_8;private List<ClientHandler> connectedClients;private int port;private String host;private void shutdown(Closeable... closeables) {try {for (Closeable shut : closeables) {if (shut != null) {shut.close();}}} catch (IOException e) {e.printStackTrace();}}private boolean readyToQuit(String str) {return QUIT.equalsIgnoreCase(str);}public ChatServer(int port, String host) {this.host = host;this.port = port;connectedClients = new ArrayList<>();}public ChatServer() {this(DEFAULT_PORT, LOCALHOST);}public void start() {try {ExecutorService pool = Executors.newFixedThreadPool(THREADPOOL_SIZE);channelGroup = AsynchronousChannelGroup.withThreadPool(pool);serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);serverSocketChannel.bind(new InetSocketAddress(host, port));System.out.println("服务器已启动,监听端口[" + serverSocketChannel.getLocalAddress() + "]");while (true) {serverSocketChannel.accept(null, new AcceptHandler());System.in.read();}} catch (IOException e) {e.printStackTrace();} finally {shutdown(serverSocketChannel);}}private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {@Overridepublic void completed(AsynchronousSocketChannel clientChannel, Object attachment) {if (serverSocketChannel.isOpen()) {serverSocketChannel.accept(null, this);}if (clientChannel != null && clientChannel.isOpen()) {ClientHandler handler = new ClientHandler(clientChannel);ByteBuffer buffer = ByteBuffer.allocate(BUFFER);//将用户添加到在线列表addClient(handler);clientChannel.read(buffer, buffer, handler);}}@Overridepublic void failed(Throwable exc, Object attachment) {System.out.println("用户连接失败" + exc);}}private synchronized void addClient(ClientHandler handler) {connectedClients.add(handler);System.out.println(getClientName(handler.clientChannel) + "已成功连接到服务器");}private synchronized void removeClient(ClientHandler clientHandler) {connectedClients.remove(clientHandler);System.err.println(getClientName(clientHandler.clientChannel) + "已断开服务器连接");shutdown(clientHandler.clientChannel);}private String getClientName(AsynchronousSocketChannel clientChannel) {try {InetSocketAddress address= (InetSocketAddress) clientChannel.getRemoteAddress();String str="客户端["+address.getPort()+"]:";return str;} catch (IOException e) {e.printStackTrace();}return "获取客户端失败";}private class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel clientChannel;public ClientHandler(AsynchronousSocketChannel clientChannel) {this.clientChannel = clientChannel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {if (attachment != null) {if (result <= 0) {//客户端异常//移除在线列表removeClient(this);} else {attachment.flip();String msg = receive(attachment);System.out.println(getClientName(clientChannel) + msg);forwardMessage(clientChannel, msg);attachment.clear();//检查用户是否决定退出if (readyToQuit(msg)) {removeClient(this);} else {clientChannel.read(attachment, attachment, this);}}}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {System.out.println("用户读写失败" + exc);}}private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String msg) {for (ClientHandler handler : connectedClients) {try {if (!clientChannel.equals(handler.clientChannel)) {String str = getClientName(handler.clientChannel);ByteBuffer buffer = null;if (msg == null) {return;}if (readyToQuit(msg)) {buffer = charset.encode(str + "已断开连接");handler.clientChannel.write(buffer, null, handler);} else {buffer = charset.encode(str + msg);handler.clientChannel.write(buffer, null, handler);}}} catch (Exception e) {e.printStackTrace();}}}private String receive(ByteBuffer attachment) {return String.valueOf(charset.decode(attachment)).trim();}}
ServerMain:
package demo5.aio.server;/*** @ClassName:* @Description:* @author: hszjj* @date: 2019/11/23 19:43*/public class ServerMain {public static void main(String[] args) {ChatServer server=new ChatServer();server.start();}}
三、客户端创建
chatClient:
package demo5.aio.client;import java.io.BufferedReader;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class ChatClient {private static final String LOCALHOST = "localhost";private static final int DEFAULT_PORT = 9999;private static final String QUIT = "quit";private static final int BUFFER = 1024;private Charset charset = Charset.forName("UTF-8");private int port;private String host;private AsynchronousSocketChannel clientChannel;public ChatClient(String host, int port) {this.host = host;this.port = port;}public ChatClient() {this(LOCALHOST, DEFAULT_PORT);}private void shutdown(Closeable... closeables) {try {for (Closeable shut : closeables) {if (shut != null) {shut.close();}}} catch (IOException e) {e.printStackTrace();}}public boolean readyToQuit(String str) {return QUIT.equalsIgnoreCase(str);}public void start() {try {clientChannel = AsynchronousSocketChannel.open();Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port));System.out.println("已成功连接到服务器");future.get();//处理用户输入new Thread(new UserInputHander(this)).start();ByteBuffer buffer = ByteBuffer.allocate(BUFFER);while (true) {Future<Integer> readResult = clientChannel.read(buffer);int result = readResult.get();if (result <= 0) {//服务器异常System.err.println("服务器断开");shutdown(clientChannel);System.exit(-1);} else {buffer.flip();String msg = String.valueOf(charset.decode(buffer));buffer.clear();System.out.println(msg);}}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}public void send(String msg) {try {if (msg.isEmpty()) {return;} else {ByteBuffer buffer = charset.encode(msg);Future<Integer> future = clientChannel.write(buffer);future.get();}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();System.err.println("消息发送失败");}}}
UserInputHander:
package demo5.aio.client;import demo3.nio1.client.Demo3ChatClient;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class UserInputHander implements Runnable {private ChatClient chatClient;private BufferedReader reader;public UserInputHander(ChatClient chatClient) {this.chatClient = chatClient;}@Overridepublic void run() {reader = new BufferedReader(new InputStreamReader(System.in));try {while (true) {String msg = reader.readLine();chatClient.send(msg);if (chatClient.readyToQuit(msg)) {break;}}} catch (IOException e) {e.printStackTrace();} finally {if (reader != null) {try {System.out.println("已断开服务器连接");reader.close();} catch (IOException e) {e.printStackTrace();}}}}}
ClientMain:
package demo5.aio.client;/*** @ClassName:* @Description:* @author: hszjj* @date: 2019/11/23 20:16*/public class ClientMain {public static void main(String[] args) {ChatClient client=new ChatClient();client.start();}}
四、效果演示

