AIO 是彻底的异步通信,NIO 是同步非阻塞通信,AIO是真正的非阻塞模型,但相比NIO并没有明显的性能提高,因此NIO目前依旧是主流
AIO操作流程
解释:
假设有这么一个场景,有一排水壶(客户)在烧水。AIO的做法是,每个水壶上装一个开关,当水开了以后会提醒对应的线程去处理。NIO的做法是,叫一个线程不停的循环观察每一个水壶,根据每个水壶当前的状态去处理。BIO的做法是,叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶
案例演示:
做一个类似回音壁的AIO案例,服务端收到客户端消息后直接将消息再返回给客户端
流程分析:
1、服务器端创建 AsynchronousServerSocketChannel (异步服务器端通道)绑定端口和地址,默认属于AsynchronousChannelGroup 通道组(可以被多个异步通道进行资源共享的群组)
2、服务端创建 服务端CompletionHandler 和 客户端ClientHandler 并使用 accept 方法异步等待客户端连接,当有客户端加入时将会触发 服务端CompletionHandler 的 completed 方法,在该方法内通知服务端接收连接请求
3、客户端创建 AsynchronousSocketChannel(异步客户端通道),绑定端口和地址,建立与服务端的连接后默认也属于AsynchronousChannelGroup
4、客户端输入信息后将数据写入 AsynchronousSocketChannel,服务端拿到客户端发送的数据后传给客户端ClientHandler ,根据额外参数判断并返回给客户端 AsynchronousSocketChannel
5、每一个新加入到服务端的 AsynchronousSocketChannel 实际都会创建一个 Handler 用来处理与其对应的读写IO事件
![C2)I@Z%5}Z(HMEP)@87T7T.png
服务端代码:
package study;import java.io.*;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.HashMap;import java.util.Map;/*** 服务端*/public class Server {final String LOCALHOST = "localhost";final int DEFAULT_PORT = 8888;AsynchronousServerSocketChannel serverChannel;/* 关闭资源流 */private void close(Closeable closable) {if (closable != null) {try {closable.close();System.out.println("关闭" + closable);} catch (IOException e) {e.printStackTrace();}}}public void start() {try {serverChannel = AsynchronousServerSocketChannel.open(); //创建AIO通道,默认使用AsynchronousChannelGroupserverChannel.bind(new InetSocketAddress(LOCALHOST, DEFAULT_PORT)); //绑定监听端口System.out.println("启动服务器,监听端口:" + DEFAULT_PORT);/* accept 判断后立马返回,直到客户端发送消息后 AcceptHandler 方法才会被系统调用返回数据* 通过accept函数返回的Feture对象 或者 有回调函数CompletionHandler作为参数的accept方法与客户端建立连接 ,这里使用自定义的 CompletionHandler 作为参数的方式进行异步* 参数一: 在回调的时候添加额外的数据,非必须,可以理解为邮件发送的附件* 参数二: 回调函数的 CompletionHandler*/while (true) {serverChannel.accept(null, new AcceptHandler()); //异步等待新客户端连接System.in.read();}} catch (IOException e) {e.printStackTrace();} finally {close(serverChannel);}}/* 参数一: IO操作的返回结果,由于是服务端调用,返回的应该是客户端对象,所以是 AsynchronousSocketChannel,* 参数二:accept方法的参数一*/private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {/* 异步调用的函数有数据返回时调用该方法* AsynchronousSocketChannel : 用于服务器和客户端之间收发信息* */@Overridepublic void completed(AsynchronousSocketChannel result, Object attachment) {/* 判断服务端channel是否处于正常开放状态 */if (serverChannel.isOpen()) {serverChannel.accept(null, this); //通知服务端接收客户端连接}AsynchronousSocketChannel clientChannel = result; //获取异步的客户端通道/* 判断客户端Channle是否处于开放状态 */if (clientChannel != null && clientChannel.isOpen()) {ClientHandler handler = new ClientHandler(clientChannel);ByteBuffer buffer = ByteBuffer.allocate(1024);/* 使用额外参数让 ClientHandler 判断是读操作还是写操作 */Map<String, Object> info = new HashMap<>();info.put("type", "read");info.put("buffer", buffer);clientChannel.read(buffer, info, handler); //从客户端通道读取输入信息返回给ClientHandler: 参数一 > 缓存区,参数二 > 添加到客户端Handler回调函数的额外参数,参数三 > 客户端Handler}}/* 异步调用的操作有错误时调用该方法 */@Overridepublic void failed(Throwable exc, Object attachment) {}}/* 客户端Handler,处理 clientChannel 异步调用读&写产生的结果* 参数一: IO操作的函数返回的数据类型,因为是字节数所以是Integer* 参数二:传给handler的额外参数*/private class ClientHandler implements CompletionHandler<Integer, Object>{private AsynchronousSocketChannel clientChannel;public ClientHandler(AsynchronousSocketChannel channel) {this.clientChannel = channel;}/* 异步调用的函数有数据返回时调用该方法 */@Overridepublic void completed(Integer result, Object attachment) {Map<String, Object> info = (Map<String, Object>) attachment; //获取服务端的额外信息String type = (String) info.get("type");/* 接收客户端数据再返回给客户端 */if ("read".equals(type)) {ByteBuffer buffer = (ByteBuffer) info.get("buffer"); //从客户端Buffer读取数据buffer.flip();info.put("type", "write");clientChannel.write(buffer, info, this);buffer.clear();}/* 监听客户端发送的数据 */if ("write".equals(type)) {ByteBuffer buffer = ByteBuffer.allocate(1024); //从客户端Buffer读取数据info.put("type", "read");info.put("buffer", buffer);clientChannel.read(buffer, info, this);}}@Overridepublic void failed(Throwable exc, Object attachment) {// 处理错误}}public static void main(String[] args) {Server server = new Server();server.start();}}
客户端代码:
package study;import java.io.*;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class Client {final String LOCALHOST = "localhost";final int DEFAULT_PORT = 8888;AsynchronousSocketChannel clientChannel;private void close(Closeable closable) {if (closable != null) {try {closable.close();System.out.println("关闭" + closable);} catch (IOException e) {e.printStackTrace();}}}public void start() {try {clientChannel = AsynchronousSocketChannel.open(); //创建channelFuture<Void> future = clientChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT)); //绑定端口future.get(); //进行异步的服务端连接BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in)); //等待用户的输入while (true) {String input = consoleReader.readLine();byte[] inputBytes = input.getBytes();ByteBuffer buffer = ByteBuffer.wrap(inputBytes); //wrap写入数据后会自动flipFuture<Integer> writeResult = clientChannel.write(buffer);writeResult.get(); //有返回值则认为用户数据成功写入到通道并发送给服务器buffer.flip();Future<Integer> readResult = clientChannel.read(buffer);readResult.get(); //有返回值则认为获取到服务端返回的数据并写入到BufferString echo = new String(buffer.array());buffer.clear();System.out.println(echo);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} finally {close(clientChannel);}}public static void main(String[] args) {Client client = new Client();client.start();}}
测试:
启动客户端和服务端,客户端输入信息后服务端返回相同数据
测试结果
AIO聊天室:
服务端代码:
package chatroom;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/* AIO聊天室服务端 */public class ChatServer {private static final String LOCALHOST = "localhost";private static final int DEFAULT_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private AsynchronousChannelGroup channelGroup; //线程组private AsynchronousServerSocketChannel serverChannel; //服务端通道private Charset charset = Charset. forName("UTF-8");private int port;private List<ClientHandler> connectedClients; //在线客户列表public ChatServer(){this(DEFAULT_PORT);}/* 用户自定义监听端口 */public ChatServer(int port){this.port = port;this.connectedClients = new ArrayList<>();}/* 退出操作 */private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}private void start(){ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,8,200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10));try {channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool); //创建一个使用自定义线程池的serverChannel = AsynchronousServerSocketChannel.open(channelGroup); //使用自定义的channelGroupserverChannel.bind(new InetSocketAddress(LOCALHOST,port)); //绑定监听地址和端口System.out.println("启动服务器,监听端口: "+port);while (true){serverChannel.accept(null, new AcceptHandler()); //等待客户端连接System.in.read();}} catch (IOException e) {e.printStackTrace();}finally{close(serverChannel);}}public static void main(String[] args) {ChatServer chatServer = new ChatServer(6767);chatServer.start();}/* 服务端 Handler */private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{@Overridepublic void completed(AsynchronousSocketChannel clientChannel, Object attachment) {/* 判断服务端channel是否处于正常开放状态 */if (serverChannel.isOpen()) {serverChannel.accept(null, this); //通知服务端接收客户端连接}/* 判断客户端Channle是否处于开放状态 */if (clientChannel != null && clientChannel.isOpen()) {ClientHandler handler = new ClientHandler(clientChannel);ByteBuffer buffer = ByteBuffer.allocate(BUFFER);addClient(handler); //将新用户加入到在线用户列表clientChannel.read(buffer, buffer, handler); //从客户端通道读取输入信息返回给ClientHandler: 参数一 > 缓存区,参数二 > 添加到客户端Handler回调函数的额外参数,参数三 > 客户端Handler}}@Overridepublic void failed(Throwable exc, Object attachment) {System.out.println("连接失败: "+exc);}}/* 将新用户加入到在线用户列表 */private synchronized void addClient(ClientHandler handler) {connectedClients.add(handler);System.out.println(getClientName(handler.clientChannel)+"已连接到服务器");}/* 用户下线,从在线用户列表移除 */private synchronized void removeClient(ClientHandler handler) {connectedClients.remove(handler);System.out.println(getClientName(handler.clientChannel)+"已断开连接");close(handler.clientChannel);}/*转换为字符串*/private String receive(ByteBuffer buffer){CharBuffer decode = charset.decode(buffer);return String.valueOf(decode);}/* 客户端Handler,处理 clientChannel 异步调用读&写产生的结果* 参数一: IO操作的数据类型 > 字节数* 参数二: 传给handler的额外参数*/private class ClientHandler implements CompletionHandler<Integer,Object>{private AsynchronousSocketChannel clientChannel;public ClientHandler(AsynchronousSocketChannel channel){this.clientChannel = channel;}@Overridepublic void completed(Integer result, Object attachment) {ByteBuffer buffer = (ByteBuffer) attachment; //read函数调用才有的对象,write是没有额外对象的if(null!=buffer){if(result<=0){removeClient(this);//客户端异常,将客户移除出在线客户列表}else{buffer.flip();String fwdMsg = receive(buffer);System.out.println(getClientName(clientChannel)+":"+fwdMsg);forwardMessage(clientChannel,fwdMsg); //转发信息buffer.clear();/** 检查用户是否退出 */if(QUIT.equals(fwdMsg)){removeClient(this);}else{clientChannel.read(buffer,buffer,this); //继续接收用户后续消息}}}}@Overridepublic void failed(Throwable exc, Object attachment) {System.out.println("读写失败: "+exc);}}/* 将消息转发给在线用户列表中的其他用户 */private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {for (ClientHandler handler : connectedClients) {if(!clientChannel.equals(handler.clientChannel)){try {ByteBuffer buffer = charset.encode(getClientName(handler.clientChannel) + ":" + fwdMsg);handler.clientChannel.write(buffer,null,handler);} catch (Exception e) {System.out.println(handler.clientChannel+"消息转发错误!");e.printStackTrace();}}}}/* 获取对应客户端对象,返回客户端ID */private String getClientName(AsynchronousSocketChannel clientChannel) {int clientPort = -1;try {InetSocketAddress address = (InetSocketAddress) clientChannel.getRemoteAddress();clientPort = address.getPort(); //获取客户端端口} catch (IOException e) {e.printStackTrace();}return "客户端["+clientPort+"]";}}
客户端代码:
package chatroom;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;/* AIO聊天室客户端 */public class ChatClient {private static final String LOCALHOST = "localhost";private static final int DEFAULT_PORT = 8888;private static final int BUFFER = 1024;private String host; //用户定义地址private int port; //用户定义端口private AsynchronousSocketChannel clientChannel;private Charset chaset = Charset.forName("UTF-8");public ChatClient(){this(LOCALHOST,DEFAULT_PORT);}public ChatClient(String host,int port){this.port = port;this.host = host;}/* 退出操作 */private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}private void start(){try {clientChannel = AsynchronousSocketChannel.open(); //创建ChannelFuture<Void> future = clientChannel.connect(new InetSocketAddress(host,port)); //绑定端口future.get(); //进行异步的服务端连接/* 启用新线程处理用户输入 */new Thread(new UserInputHander(this)).start();ByteBuffer buffer = ByteBuffer.allocate(BUFFER); //创建缓存while(true){Future<Integer> readResult = clientChannel.read(buffer);int result = readResult.get(); //等待返回读取结果if(result<=0){System.out.println("已断开与服务器的连接");close(clientChannel);System.exit(1); //中止当前虚拟机的运行进行强制性的程序退出}else{buffer.flip();String msg = String.valueOf(chaset.decode(buffer));buffer.clear();System.out.println("服务器返回数据: "+msg);}}} catch (Exception e) {e.printStackTrace();}}public void send(String msg){/* 误输入空格 */if(msg.isEmpty()){return;}/* 有意义的数据 */ByteBuffer buffer = chaset.encode(msg);Future<Integer> writeResult = clientChannel.write(buffer);try {writeResult.get();} catch (Exception e) {System.out.println("发送消息失败!");e.printStackTrace();}}public static void main(String[] args) {ChatClient chatClient = new ChatClient(LOCALHOST,6767);chatClient.start();}}
客户端消息发送方法:
package chatroom;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;/* 客户端消息发送 */public class UserInputHander implements Runnable {private ChatClient chatClient;public UserInputHander(ChatClient chatClient){this.chatClient = chatClient;}@Overridepublic void run() {try {BufferedReader consoleRerader = new BufferedReader(new InputStreamReader(System.in));while (true){String input = consoleRerader.readLine();//向服务器发送消息chatClient.send(input);//检查用户是否准备退出if("quit".equals(input)){break;}}} catch (IOException e) {e.printStackTrace();}}}
测试:
启动 ChatServer 和多个 ChatClient 实例进行测试,在客户端窗口中发送消息进行测试
