一、 前言

课题内容:10G的数字大文件,只有4G内存,如何排序?

  1. 创造一个数字大文件,先不创建10G,可以写入10万个数做测试,只有800KB左右!
  2. 对文件进行排序

参考博客:

  1. 归并排序
  2. 外部排序
  3. 超大数据量排序
  4. 大数据排序算法总结
  5. 模拟一个10G超大文件的排序

首先想到归并排序,归并排序是典型的分治算法,它不断地将某个数组分为两个部分,分别对左子数组与右子数组进行排序,然后将两个数组合并为新的有序数组。

一般来说,小文件单机版排序比并发排序更快一些,但是大文件无法一次性读入内存处理,需要分块处理。单机版排序无法做到分布式排序,分布式排序需要联网进行,网络传输跟通道传输非常类似,稍作修改就可以做到在网络上进行排序。

也就是说,大文件做并发更快!
并行和并发是有区别的,依赖操作系统和硬件是否支持。同时,在允许的情况下,使用有缓冲的通道和使用文件读写缓冲,有利于加快程序的处理速度,避免及时阻塞。
大数字文件小内存排序 - 图1

二、 测试排序是否可执行

1. 往通道写入可变参数的整数ArrSort

  1. package arraySort
  2. // 往通道写入可变参数的整数
  3. func ArrSort(a ...int) <-chan int {
  4. out := make(chan int)
  5. go func() {
  6. for _, v := range a {
  7. out <- v
  8. }
  9. close(out)
  10. }()
  11. return out
  12. }

2. 按升序排序InMemSort

对一个整数数组进行排序,系统库中已经提供了排序函数(这个也可以自己写),直接使用即可。

需要注意的是,Go中使用通道,使用不当会造成死锁。需要注意通道的使用,否则一运行就发生竞争条件造成程序无法执行而终止。

通道使用注意事项详解(这块找找资料附链接)
参考:通道用例大全(go语言白皮书)

通常来说,协程的使用是成对出现的,并且习惯用Go匿名函数来操作。需要留意匿名函数的传参问题,可以去官方文档进行查看(这块找找资料附:链接)。go程运行时,函数调用是瞬间返回的。如果在其它地方使用range操作来处理通道,在通道数据发送完毕时必须显示关闭通道,否则必是死锁错误。

  1. package arraySort
  2. /*数组排序
  3. 对一个整数数组进行排序,系统库中已经提供了排序函数,直接使用即可。
  4. 需要注意的是,Go中使用通道,使用不当会造成死锁。
  5. 需要注意通道的使用,否则一运行就发生竞争条件造成程序无法执行而终止。
  6. 通常来说,协程的使用是成对出现的,并且习惯用Go匿名函数来操作。
  7. 需要留意匿名函数的传参问题,可以去官方文档进行查看。go程运行时,函数调用是瞬间返回的。
  8. 如果在其它地方使用range操作来处理通道,在通道数据发送完毕时必须显示关闭通道,否则必是死锁错误。
  9. */
  10. import (
  11. // "fmt"
  12. "sort"
  13. // "time"
  14. )
  15. // var startTime time.Time
  16. // func init() {
  17. // startTime = time.Now()
  18. // }
  19. // 按升序排序
  20. func InMemSort(in <-chan int) <-chan int {
  21. out := make(chan int, 4096)
  22. // 启动一个协程输出排序数据
  23. go func() {
  24. var a []int
  25. for v := range in {
  26. a = append(a, v)
  27. }
  28. // fmt.Println("Read done: ", time.Now().Sub(startTime))
  29. sort.Ints(a)
  30. // fmt.Println("Sort done: ", time.Now().Sub(startTime))
  31. // 将排序好的数据输出至通道中
  32. for _, n := range a {
  33. out <- n
  34. }
  35. close(out)
  36. }()
  37. return out
  38. }

3. 合并两个有序数组Merge

