经常用到 io 库,但还真没专门读过源码,这次一读发现很多平时没注意到的内容。
io.go
前面的接口都没啥好说的,就 Reader, Writer, Closer 三个的排列组合,以及一些其他简单接口。毕竟是个很底层的库,实现的都些简单的东西。
有一个平时经常用但没注意源码的函数,还挺有意思的,就是 io.Copy 这个函数调用了 copyBuffer 而它的源码如下
// src/io/io.go ---- line 381// copyBuffer is the actual implementation of Copy and CopyBuffer.// if buf is nil, one is allocated.func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {// If the reader has a WriteTo method, use it to do the copy.// Avoids an allocation and a copy.if wt, ok := src.(WriterTo); ok {return wt.WriteTo(dst)}// Similarly, if the writer has a ReadFrom method, use it to do the copy.if rt, ok := dst.(ReaderFrom); ok {return rt.ReadFrom(src)}if buf == nil {size := 32 * 1024if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {if l.N < 1 {size = 1} else {size = int(l.N)}}buf = make([]byte, size)}for {nr, er := src.Read(buf)if nr > 0 {nw, ew := dst.Write(buf[0:nr])if nw > 0 {written += int64(nw)}if ew != nil {err = ewbreak}if nr != nw {err = ErrShortWritebreak}}if er != nil {if er != EOF {err = er}break}}return written, err}
前面的两个检查很有趣,如果 src 有 WriteTo 方法,就直接调用这个方法搞定,如果 dst 有 ReadFrom 方法,就直接调用搞定。都没有再手动进行 Copy
再往后的 LimitedReader 和 SectionReader 都很好理解,字面意思
最后面还有个 TeeReader,每次调用 Read 都会把读到的内容写进内部的 Writer 里,想必是和 tee 命令差不多的功能罢(两者都没怎么用过
multi.go
有两个结构体 multiReader 和 multiWriter 即 Reader 切片和 Writer 切片,实现了一些用于
- 一次从切片中的多个
Reader中(顺序遍历)读数据到buffer - 一次把
buffer中的内容写进多个 Writer
的函数。
:::info
不过要注意两者的区别,multiReader 是不重复的,而 multiWriter 对切片中每个 Writer 写入的都是同样的内容。
:::
比如下面的情况(伪代码,知道那个意思就行了{{1, 2}, {3, 4}, {5, 6}} -> [5]buffer{} // buffer == {1, 2, 3, 4, 5}[2]data{1, 2} -> {{1, 2}, {3, 4}, {5, 6}} // {{1, 2}, {1, 2}, {1, 2}}
即 multiReader 是按顺序从 Reader 中读取内容,直到把 buffer 填满或者所有 Reader 都读完
而 multiWriter 是对给定的数据,对切片中每一个 Writer 都写入这个给定的内容,如果欲写入数据超过 Writer 可接受的长度会返回 io.ErrShortWrite
具体细节还是看源码更加清楚
// src/io/multi.go ---- line 17func (mr *multiReader) Read(p []byte) (n int, err error) {for len(mr.readers) > 0 {// Optimization to flatten nested multiReaders (Issue 13558).if len(mr.readers) == 1 {if r, ok := mr.readers[0].(*multiReader); ok {mr.readers = r.readerscontinue}}n, err = mr.readers[0].Read(p)if err == EOF {// Use eofReader instead of nil to avoid nil panic// after performing flatten (Issue 18232).mr.readers[0] = eofReader{} // permit earlier GCmr.readers = mr.readers[1:]}if n > 0 || err != EOF {if err == EOF && len(mr.readers) > 0 {// Don't return EOF yet. More readers remain.err = nil}return}}return 0, EOF}// src/io/multi.go ---- line 58func (t *multiWriter) Write(p []byte) (n int, err error) {for _, w := range t.writers {n, err = w.Write(p)if err != nil {return}if n != len(p) {err = ErrShortWritereturn}}return len(p), nil}
pipe.go
很酷的实现,同一个结构体两种封装,先看 pipe 结构体
// src/io/pipe.go ---- line 38// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.type pipe struct {wrMu sync.Mutex // Serializes Write operationswrCh chan []byterdCh chan intonce sync.Once // Protects closing donedone chan struct{}rerr onceErrorwerr onceError}// src/io/pipe.go ---- line 182// Pipe creates a synchronous in-memory pipe.// It can be used to connect code expecting an io.Reader// with code expecting an io.Writer.//// Reads and Writes on the pipe are matched one to one// except when multiple Reads are needed to consume a single Write.// That is, each Write to the PipeWriter blocks until it has satisfied// one or more Reads from the PipeReader that fully consume// the written data.// The data is copied directly from the Write to the corresponding// Read (or Reads); there is no internal buffering.//// It is safe to call Read and Write in parallel with each other or with Close.// Parallel calls to Read and parallel calls to Write are also safe:// the individual calls will be gated sequentially.func Pipe() (*PipeReader, *PipeWriter) {p := &pipe{wrCh: make(chan []byte),rdCh: make(chan int),done: make(chan struct{}),}return &PipeReader{p}, &PipeWriter{p}}
正如注释所说,用于连接需要 io.Reader 和需要 io.Writer 的代码,并且没有内置缓存区,因为是通过 channel 传输切片,新建 pipe 时该 channel 没有缓冲区,故读写一定要在不同的 goroutine 成对出现否则会阻塞。读写部分代码如下
// src/io/pipe.go ---- line 50func (p *pipe) Read(b []byte) (n int, err error) {select {case <-p.done:return 0, p.readCloseError()default:}select {case bw := <-p.wrCh:nr := copy(b, bw)p.rdCh <- nrreturn nr, nilcase <-p.done:return 0, p.readCloseError()}}// src/io/pipe.go ---- line 84func (p *pipe) Write(b []byte) (n int, err error) {select {case <-p.done:return 0, p.writeCloseError()default:p.wrMu.Lock()defer p.wrMu.Unlock()}for once := true; once || len(b) > 0; once = false {select {case p.wrCh <- b:nw := <-p.rdChb = b[nw:]n += nwcase <-p.done:return n, p.writeCloseError()}}return n, nil}
