一、BIO编程模型概述

Acceptor:接受客户的请求
Client:客户端发送的请求

问题引出:

如果像前一个Socket和ServerSocket的模型一样设计
在Client发送请求的时候,Acceptor接受了以后,就不能再接受更多的请求了,就变成了自言自语模式了

解决方法:

因此引出了的Handler来处理请求,于是再发送请求,Acceptor就可以接受请求了。
形成模型如下图:
image.png
这样的模型即BIO传统模型

主要功能

  1. 基于BIO模型
  2. 支持多人同时在线
  3. 每个用户的发言都被转发给其他用户

    架构设计

    首先需要一个主线程来做Acceptor
    其次需要另外一个线程来做Handler,来进行数据处理和数据据读写
    每一个客户都要对应一个Handler
    服务器端需要存储所有客户信息,才可以把消息广播给所有客户
    客户端不能在输入的时候阻塞接收信息

    二、代码实现

    客户端:

  1. package demo2.client;
  2. import java.io.*;
  3. import java.net.Socket;
  4. public class ChatClient {
  5. private final String DEFAULT_IP="localhost";
  6. private final int DEFAULT_PORT=9999;
  7. private final String QUIT="quit";
  8. private BufferedReader reader;
  9. private BufferedWriter writer;
  10. private Socket socket;
  11. public void send(String msg) throws IOException {
  12. if(msg!=null&&!socket.isOutputShutdown()){
  13. writer.write(msg+"\n");
  14. writer.flush();
  15. }
  16. }
  17. public String receive() throws IOException {
  18. String msg=null;
  19. if (!socket.isInputShutdown()){
  20. msg=reader.readLine();
  21. }
  22. return msg;
  23. }
  24. public boolean isQuit(String msg){
  25. return QUIT.equalsIgnoreCase(msg);
  26. }
  27. public void close(){
  28. try {
  29. if (writer != null) {
  30. writer.close();
  31. }
  32. if (reader != null) {
  33. reader.close();
  34. }
  35. if (socket != null) {
  36. System.out.println("客户端["+socket.getPort()+"]关闭");
  37. socket.close();
  38. }
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. public void start(){
  44. try {
  45. socket=new Socket(DEFAULT_IP,DEFAULT_PORT);
  46. System.out.println("客户端["+DEFAULT_IP+"]成功启动");
  47. writer=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
  48. reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
  49. new Thread(new UserInputHander(this)).start();
  50. String msg=null;
  51. while ((msg=receive())!=null){
  52. System.out.println(msg);
  53. }
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }finally {
  57. close();
  58. }
  59. }
  60. }

线程部分(与输出不同线程):

  1. package demo2.client;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. public class UserInputHander implements Runnable {
  6. private ChatClient chatClient;
  7. private BufferedReader reader;
  8. public UserInputHander(ChatClient chatClient) {
  9. this.chatClient = chatClient;
  10. }
  11. @Override
  12. public void run() {
  13. reader = new BufferedReader(new InputStreamReader(System.in));
  14. try {
  15. while (true) {
  16. String msg = reader.readLine();
  17. chatClient.send(msg);
  18. if (chatClient.isQuit(msg)) {
  19. break;
  20. }
  21. }
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. } finally {
  25. if (reader != null) {
  26. try {
  27. reader.close();
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }
  34. }

主函数:

  1. package demo2.client;
  2. public class ClientMain {
  3. public static void main(String[] args) {
  4. ChatClient chatClient=new ChatClient();
  5. chatClient.start();
  6. }
  7. }

服务器:

  1. package demo2.server;
  2. import java.io.*;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. public class ChatServer {
  8. private ServerSocket serverSocket;
  9. private final int DEFAULT_PORT=9999;
  10. private final String QUIT="quit";
  11. private Map<Integer, Writer> connectedClients;
  12. public ChatServer(){
  13. connectedClients=new HashMap<>();
  14. }
  15. public synchronized void addClient(Socket socket) throws IOException {
  16. if(socket!=null){
  17. int port=socket.getPort();
  18. BufferedWriter writer=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
  19. connectedClients.put(port,writer);
  20. System.out.println("客户端["+port+"]已连接到服务器");
  21. }
  22. }
  23. public synchronized void removeClient(Socket socket) throws IOException {
  24. if(socket!=null) {
  25. int port=socket.getPort();
  26. if (connectedClients.containsKey(port)) {
  27. connectedClients.get(port).close();
  28. connectedClients.remove(port);
  29. System.out.println("客户端[" + port + "]已断开连接");
  30. }
  31. }
  32. }
  33. public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
  34. if (socket!=null) {
  35. for (Integer port : connectedClients.keySet()) {
  36. if (port != socket.getPort()) {
  37. Writer writer = connectedClients.get(port);
  38. writer.write(msg);
  39. writer.flush();
  40. }
  41. }
  42. }
  43. }
  44. public synchronized void shutdownClient() throws IOException {
  45. if (serverSocket!=null){
  46. serverSocket.close();
  47. System.out.println("服务器已关闭");
  48. }
  49. }
  50. public boolean isQuit(String msg){
  51. return QUIT.equalsIgnoreCase(msg);
  52. }
  53. public void start(){
  54. try {
  55. serverSocket=new ServerSocket(DEFAULT_PORT);
  56. System.out.println("启动服务器["+DEFAULT_PORT+"]成功");
  57. while (true){
  58. Socket socket=serverSocket.accept();
  59. new Thread(new ChatHander(socket,this)).start();
  60. }
  61. } catch (IOException e) {
  62. e.printStackTrace();
  63. }finally {
  64. try {
  65. shutdownClient();
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. }

线程部份(处理用户信息):

  1. package demo2.server;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.net.Socket;
  6. public class ChatHander implements Runnable{
  7. private ChatServer chatServer;
  8. private Socket socket;
  9. private BufferedReader reader;
  10. public ChatHander(Socket socket,ChatServer chatServer){
  11. this.chatServer=chatServer;
  12. this.socket=socket;
  13. }
  14. @Override
  15. public void run() {
  16. try {
  17. chatServer.addClient(socket);
  18. reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
  19. String msg=null;
  20. int port=socket.getPort();
  21. while ((msg=reader.readLine())!=null) {
  22. if (chatServer.isQuit(msg)) {
  23. break;
  24. }
  25. String nwmsg = "客户端[" + port + "]:" + msg + "\n";
  26. System.out.print(nwmsg);
  27. chatServer.forwardMessage(socket, nwmsg);
  28. }
  29. } catch (IOException e) {
  30. e.printStackTrace();
  31. }finally {
  32. try {
  33. if (socket != null) {
  34. chatServer.removeClient(socket);
  35. }
  36. if (reader != null) {
  37. reader.close();
  38. }
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. }

主函数:

  1. package demo2.server;
  2. public class ServerMain {
  3. public static void main(String[] args) {
  4. ChatServer chatServer=new ChatServer();
  5. chatServer.start();
  6. }
  7. }

三、代码的效果

image.png

四、伪异步IO编程

随着客户端增加和退出,会产生线程的调度和资源的浪费,回忆线程池的知识,这里引入线程池的做法来解决资源的问题。

功能建模:

image.png
如果Client4用户想要加入群聊时,线程池却没有空余线程,Client则等待其他用户退出时才可以加入。

代码实现:

这里我只对服务器端进行修改,因为客户端的资源浪费少之又少,就不做阐述了。
在ChatServer中增加属性:
private ExecutorService executorService;
修改构造方法:
设置5个线程。

  1. public ChatServer(){
  2. executorService= Executors.newFixedThreadPool(5);
  3. connectedClients=new HashMap<>();
  4. }

修改start方法:

  1. public void start(){
  2. try {
  3. serverSocket=new ServerSocket(DEFAULT_PORT);
  4. System.out.println("启动服务器["+DEFAULT_PORT+"]成功");
  5. while (true){
  6. Socket socket=serverSocket.accept();
  7. executorService.execute(new ChatHander(socket,this));
  8. }
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }finally {
  12. try {
  13. shutdownClient();
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }

这样我们就实现了简单的BIO的多人聊天室。