归并排序是分别进行的,每一块排序完成后,需要等待另外的块排序完毕后,才可继续进行。通道能很方便的做到这个操作,每一个读通道都是阻塞的,在未读取到数据之前,始终等待。

合并两个有序数组时,也要遵循一定的顺序合并,最终的数据才是有顺序的。这块是程序执行最难的一处,无法并行排序,只能一次合并有序数组。

  1. package arraySort
  2. /*合并两个有序数组
  3. 归并排序是分别进行的,每一块排序完成后,需要等待另外的块排序完毕后,才可继续进行。
  4. 通道能很方便的做到这个操作,每一个读通道都是阻塞的,在未读取到数据之前,始终等待。
  5. 合并两个有序数组时,也要遵循一定的顺序合并,最终的数据才是有顺序的。
  6. 这块是程序执行最难的一处,无法并行排序,只能一次合并有序数组。
  7. */
  8. import (
  9. // "fmt"
  10. // "time"
  11. )
  12. // 按升序合并两个通道中数据
  13. func Merge(in1, in2 <-chan int) <-chan int {
  14. out := make(chan int, 4096)
  15. go func() {
  16. v1, ok1 := <-in1
  17. v2, ok2 := <-in2
  18. for ok1 || ok2 {
  19. if !ok2 || (ok1 && v1 <= v2) {
  20. out <- v1
  21. v1, ok1 = <-in1
  22. } else {
  23. out <- v2
  24. v2, ok2 = <-in2
  25. }
  26. }
  27. close(out)
  28. // fmt.Println("Merge done: ", time.Now().Sub(startTime))
  29. }()
  30. return out
  31. }

4. 主函数

  1. func sortLogin() {
  2. fmt.Println("======1. 往通道写入可变参数的整数========")
  3. ch1 := arraySort.ArrSort(1, 4, 6, 2, 18, 3, 26, 5) //<-chan int
  4. ch2 := arraySort.ArrSort(100, 400, 600, 200, 1800, 300, 2600, 50)
  5. fmt.Println("======2. 按升序排序========")
  6. fmt.Println("======3. 合并两个有序数组========")
  7. p := arraySort.Merge(arraySort.InMemSort(ch1), arraySort.InMemSort(ch2))
  8. for m := range p {
  9. fmt.Println(m)
  10. }
  11. }

输出如下,可以看到两两递归排序是可行的!
image.png

三 、 生成随机数文件

1. 生成指定数量的随机整数RandomSource

生成指定大小的随机整数数据来测试用。
生成简单整数即可。

  1. package arraySort
  2. /*生成样本数据
  3. 生成指定大小的随机整数数据来测试用。
  4. 生成简单整数即可。
  5. */
  6. import (
  7. // "crypto/rand"
  8. "math/rand"
  9. )
  10. // 生成指定数量的随机整数
  11. func RandomSource(count int) <-chan int {
  12. out := make(chan int)
  13. go func() {
  14. for i := 0; i < count; i++ {
  15. out <- rand.Int()
  16. }
  17. close(out)
  18. }()
  19. return out
  20. }
  21. /*
  22. rand.Seed(time.Now().UnixNano()) // 设置种子数为当前时间
  23. var src [100]int
  24. fmt.Println("=====生成随机数组====")
  25. for i := 0; i < 100; i++ {
  26. src[i] = rand.Intn(100)
  27. }
  28. */

2. 用缓冲的方式将数据写入文件WriteSink

写文件基本都类似,也是按二进制方式写入数据到文件。
该处写文件不需要另起协程,当然也可以起协程,效果不明显。

  1. package arraySort
  2. /*写数据到文件
  3. 写文件基本都类似,也是按二进制方式写入数据到文件。
  4. 该处写文件不需要另起协程,当然也可以起协程,效果不明显。
  5. */
  6. import (
  7. "encoding/binary"
  8. "io"
  9. )
  10. // 向可写输入写入数据
  11. func WriteSink(writer io.Writer, in <-chan int) {
  12. for m := range in {
  13. buffer := make([]byte, 8)
  14. binary.BigEndian.PutUint64(buffer, uint64(m))
  15. writer.Write(buffer)
  16. }
  17. }

