image.png

NioServer.java

  1. package io.torey.niotest;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.*;
  6. import java.nio.charset.Charset;
  7. import java.util.Iterator;
  8. import java.util.Set;
  9. /**
  10. * NIO服务器端
  11. */
  12. public class NioServer {
  13. public static void main(String[] args){
  14. NioServer nioServer = new NioServer();
  15. try {
  16. nioServer.start();
  17. } catch (IOException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. /**
  22. * 启动
  23. */
  24. public void start() throws IOException{
  25. //1 创建Selector
  26. //2 通过ServerSocketChannel创建channel通道
  27. //3 为channel通道绑定监听端口
  28. //4 重点:设置channel为非阻塞模式
  29. //5 将channel注册到selector上,监听连接事件
  30. //6 循环等待新接入的连接
  31. //7 根据就绪状态,调用对应方法处理业务逻辑
  32. //1 创建Selector
  33. Selector selector = Selector.open();
  34. //2 通过ServerSocketChannel创建channel通道
  35. ServerSocketChannel serverSocketChannel= ServerSocketChannel.open();
  36. //3 为channel通道绑定监听端口
  37. serverSocketChannel.bind(new InetSocketAddress(8000));
  38. //4 重点:设置channel为非阻塞模式
  39. serverSocketChannel.configureBlocking(false);
  40. //5 将channel注册到selector上,监听连接事件
  41. serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
  42. System.out.println("服务器启动成功!!");
  43. //6 循环等待新接入的连接
  44. while (true) {
  45. //这个方法是阻塞方法,获取可用的channel数量
  46. int readyChannels= selector.select();
  47. if (readyChannels==0) {
  48. continue;
  49. }
  50. //获取可用channel的集合
  51. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  52. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  53. while (iterator.hasNext()) {
  54. //selectionKey的实例
  55. SelectionKey selectionKey = iterator.next();
  56. //移除Set中当前的selectionKey,重点
  57. iterator.remove();
  58. //7 根据就绪状态,调用对应方法处理业务逻辑
  59. //如果是 接入事件
  60. if (selectionKey.isAcceptable()) {
  61. acceptHandler(serverSocketChannel,selector);
  62. }
  63. //如果是 可读事件
  64. if (selectionKey.isReadable()) {
  65. readHandler(selectionKey,selector);
  66. }
  67. }
  68. }
  69. }
  70. /**
  71. * 接入事件处理器
  72. * @param serverSocketChannel
  73. * @param selector
  74. * @throws IOException
  75. */
  76. private void acceptHandler(ServerSocketChannel serverSocketChannel,Selector selector) throws IOException{
  77. //如果要是接入事件,创建socketChannel
  78. //将socketChannel设置为非阻塞工作模式
  79. //将channel注册到selector上,监听 可读事件
  80. //回复客户端提示信息
  81. System.out.println("有一个客户端进来了...");
  82. //如果要是接入事件,创建socketChannel
  83. SocketChannel socketChannel = serverSocketChannel.accept();
  84. //将socketChannel设置为非阻塞工作模式
  85. socketChannel.configureBlocking(false);
  86. //将channel注册到selector上,监听 可读事件
  87. socketChannel.register(selector,SelectionKey.OP_READ);
  88. //回复客户端提示信息
  89. socketChannel.write(Charset.forName("UTF-8")
  90. .encode("你与聊天室里其他人都不是朋友关系,请注意隐私安全!!"));
  91. }
  92. /**
  93. * 可读事件处理器
  94. */
  95. private void readHandler(SelectionKey selectionKey,Selector selector) throws IOException{
  96. //要从selectionKey中获取到已经就绪的channel
  97. //创建buffer
  98. //循环读取客户端请求信息
  99. //将channel再次注册到selector上,监听他的可读事件
  100. //将客户端发送的请求信息,广播给其他客户端
  101. //要从selectionKey中获取到已经就绪的channel
  102. SocketChannel socketChannel= (SocketChannel)selectionKey.channel();
  103. //创建buffer
  104. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  105. //循环读取客户端请求信息
  106. StringBuilder reques=new StringBuilder();
  107. while (socketChannel.read(byteBuffer) > 0) {
  108. //切换buffer为读模式
  109. byteBuffer.flip();
  110. //读取buffer中的内容
  111. reques.append(Charset.forName("UTF-8").decode(byteBuffer));
  112. }
  113. //将channel再次注册到selector上,监听他的可读事件
  114. socketChannel.register(selector,SelectionKey.OP_READ);
  115. //将客户端发送的请求信息,广播给其他客户端
  116. if (reques.length()>0) {
  117. String requesStr = reques.toString();
  118. System.out.println("收到客户端消息:"+requesStr);
  119. broadCast(selector,socketChannel,requesStr);
  120. }
  121. }
  122. /**
  123. * 广播给其他客户端
  124. */
  125. private void broadCast(Selector selector,SocketChannel sourceChannel,String request){
  126. //获取到所有已接入的客户端channel
  127. //循环向所有channel广播消息
  128. //获取到所有已接入的客户端channel
  129. Set<SelectionKey> selectionKeySet = selector.keys();
  130. selectionKeySet.forEach(selectionKey -> {
  131. SelectableChannel targetChannel = selectionKey.channel();
  132. //需要将发送消息的channel剔除掉,自己发的消息,广播的时候不在给自己发消息
  133. if (targetChannel instanceof SocketChannel
  134. && targetChannel!=sourceChannel) {
  135. try {
  136. //将消息发送到其他客户端
  137. ((SocketChannel) targetChannel).write(Charset.forName("UTF-8").encode(request));
  138. } catch (IOException e) {
  139. e.printStackTrace();
  140. }
  141. }
  142. });
  143. //循环向所有channel广播消息
  144. }
  145. }

NioClientHandler.java

  1. package io.torey.niotest;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.nio.charset.Charset;
  10. import java.util.Iterator;
  11. import java.util.Set;
  12. /**
  13. * 服务器端线程类,专门接收服务器端响应
  14. */
  15. public class NioClientHandler implements Runnable {
  16. private Selector selector;
  17. public NioClientHandler(Selector selector) {
  18. this.selector = selector;
  19. }
  20. @Override
  21. public void run() {
  22. try{
  23. while (true) {
  24. //这个方法是阻塞方法,获取可用的channel数量
  25. int readyChannels= selector.select();
  26. if (readyChannels==0) {
  27. continue;
  28. }
  29. //获取可用channel的集合
  30. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  31. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  32. while (iterator.hasNext()) {
  33. //selectionKey的实例
  34. SelectionKey selectionKey = iterator.next();
  35. //移除Set中当前的selectionKey,重点
  36. iterator.remove();
  37. //7 根据就绪状态,调用对应方法处理业务逻辑
  38. //如果是 可读事件
  39. if (selectionKey.isReadable()) {
  40. readHandler(selectionKey,selector);
  41. }
  42. }
  43. }}catch (Exception ex){
  44. ex.printStackTrace();
  45. }
  46. }
  47. /**
  48. * 可读事件处理器
  49. */
  50. private void readHandler(SelectionKey selectionKey,Selector selector) throws IOException {
  51. //要从selectionKey中获取到已经就绪的channel
  52. //创建buffer
  53. //循环读取服务器端请求信息
  54. //将channel再次注册到selector上,监听他的可读事件
  55. //将服务器端发送的请求信息,广播给其他服务器端
  56. //要从selectionKey中获取到已经就绪的channel
  57. SocketChannel socketChannel= (SocketChannel)selectionKey.channel();
  58. //创建buffer
  59. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  60. //循环读取服务器端请求信息
  61. StringBuilder response=new StringBuilder();
  62. while (socketChannel.read(byteBuffer) > 0) {
  63. //切换buffer为读模式
  64. byteBuffer.flip();
  65. //读取buffer中的内容
  66. response.append(Charset.forName("UTF-8").decode(byteBuffer));
  67. }
  68. //将channel再次注册到selector上,监听他的可读事件
  69. socketChannel.register(selector,SelectionKey.OP_READ);
  70. //将服务器端发送的请求信息,打印到本地
  71. if (response.length()>0) {
  72. System.out.println("收到服务器端消息:"+response.toString());
  73. }
  74. }
  75. }

NioClient.java

  1. package io.torey.niotest;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.channels.SelectionKey;
  5. import java.nio.channels.Selector;
  6. import java.nio.channels.SocketChannel;
  7. import java.nio.charset.Charset;
  8. import java.util.Scanner;
  9. /**
  10. * NIO客户端
  11. */
  12. public class NioClient {
  13. /**
  14. * 启动
  15. */
  16. public void start(String nickname) throws IOException {
  17. //连接服务器端
  18. //向服务器端发送数据
  19. //接收服务器端相应
  20. //连接服务器端
  21. SocketChannel socketChannel = SocketChannel.open(
  22. new InetSocketAddress("127.0.0.1", 8000));
  23. //接收服务器端相应
  24. //新开一个线程,专门负责来接收服务器端的相应数据
  25. Selector selector = Selector.open();
  26. socketChannel.configureBlocking(false);
  27. socketChannel.register(selector,SelectionKey.OP_READ);
  28. new Thread(new NioClientHandler(selector)).start();
  29. //向服务器端发送数据
  30. Scanner scanner = new Scanner(System.in);
  31. while (scanner.hasNextLine()) {
  32. String request =nickname+":"+ scanner.nextLine();
  33. if (null!= request&&request.length()>0) {
  34. socketChannel.write(Charset.forName("UTF-8").encode(request));
  35. }
  36. }
  37. }
  38. }

AClient.java

  1. package io.torey.niotest;
  2. import java.io.IOException;
  3. public class AClient {
  4. public static void main(String[] args) {
  5. NioClient client=new NioClient();
  6. try {
  7. client.start("客户端A");
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }

BClient.java

  1. package io.torey.niotest;
  2. import java.io.IOException;
  3. public class AClient {
  4. public static void main(String[] args) {
  5. NioClient client=new NioClient();
  6. try {
  7. client.start("客户端B");
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. }

测试截图

image.png
image.png
image.png