在 Go 中,输入和输出操作是通过使用原语来实现的,这些原语将数据建模为可以从其 read(读取)或 written(写入)的字节流。为此,Go io package 提供了接口 io.Reader io.Writer,分别用于数据输入和输出操作,如下图所示:

image.png

Go 自带很多 API,支持来自内存结构、文件、网络连接等资源的流式 IO。本篇文章主要介绍如何使用接口 io.Reader 和 io.Writer 创建能够使用自定义实现以及标准库中的接口进行流式处理数据的 Go 程序。

io.Reader

一个由 io.Reader 接口表示的 reader,从某个源读取数据到传输缓冲区,在那里它可以被流式传输和消费,如下图所示:

image.png

对于一个类型来说,它必须实现来自接口 io.Reader 的方法 Read(p []byte) (如下图所示)。

  1. type Reader interface {
  2. Read(p []byte) (n int, err error)
  3. }

Read() 方法的实现应该返回读取的字节数,如果发生错误,则返回错误。如果数据源已经用完了它的内容,Read 应该返回 io.EOF。

Reading 规则

  1. 如果可能的话,Read() 会将 len(p) 读入 p。

  2. 在 Read() 调用后,n 可能小于 len(p)。

  3. 当出错时,Read() 可能仍然会返回缓冲区 p 中的 n 个字节,例如,从一个突然关闭的 TCP 套接字中读取。根据你的用途,你可以选择保留 p 中的字节或者重试。

  4. 当 Read() 用尽可用数据时,读取器可能会返回一个非零的 n 和 err=io.EOF。然而,根据不同的实现,读取器可以选择在流的末尾返回一个非零的 n 和 err = nil。在这种情况下,任何后续的读取必须返回 n=0,err=io.EOF。

  5. 最后,对 Read() 的调用,如果返回 n=0,err=nil,并不意味着 EOF(结束),因为下一次对 Read() 的调用可能返回更多的数据。

正如你所看到的,直接从 reader 中正确地读取一个流是很棘手的。幸运的是,标准库中的 readers 遵循了合理的方法,使之易于流式传输。然而,在使用 readers 之前,请查阅它的文档。


readers 的数据流

直接从读取器流式传输数据是很容易的。方法 Read 被设计成在一个循环中调用,每一次迭代,它都会从源中读取一大块数据,并将其放入缓冲区 p 中。这个循环将继续下去,直到方法返回一个 io.EOF 错误。

下面是一个简单的例子,它使用 strings.NewReader(string) 创建的字符串 reader,从字符串源中流式传输字节值:

  1. func main() {
  2. reader := strings.NewReader("Clear is better than clever")
  3. p := make([]byte, 4)
  4. for {
  5. n, err := reader.Read(p)
  6. if err == io.EOF {
  7. break
  8. }
  9. fmt.Println(string(p[:n]))
  10. }
  11. }

上面的源代码用 make([]byte,4) 创建了一个 4 字节长的传输缓冲区 p。缓冲区有意保持小于字符串源的长度。这是为了演示如何正确地从比缓冲区大的源流式传输数据块。

Update:Reddit 上有人指出了之前的有一个 bug。代码永远不会捕捉到 non-nil err != io.EOF 的实例。以下是对代码的修正。

  1. func main() {
  2. reader := strings.NewReader("Clear is better than clever")
  3. p := make([]byte, 4)
  4. for {
  5. n, err := reader.Read(p)
  6. if err != nil{
  7. if err == io.EOF {
  8. fmt.Println(string(p[:n])) //should handle any remainding bytes.
  9. break
  10. }
  11. fmt.Println(err)
  12. os.Exit(1)
  13. }
  14. fmt.Println(string(p[:n]))
  15. }
  16. }


实现自定义的 io.Reader

