AIO 是彻底的异步通信,NIO 是同步非阻塞通信,AIO是真正的非阻塞模型,但相比NIO并没有明显的性能提高,因此NIO目前依旧是主流
image.png
AIO操作流程

解释:

假设有这么一个场景,有一排水壶(客户)在烧水。AIO的做法是,每个水壶上装一个开关,当水开了以后会提醒对应的线程去处理。NIO的做法是,叫一个线程不停的循环观察每一个水壶,根据每个水壶当前的状态去处理。BIO的做法是,叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶


案例演示:

做一个类似回音壁的AIO案例,服务端收到客户端消息后直接将消息再返回给客户端

流程分析:

1、服务器端创建 AsynchronousServerSocketChannel (异步服务器端通道)绑定端口和地址,默认属于AsynchronousChannelGroup 通道组(可以被多个异步通道进行资源共享的群组)
2、服务端创建 服务端CompletionHandler 和 客户端ClientHandler 并使用 accept 方法异步等待客户端连接,当有客户端加入时将会触发 服务端CompletionHandlercompleted 方法,在该方法内通知服务端接收连接请求
3、客户端创建 AsynchronousSocketChannel(异步客户端通道),绑定端口和地址,建立与服务端的连接后默认也属于AsynchronousChannelGroup
4、客户端输入信息后将数据写入 AsynchronousSocketChannel,服务端拿到客户端发送的数据后传给客户端ClientHandler ,根据额外参数判断并返回给客户端 AsynchronousSocketChannel
5、每一个新加入到服务端的 AsynchronousSocketChannel 实际都会创建一个 Handler 用来处理与其对应的读写IO事件
![C2)I@Z%5}Z(HMEP)@87T7T.png

