一、AIO的异步操作
AsyncSocketChannel和AsyncServerSocketChannel都是支持如图几个IO操作异步调用的。
二、如何实现异步调用
Future
简单来说通过Channel来调用那几个函数,然后返回一个Future的对象,Future在线程池中介绍过,他是一个描述未来的一个对象,通过get、isDone等等的调用查询未来的任务。
CompletionHander
通过Channel调用IO操作,然后不等待是否完成,然后把一些参数传入一个回调函数(Handler)进一步操作(实现)。
Handler拥有两种函数,一个是Completed,是在完成后调用,另一种是Failed,是在失败了后调用。
三、EchoDemo实现异步操作机制
Server:
主体函数:
package demo4.aio.server;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.HashMap;import java.util.Map;public class Server {private String LOCALHOST="localhost";private static final int DEFAULT_PORT=9999;private AsynchronousServerSocketChannel serverSocketChannel;private static final String QUIT="quit";private boolean readyToQuit(String str){return QUIT.equalsIgnoreCase(str);}private void shutDown(Closeable...closeables){try {for (Closeable shut : closeables) {if (shut != null) {shut.close();}}} catch (IOException e) {e.printStackTrace();}}public void start(){try {//绑定监听端口serverSocketChannel=AsynchronousServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,DEFAULT_PORT));System.out.println("服务器已启动,正在监听:["+LOCALHOST+","+DEFAULT_PORT+"]");while (true) {//AcceptHandler在AsynchronousChannelGroup线程池中完成,而非主线程。//AsynchronousChannelGroup未定义时使用默认的AsynchronousChannelGroupserverSocketChannel.accept(null, new AcceptHandler());//防止频繁调用accept函数。System.in.read();}} catch (IOException e) {e.printStackTrace();}finally {shutDown(serverSocketChannel);}}private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object> {@Overridepublic void completed(AsynchronousSocketChannel result, Object attachment) {//并不会造成stepOverFlow在底层实现拥有次数限制if (serverSocketChannel.isOpen()){serverSocketChannel.accept(null,this);}AsynchronousSocketChannel clientChannel=result;if (clientChannel!=null&&clientChannel.isOpen()) {ClientHander hander = new ClientHander(clientChannel);ByteBuffer buffer = ByteBuffer.allocate(1024);Map<String, Object> info = new HashMap<>();info.put("type", "read");info.put("buffer", buffer);clientChannel.read(buffer, info, hander);}}@Overridepublic void failed(Throwable exc, Object attachment) {//异常处理,略}}private class ClientHander implements CompletionHandler<Integer,Object> {private AsynchronousSocketChannel clientChannel;public ClientHander(AsynchronousSocketChannel clientChannel) {try {System.out.println("客户端["+clientChannel.getLocalAddress()+"]已连接到服务器");} catch (IOException e) {e.printStackTrace();}this.clientChannel=clientChannel;}@Overridepublic void completed(Integer result, Object attachment) {Map<String,Object> info= (Map<String, Object>) attachment;String type= (String) info.get("type");if ("read".equalsIgnoreCase(type)) {try {ByteBuffer buffer = (ByteBuffer) info.get("buffer");String str = new String(buffer.array());if (readyToQuit(str)) {System.out.println("客户端[" + clientChannel.getLocalAddress() + "]已断开连接");shutDown(clientChannel);} else {buffer.flip();info.put("type", "write");clientChannel.write(buffer, info, this);buffer.clear();System.out.println("客户端[" + clientChannel.getLocalAddress() + "]:" + str);}} catch (IOException e) {e.printStackTrace();}}if ("write".equalsIgnoreCase(type)){ByteBuffer buffer= ByteBuffer.allocate(1024);info.put("type","read");info.put("buffer",buffer);clientChannel.read(buffer,info,this);}}@Overridepublic void failed(Throwable exc, Object attachment) {//异常处理,略}}}
Main:
package demo4.aio.server;public class ServerMain {public static void main(String[] args) {Server server=new Server();server.start();}}
Client
主题函数:
package demo4.aio.client;import java.io.BufferedReader;import java.io.Closeable;import java.io.IOException;import java.io.InputStreamReader;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class Client {private String LOCALHOST = "localhost";private static final int DEFAULT_PORT = 9999;private AsynchronousSocketChannel clientChannel;private static final String QUIT="quit";private boolean readyToQuit(String str){return QUIT.equalsIgnoreCase(str);}private void shutDown(Closeable... closeables) {try {for (Closeable shut : closeables) {if (shut != null) {shut.close();}}} catch (IOException e) {e.printStackTrace();}}public void start() {try {//创建channelclientChannel = AsynchronousSocketChannel.open();Future<Void> future = clientChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));future.get();System.out.println("客户端["+LOCALHOST+","+DEFAULT_PORT+"]已连接到服务器");//等待用户输入BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));while (true) {String intput = consoleReader.readLine();byte[] inputBytes = intput.getBytes();ByteBuffer buffer = ByteBuffer.wrap(inputBytes);Future<Integer> writeRes = clientChannel.write(buffer);writeRes.get();if (readyToQuit(intput)){break;}System.out.println("消息发送成功");buffer.flip();Future<Integer> readRes = clientChannel.read(buffer);readRes.get();System.out.print("收到服务器消息:");buffer.clear();String res = new String(buffer.array());res="[ECHO]:" +res;System.out.println(res);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}finally {shutDown(clientChannel);}}}
Main:
package demo4.aio.client;public class ClientMain {public static void main(String[] args) {Client client=new Client();client.start();}}
四、结果展示

