BufferedInputStream和BufferedOutputStream分别是FilterInputStream和FilterOutputStream的子类,它们实现了输入输出流的缓存功能,我们来看它们是怎样实现的。
为什么要实现缓存功能呢?我们可以想一下,InputStream中的read和OutputStream中的write方法,是针对每个字节都会执行一次,如果是这种发送方式的话,会有很大的网络开销。BufferedInputStream和BufferedOutputStream能让我们避免对每个字节都执行read和write方法调用。
BufferedOutputStream
内部的缓存数组
BufferedOutputStream会将write等操作映射到内部的OutputStream中,但是它会加上一个字节数组来进行缓存。对于这个字节数组,BufferedOutputStream有两个字段比较重要:
/**
* The internal buffer where data is stored.
*/
protected byte buf[];
/**
* The number of valid bytes in the buffer. This value is always
* in the range <tt>0</tt> through <tt>buf.length</tt>; elements
* <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid
* byte data.
*/
protected int count;
第一个buf就是字节数组,第二个是数组中有效字节的数量,因为刚开始初始化时数组肯定是空的,然后会一个一个地添加,每添加一个count就会加1。所以count的范围是从0到buf.length。
构造方法
因为有了缓存数组,所以BufferedOutputStream的构造方法除了需要内部的OutputStream,还需要初始化内部的缓冲字节数组,它有两个构造方法:
public BufferedOutputStream(OutputStream out) {
this(out, 8192);
}
/**
* Creates a new buffered output stream to write data to the
* specified underlying output stream with the specified buffer
* size.
*
* @param out the underlying output stream.
* @param size the buffer size.
* @exception IllegalArgumentException if size <= 0.
*/
public BufferedOutputStream(OutputStream out, int size) {
super(out);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
第一个是默认内部缓存字节数组的数量是8192,第二个可以自定义这个字节数组的长度。
单字节write方法
然后我们来看BufferedOutputStream是怎么执行write方法的:
public synchronized void write(int b) throws IOException {
if (count >= buf.length) {
flushBuffer();
}
buf[count++] = (byte)b;
}
首先,它会判断count是不是大于等于缓冲数组的长度,也就是缓冲数组是否已满,如果没满,就直接将参数中的字节b添加到缓冲数组,如果缓冲数组已满,就调用flushBuffer方法,flushBuffer方法的如下:
/** Flush the internal buffer */
private void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}
它的逻辑也很简单,就是调用内部的OutputStream的write方法将内部缓存数组中的字节全部写入,然后将count重置为0。
所以BufferedOutputStream的write方法逻辑就是首先填充内部的缓存字节数组,字节数组填充满之后直接一次性将这些字节写入,然后重新再填充缓存。
多字节write方法
public synchronized void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
/* If the request length exceeds the size of the output buffer,
flush the output buffer and then write the data directly.
In this way buffered streams will cascade harmlessly. */
flushBuffer();
out.write(b, off, len);
return;
}
if (len > buf.length - count) {
flushBuffer();
}
System.arraycopy(b, off, buf, count, len);
count += len;
}
多字节的write方法的逻辑也是首先判断缓冲字节数组是否已满,未满就将参数字节数组添加到缓冲字节数组中,前提是保证能添加进去,也就是缓冲字节数组当前的剩余长度大于要添加的字节数组的长度,如果不能添加,也要先调用flushBuffer方法来刷新缓存。
整体来说,BufferedOutputStream的缓存方式比较简单也容易理解,相比较而言,BufferedInputStream就相对要复杂一些,我们来看一下它是怎么实现缓存的。
BufferedInputStream
内部的缓存数组
BufferedInputStream与缓存数组相关的有下面三个字段:
private static int DEFAULT_BUFFER_SIZE = 8192;
/**
* The internal buffer array where the data is stored. When necessary,
* it may be replaced by another array of
* a different size.
*/
protected volatile byte buf[];
/**
* The index one greater than the index of the last valid byte in
* the buffer.
* This value is always
* in the range <code>0</code> through <code>buf.length</code>;
* elements <code>buf[0]</code> through <code>buf[count-1]
* </code>contain buffered input data obtained
* from the underlying input stream.
*/
protected int count;
第一个是默认的缓存大小,第二个就是缓存数组,第三个count是缓存数组中有效字节的数量。
read方法的实现
public synchronized int read() throws IOException {
if (pos >= count) {
fill();
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}
它首先判断pos是否大于等于count,也就是读到的位置是否大于等于缓存数组中字节的数量,如果是,说明缓存数组中的字节已经读完,需要重新填充缓存数组,填充缓存数组是通过调用fill方法实现的,我们来看一下它的逻辑:
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) /* no room left in buffer */
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
} else { /* grow buffer */
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
逻辑有点多,自己看吧。