参考 《Netty 权威指南》
传统的 BIO 编程
网络编程的基本模型是 Client/Server 模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的 IP 地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
在基于传统同步阻塞模型开发中,ServerSocket 负责绑定 IP 地址,启动监听端口,Socket 负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。
BIO 通信模型图
采用 BIO 通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是经典的一请求一应答通信模式,如下图所示。
同步阻塞 I/O 服务端通信模型(一客户端一线程)
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈 1:1 的正比关系,由于线程是 Java 虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。
BIO 案例分析
下面通过一个简单的案例来看看 Java BIO 是怎么编写服务端和客户端的。案例实现的功能是:客户端向服务端发送一个需要计算的表达式的指令,服务端解析表达式内容,计算出结果返回给客户端。
源码地址:https://gitee.com/yin_jw/demo/tree/master/netty-demo
服务端
public class Server {
// 默认端口号
private static final int DEFAULT_SERVER_PORT = 8888;
// 服务端
private static ServerSocket serverSocket;
/**
* 启动服务
*
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (serverSocket != null) {
return;
}
try {
serverSocket = new ServerSocket(DEFAULT_SERVER_PORT);
System.out.println("服务已启动,端口号:" + DEFAULT_SERVER_PORT);
while (true) {
Socket socket = serverSocket.accept();
// 调用服务处理类处理
new Thread(new ServerHandler(socket)).start();
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
serverSocket = null;
}
}
}
Server 端通过一个无限循环来监听客户端的连接,如果没有客户端接入,则主线程阻塞在 ServerSocket 的 accept 操作上。
当有新的客户端接入的时候,以 Socket 为参数构造 ServerHandler 对象,ServerHandler 是一个 Runnable,使用它创建一个客户端线程处理这条 Socket 链路。
ServerHandler 主要是通过 Socket 对象获得输入输出流,处理客户端请求。
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
super();
this.socket = socket;
}
@Override
public void run() {
BufferedReader br = null;
PrintWriter pw = null;
try {
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
pw = new PrintWriter(socket.getOutputStream(), true);
while (true) {
// 接收客户端的表达式消息
String expression = null;
if ((expression = br.readLine()) == null) {
break;
}
System.out.println("服务端收到消息:" + expression);
// 计算表达式,返回计算机结果给客户端
double result = calculate(expression);
pw.println(result);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IoUtils.close(br, pw, socket);
}
}
/**
* 计算客户端的表达式
*
* @param expression
*/
private double calculate(String expression) {
double result = 0;
int index = 0;
// 加
if ((index = expression.indexOf("+")) != -1) {
double one = Double.parseDouble(expression.substring(0, index).trim());
double two = Double.parseDouble(expression.substring(index + 1, expression.length()).trim());
result = one + two;
}
// 减
if ((index = expression.indexOf("-")) != -1) {
double one = Double.parseDouble(expression.substring(0, index).trim());
double two = Double.parseDouble(expression.substring(index + 1, expression.length()).trim());
result = one - two;
}
// 乘
if ((index = expression.indexOf("*")) != -1) {
double one = Double.parseDouble(expression.substring(0, index).trim());
double two = Double.parseDouble(expression.substring(index + 1, expression.length()).trim());
result = one * two;
}
// 除
if ((index = expression.indexOf("/")) != -1) {
double one = Double.parseDouble(expression.substring(0, index).trim());
double two = Double.parseDouble(expression.substring(index + 1, expression.length()).trim());
result = one / two;
}
return result;
}
}
客户端
客户端通过 Socket 对象连接服务端,通过输入、输出流与服务端通信。
public class Client {
private static final String DEFAULT_SERVER_IP = "127.0.0.1";
private static final int DEFAULT_SERVER_PORT = 8888;
public static void main(String[] args) throws Exception {
char[] op = {'+', '-', '*', '/'};
Random random = new Random();
for (int i = 0; i < 5; i++) {
Client.send(random.nextInt(100) + "" + op[random.nextInt(4)] + random.nextInt(100));
Thread.sleep(1000);
System.out.println();
}
}
/**
* 客户端发送消息
*
* @param expression 表达式
*/
public static void send(String expression) {
Socket socket = null;
PrintWriter pw = null;
BufferedReader br = null;
try {
socket = new Socket(DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT);
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
pw = new PrintWriter(socket.getOutputStream(), true);
// 发送表达式
pw.println(expression);
// 接收计算结果
String result = br.readLine();
System.out.println("表达式:" + expression);
System.out.println("服务端计算结果:" + result);
} catch (IOException e) {
e.printStackTrace();
} finally {
IoUtils.close(br, pw, socket);
}
}
}
通过上面的案例,我们发现,BIO 主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新接入的客户端链路,一个线程只能处理一个客户端连接。在高性能服务器应用领域,往往需要面向成千上万个客户端的并发连接,这种模型显然无法满足高性能、高并发接入的场景。
伪异步 I/O编程
伪异步 I/O 模型图
为了改进一线程一连接的模型结构,引入线程池工具,后端通过一个线程池来处理多个客户端的请求接入。通过线程池可以灵活地调配线程资源,控制线程数量,防止由于海量并发接入导致服务端线程耗尽。
伪异步 I/O 模型图如下所示。
当有新的客户端接入时,将客户端的 Socket 封装成一个 Task(该任务实现 java.lang.Runnable 接口)投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
伪异步 I/O 案例分析
我们只需要对服务端代码进行一些改造,引入线程池工具,就可以实现伪异步 I/O。
public class Server {
// 默认端口号
private static final int DEFAULT_SERVER_PORT = 8888;
// 服务端
private static ServerSocket serverSocket;
/**
* 启动服务
*
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (serverSocket != null) {
return;
}
try {
serverSocket = new ServerSocket(DEFAULT_SERVER_PORT);
System.out.println("服务已启动,端口号:" + DEFAULT_SERVER_PORT);
while (true) {
Socket socket = serverSocket.accept();
// 调用服务处理类处理
Executor executor = getThreadPool(50, 10000);
executor.execute(new ServerHandler(socket));
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
serverSocket = null;
}
}
/**
* 获取线程池
*
* @param maxPoolSize
* @param queueSize
* @return
*/
public static Executor getThreadPool(int maxPoolSize, int queueSize) {
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize,
120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize));
}
}
在处理客户端请求的时候,不再是一个请求创建一个线程了,而是通过线程池来统一管理。由于线程池和消息队列都是有界的,因此,无论客户端并发连接数多大,都不会导致线程个数过多,相比传统的一连接一线程的模式是一种改良。
伪异步I/O弊端分析
因为伪异步 I/O 底层的通信依然采用同步阻塞模型,因此它依然存在弊端,我们看下面两个 Java 同步 I/O 的 API 说明。
我们先来看输入流 InputStream 的 read() 方法:
/**
* Reads some number of bytes from the input stream and stores them into
* the buffer array <code>b</code>. The number of bytes actually read is
* returned as an integer. This method blocks until input data is
* available, end of file is detected, or an exception is thrown.
*
* <p> If the length of <code>b</code> is zero, then no bytes are read and
* <code>0</code> is returned; otherwise, there is an attempt to read at
* least one byte. If no byte is available because the stream is at the
* end of the file, the value <code>-1</code> is returned; otherwise, at
* least one byte is read and stored into <code>b</code>.
*
* <p> The first byte read is stored into element <code>b[0]</code>, the
* next one into <code>b[1]</code>, and so on. The number of bytes read is,
* at most, equal to the length of <code>b</code>. Let <i>k</i> be the
* number of bytes actually read; these bytes will be stored in elements
* <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>,
* leaving elements <code>b[</code><i>k</i><code>]</code> through
* <code>b[b.length-1]</code> unaffected.
*
* <p> The <code>read(b)</code> method for class <code>InputStream</code>
* has the same effect as: <pre><code> read(b, 0, b.length) </code></pre>
*
* @param b the buffer into which the data is read.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the stream has been reached.
* @exception IOException If the first byte cannot be read for any reason
* other than the end of the file, if the input stream has been closed, or
* if some other I/O error occurs.
* @exception NullPointerException if <code>b</code> is <code>null</code>.
* @see java.io.InputStream#read(byte[], int, int)
*/
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
注意这句话:
This method blocks until input data is available, end of file is detected, or an exception is thrown.
当对 Socket 的输入流进行读取操作的时候,它会一直阻塞下去,直到发生如下三种事件。
- 有数据可读。
- 可用数据已经读取完毕。
- 发生空指针或者 I/O 异常。
这意味着当对方发送请求或者应答消息比较缓慢,或者网络传输较慢时,读取输入流一方的通信线程将被长时间阻塞,如果对方要60s才能够将数据发送完成,读取一方的 I/O 线程也将会被同步阻塞60s,在此期间,其他接入消息只能在消息队列中排队。
下面我们接着对输出流 OutputStream 的 write() 方法进行分析,还是看 JDK I/O 类库输出流的 API 文档,然后结合文档说明进行故障分析。
/**
* Writes <code>b.length</code> bytes from the specified byte array
* to this output stream. The general contract for <code>write(b)</code>
* is that it should have exactly the same effect as the call
* <code>write(b, 0, b.length)</code>.
*
* @param b the data.
* @exception IOException if an I/O error occurs.
* @see java.io.OutputStream#write(byte[], int, int)
*/
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
当调用 OutputStream 的 write 方法写输出流的时候,它将会被阻塞,直到所有要发送的字节全部写入完毕,或者发生异常。学习过 TCP/IP 相关知识的人都知道,当消息的接收方处理缓慢的时候,将不能及时地从 TCP 缓冲区读取数据,这将会导致发送方的 TCP window size 不断减小,直到为 0,双方处于 Keep-Alive 状态,消息发送方将不能再向 TCP 缓冲区写入消息,这时如果采用的是同步阻塞 I/O, write 操作将会被无限期阻塞,直到 TCP window size 大于0或者发生 I/O 异常。
通过对输入和输出流的 API 文档进行分析,我们了解到读和写操作都是同步阻塞的,阻塞的时间取决于对方 l/O 线程的处理速度和网络 I/O 的传输速度。本质上来讲,我们无法保证生产环境的网络状况和对端的应用程序能足够快,如果我们的应用程序依赖对方的处理速度,它的可靠性就非常差。也许在实验室进行的性能测试结果令人满意,但是一旦上线运行,面对恶劣的网络环境和良莠不齐的第三方系统,问题就会如火山一样喷发。
伪异步 l/O 实际上仅仅是对之前 I/O 线程模型的一个简单优化,它无法从根本上解決同步 I/O 导致的通信线程阻塞问题。下面我们就简单分析下通信对方返回应答时间过长会引起的级联故障。
- 服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms。
- 采用伪异步 I/O 的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s。
- 假如所有的可用线程都被故障服务器阻塞,那后续所有的 I/O 消息都将在队列中排队。
- 由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
- 由于前端只有一个 Accptor 线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
- 由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息。
作者:殷建卫 链接:https://www.yuque.com/yinjianwei/vyrvkf/ito2g7 来源:殷建卫 - 架构笔记 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。