经常用到 io 库,但还真没专门读过源码,这次一读发现很多平时没注意到的内容。

io.go

前面的接口都没啥好说的,就 Reader, Writer, Closer 三个的排列组合,以及一些其他简单接口。毕竟是个很底层的库,实现的都些简单的东西。


有一个平时经常用但没注意源码的函数,还挺有意思的,就是 io.Copy 这个函数调用了 copyBuffer 而它的源码如下

  1. // src/io/io.go ---- line 381
  2. // copyBuffer is the actual implementation of Copy and CopyBuffer.
  3. // if buf is nil, one is allocated.
  4. func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
  5. // If the reader has a WriteTo method, use it to do the copy.
  6. // Avoids an allocation and a copy.
  7. if wt, ok := src.(WriterTo); ok {
  8. return wt.WriteTo(dst)
  9. }
  10. // Similarly, if the writer has a ReadFrom method, use it to do the copy.
  11. if rt, ok := dst.(ReaderFrom); ok {
  12. return rt.ReadFrom(src)
  13. }
  14. if buf == nil {
  15. size := 32 * 1024
  16. if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
  17. if l.N < 1 {
  18. size = 1
  19. } else {
  20. size = int(l.N)
  21. }
  22. }
  23. buf = make([]byte, size)
  24. }
  25. for {
  26. nr, er := src.Read(buf)
  27. if nr > 0 {
  28. nw, ew := dst.Write(buf[0:nr])
  29. if nw > 0 {
  30. written += int64(nw)
  31. }
  32. if ew != nil {
  33. err = ew
  34. break
  35. }
  36. if nr != nw {
  37. err = ErrShortWrite
  38. break
  39. }
  40. }
  41. if er != nil {
  42. if er != EOF {
  43. err = er
  44. }
  45. break
  46. }
  47. }
  48. return written, err
  49. }

前面的两个检查很有趣,如果 srcWriteTo 方法,就直接调用这个方法搞定,如果 dstReadFrom 方法,就直接调用搞定。都没有再手动进行 Copy


再往后的 LimitedReaderSectionReader 都很好理解,字面意思


