image.png

Server

BIO Server:

  1. public class ChatServer implements Runnable {
  2. private static final Logger logger = LoggerFactory.getLogger(ChatServer.class);
  3. private ServerSocket serverSocket;
  4. private Map<Integer, Writer> clients;
  5. private ExecutorService service = Executors.newFixedThreadPool(10);
  6. public ChatServer() throws IOException {
  7. this(8888);
  8. }
  9. public ChatServer(int port) throws IOException {
  10. clients = new HashMap<>();
  11. // 创建ServerSocket
  12. serverSocket = new ServerSocket();
  13. // 绑定端口
  14. serverSocket.bind(new InetSocketAddress(port));
  15. logger.info("服务端已经启动,正在监听{}端口", port);
  16. }
  17. /**
  18. * 客户端连接
  19. *
  20. * @param socket
  21. * @throws IOException
  22. */
  23. public synchronized void addClient(Socket socket) throws IOException {
  24. if (Objects.nonNull(socket)) {
  25. // 使用客户端端口作为key
  26. int port = socket.getPort();
  27. // 使用客户端输出流作为value
  28. BufferedWriter writer = new BufferedWriter(
  29. new OutputStreamWriter(socket.getOutputStream()));
  30. clients.put(port, writer);
  31. logger.info("客户端{}已经连入", port);
  32. }
  33. }
  34. /**
  35. * 客户端下线方法
  36. *
  37. * @param socket
  38. */
  39. public synchronized void removeClient(Socket socket) {
  40. if (Objects.nonNull(socket)) {
  41. int port = socket.getPort();
  42. // 移除客户端
  43. clients.remove(port);
  44. logger.info("客户端{}已经下线", port);
  45. }
  46. }
  47. /**
  48. * 服务端消息转发
  49. *
  50. * @param socket
  51. * @param msg
  52. */
  53. public synchronized void forwardMessage(Socket socket, String msg) {
  54. // 实现消息转发
  55. clients.forEach((port, writer) -> {
  56. if (port != socket.getPort()) {
  57. try {
  58. writer.write(msg);
  59. writer.flush();
  60. } catch (IOException e) {
  61. logger.error("消息转发给{}失败", port);
  62. logger.error(e.getMessage(), e);
  63. }
  64. }
  65. });
  66. }
  67. @Override
  68. public void run() {
  69. try {
  70. // Thread.interrupted():查询中断标记,并清除标记
  71. while (!Thread.interrupted()) {
  72. // 等待客户端连接
  73. Socket socket = serverSocket.accept();
  74. service.execute(new ChatHandler(socket, this));
  75. }
  76. } catch (IOException e) {
  77. e.printStackTrace();
  78. } finally {
  79. try {
  80. serverSocket.close();
  81. service.shutdownNow();
  82. logger.info("服务端已经关闭");
  83. } catch (IOException e) {
  84. logger.error("服务端关闭失败!");
  85. logger.error(e.getMessage(), e);
  86. }
  87. }
  88. }
  89. }

ChatHandler:

  1. public class ChatHandler implements Runnable {
  2. private static final Logger logger = LoggerFactory.getLogger(ChatServer.class);
  3. private Socket socket;
  4. private ChatServer chatServer;
  5. public ChatHandler(Socket socket, ChatServer chatServer) {
  6. this.socket = socket;
  7. this.chatServer = chatServer;
  8. }
  9. @Override
  10. public void run() {
  11. try {
  12. // 存储新上线用户
  13. chatServer.addClient(socket);
  14. // 读取用户发送的消息
  15. BufferedReader reader = new BufferedReader(
  16. new InputStreamReader(socket.getInputStream()));
  17. System.out.println(socket);
  18. String msg = null;
  19. while ((msg = reader.readLine()) != null) {
  20. System.out.println("hello");
  21. String fwdMsg = "客户端" + socket.getPort() + "发送的消息:" + msg;
  22. System.out.println(fwdMsg);
  23. logger.info(fwdMsg);
  24. // 进行消息转发
  25. chatServer.forwardMessage(socket, fwdMsg + "\n");
  26. // 判断用户是否准备退出
  27. if ("Quit".equalsIgnoreCase(msg)) {
  28. break;
  29. }
  30. }
  31. } catch (IOException e) {
  32. logger.error(e.getMessage(), e);
  33. } finally {
  34. logger.info("用户{}退出", socket.getPort());
  35. chatServer.removeClient(socket);
  36. }
  37. }
  38. }

Client

  1. public class ChatClient implements Runnable {
  2. private static final Logger logger = LoggerFactory.getLogger(ChatClient.class);
  3. private Socket socket;
  4. private BufferedReader reader;
  5. private BufferedWriter writer;
  6. public ChatClient(InetSocketAddress endpoint) throws IOException {
  7. // 创建Socket
  8. socket = new Socket();
  9. // 连接服务器
  10. socket.connect(endpoint);
  11. logger.info("客户端连接成功");
  12. reader = new BufferedReader(
  13. new InputStreamReader(socket.getInputStream()));
  14. writer = new BufferedWriter(
  15. new OutputStreamWriter(socket.getOutputStream()));
  16. }
  17. public ChatClient() throws IOException {
  18. this(new InetSocketAddress(Inet4Address.getLocalHost(), 8888));
  19. }
  20. public void send(String msg) throws IOException {
  21. if (!socket.isOutputShutdown()) {
  22. writer.write(msg);
  23. writer.flush();
  24. if ("quit".equalsIgnoreCase(msg)) {
  25. this.close();
  26. }
  27. }
  28. }
  29. private String receive() throws IOException {
  30. String msg = null;
  31. if (!socket.isInputShutdown()) {
  32. msg = reader.readLine();
  33. }
  34. return msg;
  35. }
  36. public void close() {
  37. if (socket != null) {
  38. try {
  39. socket.close();
  40. } catch (IOException e) {
  41. logger.error(e.getMessage(), e);
  42. }
  43. }
  44. }
  45. @Override
  46. public void run() {
  47. try {
  48. while (!Thread.interrupted()) {
  49. new Thread(new UserInputHandler(this)).start();
  50. String msg = null;
  51. while ((msg = receive()) != null) {
  52. // 输出从服务端收到的消息
  53. System.out.println(msg);
  54. }
  55. }
  56. } catch (IOException e) {
  57. e.printStackTrace();
  58. } finally {
  59. try {
  60. socket.close();
  61. } catch (IOException e) {
  62. logger.error(e.getMessage(), e);
  63. }
  64. }
  65. }
  66. private class UserInputHandler implements Runnable {
  67. private ChatClient client;
  68. public UserInputHandler(ChatClient client) {
  69. this.client = client;
  70. }
  71. @Override
  72. public void run() {
  73. try {
  74. BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
  75. String msg = null;
  76. while ((msg = reader.readLine()) != null){
  77. client.send(msg + "\n");
  78. if ("quit".equalsIgnoreCase(msg)) {
  79. break;
  80. }
  81. }
  82. } catch (IOException e) {
  83. e.printStackTrace();
  84. } finally {
  85. client.close();
  86. }
  87. }
  88. }
  89. public static void main(String[] args) throws IOException {
  90. ChatClient client = new ChatClient();
  91. new Thread(client).start();
  92. }
  93. }