BufferedInputStream和BufferedOutputStream分别是FilterInputStream和FilterOutputStream的子类,它们实现了输入输出流的缓存功能,我们来看它们是怎样实现的。
为什么要实现缓存功能呢?我们可以想一下,InputStream中的read和OutputStream中的write方法,是针对每个字节都会执行一次,如果是这种发送方式的话,会有很大的网络开销。BufferedInputStream和BufferedOutputStream能让我们避免对每个字节都执行read和write方法调用。

BufferedOutputStream

内部的缓存数组

BufferedOutputStream会将write等操作映射到内部的OutputStream中,但是它会加上一个字节数组来进行缓存。对于这个字节数组,BufferedOutputStream有两个字段比较重要:

  1. /**
  2. * The internal buffer where data is stored.
  3. */
  4. protected byte buf[];
  5. /**
  6. * The number of valid bytes in the buffer. This value is always
  7. * in the range <tt>0</tt> through <tt>buf.length</tt>; elements
  8. * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid
  9. * byte data.
  10. */
  11. protected int count;

第一个buf就是字节数组,第二个是数组中有效字节的数量,因为刚开始初始化时数组肯定是空的,然后会一个一个地添加,每添加一个count就会加1。所以count的范围是从0到buf.length。

构造方法

因为有了缓存数组,所以BufferedOutputStream的构造方法除了需要内部的OutputStream,还需要初始化内部的缓冲字节数组,它有两个构造方法:

  1. public BufferedOutputStream(OutputStream out) {
  2. this(out, 8192);
  3. }
  4. /**
  5. * Creates a new buffered output stream to write data to the
  6. * specified underlying output stream with the specified buffer
  7. * size.
  8. *
  9. * @param out the underlying output stream.
  10. * @param size the buffer size.
  11. * @exception IllegalArgumentException if size &lt;= 0.
  12. */
  13. public BufferedOutputStream(OutputStream out, int size) {
  14. super(out);
  15. if (size <= 0) {
  16. throw new IllegalArgumentException("Buffer size <= 0");
  17. }
  18. buf = new byte[size];
  19. }

第一个是默认内部缓存字节数组的数量是8192,第二个可以自定义这个字节数组的长度。

单字节write方法

然后我们来看BufferedOutputStream是怎么执行write方法的:

  1. public synchronized void write(int b) throws IOException {
  2. if (count >= buf.length) {
  3. flushBuffer();
  4. }
  5. buf[count++] = (byte)b;
  6. }

首先,它会判断count是不是大于等于缓冲数组的长度,也就是缓冲数组是否已满,如果没满,就直接将参数中的字节b添加到缓冲数组,如果缓冲数组已满,就调用flushBuffer方法,flushBuffer方法的如下:

  1. /** Flush the internal buffer */
  2. private void flushBuffer() throws IOException {
  3. if (count > 0) {
  4. out.write(buf, 0, count);
  5. count = 0;
  6. }
  7. }

它的逻辑也很简单,就是调用内部的OutputStream的write方法将内部缓存数组中的字节全部写入,然后将count重置为0。
所以BufferedOutputStream的write方法逻辑就是首先填充内部的缓存字节数组,字节数组填充满之后直接一次性将这些字节写入,然后重新再填充缓存。

多字节write方法

  1. public synchronized void write(byte b[], int off, int len) throws IOException {
  2. if (len >= buf.length) {
  3. /* If the request length exceeds the size of the output buffer,
  4. flush the output buffer and then write the data directly.
  5. In this way buffered streams will cascade harmlessly. */
  6. flushBuffer();
  7. out.write(b, off, len);
  8. return;
  9. }
  10. if (len > buf.length - count) {
  11. flushBuffer();
  12. }
  13. System.arraycopy(b, off, buf, count, len);
  14. count += len;
  15. }

多字节的write方法的逻辑也是首先判断缓冲字节数组是否已满,未满就将参数字节数组添加到缓冲字节数组中,前提是保证能添加进去,也就是缓冲字节数组当前的剩余长度大于要添加的字节数组的长度,如果不能添加,也要先调用flushBuffer方法来刷新缓存。
整体来说,BufferedOutputStream的缓存方式比较简单也容易理解,相比较而言,BufferedInputStream就相对要复杂一些,我们来看一下它是怎么实现缓存的。

BufferedInputStream

内部的缓存数组

BufferedInputStream与缓存数组相关的有下面三个字段:

  1. private static int DEFAULT_BUFFER_SIZE = 8192;
  2. /**
  3. * The internal buffer array where the data is stored. When necessary,
  4. * it may be replaced by another array of
  5. * a different size.
  6. */
  7. protected volatile byte buf[];
  8. /**
  9. * The index one greater than the index of the last valid byte in
  10. * the buffer.
  11. * This value is always
  12. * in the range <code>0</code> through <code>buf.length</code>;
  13. * elements <code>buf[0]</code> through <code>buf[count-1]
  14. * </code>contain buffered input data obtained
  15. * from the underlying input stream.
  16. */
  17. protected int count;

第一个是默认的缓存大小,第二个就是缓存数组,第三个count是缓存数组中有效字节的数量。

read方法的实现

  1. public synchronized int read() throws IOException {
  2. if (pos >= count) {
  3. fill();
  4. if (pos >= count)
  5. return -1;
  6. }
  7. return getBufIfOpen()[pos++] & 0xff;
  8. }

它首先判断pos是否大于等于count,也就是读到的位置是否大于等于缓存数组中字节的数量,如果是,说明缓存数组中的字节已经读完,需要重新填充缓存数组,填充缓存数组是通过调用fill方法实现的,我们来看一下它的逻辑:

  1. private void fill() throws IOException {
  2. byte[] buffer = getBufIfOpen();
  3. if (markpos < 0)
  4. pos = 0; /* no mark: throw away the buffer */
  5. else if (pos >= buffer.length) /* no room left in buffer */
  6. if (markpos > 0) { /* can throw away early part of the buffer */
  7. int sz = pos - markpos;
  8. System.arraycopy(buffer, markpos, buffer, 0, sz);
  9. pos = sz;
  10. markpos = 0;
  11. } else if (buffer.length >= marklimit) {
  12. markpos = -1; /* buffer got too big, invalidate mark */
  13. pos = 0; /* drop buffer contents */
  14. } else if (buffer.length >= MAX_BUFFER_SIZE) {
  15. throw new OutOfMemoryError("Required array size too large");
  16. } else { /* grow buffer */
  17. int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
  18. pos * 2 : MAX_BUFFER_SIZE;
  19. if (nsz > marklimit)
  20. nsz = marklimit;
  21. byte nbuf[] = new byte[nsz];
  22. System.arraycopy(buffer, 0, nbuf, 0, pos);
  23. if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
  24. // Can't replace buf if there was an async close.
  25. // Note: This would need to be changed if fill()
  26. // is ever made accessible to multiple threads.
  27. // But for now, the only way CAS can fail is via close.
  28. // assert buf == null;
  29. throw new IOException("Stream closed");
  30. }
  31. buffer = nbuf;
  32. }
  33. count = pos;
  34. int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
  35. if (n > 0)
  36. count = n + pos;
  37. }

逻辑有点多,自己看吧。