3. 用缓冲的方式读取文件ReadSource

Go习惯用法是接口类型,即可读数据源均可,包括文件,网络,甚至各种输入类型,而不是简单的FileSource形式,而是更广泛意义上的可读源。

以二进制的方式读数据,同时使用有缓冲的读操作,加快文件读取速度。如果尺寸为-1时完全一次性读取文件,否则读取指定大小的文件。文件读取有一个偏移量,默认从偏移量处开始往后读取到指定的尺寸。

  1. package arraySort
  2. /*从源读取数据
  3. Go习惯用法是接口类型,即可读数据源均可,包括文件,网络,甚至各种输入类型,
  4. 而不是简单的FileSource形式,而是更广泛意义上的可读源。
  5. 以二进制的方式读数据,同时使用有缓冲的读操作,加快文件读取速度。
  6. 如果尺寸为-1时完全一次性读取文件,否则读取指定大小的文件。
  7. 文件读取有一个偏移量,默认从偏移量处开始往后读取到指定的尺寸。
  8. */
  9. import (
  10. "encoding/binary"
  11. "io"
  12. )
  13. // 从可读位置读入数据
  14. // 读入指定长度的数据, 为-1时一次性全部读完
  15. func ReadSource(reader io.Reader, chunkSize int) <-chan int {
  16. out := make(chan int, 4096)
  17. go func() {
  18. buffer := make([]byte, 8)
  19. size := 0
  20. for {
  21. n, err := reader.Read(buffer)
  22. size += n
  23. if n > 0 {
  24. v := int(binary.BigEndian.Uint64(buffer))
  25. // var v int
  26. out <- v
  27. }
  28. if err != nil || (chunkSize != -1 && size >= chunkSize) {
  29. break
  30. }
  31. }
  32. close(out)
  33. }()
  34. return out
  35. }
  36. package arraySort
  37. import (
  38. "encoding/binary"
  39. "io"
  40. )
  41. //从可读位置读入数据,如果剩下的不能占用一次的读取大小就全部读完即chunkSize = -1
  42. func ReadSource(reader io.Reader, chunkSize int) <-chan int {
  43. out := make(chan int, 4096)
  44. go func() {
  45. buffer := make([]byte, 8)
  46. size := 0
  47. for {
  48. n, err := reader.Read(buffer)
  49. size += n
  50. if n > 0 {
  51. v := int(binary.BigEndian.Uint64(buffer))
  52. out <- v
  53. }
  54. if err != nil || (chunkSize != -1 && size >= chunkSize) {
  55. break
  56. }
  57. }
  58. close(out)
  59. }()
  60. return out
  61. }