上一节使用了标准库中现有的 IO reader 实现,现在让我们看看如何编写我们自己的实现。下面是一个简单的 io.Reader 的实现,它可以从流中过滤掉非字母字符。

  1. type alphaReader struct {
  2. src string
  3. cur int
  4. }
  5. func newAlphaReader(src string) *alphaReader {
  6. return &alphaReader{src: src}
  7. }
  8. func alpha(r byte) byte {
  9. if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
  10. return r
  11. }
  12. return 0
  13. }
  14. func (a *alphaReader) Read(p []byte) (int, error) {
  15. if a.cur >= len(a.src) {
  16. return 0, io.EOF
  17. }
  18. x := len(a.src) - a.cur
  19. n, bound := 0, 0
  20. if x >= len(p) {
  21. bound = len(p)
  22. } else if x <= len(p) {
  23. bound = x
  24. }
  25. buf := make([]byte, bound)
  26. for n < bound {
  27. if char := alpha(a.src[a.cur]); char != 0 {
  28. buf[n] = char
  29. }
  30. n++
  31. a.cur++
  32. }
  33. copy(p, buf)
  34. return n, nil
  35. }
  36. func main() {
  37. reader := newAlphaReader("Hello! It's 9am, where is the sun?")
  38. p := make([]byte, 4)
  39. for {
  40. n, err := reader.Read(p)
  41. if err == io.EOF {
  42. break
  43. }
  44. fmt.Print(string(p[:n]))
  45. }
  46. fmt.Println()
  47. }

执行该程序时,它将打印:

  1. $> go run alpha_reader.go
  2. HelloItsamwhereisthesun


链式 Readers

标准库中已经实现了许多 readers。使用一个 reader 作为另一个 reader 的数据源是一个常见的用法。这种 readers 链接允许一个 reader 重用另一个 reader 的逻辑,就像下面的源码片段一样,它更新了 alphaReader 以接受一个 io.Reader 作为它的源。通过将流管理问题推送给 root reader,这降低了代码的复杂性。

https://github.com/vladimirvivien/learning-go/blob/master/tutorial/io/alpha_reader3.go

这种方法的另一个优点是,alphaReader 现在能够从任何 reader 实现中读取。例如,下面的代码段显示了如何将 alphaReader 与 os.File 源结合起来,从文件中过滤出非字母字符。

  1. func main() {
  2. // use an os.File as source for alphaReader
  3. file, err := os.Open("./alpha_reader3.go")
  4. if err != nil {
  5. fmt.Println(err)
  6. os.Exit(1)
  7. }
  8. defer file.Close()
  9. reader := newAlphaReader(file)
  10. p := make([]byte, 4)
  11. for {
  12. n, err := reader.Read(p)
  13. if err == io.EOF {
  14. break
  15. }
  16. fmt.Print(string(p[:n]))
  17. }
  18. fmt.Println()
  19. }

https://github.com/vladimirvivien/learning-go/blob/master/tutorial/io/alpha_reader3.go


io.Writer

由接口 io.Writer 表示的 writer,从缓冲区流式传输数据并将其写入目标资源,如下图所示:

image.png

所有的流写入器都必须实现接口 io.Writer 中的方法 Write(p []byte)(如下所示)。该方法被设计为从缓冲区 p 中读取数据并将其写入指定的目标资源。

  1. type Writer interface {
  2. Write(p []byte) (n int, err error)
  3. }


Write() 方法的实现应该返回写入的字节数,或者在发生任何错误时返回一个错误。

使用 writers

标准库自带了许多预先实现的 io.Writer 类型。直接使用写入器工作很简单,如下面的代码片段所示,它使用 bytes.Buffer 类型作为 io.Writer 将数据写入内存缓冲区。

  1. func main() {
  2. proverbs := []string{
  3. "Channels orchestrate mutexes serialize",
  4. "Cgo is not Go",
  5. "Errors are values",
  6. "Don't panic",
  7. }
  8. var writer bytes.Buffer
  9. for _, p := range proverbs {
  10. n, err := writer.Write([]byte(p))
  11. if err != nil {
  12. fmt.Println(err)
  13. os.Exit(1)
  14. }
  15. if n != len(p) {
  16. fmt.Println("failed to write data")
  17. os.Exit(1)
  18. }
  19. }
  20. fmt.Println(writer.String())
  21. }


实现自定义的 io.Writer

