https://www.bilibili.com/video/BV1gz4y1C7RK

Java BIO 基本介绍

  • Java BIO 就是传统的 java IO 编程,其相关的类和接口在 java.io
  • BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器).

    Java BIO 工作机制

    image.png
    对 BIO 编程流程的梳理
    1) 服务器端启动一个 ServerSocket,注册端口,调用accpet方法监听客户端的Socket连接。
    2) 客户端启动 Socket对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯

    传统的BIO编程实例回顾

    网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信。

传统的同步阻塞模型开发中,服务端ServerSocket负责绑定IP地址,启动监听端口;客户端Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。

基于BIO模式下的通信,客户端 - 服务端是完全同步,完全耦合的。

BIO模式下单发和单收消息

  1. package cn.java.money.bio.demo1;
  2. import java.io.BufferedReader;
  3. import java.io.InputStream;
  4. import java.io.InputStreamReader;
  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. /**
  8. * 服务端
  9. */
  10. public class ServerDemo {
  11. public static void main(String[] args) throws Exception {
  12. System.out.println("==服务器启动==");
  13. // (1)注册端口
  14. ServerSocket serverSocket = new ServerSocket(8888);
  15. //(2)开始在这里暂停等待接收客户端的连接,得到一个端到端的Socket管道
  16. Socket socket = serverSocket.accept();
  17. //(3)从Socket管道中得到一个字节输入流。
  18. InputStream is = socket.getInputStream();
  19. //(4)把字节输入流包装成自己需要的流进行数据的读取。
  20. // InputStreamReader 把字节流转成字符流,把字符流包装为BufferedReader
  21. BufferedReader br = new BufferedReader(new InputStreamReader(is));
  22. //(5)读取数据
  23. //readLine()一直等待客服端完整的一行,如果没有完整的一行且客户端关闭通道,服务端就丢弃已经接受的这行数据,并关闭通道抛出异常
  24. //readLine()读到一行数据以后,会等待下一行数据,如果客户端关闭通道,服务端也关闭并抛出异常 java.net.SocketException: Connection reset
  25. String line;
  26. /*while ((line = br.readLine()) != null) {
  27. System.out.println("服务端收到:" + line);
  28. }*/
  29. //这样处理只是不读取下一行输入了,不会等待,直接结束 没有异常抛出
  30. if ((line = br.readLine()) != null) {
  31. System.out.println("服务端收到:" + line);
  32. }
  33. }
  34. }
  1. package cn.java.money.bio.demo1;
  2. import java.io.OutputStream;
  3. import java.io.PrintStream;
  4. import java.net.Socket;
  5. /**
  6. * 目标: Socket网络编程。
  7. * <p>
  8. * Java提供了一个包:java.net下的类都是用于网络通信。
  9. * Java提供了基于套接字(端口)Socket的网络通信模式,我们基于这种模式就可以直接实现TCP通信。
  10. * 只要用Socket通信,那么就是基于TCP可靠传输通信。
  11. * Socket的使用:
  12. * 构造器:public Socket(String host, int port)
  13. * 方法: public OutputStream getOutputStream():获取字节输出流
  14. * public InputStream getInputStream() :获取字节输入流
  15. * <p>
  16. * ServerSocket的使用:
  17. * 构造器:public ServerSocket(int port)
  18. * <p>
  19. * 小结:
  20. * 通信是很严格的,对方怎么发你就怎么收,对方发多少你就只能收多少!!
  21. */
  22. public class ClientDemo {
  23. public static void main(String[] args) throws Exception {
  24. System.out.println("==客户端启动==");
  25. // (1)创建一个Socket的通信管道,请求与服务端的端口连接。
  26. Socket socket = new Socket("127.0.0.1", 8888);
  27. // (2)从Socket通信管道中得到一个字节输出流。
  28. OutputStream os = socket.getOutputStream();
  29. // (3)把字节流改装成自己需要的流进行数据的发送
  30. PrintStream ps = new PrintStream(os);
  31. // (4)开始发送消息
  32. // ps.print("我是客户端,我想约你吃小龙虾!!!"); // 发送的不是一行数据,么有换行符
  33. ps.println("我是客户端,我想约你吃小龙虾!!!"); // 发送一行数据
  34. ps.flush();
  35. }
  36. }