4. 主函数

  1. func readFile() {
  2. fmt.Println("======4. 生成指定数量的随机整数========")
  3. random := arraySort.RandomSource(num)
  4. fmt.Println("======5. 用缓冲的方式将数据写入文件========")
  5. fileWrite, errWrite := os.Create(fileInPath)
  6. if errWrite != nil {
  7. panic(errWrite)
  8. }
  9. defer fileWrite.Close()
  10. // 使用缓冲方式写数据,使用缓冲比不适用缓冲快了好几倍
  11. writer := bufio.NewWriter(fileWrite)
  12. // for m := range random {
  13. // fmt.Println(m)
  14. // writer.WriteString(strconv.Itoa(m)+"\n")
  15. // }
  16. arraySort.WriteSink(writer, random)
  17. // 最后数据可能不能写缓冲, 需要强制刷新缓冲
  18. writer.Flush()
  19. fmt.Println("======6.用缓冲的方式读数据========")
  20. fileRead, errRead := os.Open(fileInPath)
  21. if errRead != nil {
  22. panic(errRead)
  23. }
  24. defer fileRead.Close()
  25. // reader := bufio.NewReader(file)
  26. // //循环读取文件内容
  27. // for {
  28. // //读到一个换行就结束
  29. // line, err := reader.ReadString('\n') //注意是字符
  30. // // 代表文件末尾 err!= nil
  31. // if err == io.EOF {
  32. // if len(line) != 0 {
  33. // fmt.Println(line)
  34. // }
  35. // fmt.Println("文件读完了")
  36. // break
  37. // }
  38. // if err != nil {
  39. // fmt.Println("read file failed, err:", err)
  40. // return
  41. // }
  42. // fmt.Print(line)
  43. // }
  44. // 使用有缓冲方式读数据
  45. reader := arraySort.ReadSource(bufio.NewReader(fileRead), -1)
  46. n := 0
  47. for m := range reader {
  48. fmt.Println(m)
  49. n++
  50. // 只输出前10个二进制数字
  51. if n >= 10 {
  52. break
  53. }
  54. }
  55. }

测试输出,只输出前10个二进制数字,可以看到是乱序的!
image.png

四、 排序

1. 分片读取文件内容Create

分片的实现,其实就是指定一次性读取多大文件内容而已。实现方式很多,一般就是读取等大小的块,该处需要注意的是,文件的完整性,如果读取的内容是不完整,或者被错误分隔的,将造成数据的不完整或错误。

该处实现文件切片并归并排序,也是最主要的外部排序实现方式。注意文件偏移量的概念和读方式。

  1. package arraySort
  2. /*分片读取文件内容
  3. 分片的实现,其实就是指定一次性读取多大文件内容而已。
  4. 实现方式很多,一般就是读取等大小的块,该处需要注意的是,文件的完整性,
  5. 如果读取的内容是不完整,或者被错误分隔的,将造成数据的不完整或错误。
  6. 该处实现文件切片并归并排序,也是最主要的外部排序实现方式。注意文件偏移量的概念和读方式。
  7. */
  8. import (
  9. "bufio"
  10. "os"
  11. // "fmt"
  12. )
  13. // 可读入资源切片
  14. func Create(fileName string, fileSize, count int) <-chan int {
  15. chunkSize := fileSize / count
  16. var result []<-chan int
  17. for i := 0; i < count; i++ {
  18. file, err := os.Open(fileName)
  19. if err != nil {
  20. panic(err)
  21. }
  22. // 设置文件偏移量, 并从偏移量开始0处读取文件内容
  23. file.Seek(int64(i*chunkSize), 0)
  24. p := ReadSource(bufio.NewReader(file), chunkSize)
  25. //按升序排序
  26. result = append(result, InMemSort(p))
  27. }
  28. //递归进行两两归并
  29. return MergeN(result...)
  30. }

这里注意有一个递归排序的过程!MergeN

可变通道的递归合并,一般也是按中点切分,区间内进行递归合并。
如果只有一个通道时不需要递归,直接退出即可。需要注意的是,递归合并始终传递可变通道类型。

  1. package arraySort
  2. /*递归合并数组
  3. 可变通道的递归合并,一般也是按中点切分,区间内进行递归合并。
  4. 如果只有一个通道时不需要递归,直接退出即可。需要注意的是,递归合并始终传递可变通道类型。
  5. */
  6. // 递归进行两两归并
  7. func MergeN(ins ...<-chan int) <-chan int {
  8. // 只有一个通道时直接返回
  9. if len(ins) == 1 {
  10. return ins[0]
  11. }
  12. // 取中点
  13. m := len(ins) / 2
  14. // 递归归并
  15. return Merge(MergeN(ins[:m]...), MergeN(ins[m:]...))
  16. }

2. 将排序数据写入文件Write

