一、BIO编程模型概述
Acceptor:接受客户的请求
Client:客户端发送的请求
问题引出:
如果像前一个Socket和ServerSocket的模型一样设计
在Client发送请求的时候,Acceptor接受了以后,就不能再接受更多的请求了,就变成了自言自语模式了
解决方法:
因此引出了的Handler来处理请求,于是再发送请求,Acceptor就可以接受请求了。
形成模型如下图:
这样的模型即BIO传统模型
主要功能
- 基于BIO模型
- 支持多人同时在线
- 每个用户的发言都被转发给其他用户
架构设计
首先需要一个主线程来做Acceptor
其次需要另外一个线程来做Handler,来进行数据处理和数据据读写
每一个客户都要对应一个Handler
服务器端需要存储所有客户信息,才可以把消息广播给所有客户
客户端不能在输入的时候阻塞接收信息二、代码实现
客户端:
package demo2.client;
import java.io.*;
import java.net.Socket;
public class ChatClient {
private final String DEFAULT_IP="localhost";
private final int DEFAULT_PORT=9999;
private final String QUIT="quit";
private BufferedReader reader;
private BufferedWriter writer;
private Socket socket;
public void send(String msg) throws IOException {
if(msg!=null&&!socket.isOutputShutdown()){
writer.write(msg+"\n");
writer.flush();
}
}
public String receive() throws IOException {
String msg=null;
if (!socket.isInputShutdown()){
msg=reader.readLine();
}
return msg;
}
public boolean isQuit(String msg){
return QUIT.equalsIgnoreCase(msg);
}
public void close(){
try {
if (writer != null) {
writer.close();
}
if (reader != null) {
reader.close();
}
if (socket != null) {
System.out.println("客户端["+socket.getPort()+"]关闭");
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void start(){
try {
socket=new Socket(DEFAULT_IP,DEFAULT_PORT);
System.out.println("客户端["+DEFAULT_IP+"]成功启动");
writer=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
new Thread(new UserInputHander(this)).start();
String msg=null;
while ((msg=receive())!=null){
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
close();
}
}
}
线程部分(与输出不同线程):
package demo2.client;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class UserInputHander implements Runnable {
private ChatClient chatClient;
private BufferedReader reader;
public UserInputHander(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
reader = new BufferedReader(new InputStreamReader(System.in));
try {
while (true) {
String msg = reader.readLine();
chatClient.send(msg);
if (chatClient.isQuit(msg)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
主函数:
package demo2.client;
public class ClientMain {
public static void main(String[] args) {
ChatClient chatClient=new ChatClient();
chatClient.start();
}
}
服务器:
package demo2.server;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class ChatServer {
private ServerSocket serverSocket;
private final int DEFAULT_PORT=9999;
private final String QUIT="quit";
private Map<Integer, Writer> connectedClients;
public ChatServer(){
connectedClients=new HashMap<>();
}
public synchronized void addClient(Socket socket) throws IOException {
if(socket!=null){
int port=socket.getPort();
BufferedWriter writer=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
connectedClients.put(port,writer);
System.out.println("客户端["+port+"]已连接到服务器");
}
}
public synchronized void removeClient(Socket socket) throws IOException {
if(socket!=null) {
int port=socket.getPort();
if (connectedClients.containsKey(port)) {
connectedClients.get(port).close();
connectedClients.remove(port);
System.out.println("客户端[" + port + "]已断开连接");
}
}
}
public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
if (socket!=null) {
for (Integer port : connectedClients.keySet()) {
if (port != socket.getPort()) {
Writer writer = connectedClients.get(port);
writer.write(msg);
writer.flush();
}
}
}
}
public synchronized void shutdownClient() throws IOException {
if (serverSocket!=null){
serverSocket.close();
System.out.println("服务器已关闭");
}
}
public boolean isQuit(String msg){
return QUIT.equalsIgnoreCase(msg);
}
public void start(){
try {
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("启动服务器["+DEFAULT_PORT+"]成功");
while (true){
Socket socket=serverSocket.accept();
new Thread(new ChatHander(socket,this)).start();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
shutdownClient();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
线程部份(处理用户信息):
package demo2.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
public class ChatHander implements Runnable{
private ChatServer chatServer;
private Socket socket;
private BufferedReader reader;
public ChatHander(Socket socket,ChatServer chatServer){
this.chatServer=chatServer;
this.socket=socket;
}
@Override
public void run() {
try {
chatServer.addClient(socket);
reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg=null;
int port=socket.getPort();
while ((msg=reader.readLine())!=null) {
if (chatServer.isQuit(msg)) {
break;
}
String nwmsg = "客户端[" + port + "]:" + msg + "\n";
System.out.print(nwmsg);
chatServer.forwardMessage(socket, nwmsg);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if (socket != null) {
chatServer.removeClient(socket);
}
if (reader != null) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
主函数:
package demo2.server;
public class ServerMain {
public static void main(String[] args) {
ChatServer chatServer=new ChatServer();
chatServer.start();
}
}
三、代码的效果
四、伪异步IO编程
随着客户端增加和退出,会产生线程的调度和资源的浪费,回忆线程池的知识,这里引入线程池的做法来解决资源的问题。
功能建模:
如果Client4用户想要加入群聊时,线程池却没有空余线程,Client则等待其他用户退出时才可以加入。
代码实现:
这里我只对服务器端进行修改,因为客户端的资源浪费少之又少,就不做阐述了。
在ChatServer中增加属性:private ExecutorService executorService;
修改构造方法:
设置5个线程。
public ChatServer(){
executorService= Executors.newFixedThreadPool(5);
connectedClients=new HashMap<>();
}
修改start方法:
public void start(){
try {
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("启动服务器["+DEFAULT_PORT+"]成功");
while (true){
Socket socket=serverSocket.accept();
executorService.execute(new ChatHander(socket,this));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
shutdownClient();
} catch (IOException e) {
e.printStackTrace();
}
}
}
这样我们就实现了简单的BIO的多人聊天室。