服务端代码:

  1. package study;
  2. import java.io.*;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousServerSocketChannel;
  6. import java.nio.channels.AsynchronousSocketChannel;
  7. import java.nio.channels.CompletionHandler;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * 服务端
  12. */
  13. public class Server {
  14. final String LOCALHOST = "localhost";
  15. final int DEFAULT_PORT = 8888;
  16. AsynchronousServerSocketChannel serverChannel;
  17. /* 关闭资源流 */
  18. private void close(Closeable closable) {
  19. if (closable != null) {
  20. try {
  21. closable.close();
  22. System.out.println("关闭" + closable);
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. public void start() {
  29. try {
  30. serverChannel = AsynchronousServerSocketChannel.open(); //创建AIO通道,默认使用AsynchronousChannelGroup
  31. serverChannel.bind(new InetSocketAddress(LOCALHOST, DEFAULT_PORT)); //绑定监听端口
  32. System.out.println("启动服务器,监听端口:" + DEFAULT_PORT);
  33. /* accept 判断后立马返回,直到客户端发送消息后 AcceptHandler 方法才会被系统调用返回数据
  34. * 通过accept函数返回的Feture对象 或者 有回调函数CompletionHandler作为参数的accept方法与客户端建立连接 ,这里使用自定义的 CompletionHandler 作为参数的方式进行异步
  35. * 参数一: 在回调的时候添加额外的数据,非必须,可以理解为邮件发送的附件
  36. * 参数二: 回调函数的 CompletionHandler
  37. */
  38. while (true) {
  39. serverChannel.accept(null, new AcceptHandler()); //异步等待新客户端连接
  40. System.in.read();
  41. }
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. } finally {
  45. close(serverChannel);
  46. }
  47. }
  48. /* 参数一: IO操作的返回结果,由于是服务端调用,返回的应该是客户端对象,所以是 AsynchronousSocketChannel,
  49. * 参数二:accept方法的参数一
  50. */
  51. private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
  52. /* 异步调用的函数有数据返回时调用该方法
  53. * AsynchronousSocketChannel : 用于服务器和客户端之间收发信息
  54. * */
  55. @Override
  56. public void completed(AsynchronousSocketChannel result, Object attachment) {
  57. /* 判断服务端channel是否处于正常开放状态 */
  58. if (serverChannel.isOpen()) {
  59. serverChannel.accept(null, this); //通知服务端接收客户端连接
  60. }
  61. AsynchronousSocketChannel clientChannel = result; //获取异步的客户端通道
  62. /* 判断客户端Channle是否处于开放状态 */
  63. if (clientChannel != null && clientChannel.isOpen()) {
  64. ClientHandler handler = new ClientHandler(clientChannel);
  65. ByteBuffer buffer = ByteBuffer.allocate(1024);
  66. /* 使用额外参数让 ClientHandler 判断是读操作还是写操作 */
  67. Map<String, Object> info = new HashMap<>();
  68. info.put("type", "read");
  69. info.put("buffer", buffer);
  70. clientChannel.read(buffer, info, handler); //从客户端通道读取输入信息返回给ClientHandler: 参数一 > 缓存区,参数二 > 添加到客户端Handler回调函数的额外参数,参数三 > 客户端Handler
  71. }
  72. }
  73. /* 异步调用的操作有错误时调用该方法 */
  74. @Override
  75. public void failed(Throwable exc, Object attachment) {
  76. }
  77. }
  78. /* 客户端Handler,处理 clientChannel 异步调用读&写产生的结果
  79. * 参数一: IO操作的函数返回的数据类型,因为是字节数所以是Integer
  80. * 参数二:传给handler的额外参数
  81. */
  82. private class ClientHandler implements CompletionHandler<Integer, Object>{
  83. private AsynchronousSocketChannel clientChannel;
  84. public ClientHandler(AsynchronousSocketChannel channel) {
  85. this.clientChannel = channel;
  86. }
  87. /* 异步调用的函数有数据返回时调用该方法 */
  88. @Override
  89. public void completed(Integer result, Object attachment) {
  90. Map<String, Object> info = (Map<String, Object>) attachment; //获取服务端的额外信息
  91. String type = (String) info.get("type");
  92. /* 接收客户端数据再返回给客户端 */
  93. if ("read".equals(type)) {
  94. ByteBuffer buffer = (ByteBuffer) info.get("buffer"); //从客户端Buffer读取数据
  95. buffer.flip();
  96. info.put("type", "write");
  97. clientChannel.write(buffer, info, this);
  98. buffer.clear();
  99. }
  100. /* 监听客户端发送的数据 */
  101. if ("write".equals(type)) {
  102. ByteBuffer buffer = ByteBuffer.allocate(1024); //从客户端Buffer读取数据
  103. info.put("type", "read");
  104. info.put("buffer", buffer);
  105. clientChannel.read(buffer, info, this);
  106. }
  107. }
  108. @Override
  109. public void failed(Throwable exc, Object attachment) {
  110. // 处理错误
  111. }
  112. }
  113. public static void main(String[] args) {
  114. Server server = new Server();
  115. server.start();
  116. }
  117. }

客户端代码:

  1. package study;
  2. import java.io.*;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousSocketChannel;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.concurrent.Future;
  8. public class Client {
  9. final String LOCALHOST = "localhost";
  10. final int DEFAULT_PORT = 8888;
  11. AsynchronousSocketChannel clientChannel;
  12. private void close(Closeable closable) {
  13. if (closable != null) {
  14. try {
  15. closable.close();
  16. System.out.println("关闭" + closable);
  17. } catch (IOException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. public void start() {
  23. try {
  24. clientChannel = AsynchronousSocketChannel.open(); //创建channel
  25. Future<Void> future = clientChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT)); //绑定端口
  26. future.get(); //进行异步的服务端连接
  27. BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in)); //等待用户的输入
  28. while (true) {
  29. String input = consoleReader.readLine();
  30. byte[] inputBytes = input.getBytes();
  31. ByteBuffer buffer = ByteBuffer.wrap(inputBytes); //wrap写入数据后会自动flip
  32. Future<Integer> writeResult = clientChannel.write(buffer);
  33. writeResult.get(); //有返回值则认为用户数据成功写入到通道并发送给服务器
  34. buffer.flip();
  35. Future<Integer> readResult = clientChannel.read(buffer);
  36. readResult.get(); //有返回值则认为获取到服务端返回的数据并写入到Buffer
  37. String echo = new String(buffer.array());
  38. buffer.clear();
  39. System.out.println(echo);
  40. }
  41. } catch (IOException e) {
  42. e.printStackTrace();
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. } catch (ExecutionException e) {
  46. e.printStackTrace();
  47. } finally {
  48. close(clientChannel);
  49. }
  50. }
  51. public static void main(String[] args) {
  52. Client client = new Client();
  53. client.start();
  54. }
  55. }

测试:

启动客户端和服务端,客户端输入信息后服务端返回相同数据
image.png
测试结果


AIO聊天室:

服务端代码:

  1. package chatroom;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.CharBuffer;
  7. import java.nio.channels.AsynchronousChannelGroup;
  8. import java.nio.channels.AsynchronousServerSocketChannel;
  9. import java.nio.channels.AsynchronousSocketChannel;
  10. import java.nio.channels.CompletionHandler;
  11. import java.nio.charset.Charset;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.concurrent.ArrayBlockingQueue;
  15. import java.util.concurrent.ThreadPoolExecutor;
  16. import java.util.concurrent.TimeUnit;
  17. /* AIO聊天室服务端 */
  18. public class ChatServer {
  19. private static final String LOCALHOST = "localhost";
  20. private static final int DEFAULT_PORT = 8888;
  21. private static final String QUIT = "quit";
  22. private static final int BUFFER = 1024;
  23. private AsynchronousChannelGroup channelGroup; //线程组
  24. private AsynchronousServerSocketChannel serverChannel; //服务端通道
  25. private Charset charset = Charset. forName("UTF-8");
  26. private int port;
  27. private List<ClientHandler> connectedClients; //在线客户列表
  28. public ChatServer(){
  29. this(DEFAULT_PORT);
  30. }
  31. /* 用户自定义监听端口 */
  32. public ChatServer(int port){
  33. this.port = port;
  34. this.connectedClients = new ArrayList<>();
  35. }
  36. /* 退出操作 */
  37. private void close(Closeable closable) {
  38. if (closable != null) {
  39. try {
  40. closable.close();
  41. } catch (IOException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }
  46. private void start(){
  47. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,8,
  48. 200, TimeUnit.MILLISECONDS,
  49. new ArrayBlockingQueue<>(10));
  50. try {
  51. channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool); //创建一个使用自定义线程池的
  52. serverChannel = AsynchronousServerSocketChannel.open(channelGroup); //使用自定义的channelGroup
  53. serverChannel.bind(new InetSocketAddress(LOCALHOST,port)); //绑定监听地址和端口
  54. System.out.println("启动服务器,监听端口: "+port);
  55. while (true){
  56. serverChannel.accept(null, new AcceptHandler()); //等待客户端连接
  57. System.in.read();
  58. }
  59. } catch (IOException e) {
  60. e.printStackTrace();
  61. }finally{
  62. close(serverChannel);
  63. }
  64. }
  65. public static void main(String[] args) {
  66. ChatServer chatServer = new ChatServer(6767);
  67. chatServer.start();
  68. }
  69. /* 服务端 Handler */
  70. private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
  71. @Override
  72. public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
  73. /* 判断服务端channel是否处于正常开放状态 */
  74. if (serverChannel.isOpen()) {
  75. serverChannel.accept(null, this); //通知服务端接收客户端连接
  76. }
  77. /* 判断客户端Channle是否处于开放状态 */
  78. if (clientChannel != null && clientChannel.isOpen()) {
  79. ClientHandler handler = new ClientHandler(clientChannel);
  80. ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
  81. addClient(handler); //将新用户加入到在线用户列表
  82. clientChannel.read(buffer, buffer, handler); //从客户端通道读取输入信息返回给ClientHandler: 参数一 > 缓存区,参数二 > 添加到客户端Handler回调函数的额外参数,参数三 > 客户端Handler
  83. }
  84. }
  85. @Override
  86. public void failed(Throwable exc, Object attachment) {
  87. System.out.println("连接失败: "+exc);
  88. }
  89. }
  90. /* 将新用户加入到在线用户列表 */
  91. private synchronized void addClient(ClientHandler handler) {
  92. connectedClients.add(handler);
  93. System.out.println(getClientName(handler.clientChannel)+"已连接到服务器");
  94. }
  95. /* 用户下线,从在线用户列表移除 */
  96. private synchronized void removeClient(ClientHandler handler) {
  97. connectedClients.remove(handler);
  98. System.out.println(getClientName(handler.clientChannel)+"已断开连接");
  99. close(handler.clientChannel);
  100. }
  101. /*转换为字符串*/
  102. private String receive(ByteBuffer buffer){
  103. CharBuffer decode = charset.decode(buffer);
  104. return String.valueOf(decode);
  105. }
  106. /* 客户端Handler,处理 clientChannel 异步调用读&写产生的结果
  107. * 参数一: IO操作的数据类型 > 字节数
  108. * 参数二: 传给handler的额外参数
  109. */
  110. private class ClientHandler implements CompletionHandler<Integer,Object>{
  111. private AsynchronousSocketChannel clientChannel;
  112. public ClientHandler(AsynchronousSocketChannel channel){
  113. this.clientChannel = channel;
  114. }
  115. @Override
  116. public void completed(Integer result, Object attachment) {
  117. ByteBuffer buffer = (ByteBuffer) attachment; //read函数调用才有的对象,write是没有额外对象的
  118. if(null!=buffer){
  119. if(result<=0){
  120. removeClient(this);//客户端异常,将客户移除出在线客户列表
  121. }else{
  122. buffer.flip();
  123. String fwdMsg = receive(buffer);
  124. System.out.println(getClientName(clientChannel)+":"+fwdMsg);
  125. forwardMessage(clientChannel,fwdMsg); //转发信息
  126. buffer.clear();
  127. /** 检查用户是否退出 */
  128. if(QUIT.equals(fwdMsg)){
  129. removeClient(this);
  130. }else{
  131. clientChannel.read(buffer,buffer,this); //继续接收用户后续消息
  132. }
  133. }
  134. }
  135. }
  136. @Override
  137. public void failed(Throwable exc, Object attachment) {
  138. System.out.println("读写失败: "+exc);
  139. }
  140. }
  141. /* 将消息转发给在线用户列表中的其他用户 */
  142. private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
  143. for (ClientHandler handler : connectedClients) {
  144. if(!clientChannel.equals(handler.clientChannel)){
  145. try {
  146. ByteBuffer buffer = charset.encode(getClientName(handler.clientChannel) + ":" + fwdMsg);
  147. handler.clientChannel.write(buffer,null,handler);
  148. } catch (Exception e) {
  149. System.out.println(handler.clientChannel+"消息转发错误!");
  150. e.printStackTrace();
  151. }
  152. }
  153. }
  154. }
  155. /* 获取对应客户端对象,返回客户端ID */
  156. private String getClientName(AsynchronousSocketChannel clientChannel) {
  157. int clientPort = -1;
  158. try {
  159. InetSocketAddress address = (InetSocketAddress) clientChannel.getRemoteAddress();
  160. clientPort = address.getPort(); //获取客户端端口
  161. } catch (IOException e) {
  162. e.printStackTrace();
  163. }
  164. return "客户端["+clientPort+"]";
  165. }
  166. }