本节的代码展示了如何实现一个名为 chanWriter 的自定义 io.Writer,它将其内容以字节序列的形式写入 Go channel。


  1. type chanWriter struct {
  2. ch chan byte
  3. }
  4. func newChanWriter() *chanWriter {
  5. return &chanWriter{make(chan byte, 1024)}
  6. }
  7. func (w *chanWriter) Chan() <-chan byte {
  8. return w.ch
  9. }
  10. func (w *chanWriter) Write(p []byte) (int, error) {
  11. n := 0
  12. for _, b := range p {
  13. w.ch <- b
  14. n++
  15. }
  16. return n, nil
  17. }
  18. func (w *chanWriter) Close() error {
  19. close(w.ch)
  20. return nil
  21. }
  22. func main() {
  23. writer := newChanWriter()
  24. go func() {
  25. defer writer.Close()
  26. writer.Write([]byte("Stream "))
  27. writer.Write([]byte("me!"))
  28. }()
  29. for c := range writer.Chan() {
  30. fmt.Printf("%c", c)
  31. }
  32. fmt.Println()
  33. }

要使用 writer,代码只需在函数 main() 中调用方法 writer.Write() (在一个单独的 goroutine 中)。因为 chanWriter 也实现了接口 io.Closer,所以调用方法 writer.Close() 来正确关闭 channel,以避免访问 channel 时出现死锁。

IO 有用的类型和包

如前所述,Go 标准库中自带了许多有用的函数和其他类型,可以方便地处理流式 IO。

os.File

os.File 类型表示本地系统上的一个文件,它同时实现了 io.Reader 和 io.Writer,因此可以在任何流 IO 上下文中使用。它同时实现了 io.Reader 和 io.Writer,因此,可以在任何流式 IO 上下文中使用。例如,下面的例子显示了如何将连续的字符串片段直接写入一个文件。

  1. func main() {
  2. proverbs := []string{
  3. "Channels orchestrate mutexes serialize\n",
  4. "Cgo is not Go\n",
  5. "Errors are values\n",
  6. "Don't panic\n",
  7. }
  8. file, err := os.Create("./proverbs.txt")
  9. if err != nil {
  10. fmt.Println(err)
  11. os.Exit(1)
  12. }
  13. defer file.Close()
  14. for _, p := range proverbs {
  15. n, err := file.Write([]byte(p))
  16. if err != nil {
  17. fmt.Println(err)
  18. os.Exit(1)
  19. }
  20. if n != len(p) {
  21. fmt.Println("failed to write data")
  22. os.Exit(1)
  23. }
  24. }
  25. fmt.Println("file write done")
  26. }

相反,类型 io.File 可以作为 reader,从本地文件系统流式传输文件内容。例如,下面的源代码段读取一个文件并打印其内容。

  1. func main() {
  2. file, err := os.Open("./proverbs.txt")
  3. if err != nil {
  4. fmt.Println(err)
  5. os.Exit(1)
  6. }
  7. defer file.Close()
  8. p := make([]byte, 4)
  9. for {
  10. n, err := file.Read(p)
  11. if err == io.EOF {
  12. break
  13. }
  14. fmt.Print(string(p[:n]))
  15. }
  16. }


标准输出,输入和错误

os package 公开了三个变量,os.Stdout、os.Stdin 和 os.Stderr,它们的类型是 *os.File,分别代表操作系统的标准输出、输入和错误的文件句柄。例如,下面的源代码片段直接打印到标准输出。


  1. func main() {
  2. proverbs := []string{
  3. "Channels orchestrate mutexes serialize\n",
  4. "Cgo is not Go\n",
  5. "Errors are values\n",
  6. "Don't panic\n",
  7. }
  8. for _, p := range proverbs {
  9. n, err := os.Stdout.Write([]byte(p))
  10. if err != nil {
  11. fmt.Println(err)
  12. os.Exit(1)
  13. }
  14. if n != len(p) {
  15. fmt.Println("failed to write data")
  16. os.Exit(1)
  17. }
  18. }
  19. }


io.Copy()

函数 io.Copy() 可以轻松地将数据从源 reader 流到目标 writer。它抽象出了 for-loop 模式(我们目前已经看到了),并正确处理 io.EOF 和字节数。

