一、AIO编程模型梳理

回忆上一篇文章的AIO模型,这里做一些大概梳理。
首先在服务器端创建一部服务器通道,绑定监听端口
这里使用的AsynchronousServerSocketChannel 他其实属于一个通道群(AsynchronousChannelGroups),而这个通道群代表着一组可以被多个一部通道共享的资源群组。通过这个通道群来调用handler 。
当我们不做额外设定时候,系统会使用默认的通道群。
异步如何实现?
创建一个handler ,然后通过行为调用handler 来处理行为。
具体物理逻辑见上一篇文章

二、服务器创建

chatServer:

  1. package demo5.aio.server;
  2. import java.io.Closeable;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.AsynchronousChannelGroup;
  7. import java.nio.channels.AsynchronousServerSocketChannel;
  8. import java.nio.channels.AsynchronousSocketChannel;
  9. import java.nio.channels.CompletionHandler;
  10. import java.nio.charset.Charset;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.Executors;
  16. public class ChatServer {
  17. private static final String LOCALHOST = "localhost";
  18. private static final int DEFAULT_PORT = 9999;
  19. private static final String QUIT = "quit";
  20. private static final int BUFFER = 1024;
  21. private static final int THREADPOOL_SIZE = 8;
  22. private AsynchronousServerSocketChannel serverSocketChannel;
  23. private AsynchronousChannelGroup channelGroup;
  24. private Charset charset = StandardCharsets.UTF_8;
  25. private List<ClientHandler> connectedClients;
  26. private int port;
  27. private String host;
  28. private void shutdown(Closeable... closeables) {
  29. try {
  30. for (Closeable shut : closeables) {
  31. if (shut != null) {
  32. shut.close();
  33. }
  34. }
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. private boolean readyToQuit(String str) {
  40. return QUIT.equalsIgnoreCase(str);
  41. }
  42. public ChatServer(int port, String host) {
  43. this.host = host;
  44. this.port = port;
  45. connectedClients = new ArrayList<>();
  46. }
  47. public ChatServer() {
  48. this(DEFAULT_PORT, LOCALHOST);
  49. }
  50. public void start() {
  51. try {
  52. ExecutorService pool = Executors.newFixedThreadPool(THREADPOOL_SIZE);
  53. channelGroup = AsynchronousChannelGroup.withThreadPool(pool);
  54. serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
  55. serverSocketChannel.bind(new InetSocketAddress(host, port));
  56. System.out.println("服务器已启动,监听端口[" + serverSocketChannel.getLocalAddress() + "]");
  57. while (true) {
  58. serverSocketChannel.accept(null, new AcceptHandler());
  59. System.in.read();
  60. }
  61. } catch (IOException e) {
  62. e.printStackTrace();
  63. } finally {
  64. shutdown(serverSocketChannel);
  65. }
  66. }
  67. private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
  68. @Override
  69. public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
  70. if (serverSocketChannel.isOpen()) {
  71. serverSocketChannel.accept(null, this);
  72. }
  73. if (clientChannel != null && clientChannel.isOpen()) {
  74. ClientHandler handler = new ClientHandler(clientChannel);
  75. ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
  76. //将用户添加到在线列表
  77. addClient(handler);
  78. clientChannel.read(buffer, buffer, handler);
  79. }
  80. }
  81. @Override
  82. public void failed(Throwable exc, Object attachment) {
  83. System.out.println("用户连接失败" + exc);
  84. }
  85. }
  86. private synchronized void addClient(ClientHandler handler) {
  87. connectedClients.add(handler);
  88. System.out.println(getClientName(handler.clientChannel) + "已成功连接到服务器");
  89. }
  90. private synchronized void removeClient(ClientHandler clientHandler) {
  91. connectedClients.remove(clientHandler);
  92. System.err.println(getClientName(clientHandler.clientChannel) + "已断开服务器连接");
  93. shutdown(clientHandler.clientChannel);
  94. }
  95. private String getClientName(AsynchronousSocketChannel clientChannel) {
  96. try {
  97. InetSocketAddress address= (InetSocketAddress) clientChannel.getRemoteAddress();
  98. String str="客户端["+address.getPort()+"]:";
  99. return str;
  100. } catch (IOException e) {
  101. e.printStackTrace();
  102. }
  103. return "获取客户端失败";
  104. }
  105. private class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {
  106. private AsynchronousSocketChannel clientChannel;
  107. public ClientHandler(AsynchronousSocketChannel clientChannel) {
  108. this.clientChannel = clientChannel;
  109. }
  110. @Override
  111. public void completed(Integer result, ByteBuffer attachment) {
  112. if (attachment != null) {
  113. if (result <= 0) {
  114. //客户端异常
  115. //移除在线列表
  116. removeClient(this);
  117. } else {
  118. attachment.flip();
  119. String msg = receive(attachment);
  120. System.out.println(getClientName(clientChannel) + msg);
  121. forwardMessage(clientChannel, msg);
  122. attachment.clear();
  123. //检查用户是否决定退出
  124. if (readyToQuit(msg)) {
  125. removeClient(this);
  126. } else {
  127. clientChannel.read(attachment, attachment, this);
  128. }
  129. }
  130. }
  131. }
  132. @Override
  133. public void failed(Throwable exc, ByteBuffer attachment) {
  134. System.out.println("用户读写失败" + exc);
  135. }
  136. }
  137. private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String msg) {
  138. for (ClientHandler handler : connectedClients) {
  139. try {
  140. if (!clientChannel.equals(handler.clientChannel)) {
  141. String str = getClientName(handler.clientChannel);
  142. ByteBuffer buffer = null;
  143. if (msg == null) {
  144. return;
  145. }
  146. if (readyToQuit(msg)) {
  147. buffer = charset.encode(str + "已断开连接");
  148. handler.clientChannel.write(buffer, null, handler);
  149. } else {
  150. buffer = charset.encode(str + msg);
  151. handler.clientChannel.write(buffer, null, handler);
  152. }
  153. }
  154. } catch (Exception e) {
  155. e.printStackTrace();
  156. }
  157. }
  158. }
  159. private String receive(ByteBuffer attachment) {
  160. return String.valueOf(charset.decode(attachment)).trim();
  161. }
  162. }