客户端代码:

  1. package chatroom;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.AsynchronousSocketChannel;
  7. import java.nio.charset.Charset;
  8. import java.util.concurrent.ExecutionException;
  9. import java.util.concurrent.Future;
  10. /* AIO聊天室客户端 */
  11. public class ChatClient {
  12. private static final String LOCALHOST = "localhost";
  13. private static final int DEFAULT_PORT = 8888;
  14. private static final int BUFFER = 1024;
  15. private String host; //用户定义地址
  16. private int port; //用户定义端口
  17. private AsynchronousSocketChannel clientChannel;
  18. private Charset chaset = Charset.forName("UTF-8");
  19. public ChatClient(){
  20. this(LOCALHOST,DEFAULT_PORT);
  21. }
  22. public ChatClient(String host,int port){
  23. this.port = port;
  24. this.host = host;
  25. }
  26. /* 退出操作 */
  27. private void close(Closeable closable) {
  28. if (closable != null) {
  29. try {
  30. closable.close();
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. private void start(){
  37. try {
  38. clientChannel = AsynchronousSocketChannel.open(); //创建Channel
  39. Future<Void> future = clientChannel.connect(new InetSocketAddress(host,port)); //绑定端口
  40. future.get(); //进行异步的服务端连接
  41. /* 启用新线程处理用户输入 */
  42. new Thread(new UserInputHander(this)).start();
  43. ByteBuffer buffer = ByteBuffer.allocate(BUFFER); //创建缓存
  44. while(true){
  45. Future<Integer> readResult = clientChannel.read(buffer);
  46. int result = readResult.get(); //等待返回读取结果
  47. if(result<=0){
  48. System.out.println("已断开与服务器的连接");
  49. close(clientChannel);
  50. System.exit(1); //中止当前虚拟机的运行进行强制性的程序退出
  51. }else{
  52. buffer.flip();
  53. String msg = String.valueOf(chaset.decode(buffer));
  54. buffer.clear();
  55. System.out.println("服务器返回数据: "+msg);
  56. }
  57. }
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. public void send(String msg){
  63. /* 误输入空格 */
  64. if(msg.isEmpty()){
  65. return;
  66. }
  67. /* 有意义的数据 */
  68. ByteBuffer buffer = chaset.encode(msg);
  69. Future<Integer> writeResult = clientChannel.write(buffer);
  70. try {
  71. writeResult.get();
  72. } catch (Exception e) {
  73. System.out.println("发送消息失败!");
  74. e.printStackTrace();
  75. }
  76. }
  77. public static void main(String[] args) {
  78. ChatClient chatClient = new ChatClient(LOCALHOST,6767);
  79. chatClient.start();
  80. }
  81. }

客户端消息发送方法:

  1. package chatroom;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. /* 客户端消息发送 */
  6. public class UserInputHander implements Runnable {
  7. private ChatClient chatClient;
  8. public UserInputHander(ChatClient chatClient){
  9. this.chatClient = chatClient;
  10. }
  11. @Override
  12. public void run() {
  13. try {
  14. BufferedReader consoleRerader = new BufferedReader(new InputStreamReader(System.in));
  15. while (true){
  16. String input = consoleRerader.readLine();
  17. //向服务器发送消息
  18. chatClient.send(input);
  19. //检查用户是否准备退出
  20. if("quit".equals(input)){
  21. break;
  22. }
  23. }
  24. } catch (IOException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }

测试:

启动 ChatServer 和多个 ChatClient 实例进行测试,在客户端窗口中发送消息进行测试
image.png