下面显示的是以前程序的一个简化版本,它将内存中 proberbs reader 的内容复制到 writer file 中:

  1. func main() {
  2. proverbs := new(bytes.Buffer)
  3. proverbs.WriteString("Channels orchestrate mutexes serialize\n")
  4. proverbs.WriteString("Cgo is not Go\n")
  5. proverbs.WriteString("Errors are values\n")
  6. proverbs.WriteString("Don't panic\n")
  7. file, err := os.Create("./proverbs.txt")
  8. if err != nil {
  9. fmt.Println(err)
  10. os.Exit(1)
  11. }
  12. defer file.Close()
  13. // copy from reader data into writer file
  14. if _, err := io.Copy(file, proverbs); err != nil {
  15. fmt.Println(err)
  16. os.Exit(1)
  17. }
  18. fmt.Println("file created")
  19. }

同理,我们可以重写一个之前的程序,使用 io.Copy() 函数从文件中读取并打印到标准输出,如下图所示:

  1. func main() {
  2. file, err := os.Open("./proverbs.txt")
  3. if err != nil {
  4. fmt.Println(err)
  5. os.Exit(1)
  6. }
  7. defer file.Close()
  8. if _, err := io.Copy(os.Stdout, file); err != nil {
  9. fmt.Println(err)
  10. os.Exit(1)
  11. }
  12. }


io.WriteString()

该函数提供了将字符串值写入指定 writer 方便的语法糖。

  1. func main() {
  2. file, err := os.Create("./magic_msg.txt")
  3. if err != nil {
  4. fmt.Println(err)
  5. os.Exit(1)
  6. }
  7. defer file.Close()
  8. if _, err := io.WriteString(file, "Go is fun!"); err != nil {
  9. fmt.Println(err)
  10. os.Exit(1)
  11. }
  12. }


writers 和 readers 管道

io.PipeWriter 和 io.PipeReader 两个类型模拟了内存管道中的 IO 操作。数据被写入管道的写入端,并在管道的读取端使用单独的 goroutines 进行读取。下面使用 io.Pipe() 创建管道 reader/writer 对,然后用它将数据从缓冲区 proverbs 复制到 io.Stdout:


  1. func main() {
  2. proverbs := new(bytes.Buffer)
  3. proverbs.WriteString("Channels orchestrate mutexes serialize\n")
  4. proverbs.WriteString("Cgo is not Go\n")
  5. proverbs.WriteString("Errors are values\n")
  6. proverbs.WriteString("Don't panic\n")
  7. piper, pipew := io.Pipe()
  8. // write in writer end of pipe
  9. go func() {
  10. defer pipew.Close()
  11. io.Copy(pipew, proverbs)
  12. }()
  13. // read from reader end of pipe.
  14. io.Copy(os.Stdout, piper)
  15. piper.Close()
  16. }


缓冲 IO

Go 通过 bufio package 支持缓冲 IO,这使得它可以很容易地处理文本内容。例如,下面的程序逐行读取以 ‘\n’ 分隔的文件内容:

  1. func main() {
  2. file, err := os.Open("./planets.txt")
  3. if err != nil {
  4. fmt.Println(err)
  5. os.Exit(1)
  6. }
  7. defer file.Close()
  8. reader := bufio.NewReader(file)
  9. for {
  10. line, err := reader.ReadString('\n')
  11. if err != nil {
  12. if err == io.EOF {
  13. break
  14. } else {
  15. fmt.Println(err)
  16. os.Exit(1)
  17. }
  18. }
  19. fmt.Print(line)
  20. }
  21. }


Util package

ioutil 包是 io 的一个子包,它为 IO 提供了几个方便的函数。例如,下面使用函数 ReadFile 将一个文件的内容加载到一个 []byte 中。

  1. package main
  2. import (
  3. "io/ioutil"
  4. ...
  5. )
  6. func main() {
  7. bytes, err := ioutil.ReadFile("./planets.txt")
  8. if err != nil {
  9. fmt.Println(err)
  10. os.Exit(1)
  11. }
  12. fmt.Printf("%s", bytes)
  13. }


原文链接

https://medium.com/learning-the-go-programming-language/streaming-io-in-go-d93507931185