Server
BIO Server:
public class ChatServer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChatServer.class);
private ServerSocket serverSocket;
private Map<Integer, Writer> clients;
private ExecutorService service = Executors.newFixedThreadPool(10);
public ChatServer() throws IOException {
this(8888);
}
public ChatServer(int port) throws IOException {
clients = new HashMap<>();
// 创建ServerSocket
serverSocket = new ServerSocket();
// 绑定端口
serverSocket.bind(new InetSocketAddress(port));
logger.info("服务端已经启动,正在监听{}端口", port);
}
/**
* 客户端连接
*
* @param socket
* @throws IOException
*/
public synchronized void addClient(Socket socket) throws IOException {
if (Objects.nonNull(socket)) {
// 使用客户端端口作为key
int port = socket.getPort();
// 使用客户端输出流作为value
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream()));
clients.put(port, writer);
logger.info("客户端{}已经连入", port);
}
}
/**
* 客户端下线方法
*
* @param socket
*/
public synchronized void removeClient(Socket socket) {
if (Objects.nonNull(socket)) {
int port = socket.getPort();
// 移除客户端
clients.remove(port);
logger.info("客户端{}已经下线", port);
}
}
/**
* 服务端消息转发
*
* @param socket
* @param msg
*/
public synchronized void forwardMessage(Socket socket, String msg) {
// 实现消息转发
clients.forEach((port, writer) -> {
if (port != socket.getPort()) {
try {
writer.write(msg);
writer.flush();
} catch (IOException e) {
logger.error("消息转发给{}失败", port);
logger.error(e.getMessage(), e);
}
}
});
}
@Override
public void run() {
try {
// Thread.interrupted():查询中断标记,并清除标记
while (!Thread.interrupted()) {
// 等待客户端连接
Socket socket = serverSocket.accept();
service.execute(new ChatHandler(socket, this));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
serverSocket.close();
service.shutdownNow();
logger.info("服务端已经关闭");
} catch (IOException e) {
logger.error("服务端关闭失败!");
logger.error(e.getMessage(), e);
}
}
}
}
ChatHandler:
public class ChatHandler implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChatServer.class);
private Socket socket;
private ChatServer chatServer;
public ChatHandler(Socket socket, ChatServer chatServer) {
this.socket = socket;
this.chatServer = chatServer;
}
@Override
public void run() {
try {
// 存储新上线用户
chatServer.addClient(socket);
// 读取用户发送的消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
System.out.println(socket);
String msg = null;
while ((msg = reader.readLine()) != null) {
System.out.println("hello");
String fwdMsg = "客户端" + socket.getPort() + "发送的消息:" + msg;
System.out.println(fwdMsg);
logger.info(fwdMsg);
// 进行消息转发
chatServer.forwardMessage(socket, fwdMsg + "\n");
// 判断用户是否准备退出
if ("Quit".equalsIgnoreCase(msg)) {
break;
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
logger.info("用户{}退出", socket.getPort());
chatServer.removeClient(socket);
}
}
}
Client
public class ChatClient implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChatClient.class);
private Socket socket;
private BufferedReader reader;
private BufferedWriter writer;
public ChatClient(InetSocketAddress endpoint) throws IOException {
// 创建Socket
socket = new Socket();
// 连接服务器
socket.connect(endpoint);
logger.info("客户端连接成功");
reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream()));
}
public ChatClient() throws IOException {
this(new InetSocketAddress(Inet4Address.getLocalHost(), 8888));
}
public void send(String msg) throws IOException {
if (!socket.isOutputShutdown()) {
writer.write(msg);
writer.flush();
if ("quit".equalsIgnoreCase(msg)) {
this.close();
}
}
}
private String receive() throws IOException {
String msg = null;
if (!socket.isInputShutdown()) {
msg = reader.readLine();
}
return msg;
}
public void close() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
new Thread(new UserInputHandler(this)).start();
String msg = null;
while ((msg = receive()) != null) {
// 输出从服务端收到的消息
System.out.println(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
private class UserInputHandler implements Runnable {
private ChatClient client;
public UserInputHandler(ChatClient client) {
this.client = client;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
while ((msg = reader.readLine()) != null){
client.send(msg + "\n");
if ("quit".equalsIgnoreCase(msg)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
public static void main(String[] args) throws IOException {
ChatClient client = new ChatClient();
new Thread(client).start();
}
}