最后面还有个 TeeReader,每次调用 Read 都会把读到的内容写进内部的 Writer 里,想必是和 tee 命令差不多的功能罢(两者都没怎么用过

multi.go

有两个结构体 multiReadermultiWriterReader 切片和 Writer 切片,实现了一些用于

  • 一次从切片中的多个 Reader 中(顺序遍历)读数据到 buffer
  • 一次把 buffer 中的内容写进多个 Writer

的函数。

:::info 不过要注意两者的区别,multiReader 是不重复的,而 multiWriter 对切片中每个 Writer 写入的都是同样的内容。 :::

比如下面的情况(伪代码,知道那个意思就行了
{{1, 2}, {3, 4}, {5, 6}} -> [5]buffer{} // buffer == {1, 2, 3, 4, 5}
[2]data{1, 2} -> {{1, 2}, {3, 4}, {5, 6}} // {{1, 2}, {1, 2}, {1, 2}}
multiReader 是按顺序从 Reader 中读取内容,直到把 buffer 填满或者所有 Reader 都读完
multiWriter 是对给定的数据,对切片中每一个 Writer 都写入这个给定的内容,如果欲写入数据超过 Writer 可接受的长度会返回 io.ErrShortWrite
具体细节还是看源码更加清楚

  1. // src/io/multi.go ---- line 17
  2. func (mr *multiReader) Read(p []byte) (n int, err error) {
  3. for len(mr.readers) > 0 {
  4. // Optimization to flatten nested multiReaders (Issue 13558).
  5. if len(mr.readers) == 1 {
  6. if r, ok := mr.readers[0].(*multiReader); ok {
  7. mr.readers = r.readers
  8. continue
  9. }
  10. }
  11. n, err = mr.readers[0].Read(p)
  12. if err == EOF {
  13. // Use eofReader instead of nil to avoid nil panic
  14. // after performing flatten (Issue 18232).
  15. mr.readers[0] = eofReader{} // permit earlier GC
  16. mr.readers = mr.readers[1:]
  17. }
  18. if n > 0 || err != EOF {
  19. if err == EOF && len(mr.readers) > 0 {
  20. // Don't return EOF yet. More readers remain.
  21. err = nil
  22. }
  23. return
  24. }
  25. }
  26. return 0, EOF
  27. }
  28. // src/io/multi.go ---- line 58
  29. func (t *multiWriter) Write(p []byte) (n int, err error) {
  30. for _, w := range t.writers {
  31. n, err = w.Write(p)
  32. if err != nil {
  33. return
  34. }
  35. if n != len(p) {
  36. err = ErrShortWrite
  37. return
  38. }
  39. }
  40. return len(p), nil
  41. }

pipe.go

很酷的实现,同一个结构体两种封装,先看 pipe 结构体

  1. // src/io/pipe.go ---- line 38
  2. // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
  3. type pipe struct {
  4. wrMu sync.Mutex // Serializes Write operations
  5. wrCh chan []byte
  6. rdCh chan int
  7. once sync.Once // Protects closing done
  8. done chan struct{}
  9. rerr onceError
  10. werr onceError
  11. }
  12. // src/io/pipe.go ---- line 182
  13. // Pipe creates a synchronous in-memory pipe.
  14. // It can be used to connect code expecting an io.Reader
  15. // with code expecting an io.Writer.
  16. //
  17. // Reads and Writes on the pipe are matched one to one
  18. // except when multiple Reads are needed to consume a single Write.
  19. // That is, each Write to the PipeWriter blocks until it has satisfied
  20. // one or more Reads from the PipeReader that fully consume
  21. // the written data.
  22. // The data is copied directly from the Write to the corresponding
  23. // Read (or Reads); there is no internal buffering.
  24. //
  25. // It is safe to call Read and Write in parallel with each other or with Close.
  26. // Parallel calls to Read and parallel calls to Write are also safe:
  27. // the individual calls will be gated sequentially.
  28. func Pipe() (*PipeReader, *PipeWriter) {
  29. p := &pipe{
  30. wrCh: make(chan []byte),
  31. rdCh: make(chan int),
  32. done: make(chan struct{}),
  33. }
  34. return &PipeReader{p}, &PipeWriter{p}
  35. }

正如注释所说,用于连接需要 io.Reader 和需要 io.Writer 的代码,并且没有内置缓存区,因为是通过 channel 传输切片,新建 pipe 时该 channel 没有缓冲区,故读写一定要在不同的 goroutine 成对出现否则会阻塞。读写部分代码如下

  1. // src/io/pipe.go ---- line 50
  2. func (p *pipe) Read(b []byte) (n int, err error) {
  3. select {
  4. case <-p.done:
  5. return 0, p.readCloseError()
  6. default:
  7. }
  8. select {
  9. case bw := <-p.wrCh:
  10. nr := copy(b, bw)
  11. p.rdCh <- nr
  12. return nr, nil
  13. case <-p.done:
  14. return 0, p.readCloseError()
  15. }
  16. }
  17. // src/io/pipe.go ---- line 84
  18. func (p *pipe) Write(b []byte) (n int, err error) {
  19. select {
  20. case <-p.done:
  21. return 0, p.writeCloseError()
  22. default:
  23. p.wrMu.Lock()
  24. defer p.wrMu.Unlock()
  25. }
  26. for once := true; once || len(b) > 0; once = false {
  27. select {
  28. case p.wrCh <- b:
  29. nw := <-p.rdCh
  30. b = b[nw:]
  31. n += nw
  32. case <-p.done:
  33. return n, p.writeCloseError()
  34. }
  35. }
  36. return n, nil
  37. }