经常用到 io 库,但还真没专门读过源码,这次一读发现很多平时没注意到的内容。
io.go
前面的接口都没啥好说的,就 Reader, Writer, Closer
三个的排列组合,以及一些其他简单接口。毕竟是个很底层的库,实现的都些简单的东西。
有一个平时经常用但没注意源码的函数,还挺有意思的,就是 io.Copy
这个函数调用了 copyBuffer
而它的源码如下
// src/io/io.go ---- line 381
// copyBuffer is the actual implementation of Copy and CopyBuffer.
// if buf is nil, one is allocated.
func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
// If the reader has a WriteTo method, use it to do the copy.
// Avoids an allocation and a copy.
if wt, ok := src.(WriterTo); ok {
return wt.WriteTo(dst)
}
// Similarly, if the writer has a ReadFrom method, use it to do the copy.
if rt, ok := dst.(ReaderFrom); ok {
return rt.ReadFrom(src)
}
if buf == nil {
size := 32 * 1024
if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
if l.N < 1 {
size = 1
} else {
size = int(l.N)
}
}
buf = make([]byte, size)
}
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = ErrShortWrite
break
}
}
if er != nil {
if er != EOF {
err = er
}
break
}
}
return written, err
}
前面的两个检查很有趣,如果 src
有 WriteTo
方法,就直接调用这个方法搞定,如果 dst
有 ReadFrom
方法,就直接调用搞定。都没有再手动进行 Copy
再往后的 LimitedReader
和 SectionReader
都很好理解,字面意思
最后面还有个 TeeReader
,每次调用 Read
都会把读到的内容写进内部的 Writer
里,想必是和 tee
命令差不多的功能罢(两者都没怎么用过
multi.go
有两个结构体 multiReader
和 multiWriter
即 Reader
切片和 Writer
切片,实现了一些用于
- 一次从切片中的多个
Reader
中(顺序遍历)读数据到buffer
- 一次把
buffer
中的内容写进多个 Writer
的函数。
:::info
不过要注意两者的区别,multiReader
是不重复的,而 multiWriter
对切片中每个 Writer
写入的都是同样的内容。
:::
比如下面的情况(伪代码,知道那个意思就行了{{1, 2}, {3, 4}, {5, 6}} -> [5]buffer{} // buffer == {1, 2, 3, 4, 5}
[2]data{1, 2} -> {{1, 2}, {3, 4}, {5, 6}} // {{1, 2}, {1, 2}, {1, 2}}
即 multiReader
是按顺序从 Reader
中读取内容,直到把 buffer
填满或者所有 Reader
都读完
而 multiWriter
是对给定的数据,对切片中每一个 Writer
都写入这个给定的内容,如果欲写入数据超过 Writer
可接受的长度会返回 io.ErrShortWrite
具体细节还是看源码更加清楚
// src/io/multi.go ---- line 17
func (mr *multiReader) Read(p []byte) (n int, err error) {
for len(mr.readers) > 0 {
// Optimization to flatten nested multiReaders (Issue 13558).
if len(mr.readers) == 1 {
if r, ok := mr.readers[0].(*multiReader); ok {
mr.readers = r.readers
continue
}
}
n, err = mr.readers[0].Read(p)
if err == EOF {
// Use eofReader instead of nil to avoid nil panic
// after performing flatten (Issue 18232).
mr.readers[0] = eofReader{} // permit earlier GC
mr.readers = mr.readers[1:]
}
if n > 0 || err != EOF {
if err == EOF && len(mr.readers) > 0 {
// Don't return EOF yet. More readers remain.
err = nil
}
return
}
}
return 0, EOF
}
// src/io/multi.go ---- line 58
func (t *multiWriter) Write(p []byte) (n int, err error) {
for _, w := range t.writers {
n, err = w.Write(p)
if err != nil {
return
}
if n != len(p) {
err = ErrShortWrite
return
}
}
return len(p), nil
}
pipe.go
很酷的实现,同一个结构体两种封装,先看 pipe 结构体
// src/io/pipe.go ---- line 38
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int
once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
}
// src/io/pipe.go ---- line 182
// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the PipeWriter blocks until it has satisfied
// one or more Reads from the PipeReader that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func Pipe() (*PipeReader, *PipeWriter) {
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}
正如注释所说,用于连接需要 io.Reader
和需要 io.Writer
的代码,并且没有内置缓存区,因为是通过 channel 传输切片,新建 pipe 时该 channel 没有缓冲区,故读写一定要在不同的 goroutine 成对出现否则会阻塞。读写部分代码如下
// src/io/pipe.go ---- line 50
func (p *pipe) Read(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
select {
case bw := <-p.wrCh:
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
// src/io/pipe.go ---- line 84
func (p *pipe) Write(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b:
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}