一、BIO编程模型概述
Acceptor:接受客户的请求
Client:客户端发送的请求
问题引出:
如果像前一个Socket和ServerSocket的模型一样设计
在Client发送请求的时候,Acceptor接受了以后,就不能再接受更多的请求了,就变成了自言自语模式了
解决方法:
因此引出了的Handler来处理请求,于是再发送请求,Acceptor就可以接受请求了。
形成模型如下图:
这样的模型即BIO传统模型
主要功能
- 基于BIO模型
- 支持多人同时在线
- 每个用户的发言都被转发给其他用户
架构设计
首先需要一个主线程来做Acceptor
其次需要另外一个线程来做Handler,来进行数据处理和数据据读写
每一个客户都要对应一个Handler
服务器端需要存储所有客户信息,才可以把消息广播给所有客户
客户端不能在输入的时候阻塞接收信息二、代码实现
客户端:
package demo2.client;import java.io.*;import java.net.Socket;public class ChatClient {private final String DEFAULT_IP="localhost";private final int DEFAULT_PORT=9999;private final String QUIT="quit";private BufferedReader reader;private BufferedWriter writer;private Socket socket;public void send(String msg) throws IOException {if(msg!=null&&!socket.isOutputShutdown()){writer.write(msg+"\n");writer.flush();}}public String receive() throws IOException {String msg=null;if (!socket.isInputShutdown()){msg=reader.readLine();}return msg;}public boolean isQuit(String msg){return QUIT.equalsIgnoreCase(msg);}public void close(){try {if (writer != null) {writer.close();}if (reader != null) {reader.close();}if (socket != null) {System.out.println("客户端["+socket.getPort()+"]关闭");socket.close();}} catch (IOException e) {e.printStackTrace();}}public void start(){try {socket=new Socket(DEFAULT_IP,DEFAULT_PORT);System.out.println("客户端["+DEFAULT_IP+"]成功启动");writer=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));new Thread(new UserInputHander(this)).start();String msg=null;while ((msg=receive())!=null){System.out.println(msg);}} catch (IOException e) {e.printStackTrace();}finally {close();}}}
线程部分(与输出不同线程):
package demo2.client;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class UserInputHander implements Runnable {private ChatClient chatClient;private BufferedReader reader;public UserInputHander(ChatClient chatClient) {this.chatClient = chatClient;}@Overridepublic void run() {reader = new BufferedReader(new InputStreamReader(System.in));try {while (true) {String msg = reader.readLine();chatClient.send(msg);if (chatClient.isQuit(msg)) {break;}}} catch (IOException e) {e.printStackTrace();} finally {if (reader != null) {try {reader.close();} catch (IOException e) {e.printStackTrace();}}}}}
主函数:
package demo2.client;public class ClientMain {public static void main(String[] args) {ChatClient chatClient=new ChatClient();chatClient.start();}}
服务器:
package demo2.server;import java.io.*;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.Map;public class ChatServer {private ServerSocket serverSocket;private final int DEFAULT_PORT=9999;private final String QUIT="quit";private Map<Integer, Writer> connectedClients;public ChatServer(){connectedClients=new HashMap<>();}public synchronized void addClient(Socket socket) throws IOException {if(socket!=null){int port=socket.getPort();BufferedWriter writer=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));connectedClients.put(port,writer);System.out.println("客户端["+port+"]已连接到服务器");}}public synchronized void removeClient(Socket socket) throws IOException {if(socket!=null) {int port=socket.getPort();if (connectedClients.containsKey(port)) {connectedClients.get(port).close();connectedClients.remove(port);System.out.println("客户端[" + port + "]已断开连接");}}}public synchronized void forwardMessage(Socket socket,String msg) throws IOException {if (socket!=null) {for (Integer port : connectedClients.keySet()) {if (port != socket.getPort()) {Writer writer = connectedClients.get(port);writer.write(msg);writer.flush();}}}}public synchronized void shutdownClient() throws IOException {if (serverSocket!=null){serverSocket.close();System.out.println("服务器已关闭");}}public boolean isQuit(String msg){return QUIT.equalsIgnoreCase(msg);}public void start(){try {serverSocket=new ServerSocket(DEFAULT_PORT);System.out.println("启动服务器["+DEFAULT_PORT+"]成功");while (true){Socket socket=serverSocket.accept();new Thread(new ChatHander(socket,this)).start();}} catch (IOException e) {e.printStackTrace();}finally {try {shutdownClient();} catch (IOException e) {e.printStackTrace();}}}}
线程部份(处理用户信息):
package demo2.server;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.Socket;public class ChatHander implements Runnable{private ChatServer chatServer;private Socket socket;private BufferedReader reader;public ChatHander(Socket socket,ChatServer chatServer){this.chatServer=chatServer;this.socket=socket;}@Overridepublic void run() {try {chatServer.addClient(socket);reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));String msg=null;int port=socket.getPort();while ((msg=reader.readLine())!=null) {if (chatServer.isQuit(msg)) {break;}String nwmsg = "客户端[" + port + "]:" + msg + "\n";System.out.print(nwmsg);chatServer.forwardMessage(socket, nwmsg);}} catch (IOException e) {e.printStackTrace();}finally {try {if (socket != null) {chatServer.removeClient(socket);}if (reader != null) {reader.close();}} catch (IOException e) {e.printStackTrace();}}}}
主函数:
package demo2.server;public class ServerMain {public static void main(String[] args) {ChatServer chatServer=new ChatServer();chatServer.start();}}
三、代码的效果
四、伪异步IO编程
随着客户端增加和退出,会产生线程的调度和资源的浪费,回忆线程池的知识,这里引入线程池的做法来解决资源的问题。
功能建模:

如果Client4用户想要加入群聊时,线程池却没有空余线程,Client则等待其他用户退出时才可以加入。
代码实现:
这里我只对服务器端进行修改,因为客户端的资源浪费少之又少,就不做阐述了。
在ChatServer中增加属性:private ExecutorService executorService;
修改构造方法:
设置5个线程。
public ChatServer(){executorService= Executors.newFixedThreadPool(5);connectedClients=new HashMap<>();}
修改start方法:
public void start(){try {serverSocket=new ServerSocket(DEFAULT_PORT);System.out.println("启动服务器["+DEFAULT_PORT+"]成功");while (true){Socket socket=serverSocket.accept();executorService.execute(new ChatHander(socket,this));}} catch (IOException e) {e.printStackTrace();}finally {try {shutdownClient();} catch (IOException e) {e.printStackTrace();}}}
这样我们就实现了简单的BIO的多人聊天室。