小结

  • 在以上通信中,服务端会一致等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直进入阻塞状态。
  • 同时服务端是按照行获取消息的,这意味着客户端也必须按照行进行消息的发送,否则服务端将进入等待消息的阻塞状态

    BIO模式下多发和多收消息

    在前面的案例中,只能实现客户端发送消息,服务端接收消息,并不能实现反复的收消息和反复的发消息,我 们只需要在客户端案例中,加上反复按照行发送消息的逻辑即可!案例代码如下: ```java package cn.java.money.bio.demo2;

import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket;

/**

  • 服务端 */ public class ServerDemo {

    public static void main(String[] args) throws Exception {

    1. System.out.println("==服务器的启动==");
    2. //(1)注册端口
    3. ServerSocket serverSocket = new ServerSocket(8888);
    4. //(2)开始在这里暂停等待接收客户端的连接,得到一个端到端的Socket管道
    5. Socket socket = serverSocket.accept();
    6. //(3)从Socket管道中得到一个字节输入流。
    7. InputStream is = socket.getInputStream();
    8. //(4)把字节输入流包装成 自己需要的流进行数据的读取。
    9. BufferedReader br = new BufferedReader(new InputStreamReader(is));
    10. //(5)读取数据 readLine 阻塞等待
    11. String line;
    12. while ((line = br.readLine()) != null) {
    13. System.out.println("服务端收到:" + line);
    14. }

    } } java package cn.java.money.bio.demo2;

import java.io.OutputStream; import java.io.PrintStream; import java.net.Socket; import java.util.Scanner;

/* 功能1:客户端可以反复发消息,服务端可以反复收消息 小结: 通信是很严格的,对方怎么发你就怎么收,对方发多少你就只能收多少!! / public class ClientDemo {

  1. public static void main(String[] args) throws Exception {
  2. System.out.println("==客户端的启动==");
  3. // (1)创建一个Socket的通信管道,请求与服务端的端口连接。
  4. Socket socket = new Socket("127.0.0.1",8888);
  5. // (2)从Socket通信管道中得到一个字节输出流。
  6. OutputStream os = socket.getOutputStream();
  7. // (3)把字节流改装成自己需要的流进行数据的发送
  8. PrintStream ps = new PrintStream(os);
  9. // (4)开始发送消息
  10. Scanner sc = new Scanner(System.in);
  11. while(true){
  12. System.out.print("请说:");
  13. // 阻塞等待输入
  14. String msg = sc.nextLine();
  15. ps.println(msg);
  16. ps.flush();
  17. }
  18. }

}

  1. 本案例中确实可以实现客户端多发服务端多收** 但是服务端只能处理一个客户端的请求,因为服务端是单线程的**。**一次只能与一个客户端进行消息通信**。
  2. <a name="MTBgF"></a>
  3. ### BIO模式下接收多个客户端
  4. 在上述的案例中,**一个服务端只能接收一个客户端的通信请求,那么如果服务端需要处理很多个客户端的消**<br />**息通信请求应该如何处理呢,此时我们就需要在服务端引入线程了,也就是说客户端每发起一个请求,服务**<br />**端就创建一个新的线程来处理这个客户端的请求,这样就实现了一个客户端一个线程的模型,**图解模式如下:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12535721/1634175658544-cb038e8c-9861-4819-95bb-3de98025ca55.png#clientId=ucdd030a1-3ace-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=379&id=ue80a1adb&margin=%5Bobject%20Object%5D&name=image.png&originHeight=556&originWidth=730&originalType=binary&ratio=1&rotation=0&showTitle=false&size=86855&status=done&style=none&taskId=uf7b6905e-83b0-4a33-9a85-8665e82f414&title=&width=498)
  5. ```java
  6. package cn.java.money.bio.demo3;
  7. import java.io.BufferedReader;
  8. import java.io.InputStream;
  9. import java.io.InputStreamReader;
  10. import java.net.ServerSocket;
  11. import java.net.Socket;
  12. /**
  13. * 服务端
  14. */
  15. public class ServerDemo {
  16. public static void main(String[] args) throws Exception {
  17. System.out.println("==服务器的启动==");
  18. // (1)注册端口
  19. ServerSocket serverSocket = new ServerSocket(7777);
  20. while (true) {
  21. //(2)开始在这里暂停等待接收客户端的连接,得到一个端到端的Socket管道
  22. Socket socket = serverSocket.accept();
  23. new ServerReadThread(socket).start();
  24. System.out.println(socket.getRemoteSocketAddress() + "上线了!");
  25. }
  26. }
  27. }
  28. class ServerReadThread extends Thread {
  29. private Socket socket;
  30. public ServerReadThread(Socket socket) {
  31. this.socket = socket;
  32. }
  33. @Override
  34. public void run() {
  35. try {
  36. //(3)从Socket管道中得到一个字节输入流。
  37. InputStream is = socket.getInputStream();
  38. //(4)把字节输入流包装成自己需要的流进行数据的读取。
  39. BufferedReader br = new BufferedReader(new InputStreamReader(is));
  40. //(5)读取数据
  41. String line;
  42. while ((line = br.readLine()) != null) {
  43. System.out.println("服务端收到:" + socket.getRemoteSocketAddress() + ":" + line);
  44. }
  45. } catch (Exception e) {
  46. System.out.println(socket.getRemoteSocketAddress() + "下线了!");
  47. }
  48. }
  49. }
  1. package cn.java.money.bio.demo3;
  2. import java.io.OutputStream;
  3. import java.io.PrintStream;
  4. import java.net.Socket;
  5. import java.util.Scanner;
  6. /**
  7. * 功能1:客户端可以反复发,一个服务端可以接收无数个客户端的消息!!
  8. * 小结:
  9. * 服务器如果想要接收多个客户端,那么必须引入线程,一个客户端一个线程处理!!
  10. */
  11. public class ClientDemo {
  12. public static void main(String[] args) throws Exception {
  13. System.out.println("==客户端的启动==");
  14. // (1)创建一个Socket的通信管道,请求与服务端的端口连接。
  15. Socket socket = new Socket("127.0.0.1", 7777);
  16. // (2)从Socket通信管道中得到一个字节输出流。
  17. OutputStream os = socket.getOutputStream();
  18. // (3)把字节流改装成自己需要的流进行数据的发送
  19. PrintStream ps = new PrintStream(os);
  20. // (4)开始发送消息
  21. Scanner sc = new Scanner(System.in);
  22. while (true) {
  23. System.out.print("请说:");
  24. String msg = sc.nextLine();
  25. ps.println(msg);
  26. ps.flush();
  27. }
  28. }
  29. }
  • 每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
  • 每个线程都会占用栈空间和CPU资源;
  • 并不是每个socket都进行IO操作,无意义的线程处理;
  • 客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。(无限制的创建线程的问题)

    伪异步I/O编程

    在上述案例中:客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

