一、案例1

要求

  1. 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:通过 Channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
  5. 目的:进一步理解 NIO 非阻塞网络编程机制

    1、服务端

    1. public class GroupChatServer {
    2. // 定义属性
    3. private Selector selector;
    4. private ServerSocketChannel listenChannel;
    5. private static final int PORT = 6667;
    6. // 构造器
    7. // 初始化工作
    8. public GroupChatServer() {
    9. try {
    10. // 得到选择器
    11. selector = Selector.open();
    12. // 获取 ServerSocketChannel
    13. listenChannel = ServerSocketChannel.open();
    14. // 绑定端口
    15. listenChannel.socket().bind(new InetSocketAddress(PORT));
    16. // 配置非阻塞模式
    17. listenChannel.configureBlocking(false);
    18. // 将该 listenChannel 注册到 selector
    19. listenChannel.register(selector, SelectionKey.OP_ACCEPT);
    20. } catch (IOException e) {
    21. e.printStackTrace();
    22. }
    23. }
    24. // 监听
    25. public void listen() {
    26. try {
    27. while (true) {
    28. // int count = selector.select(2000);
    29. int count = selector.select();
    30. if (count > 0) { // 有事件处理
    31. // 遍历得到的 SelectionKey 集合
    32. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    33. while (iterator.hasNext()) {
    34. // 取出 selectionKey
    35. SelectionKey key = iterator.next();
    36. // 监听到 accept
    37. if (key.isAcceptable()) {
    38. SocketChannel socketChannel = listenChannel.accept();
    39. socketChannel.configureBlocking(false);
    40. // 将该 socketChannel 注册到 Selector
    41. socketChannel.register(selector, SelectionKey.OP_READ);
    42. // 提示
    43. System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
    44. }
    45. if (key.isReadable()) { //通道发送read事件,即通道可读
    46. // 处理读
    47. readData(key);
    48. }
    49. // 当前的key删除,防止重复处理
    50. iterator.remove();
    51. }
    52. } else {
    53. System.out.println("等待。。。。。");
    54. }
    55. }
    56. } catch (Exception e) {
    57. e.printStackTrace();
    58. } finally {
    59. }
    60. }
    61. // 读取客户端信息
    62. private void readData(SelectionKey key) {
    63. SocketChannel channel = null;
    64. try {
    65. // 取到关联的 channel
    66. channel = (SocketChannel) key.channel();
    67. // 创建 buffer
    68. ByteBuffer buffer = ByteBuffer.allocate(1024);
    69. int count = channel.read(buffer);
    70. // 根据count做处理
    71. if (count > 0) {
    72. String msg = new String(buffer.array());
    73. System.out.println("from 客户端:" + msg);
    74. // 向其他客户端转发消息(排除自己)
    75. sendInfoToOtherClients(msg, channel);
    76. }
    77. } catch (Exception e) {
    78. try {
    79. System.out.println(channel.getRemoteAddress() + "离线");
    80. // 取消注册
    81. key.cancel();
    82. // 关闭通道
    83. channel.close();
    84. } catch (IOException ioException) {
    85. ioException.printStackTrace();
    86. }
    87. }
    88. }
    89. // 转发消息给其他客户
    90. private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
    91. System.out.println("服务器转发消息中...");
    92. // 遍历 所有注册到 selector 上的 SocketChannel,并删除 self
    93. for (SelectionKey key : selector.keys()) {
    94. Channel targetChannel = key.channel();
    95. // 排除自己
    96. if (targetChannel instanceof SocketChannel && targetChannel != self) {
    97. // 转型
    98. SocketChannel dest = (SocketChannel) targetChannel;
    99. // 将 msg 存储到buffer
    100. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
    101. // 将 buffer 数据写入到 通道
    102. System.out.println("转发给" + dest.getRemoteAddress().toString().substring(1) + ",内容:【" + msg + "]");
    103. dest.write(buffer);
    104. }
    105. }
    106. }
    107. public static void main(String[] args) {
    108. GroupChatServer groupChatServer = new GroupChatServer();
    109. groupChatServer.listen();
    110. }
    111. }

    2、客户端

    1. public class GroupChatClient {
    2. private final static String HOST = "127.0.0.1";
    3. private final static int PORT = 6667;
    4. private Selector selector;
    5. private SocketChannel socketChannel;
    6. private String username;
    7. // 构造器,完成初始化工作
    8. public GroupChatClient() throws IOException {
    9. selector = Selector.open();
    10. // 连接服务器
    11. socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
    12. // 设置非阻塞
    13. socketChannel.configureBlocking(false);
    14. socketChannel.register(selector, SelectionKey.OP_READ);
    15. // 得到 username
    16. // username = socketChannel.getLocalAddress().toString().substring(1);
    17. username = socketChannel.getLocalAddress().toString().substring(1);
    18. System.out.println(username + " is ok...");
    19. }
    20. // 向服务器发送消息
    21. public void sendInfo(String info) {
    22. info = username + "说:" + info;
    23. try {
    24. socketChannel.write(ByteBuffer.wrap(info.getBytes(StandardCharsets.UTF_8)));
    25. } catch (IOException e) {
    26. e.printStackTrace();
    27. }
    28. }
    29. // 读取从服务器端回复的消息
    30. public void readInfo() {
    31. try {
    32. // int readChannels = selector.select(2000);
    33. int readChannels = selector.select();
    34. if (readChannels > 0) {
    35. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    36. while (iterator.hasNext()) {
    37. SelectionKey key = iterator.next();
    38. if (key.isReadable()) {
    39. // 得到相关的通道
    40. SocketChannel socketChannel = (SocketChannel) key.channel();
    41. // 得到一个buffer
    42. ByteBuffer buffer = ByteBuffer.allocate(1024);
    43. // 读取
    44. socketChannel.read(buffer);
    45. String msg = new String(buffer.array());
    46. System.out.println(msg);
    47. }
    48. }
    49. // 删除当前 SelectionKey 防止重复操作
    50. iterator.remove();
    51. } else {
    52. // System.out.println("没有可用的通道");
    53. }
    54. } catch (IOException e) {
    55. e.printStackTrace();
    56. }
    57. }
    58. public static void main(String[] args) throws IOException {
    59. // 启动客户端
    60. GroupChatClient groupChatClient = new GroupChatClient();
    61. // 启动一个线程,每隔三秒,从服务器端读取数据
    62. new Thread(new Runnable() {
    63. @Override
    64. public void run() {
    65. while (true) {
    66. groupChatClient.readInfo();
    67. try {
    68. Thread.sleep(3000);
    69. } catch (InterruptedException e) {
    70. e.printStackTrace();
    71. }
    72. }
    73. }
    74. }).start();
    75. // 发送数据给服务器
    76. Scanner scanner = new Scanner(System.in);
    77. while (scanner.hasNextLine()) {
    78. String s = scanner.nextLine();
    79. groupChatClient.sendInfo(s);
    80. }
    81. }
    82. }