image.png

Server端

  1. public class Server {
  2. //定义属性
  3. private Selector selector;
  4. private ServerSocketChannel listenChannel;
  5. private static final int PORT = 6667;
  6. //构造器完成初始化工作
  7. public Server() {
  8. try {
  9. //得到选择器
  10. selector = Selector.open();
  11. //serverSocketChannel
  12. listenChannel = ServerSocketChannel.open();
  13. //绑定端口
  14. listenChannel.socket().bind(new InetSocketAddress(PORT));
  15. //设置非阻塞
  16. listenChannel.configureBlocking(false);
  17. //将listenerChannel注册到selector
  18. listenChannel.register(selector, SelectionKey.OP_ACCEPT);
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. //监听
  24. public void listen() {
  25. try {
  26. while (true) {
  27. int count = selector.select(2000);
  28. if (count > 0) { //表明有事件需要处理
  29. //遍历得到selectionKey集合
  30. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  31. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  32. while ((iterator.hasNext())) {
  33. //取出SelectionKey
  34. SelectionKey key = iterator.next();
  35. //监听到accept
  36. if (key.isAcceptable()) {
  37. SocketChannel socketChannel = listenChannel.accept();
  38. //设置非阻塞
  39. socketChannel.configureBlocking(false);
  40. //将该socketChannel注册到seletor上
  41. socketChannel.register(selector, SelectionKey.OP_READ);
  42. //提示上线
  43. System.out.println(socketChannel.getRemoteAddress() + "上线了");
  44. }
  45. if (key.isReadable()) {
  46. //读取数据
  47. readData(key);
  48. }
  49. //删除key,防止重复
  50. iterator.remove();
  51. }
  52. } else {
  53. //System.out.println("等待中");
  54. }
  55. }
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. } finally {
  59. }
  60. }
  61. /**
  62. * 读取消息
  63. *
  64. * @param key 传入SelectionKey
  65. */
  66. private void readData(SelectionKey key) {
  67. //定义socketChannel
  68. SocketChannel channel = null;
  69. try {
  70. channel = (SocketChannel) key.channel();
  71. //创建Buffer
  72. ByteBuffer buffer = ByteBuffer.allocate(1024);
  73. int count = channel.read(buffer);
  74. //根据count的值做处理
  75. if (count > 0) {
  76. //读取到了数据转化成字符串
  77. String msg = new String(buffer.array());
  78. //输出消息
  79. System.out.println("form 客户端" + msg);
  80. //向其他的客户端转发消息
  81. sendInfoToaOtherClients(msg, channel);
  82. }
  83. } catch (Exception e) {
  84. try {
  85. System.out.println(channel.getRemoteAddress() + "离线了");
  86. //取消注册
  87. key.cancel();
  88. //关闭通道
  89. channel.close();
  90. } catch (IOException ioException) {
  91. ioException.printStackTrace();
  92. }
  93. }
  94. }
  95. /**
  96. * 转发消息给其他用户
  97. *
  98. * @param msg 消息
  99. * @param self 排除自己--传入自己的channel
  100. */
  101. private void sendInfoToaOtherClients(String msg, SocketChannel self) throws IOException {
  102. System.out.println("服务器转发消息");
  103. //遍历所有注册到selector中的socketChannel并排除自己
  104. for (SelectionKey key : selector.keys()) {
  105. //获取到key对应的channel
  106. Channel channel = key.channel();
  107. //排除自己
  108. if (channel instanceof SocketChannel && channel != self) {
  109. SocketChannel dest = (SocketChannel) channel;
  110. //将消息存储到buffer
  111. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
  112. //将buffer的数据写入通道
  113. dest.write(buffer);
  114. }
  115. }
  116. }
  117. public static void main(String[] args) {
  118. Server server = new Server();
  119. //监听
  120. server.listen();
  121. }
  122. }

Client端

  1. public class Client1 {
  2. private final static String HOST = "127.0.0.1"; //ip
  3. private final static int PORT = 6667; //端口
  4. private final Selector selector;
  5. private final SocketChannel socketChannel;
  6. private final String username;
  7. public Client1() throws Exception {
  8. selector = Selector.open();
  9. //连接
  10. socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
  11. //设置非阻塞
  12. socketChannel.configureBlocking(false);
  13. //将channel注册到selector
  14. socketChannel.register(selector, SelectionKey.OP_READ);
  15. //得到username
  16. username = socketChannel.getLocalAddress().toString().substring(1);
  17. System.out.println(username + "is ok");
  18. }
  19. /**
  20. * 向服务器端发送消息
  21. *
  22. * @param info 发送的消息
  23. */
  24. public void sendInfo(String info) {
  25. info = username + "说:" + info;
  26. try {
  27. socketChannel.write(ByteBuffer.wrap(info.getBytes(StandardCharsets.UTF_8)));
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. public void readInfo() {
  33. try {
  34. int ReadSelects = selector.select();
  35. if (ReadSelects > 0) {//说明有事件发生的通道
  36. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  37. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  38. while (iterator.hasNext()) {
  39. SelectionKey key = iterator.next();
  40. if (key.isReadable()) {
  41. //得到相关的通道
  42. SocketChannel channel = (SocketChannel) key.channel();
  43. //得到一个buffer
  44. ByteBuffer buffer = ByteBuffer.allocate(1024);
  45. channel.read(buffer);
  46. //输出读到缓冲区的内容
  47. String msg = new String(buffer.array());
  48. System.out.println(msg.trim());
  49. }
  50. }
  51. iterator.remove(); //移除当前key
  52. }
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. public static void main(String[] args) throws Exception {
  58. //启动客户端
  59. Client1 client1 = new Client1();
  60. //启动线程,每隔3s,读取从服务器端发送数据
  61. new Thread(() -> {
  62. while (true) {
  63. client1.readInfo();
  64. try {
  65. Thread.sleep(1000);
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }).start();
  71. //发送数据给服务器
  72. Scanner scanner = new Scanner(System.in);
  73. while (scanner.hasNextLine()) {
  74. String s = scanner.nextLine();
  75. client1.sendInfo(s);
  76. }
  77. }
  78. }