image.png
Client1加入聊天室,当selector监听到了accept信号之后,就会做handles的处理,将客户端注册到selector上
当客户向服务端发送了信息之后,就会认为socketchannel就会被触发read事件。
image.png
读取出来之后,转发给其他的客户端
但selector是阻塞的,如果没有监听到任何的信息,selector就会阻塞住。
如果再有一个客户端加入,selector就会监听到accept,然后加入这个客户端的channel,并监听这个channel的read事件。
image.png
也就是一个客户端对应一个channel,selector负责监听各个channel的信号。
一个selector对象在一个线程里面监听处理多个通道

代码演示

先来演示ServerSocket这个Channel的

  1. import java.io.Closeable;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.*;
  6. import java.nio.charset.Charset;
  7. import java.util.Set;
  8. public class ChatServer {
  9. private static final int DEFAULT_PORT = 8888;
  10. private static final String QUIT = "quit";
  11. private static final int BUFFERE = 1024;
  12. private ServerSocketChannel server;
  13. private Selector selector;
  14. private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFERE);
  15. private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFERE);
  16. private Charset charset = Charset.forName("UTF-8");
  17. private int port;
  18. public ChatServer(){
  19. this(DEFAULT_PORT);
  20. }
  21. public ChatServer(int port){
  22. this.port = port;
  23. }
  24. private void start(){
  25. try {
  26. server = ServerSocketChannel.open();
  27. server.configureBlocking(false);
  28. server.socket().bind(new InetSocketAddress(port));
  29. selector = Selector.open();
  30. server.register(selector, SelectionKey.OP_ACCEPT);
  31. System.out.println("启动服务器,监听端口:" + port + "...");
  32. while (true){
  33. selector.select();
  34. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  35. // 遍历每一个触发的事件
  36. for (SelectionKey key : selectionKeys){
  37. // 处理被触发的事件
  38. handles(key);
  39. }
  40. selectionKeys.clear();
  41. }
  42. } catch (IOException e) {
  43. e.printStackTrace();
  44. }finally {
  45. close(selector);
  46. }
  47. }
  48. private void handles(SelectionKey key) throws IOException {
  49. // ACCEPT事件 —— 和客户端建立了连接 ServerSocketChannel上的
  50. //将客户端绑定selector,这样后面selector才能监听客户端的channel上的消息
  51. if (key.isAcceptable()){
  52. SocketChannel client = (SocketChannel) key.channel();
  53. // SocketChannel client = server.accept();
  54. client.configureBlocking(false);
  55. client.register(selector,SelectionKey.OP_READ);
  56. System.out.println(getClientName(client) + "已连接");
  57. }
  58. // READ事件 —— 客户端发送了消息给服务器端 SocketChannel上的
  59. else if (key.isReadable()){
  60. SocketChannel client = (SocketChannel) key.channel();
  61. // 往Channel中读取信息
  62. String fwdMsg = receive(client);
  63. if (fwdMsg.isEmpty()){
  64. //客户端异常 取消这个通道
  65. key.cancel();
  66. selector.wakeup();
  67. }else{
  68. //转发数据到其他客户端
  69. forwardMessage(client, fwdMsg);
  70. //检查用户是否退出
  71. if (readyToQuit(fwdMsg)){
  72. key.cancel();
  73. selector.wakeup();
  74. System.out.println(getClientName(client) + "已断开");
  75. }
  76. }
  77. }
  78. }
  79. private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
  80. for (SelectionKey key : selector.keys()){
  81. Channel connectedClient = key.channel();
  82. if (connectedClient instanceof ServerSocketChannel){
  83. continue;
  84. }
  85. if (key.isValid() && !client.equals(connectedClient)){
  86. //发送消息
  87. wBuffer.clear();
  88. wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));
  89. wBuffer.flip();
  90. while (wBuffer.hasRemaining()){
  91. ((SocketChannel)connectedClient).write(wBuffer);
  92. }
  93. }
  94. }
  95. }
  96. private String receive(SocketChannel client) throws IOException {
  97. rBuffer.clear();
  98. // rBuffer读取SocketChannel中的消息
  99. while (client.read(rBuffer) > 0);
  100. rBuffer.flip();
  101. return String.valueOf(charset.decode(rBuffer));
  102. }
  103. private String getClientName(SocketChannel client){
  104. return "客户端[" + client.socket().getPort() + "]";
  105. }
  106. private boolean readyToQuit(String msg){
  107. return QUIT.equals(msg);
  108. }
  109. private void close(Closeable closeable){
  110. if (closeable != null){
  111. try{
  112. closeable.close();
  113. }catch (IOException e){
  114. e.printStackTrace();
  115. }
  116. }
  117. }
  118. public static void main(String[] args) {
  119. ChatServer chatServer = new ChatServer(7777);
  120. chatServer.start();
  121. }
  122. }