一、 前言
课题内容:10G的数字大文件,只有4G内存,如何排序?
- 创造一个数字大文件,先不创建10G,可以写入10万个数做测试,只有800KB左右!
- 对文件进行排序
参考博客:
首先想到归并排序,归并排序是典型的分治算法,它不断地将某个数组分为两个部分,分别对左子数组与右子数组进行排序,然后将两个数组合并为新的有序数组。
一般来说,小文件单机版排序比并发排序更快一些,但是大文件无法一次性读入内存处理,需要分块处理。单机版排序无法做到分布式排序,分布式排序需要联网进行,网络传输跟通道传输非常类似,稍作修改就可以做到在网络上进行排序。
也就是说,大文件做并发更快!
并行和并发是有区别的,依赖操作系统和硬件是否支持。同时,在允许的情况下,使用有缓冲的通道和使用文件读写缓冲,有利于加快程序的处理速度,避免及时阻塞。
二、 测试排序是否可执行
1. 往通道写入可变参数的整数ArrSort
package arraySort
// 往通道写入可变参数的整数
func ArrSort(a ...int) <-chan int {
out := make(chan int)
go func() {
for _, v := range a {
out <- v
}
close(out)
}()
return out
}
2. 按升序排序InMemSort
对一个整数数组进行排序,系统库中已经提供了排序函数
(这个也可以自己写),直接使用即可。
需要注意的是,Go中使用通道,使用不当会造成死锁。需要注意通道的使用,否则一运行就发生竞争条件造成程序无法执行而终止。
通道使用注意事项详解
(这块找找资料附链接)
参考:通道用例大全(go语言白皮书)
通常来说,协程的使用是成对出现的,并且习惯用Go匿名函数
来操作。需要留意匿名函数的传参问题,可以去官方文档进行查看(这块找找资料附:链接)。go程运行时,函数调用是瞬间返回的。如果在其它地方使用range操作来处理通道,在通道数据发送完毕时必须显示关闭通道,否则必是死锁错误。
package arraySort
/*数组排序
对一个整数数组进行排序,系统库中已经提供了排序函数,直接使用即可。
需要注意的是,Go中使用通道,使用不当会造成死锁。
需要注意通道的使用,否则一运行就发生竞争条件造成程序无法执行而终止。
通常来说,协程的使用是成对出现的,并且习惯用Go匿名函数来操作。
需要留意匿名函数的传参问题,可以去官方文档进行查看。go程运行时,函数调用是瞬间返回的。
如果在其它地方使用range操作来处理通道,在通道数据发送完毕时必须显示关闭通道,否则必是死锁错误。
*/
import (
// "fmt"
"sort"
// "time"
)
// var startTime time.Time
// func init() {
// startTime = time.Now()
// }
// 按升序排序
func InMemSort(in <-chan int) <-chan int {
out := make(chan int, 4096)
// 启动一个协程输出排序数据
go func() {
var a []int
for v := range in {
a = append(a, v)
}
// fmt.Println("Read done: ", time.Now().Sub(startTime))
sort.Ints(a)
// fmt.Println("Sort done: ", time.Now().Sub(startTime))
// 将排序好的数据输出至通道中
for _, n := range a {
out <- n
}
close(out)
}()
return out
}
3. 合并两个有序数组Merge
归并排序是分别进行的,每一块排序完成后,需要等待另外的块排序完毕后,才可继续进行。通道能很方便的做到这个操作,每一个读通道都是阻塞的,在未读取到数据之前,始终等待。
合并两个有序数组时,也要遵循一定的顺序合并,最终的数据才是有顺序的。这块是程序执行最难的一处,无法并行排序,只能一次合并有序数组。
package arraySort
/*合并两个有序数组
归并排序是分别进行的,每一块排序完成后,需要等待另外的块排序完毕后,才可继续进行。
通道能很方便的做到这个操作,每一个读通道都是阻塞的,在未读取到数据之前,始终等待。
合并两个有序数组时,也要遵循一定的顺序合并,最终的数据才是有顺序的。
这块是程序执行最难的一处,无法并行排序,只能一次合并有序数组。
*/
import (
// "fmt"
// "time"
)
// 按升序合并两个通道中数据
func Merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 4096)
go func() {
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) {
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
// fmt.Println("Merge done: ", time.Now().Sub(startTime))
}()
return out
}
4. 主函数
func sortLogin() {
fmt.Println("======1. 往通道写入可变参数的整数========")
ch1 := arraySort.ArrSort(1, 4, 6, 2, 18, 3, 26, 5) //<-chan int
ch2 := arraySort.ArrSort(100, 400, 600, 200, 1800, 300, 2600, 50)
fmt.Println("======2. 按升序排序========")
fmt.Println("======3. 合并两个有序数组========")
p := arraySort.Merge(arraySort.InMemSort(ch1), arraySort.InMemSort(ch2))
for m := range p {
fmt.Println(m)
}
}
三 、 生成随机数文件
1. 生成指定数量的随机整数RandomSource
生成指定大小的随机整数数据来测试用。
生成简单整数即可。
package arraySort
/*生成样本数据
生成指定大小的随机整数数据来测试用。
生成简单整数即可。
*/
import (
// "crypto/rand"
"math/rand"
)
// 生成指定数量的随机整数
func RandomSource(count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Int()
}
close(out)
}()
return out
}
/*
rand.Seed(time.Now().UnixNano()) // 设置种子数为当前时间
var src [100]int
fmt.Println("=====生成随机数组====")
for i := 0; i < 100; i++ {
src[i] = rand.Intn(100)
}
*/
2. 用缓冲的方式将数据写入文件WriteSink
写文件基本都类似,也是按二进制方式写入数据到文件。
该处写文件不需要另起协程,当然也可以起协程,效果不明显。
package arraySort
/*写数据到文件
写文件基本都类似,也是按二进制方式写入数据到文件。
该处写文件不需要另起协程,当然也可以起协程,效果不明显。
*/
import (
"encoding/binary"
"io"
)
// 向可写输入写入数据
func WriteSink(writer io.Writer, in <-chan int) {
for m := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(m))
writer.Write(buffer)
}
}
3. 用缓冲的方式读取文件ReadSource
Go习惯用法是接口类型,即可读数据源均可,包括文件,网络,甚至各种输入类型,而不是简单的FileSource形式,而是更广泛意义上的可读源。
以二进制的方式读数据,同时使用有缓冲的读操作,加快文件读取速度。如果尺寸为-1时完全一次性读取文件,否则读取指定大小的文件。文件读取有一个偏移量,默认从偏移量处开始往后读取到指定的尺寸。
package arraySort
/*从源读取数据
Go习惯用法是接口类型,即可读数据源均可,包括文件,网络,甚至各种输入类型,
而不是简单的FileSource形式,而是更广泛意义上的可读源。
以二进制的方式读数据,同时使用有缓冲的读操作,加快文件读取速度。
如果尺寸为-1时完全一次性读取文件,否则读取指定大小的文件。
文件读取有一个偏移量,默认从偏移量处开始往后读取到指定的尺寸。
*/
import (
"encoding/binary"
"io"
)
// 从可读位置读入数据
// 读入指定长度的数据, 为-1时一次性全部读完
func ReadSource(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 4096)
go func() {
buffer := make([]byte, 8)
size := 0
for {
n, err := reader.Read(buffer)
size += n
if n > 0 {
v := int(binary.BigEndian.Uint64(buffer))
// var v int
out <- v
}
if err != nil || (chunkSize != -1 && size >= chunkSize) {
break
}
}
close(out)
}()
return out
}
package arraySort
import (
"encoding/binary"
"io"
)
//从可读位置读入数据,如果剩下的不能占用一次的读取大小就全部读完即chunkSize = -1
func ReadSource(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 4096)
go func() {
buffer := make([]byte, 8)
size := 0
for {
n, err := reader.Read(buffer)
size += n
if n > 0 {
v := int(binary.BigEndian.Uint64(buffer))
out <- v
}
if err != nil || (chunkSize != -1 && size >= chunkSize) {
break
}
}
close(out)
}()
return out
}
4. 主函数
func readFile() {
fmt.Println("======4. 生成指定数量的随机整数========")
random := arraySort.RandomSource(num)
fmt.Println("======5. 用缓冲的方式将数据写入文件========")
fileWrite, errWrite := os.Create(fileInPath)
if errWrite != nil {
panic(errWrite)
}
defer fileWrite.Close()
// 使用缓冲方式写数据,使用缓冲比不适用缓冲快了好几倍
writer := bufio.NewWriter(fileWrite)
// for m := range random {
// fmt.Println(m)
// writer.WriteString(strconv.Itoa(m)+"\n")
// }
arraySort.WriteSink(writer, random)
// 最后数据可能不能写缓冲, 需要强制刷新缓冲
writer.Flush()
fmt.Println("======6.用缓冲的方式读数据========")
fileRead, errRead := os.Open(fileInPath)
if errRead != nil {
panic(errRead)
}
defer fileRead.Close()
// reader := bufio.NewReader(file)
// //循环读取文件内容
// for {
// //读到一个换行就结束
// line, err := reader.ReadString('\n') //注意是字符
// // 代表文件末尾 err!= nil
// if err == io.EOF {
// if len(line) != 0 {
// fmt.Println(line)
// }
// fmt.Println("文件读完了")
// break
// }
// if err != nil {
// fmt.Println("read file failed, err:", err)
// return
// }
// fmt.Print(line)
// }
// 使用有缓冲方式读数据
reader := arraySort.ReadSource(bufio.NewReader(fileRead), -1)
n := 0
for m := range reader {
fmt.Println(m)
n++
// 只输出前10个二进制数字
if n >= 10 {
break
}
}
}
测试输出,只输出前10个二进制数字,可以看到是乱序的!
四、 排序
1. 分片读取文件内容Create
分片的实现,其实就是指定一次性读取多大文件内容而已。实现方式很多,一般就是读取等大小的块,该处需要注意的是,文件的完整性,如果读取的内容是不完整,或者被错误分隔的,将造成数据的不完整或错误。
该处实现文件切片并归并排序,也是最主要的外部排序实现方式。注意文件偏移量的概念和读方式。
package arraySort
/*分片读取文件内容
分片的实现,其实就是指定一次性读取多大文件内容而已。
实现方式很多,一般就是读取等大小的块,该处需要注意的是,文件的完整性,
如果读取的内容是不完整,或者被错误分隔的,将造成数据的不完整或错误。
该处实现文件切片并归并排序,也是最主要的外部排序实现方式。注意文件偏移量的概念和读方式。
*/
import (
"bufio"
"os"
// "fmt"
)
// 可读入资源切片
func Create(fileName string, fileSize, count int) <-chan int {
chunkSize := fileSize / count
var result []<-chan int
for i := 0; i < count; i++ {
file, err := os.Open(fileName)
if err != nil {
panic(err)
}
// 设置文件偏移量, 并从偏移量开始0处读取文件内容
file.Seek(int64(i*chunkSize), 0)
p := ReadSource(bufio.NewReader(file), chunkSize)
//按升序排序
result = append(result, InMemSort(p))
}
//递归进行两两归并
return MergeN(result...)
}
这里注意有一个递归排序的过程!MergeN
可变通道的递归合并,一般也是按中点切分,区间内进行递归合并。
如果只有一个通道时不需要递归,直接退出即可。需要注意的是,递归合并始终传递可变通道类型。
package arraySort
/*递归合并数组
可变通道的递归合并,一般也是按中点切分,区间内进行递归合并。
如果只有一个通道时不需要递归,直接退出即可。需要注意的是,递归合并始终传递可变通道类型。
*/
// 递归进行两两归并
func MergeN(ins ...<-chan int) <-chan int {
// 只有一个通道时直接返回
if len(ins) == 1 {
return ins[0]
}
// 取中点
m := len(ins) / 2
// 递归归并
return Merge(MergeN(ins[:m]...), MergeN(ins[m:]...))
}
2. 将排序数据写入文件Write
排序完成后将数据写入结果文件中,便于后续直接处理有序的文件内容。
缓冲写数据需要注意,可能缓冲区未满,文件就写结束,需要强制刷新缓冲区,避免丢失未来得及写入的部分数据。
package arraySort
/*将排序数据写入文件
排序完成后将数据写入结果文件中,便于后续直接处理有序的文件内容。
缓冲写数据需要注意,可能缓冲区未满,文件就写结束,需要强制刷新缓冲区,避免丢失未来得及写入的部分数据。
*/
import (
"bufio"
// "fmt"
"os"
// "strconv"
)
// 将排序好的数据写入文件中
func Write(in <-chan int, fileName string) {
file, err := os.Create(fileName)
if err != nil {
panic(err)
}
defer file.Close()
w := bufio.NewWriter(file)
defer w.Flush()
WriteSink(w, in)
// writer := bufio.NewWriter(file)
// for m := range in {
// fmt.Println(m)
// writer.WriteString(strconv.Itoa(m)+"\n")
// }
// // arraySort.WriteSink(writer, random)
// // 最后数据可能不能写缓冲, 需要强制刷新缓冲
// defer writer.Flush()
}
这里注意有一个写数据到文件的过程WriteSink
写文件基本都类似,也是按二进制方式写入数据到文件。
该处写文件不需要另起协程,当然也可以起协程,效果不明显。
package arraySort
/*写数据到文件
写文件基本都类似,也是按二进制方式写入数据到文件。
该处写文件不需要另起协程,当然也可以起协程,效果不明显。
*/
import (
"encoding/binary"
"io"
)
// 向可写输入写入数据
func WriteSink(writer io.Writer, in <-chan int) {
for m := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(m))
writer.Write(buffer)
}
}
3. 打印排序结果
文件过大时全部数据可能无法输出,内存不够,故只输出一部分数据即可。
package arraySort
/*打印排序结果
文件过大时全部数据可能无法输出,内存不够,故只输出一部分数据即可。
排序一个800M的整数数据,文件分4块,可以看到读取,内部排序都比较快,
比较慢的部分是归并部分,归并部分需要每个数据进行比较,故速度会稍微慢一些。
*/
import (
"fmt"
"os"
)
// 输出文件内容
func Echo(fileName string) {
file, err := os.Open(fileName)
if err != nil {
panic(err)
}
defer file.Close()
p := ReadSource(file, -1)
n := 0
for m := range p {
fmt.Println(m)
n++
if n >= 10 {
break
}
}
}
4. 主函数
func MergerSort() {
fmt.Println("======7. 分片读取文件内容========")
fileContent := arraySort.Create(fileInPath, 800, 4)
// for m := range fileContent {
// fmt.Println(m)
// }
fmt.Println("======8. 将排序数据写入文件========")
arraySort.Write(fileContent, fileOutPath)
fmt.Println("======9. 输出文件内容========")
arraySort.Echo(fileOutPath)
}
排序一个800M的整数数据,文件分4块,可以看到读取,内部排序都比较快,比较慢的部分是归并部分,归并部分需要每个数据进行比较,故速度会稍微慢一些。
打印10个数测试一下!可以看到已经排序!