ServerMain:

  1. package demo5.aio.server;
  2. /**
  3. * @ClassName:
  4. * @Description:
  5. * @author: hszjj
  6. * @date: 2019/11/23 19:43
  7. */
  8. public class ServerMain {
  9. public static void main(String[] args) {
  10. ChatServer server=new ChatServer();
  11. server.start();
  12. }
  13. }

三、客户端创建

chatClient:

  1. package demo5.aio.client;
  2. import java.io.BufferedReader;
  3. import java.io.Closeable;
  4. import java.io.IOException;
  5. import java.net.InetSocketAddress;
  6. import java.nio.ByteBuffer;
  7. import java.nio.channels.AsynchronousSocketChannel;
  8. import java.nio.charset.Charset;
  9. import java.util.concurrent.ExecutionException;
  10. import java.util.concurrent.Future;
  11. public class ChatClient {
  12. private static final String LOCALHOST = "localhost";
  13. private static final int DEFAULT_PORT = 9999;
  14. private static final String QUIT = "quit";
  15. private static final int BUFFER = 1024;
  16. private Charset charset = Charset.forName("UTF-8");
  17. private int port;
  18. private String host;
  19. private AsynchronousSocketChannel clientChannel;
  20. public ChatClient(String host, int port) {
  21. this.host = host;
  22. this.port = port;
  23. }
  24. public ChatClient() {
  25. this(LOCALHOST, DEFAULT_PORT);
  26. }
  27. private void shutdown(Closeable... closeables) {
  28. try {
  29. for (Closeable shut : closeables) {
  30. if (shut != null) {
  31. shut.close();
  32. }
  33. }
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. public boolean readyToQuit(String str) {
  39. return QUIT.equalsIgnoreCase(str);
  40. }
  41. public void start() {
  42. try {
  43. clientChannel = AsynchronousSocketChannel.open();
  44. Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port));
  45. System.out.println("已成功连接到服务器");
  46. future.get();
  47. //处理用户输入
  48. new Thread(new UserInputHander(this)).start();
  49. ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
  50. while (true) {
  51. Future<Integer> readResult = clientChannel.read(buffer);
  52. int result = readResult.get();
  53. if (result <= 0) {
  54. //服务器异常
  55. System.err.println("服务器断开");
  56. shutdown(clientChannel);
  57. System.exit(-1);
  58. } else {
  59. buffer.flip();
  60. String msg = String.valueOf(charset.decode(buffer));
  61. buffer.clear();
  62. System.out.println(msg);
  63. }
  64. }
  65. } catch (IOException e) {
  66. e.printStackTrace();
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. } catch (ExecutionException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. public void send(String msg) {
  74. try {
  75. if (msg.isEmpty()) {
  76. return;
  77. } else {
  78. ByteBuffer buffer = charset.encode(msg);
  79. Future<Integer> future = clientChannel.write(buffer);
  80. future.get();
  81. }
  82. } catch (InterruptedException | ExecutionException e) {
  83. e.printStackTrace();
  84. System.err.println("消息发送失败");
  85. }
  86. }
  87. }

UserInputHander:

  1. package demo5.aio.client;
  2. import demo3.nio1.client.Demo3ChatClient;
  3. import java.io.BufferedReader;
  4. import java.io.IOException;
  5. import java.io.InputStreamReader;
  6. public class UserInputHander implements Runnable {
  7. private ChatClient chatClient;
  8. private BufferedReader reader;
  9. public UserInputHander(ChatClient chatClient) {
  10. this.chatClient = chatClient;
  11. }
  12. @Override
  13. public void run() {
  14. reader = new BufferedReader(new InputStreamReader(System.in));
  15. try {
  16. while (true) {
  17. String msg = reader.readLine();
  18. chatClient.send(msg);
  19. if (chatClient.readyToQuit(msg)) {
  20. break;
  21. }
  22. }
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. } finally {
  26. if (reader != null) {
  27. try {
  28. System.out.println("已断开服务器连接");
  29. reader.close();
  30. } catch (IOException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }
  36. }

ClientMain:

  1. package demo5.aio.client;
  2. /**
  3. * @ClassName:
  4. * @Description:
  5. * @author: hszjj
  6. * @date: 2019/11/23 20:16
  7. */
  8. public class ClientMain {
  9. public static void main(String[] args) {
  10. ChatClient client=new ChatClient();
  11. client.start();
  12. }
  13. }

四、效果演示

image.png