一、AIO的异步操作

AsyncSocketChannel和AsyncServerSocketChannel都是支持如图几个IO操作异步调用的。
image.png

二、如何实现异步调用

Future

简单来说通过Channel来调用那几个函数,然后返回一个Future的对象,Future在线程池中介绍过,他是一个描述未来的一个对象,通过get、isDone等等的调用查询未来的任务。
image.png

CompletionHander

通过Channel调用IO操作,然后不等待是否完成,然后把一些参数传入一个回调函数(Handler)进一步操作(实现)。
image.png
Handler拥有两种函数,一个是Completed,是在完成后调用,另一种是Failed,是在失败了后调用。

三、EchoDemo实现异步操作机制

Server:

主体函数:

  1. package demo4.aio.server;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.AsynchronousServerSocketChannel;
  7. import java.nio.channels.AsynchronousSocketChannel;
  8. import java.nio.channels.CompletionHandler;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. public class Server {
  12. private String LOCALHOST="localhost";
  13. private static final int DEFAULT_PORT=9999;
  14. private AsynchronousServerSocketChannel serverSocketChannel;
  15. private static final String QUIT="quit";
  16. private boolean readyToQuit(String str){
  17. return QUIT.equalsIgnoreCase(str);
  18. }
  19. private void shutDown(Closeable...closeables){
  20. try {
  21. for (Closeable shut : closeables) {
  22. if (shut != null) {
  23. shut.close();
  24. }
  25. }
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. public void start(){
  31. try {
  32. //绑定监听端口
  33. serverSocketChannel=AsynchronousServerSocketChannel.open();
  34. serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,DEFAULT_PORT));
  35. System.out.println("服务器已启动,正在监听:["+LOCALHOST+","+DEFAULT_PORT+"]");
  36. while (true) {
  37. //AcceptHandler在AsynchronousChannelGroup线程池中完成,而非主线程。
  38. //AsynchronousChannelGroup未定义时使用默认的AsynchronousChannelGroup
  39. serverSocketChannel.accept(null, new AcceptHandler());
  40. //防止频繁调用accept函数。
  41. System.in.read();
  42. }
  43. } catch (IOException e) {
  44. e.printStackTrace();
  45. }finally {
  46. shutDown(serverSocketChannel);
  47. }
  48. }
  49. private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object> {
  50. @Override
  51. public void completed(AsynchronousSocketChannel result, Object attachment) {
  52. //并不会造成stepOverFlow在底层实现拥有次数限制
  53. if (serverSocketChannel.isOpen()){
  54. serverSocketChannel.accept(null,this);
  55. }
  56. AsynchronousSocketChannel clientChannel=result;
  57. if (clientChannel!=null&&clientChannel.isOpen()) {
  58. ClientHander hander = new ClientHander(clientChannel);
  59. ByteBuffer buffer = ByteBuffer.allocate(1024);
  60. Map<String, Object> info = new HashMap<>();
  61. info.put("type", "read");
  62. info.put("buffer", buffer);
  63. clientChannel.read(buffer, info, hander);
  64. }
  65. }
  66. @Override
  67. public void failed(Throwable exc, Object attachment) {
  68. //异常处理,略
  69. }
  70. }
  71. private class ClientHander implements CompletionHandler<Integer,Object> {
  72. private AsynchronousSocketChannel clientChannel;
  73. public ClientHander(AsynchronousSocketChannel clientChannel) {
  74. try {
  75. System.out.println("客户端["+clientChannel.getLocalAddress()+"]已连接到服务器");
  76. } catch (IOException e) {
  77. e.printStackTrace();
  78. }
  79. this.clientChannel=clientChannel;
  80. }
  81. @Override
  82. public void completed(Integer result, Object attachment) {
  83. Map<String,Object> info= (Map<String, Object>) attachment;
  84. String type= (String) info.get("type");
  85. if ("read".equalsIgnoreCase(type)) {
  86. try {
  87. ByteBuffer buffer = (ByteBuffer) info.get("buffer");
  88. String str = new String(buffer.array());
  89. if (readyToQuit(str)) {
  90. System.out.println("客户端[" + clientChannel.getLocalAddress() + "]已断开连接");
  91. shutDown(clientChannel);
  92. } else {
  93. buffer.flip();
  94. info.put("type", "write");
  95. clientChannel.write(buffer, info, this);
  96. buffer.clear();
  97. System.out.println("客户端[" + clientChannel.getLocalAddress() + "]:" + str);
  98. }
  99. } catch (IOException e) {
  100. e.printStackTrace();
  101. }
  102. }
  103. if ("write".equalsIgnoreCase(type)){
  104. ByteBuffer buffer= ByteBuffer.allocate(1024);
  105. info.put("type","read");
  106. info.put("buffer",buffer);
  107. clientChannel.read(buffer,info,this);
  108. }
  109. }
  110. @Override
  111. public void failed(Throwable exc, Object attachment) {
  112. //异常处理,略
  113. }
  114. }
  115. }

Main:

  1. package demo4.aio.server;
  2. public class ServerMain {
  3. public static void main(String[] args) {
  4. Server server=new Server();
  5. server.start();
  6. }
  7. }

Client

主题函数:

  1. package demo4.aio.client;
  2. import java.io.BufferedReader;
  3. import java.io.Closeable;
  4. import java.io.IOException;
  5. import java.io.InputStreamReader;
  6. import java.net.InetSocketAddress;
  7. import java.nio.ByteBuffer;
  8. import java.nio.channels.AsynchronousSocketChannel;
  9. import java.util.concurrent.ExecutionException;
  10. import java.util.concurrent.Future;
  11. public class Client {
  12. private String LOCALHOST = "localhost";
  13. private static final int DEFAULT_PORT = 9999;
  14. private AsynchronousSocketChannel clientChannel;
  15. private static final String QUIT="quit";
  16. private boolean readyToQuit(String str){
  17. return QUIT.equalsIgnoreCase(str);
  18. }
  19. private void shutDown(Closeable... closeables) {
  20. try {
  21. for (Closeable shut : closeables) {
  22. if (shut != null) {
  23. shut.close();
  24. }
  25. }
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. public void start() {
  31. try {
  32. //创建channel
  33. clientChannel = AsynchronousSocketChannel.open();
  34. Future<Void> future = clientChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
  35. future.get();
  36. System.out.println("客户端["+LOCALHOST+","+DEFAULT_PORT+"]已连接到服务器");
  37. //等待用户输入
  38. BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
  39. while (true) {
  40. String intput = consoleReader.readLine();
  41. byte[] inputBytes = intput.getBytes();
  42. ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
  43. Future<Integer> writeRes = clientChannel.write(buffer);
  44. writeRes.get();
  45. if (readyToQuit(intput)){
  46. break;
  47. }
  48. System.out.println("消息发送成功");
  49. buffer.flip();
  50. Future<Integer> readRes = clientChannel.read(buffer);
  51. readRes.get();
  52. System.out.print("收到服务器消息:");
  53. buffer.clear();
  54. String res = new String(buffer.array());
  55. res="[ECHO]:" +res;
  56. System.out.println(res);
  57. }
  58. } catch (IOException e) {
  59. e.printStackTrace();
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. } catch (ExecutionException e) {
  63. e.printStackTrace();
  64. }finally {
  65. shutDown(clientChannel);
  66. }
  67. }
  68. }

Main:

  1. package demo4.aio.client;
  2. public class ClientMain {
  3. public static void main(String[] args) {
  4. Client client=new Client();
  5. client.start();
  6. }
  7. }

四、结果展示

image.png