接下来我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
image.png
反过来讲,我们并发的能力是有限的,这样是保护了服务端。

  1. package cn.java.money.bio.demo4;
  2. import java.io.BufferedReader;
  3. import java.io.InputStream;
  4. import java.io.InputStreamReader;
  5. import java.io.Reader;
  6. import java.net.ServerSocket;
  7. import java.net.Socket;
  8. public class Server {
  9. public static void main(String[] args) {
  10. try {
  11. System.out.println("----------服务端启动成功------------");
  12. ServerSocket ss = new ServerSocket(9999);
  13. // 一个服务端只需要对应一个线程池
  14. HandlerSocketThreadPool handlerSocketThreadPool = new HandlerSocketThreadPool(3, 1000);
  15. // 客户端可能有很多个
  16. while (true) {
  17. Socket socket = ss.accept(); // 阻塞式的!
  18. System.out.println("有人上线了!!");
  19. // 每次收到一个客户端的socket请求,都需要为这个客户端分配一个
  20. // 独立的线程 专门负责对这个客户端的通信!!
  21. handlerSocketThreadPool.execute(new ReaderClientRunnable(socket));
  22. }
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. class ReaderClientRunnable implements Runnable {
  29. private Socket socket;
  30. public ReaderClientRunnable(Socket socket) {
  31. this.socket = socket;
  32. }
  33. @Override
  34. public void run() {
  35. try {
  36. InputStream is = socket.getInputStream();
  37. // 转成一个缓冲字符流
  38. Reader fr = new InputStreamReader(is);
  39. BufferedReader br = new BufferedReader(fr);
  40. // 一行一行的读取数据
  41. String line = null;
  42. while ((line = br.readLine()) != null) { // 阻塞式的!!
  43. System.out.println("服务端收到了数据:" + line);
  44. }
  45. } catch (Exception e) {
  46. System.out.println("有人下线了");
  47. }
  48. }
  49. }
  1. package cn.java.money.bio.demo4;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. // 线程池处理类
  7. public class HandlerSocketThreadPool {
  8. // 线程池
  9. private ExecutorService executor;
  10. public HandlerSocketThreadPool(int maxPoolSize, int queueSize) {
  11. this.executor = new ThreadPoolExecutor(
  12. 3, // 8
  13. maxPoolSize,
  14. 120L,
  15. TimeUnit.SECONDS,
  16. new ArrayBlockingQueue<Runnable>(queueSize));
  17. }
  18. public void execute(Runnable task) {
  19. this.executor.execute(task);
  20. }
  21. }
  1. package cn.java.money.bio.demo4;
  2. import java.io.BufferedReader;
  3. import java.io.InputStreamReader;
  4. import java.io.OutputStream;
  5. import java.io.PrintWriter;
  6. import java.net.Socket;
  7. public class Client {
  8. public static void main(String[] args) {
  9. try {
  10. // 1.建立一个与服务端的Socket对象:套接字
  11. Socket socket = new Socket("127.0.0.1", 9999);
  12. // 2.从socket管道中获取一个输出流,写数据给服务端
  13. OutputStream os = socket.getOutputStream();
  14. // 3.把输出流包装成一个打印流
  15. PrintWriter pw = new PrintWriter(os);
  16. // 4.反复接收用户的输入
  17. BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
  18. String line;
  19. while ((line = br.readLine()) != null) {
  20. pw.println(line);
  21. pw.flush();
  22. }
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }

伪异步io采用了线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然是采用的同步阻塞模型,因此无法从根本上解决问题。

如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续socket的i/o消息都将在队列中排队。新的Socket请求将被拒绝,客户端会发生大量连接超时。

基于BIO形式下的文件上传

支持任意类型文件形式的上传。

  1. package cn.java.money.bio.demo5;
  2. import java.net.ServerSocket;
  3. import java.net.Socket;
  4. /**
  5. 目标:服务端开发,可以实现接收客户端的任意类型文件,并保存到服务端磁盘。
  6. */
  7. public class Server {
  8. public static void main(String[] args) {
  9. try{
  10. ServerSocket ss = new ServerSocket(8888);
  11. while (true){
  12. Socket socket = ss.accept();
  13. // 交给一个独立的线程来处理与这个客户端的文件通信需求。
  14. new ServerReaderThread(socket).start();
  15. }
  16. }catch (Exception e){
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  1. package cn.java.money.bio.demo5;
  2. import java.io.DataInputStream;
  3. import java.io.FileOutputStream;
  4. import java.io.OutputStream;
  5. import java.net.Socket;
  6. import java.util.UUID;
  7. public class ServerReaderThread extends Thread {
  8. private Socket socket;
  9. public ServerReaderThread(Socket socket){
  10. this.socket = socket;
  11. }
  12. @Override
  13. public void run() {
  14. try{
  15. // 1、得到一个数据输入流读取客户端发送过来的数据
  16. DataInputStream dis = new DataInputStream(socket.getInputStream());
  17. // 2、读取客户端发送过来的文件类型
  18. String suffix = dis.readUTF();
  19. System.out.println("服务端已经成功接收到了文件类型:" + suffix);
  20. // 3、定义一个字节输出管道负责把客户端发来的文件数据写出去
  21. OutputStream os = new FileOutputStream("d:\\"+ UUID.randomUUID().toString()+suffix);
  22. // 4、从数据输入流中读取文件数据,写出到字节输出流中去
  23. byte[] buffer = new byte[1024];
  24. int len;
  25. while((len = dis.read(buffer)) > 0){
  26. os.write(buffer,0, len);
  27. }
  28. os.close();
  29. System.out.println("服务端接收文件保存成功!");
  30. }catch (Exception e){
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  1. package cn.java.money.bio.demo5;
  2. import java.io.DataOutputStream;
  3. import java.io.FileInputStream;
  4. import java.io.InputStream;
  5. import java.net.Socket;
  6. /**
  7. 目标:实现客户端上传任意类型的文件数据给服务端保存起来。
  8. */
  9. public class Client {
  10. public static void main(String[] args) {
  11. try(
  12. InputStream is = new FileInputStream("d:\\aa.jpg");
  13. ){
  14. // 1、请求与服务端的Socket链接
  15. Socket socket = new Socket("127.0.0.1" , 8888);
  16. // 2、把字节输出流包装成一个数据输出流
  17. DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
  18. // 3、先发送上传文件的后缀给服务端
  19. dos.writeUTF(".jpg");
  20. // 4、把文件数据发送给服务端进行接收
  21. byte[] buffer = new byte[1024];
  22. int len;
  23. while((len = is.read(buffer)) > 0 ){
  24. dos.write(buffer , 0 , len);
  25. }
  26. dos.flush();
  27. Thread.sleep(10000);
  28. }catch (Exception e){
  29. e.printStackTrace();
  30. }
  31. }
  32. }

客户端怎么发,服务端就怎么接收。

Java BIO模式下的 端口转发思想

需求:需要实现一个客户端的消息可以发送给所有的客户端去接收。(群聊实现)
image.png
端口转发思想 在BIO模式下的运用。

端口转发思想案例:
服务端代码如下:

  1. package cn.java.money.bio.demo6;
  2. import java.net.ServerSocket;
  3. import java.net.Socket;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. /**
  7. * 1. 注册端口
  8. * 2. 接受客户端的socket连接,交给一个独立的线程来处理
  9. * 3. 把当前连接的客户端socket存入一个在线socket集合中
  10. * 4. 接受客户端的消息,然后推送给当前所有在线的socket接受
  11. */
  12. public class Server {
  13. public static List<Socket> allSocketOnLine = new ArrayList<>();
  14. public static void main(String[] args) {
  15. try {
  16. ServerSocket ss = new ServerSocket(9999);
  17. while (true) {
  18. //接受连接
  19. Socket socket = ss.accept();
  20. //保存连接到 集合
  21. allSocketOnLine.add(socket);
  22. //为当前连接成功的socket分配一个独立的线程来处理与之通信
  23. new ServerReaderThread(socket).start();
  24. }
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  1. package cn.java.money.bio.demo6;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.io.PrintStream;
  6. import java.net.Socket;
  7. public class ServerReaderThread extends Thread {
  8. private Socket socket;
  9. public ServerReaderThread(Socket socket) {
  10. this.socket = socket;
  11. }
  12. @Override
  13. public void run() {
  14. try (BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
  15. String msg;
  16. //这里收到消息 再转发到其他端口,体现的就是端口转发的思想
  17. while ((msg = br.readLine()) != null) {
  18. sendMsgToAllClient(msg);
  19. }
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. System.out.println("当前socket下线");
  23. Server.allSocketOnLine.remove(socket);
  24. }
  25. }
  26. // 把当前客户端发来的消息推送给 所有在线的客户端
  27. private void sendMsgToAllClient(String msg) throws IOException {
  28. for (Socket sk : Server.allSocketOnLine) {
  29. PrintStream printStream = new PrintStream(sk.getOutputStream());
  30. printStream.println(msg);
  31. printStream.flush();
  32. }
  33. }
  34. }

基于BIO模式下即时通信项目(TODO)