排序完成后将数据写入结果文件中,便于后续直接处理有序的文件内容。
缓冲写数据需要注意,可能缓冲区未满,文件就写结束,需要强制刷新缓冲区,避免丢失未来得及写入的部分数据。

  1. package arraySort
  2. /*将排序数据写入文件
  3. 排序完成后将数据写入结果文件中,便于后续直接处理有序的文件内容。
  4. 缓冲写数据需要注意,可能缓冲区未满,文件就写结束,需要强制刷新缓冲区,避免丢失未来得及写入的部分数据。
  5. */
  6. import (
  7. "bufio"
  8. // "fmt"
  9. "os"
  10. // "strconv"
  11. )
  12. // 将排序好的数据写入文件中
  13. func Write(in <-chan int, fileName string) {
  14. file, err := os.Create(fileName)
  15. if err != nil {
  16. panic(err)
  17. }
  18. defer file.Close()
  19. w := bufio.NewWriter(file)
  20. defer w.Flush()
  21. WriteSink(w, in)
  22. // writer := bufio.NewWriter(file)
  23. // for m := range in {
  24. // fmt.Println(m)
  25. // writer.WriteString(strconv.Itoa(m)+"\n")
  26. // }
  27. // // arraySort.WriteSink(writer, random)
  28. // // 最后数据可能不能写缓冲, 需要强制刷新缓冲
  29. // defer writer.Flush()
  30. }

这里注意有一个写数据到文件的过程WriteSink

写文件基本都类似,也是按二进制方式写入数据到文件。
该处写文件不需要另起协程,当然也可以起协程,效果不明显。

  1. package arraySort
  2. /*写数据到文件
  3. 写文件基本都类似,也是按二进制方式写入数据到文件。
  4. 该处写文件不需要另起协程,当然也可以起协程,效果不明显。
  5. */
  6. import (
  7. "encoding/binary"
  8. "io"
  9. )
  10. // 向可写输入写入数据
  11. func WriteSink(writer io.Writer, in <-chan int) {
  12. for m := range in {
  13. buffer := make([]byte, 8)
  14. binary.BigEndian.PutUint64(buffer, uint64(m))
  15. writer.Write(buffer)
  16. }
  17. }

3. 打印排序结果

文件过大时全部数据可能无法输出,内存不够,故只输出一部分数据即可。

  1. package arraySort
  2. /*打印排序结果
  3. 文件过大时全部数据可能无法输出,内存不够,故只输出一部分数据即可。
  4. 排序一个800M的整数数据,文件分4块,可以看到读取,内部排序都比较快,
  5. 比较慢的部分是归并部分,归并部分需要每个数据进行比较,故速度会稍微慢一些。
  6. */
  7. import (
  8. "fmt"
  9. "os"
  10. )
  11. // 输出文件内容
  12. func Echo(fileName string) {
  13. file, err := os.Open(fileName)
  14. if err != nil {
  15. panic(err)
  16. }
  17. defer file.Close()
  18. p := ReadSource(file, -1)
  19. n := 0
  20. for m := range p {
  21. fmt.Println(m)
  22. n++
  23. if n >= 10 {
  24. break
  25. }
  26. }
  27. }

4. 主函数

  1. func MergerSort() {
  2. fmt.Println("======7. 分片读取文件内容========")
  3. fileContent := arraySort.Create(fileInPath, 800, 4)
  4. // for m := range fileContent {
  5. // fmt.Println(m)
  6. // }
  7. fmt.Println("======8. 将排序数据写入文件========")
  8. arraySort.Write(fileContent, fileOutPath)
  9. fmt.Println("======9. 输出文件内容========")
  10. arraySort.Echo(fileOutPath)
  11. }

排序一个800M的整数数据,文件分4块,可以看到读取,内部排序都比较快,比较慢的部分是归并部分,归并部分需要每个数据进行比较,故速度会稍微慢一些。
image.png
打印10个数测试一下!可以看到已经排序!