简单总结

6.824 lab 1 MapReduce

image.png

  1. 最近好几个学弟提到了 6.824, 今天下午有时间,我也学习一下.
  2. 花了大概 4 小时做了初版通过了所有测试,
  3. 一小时看代码设计,两个小时编码,一个小时编码加测试. 大概也就 300 行代码.
  4. 总体难度,对我来说正好能学习其中讲授知识部分. 我正好使用 go 语言(快两年),了解 rpc(自己设计过也给 grpc 贡献有代码), 看过一点 MapReduce paper (之前看 chubby spanner 的时候)
  5. 遇到了问题还比较多,其中 IDE 能力不同的问题还挺奇特的,也就现在有.

先花时间整理一下大概有哪些问题,
然后描述设计和编码是遇到的问题和解决方案,
最后展示一下关键代码和测试结果.
希望能够总结一些适合复用的解决方案,提醒自己.也帮助他人学习如何解决相应种类问题.

编码问题

  1. 首先是 vscode 对 golang 的支持,在 你的代码不符合 golang 规范的时候, vscode 对于代码提示的功能非常小,相当于白板编程(那我要 gopls 有什么用?). 比较重要的问题是 代码跳转 用不了,另外是 类型检查 和 拼写错误会较多.

这个时候只能凭经验来补代码提示和库函数运用, 这里我用了 tabnine 这个插件用作代码提示. tabnine 用 ai 的方式预测接下来的输入,对于重复的代码片段很有用,尽量保证了我拼写的正确.
最后迫于更快的代码检查,还是转用了 goland. goland 对于开源项目工作者是免费的,可以了解一下.

  1. go 1.14 版本的问题, 我遇到的问题有两个,
    1. 一个是 从 1.11 开始启用的 go module 后,引用本地包里的文件无法通过编译的问题 如 6.824 中 "../mr" 这种相对目录的方式通不过编译.
    2. 另一个是 goland 2019.3 这个版本,对于 1.14 的 gomodule 是不支持的(因为 go 1.14 刚出),这个在 2019.3.3 修复了

于是我在 6.824 的根目录下go mod init ds 并把 "../mr"改为了 "ds/mr" ,并且在运行调试的时候又转为了 vscode

原有代码和工作要求

解决了环境问题,接下来明晰一下工作要求. 首先 6.824 提供了串行执行的 MapReduce 示例, 对 MapReduce 了解更多请看论文, 对 本 lab 了解更多请直接看最上链接.我们直接简要分析,提出主要问题和解决方案.

我们的主要问题就是,分布式设计和串行化设计的差异和考虑的问题,以及我们的方案,除此之外, go 语法啊, 怎样更优雅更好看, go 的插件工作模式啊,属于其他问题,自己下来日常养成.首先你应该先看 lab, paper, 执行 lab 中的程序直到开始自己编码.

我们看 lab 1 中的 Your Job部分.
要求是, 实现一个系统,其中 master 用于分配 map 和 reduce 任务, worker 负责申请任务并执行.

这里有个特别的参数 nReduce ,他可以理解为我们需要 nReduce 个桶.一个桶负责装一类信息,在这里我们使用桶来保证相同 key 的 keyvalue 对放在同一个文件中等待处理.

问题一:网络通信

通信问题,首先我们分析 master 和 worker 需要通信, 如何通信是首要问题.

解决:

lab 帮我们决定了使用 rpc 通信,同时 master 和 worker 通过 unix domain socket 作为通信方式, 用 http 服务器提供 rpc 服务. unix 通信我写过例子代码. 由于代码编写 lab 中已经完成,理解即可.需要注意的是,

  1. //
  2. // start a thread that listens for RPCs from worker.go
  3. //
  4. func (m *Master) server() {
  5. rpc.Register(m)
  6. rpc.HandleHTTP()
  7. sockname := masterSock()
  8. os.Remove(sockname)
  9. l, e := net.Listen("unix", sockname)
  10. if e != nil {
  11. log.Fatal("listen error:", e)
  12. }
  13. go http.Serve(l, nil)
  14. }

以上是 golang 标准库 rpc 包的便利操作.快速启动了一个 rpc 服务.

问题二:共享文件

存储问题, 我们需要的工作是处理文件啊,文件内容跟着网络到处传输吗?这样网络开销也太大了,我们看过 paper 知道,当年的瓶颈就是网络传输的带宽.

解决:

我们现在实现的版本是在本地执行的一个分布式设计.这是因为 MapReduce 论文中提到, 所有实例应该共享文件系统, paper 中是 GFS, 我们本地设计就是本机的文件系统.同样解决了这个问题.

解决了以上问题,意味着这个系统的数据能具有一致性了.接下来设计该系统的处理逻辑.

问题三:任务切分

我们知道, map 和 reduce 的输入输出都是文件, 我们现在同一文件系统,能够相互交流,我们在交流中携带需要处理的文件路径即可. 根据 MapReduce 的思想,首先我们需要切分任务.

解决:
  1. 我们程序的输入是 n 个文件, go run mrsequential.go wc.so pg*.txt 命令还记得吗, 其中 pg.txt 就是我们的输入文件, 是通配符.意思是下面这 8 个文件名都会输入到我们程序里

image.png for _, filename := range os.Args[2:]用了这行代码读取

那么根据 paper, 这里就有 8 个 map 任务, 每个 map 任务读取一个文件中的内容并处理后输出.

  1. 这里输出需要注意,为什么要给你 nReduce 这个参数啊? 你想你从各个文章中读取的数据 (KV 对), reduce 要怎样工作才能把 map 的输出都 reduce 呢?

我们先看之前 串行化程序 mrsequential.go怎么处理的. 他用 sort.Sort(ByKey(intermediate))排了序,这意味着, Key 一样的 kv 对聚集在一起了, 这样 reduce 工作代码很好写对吧.一溜下来保证相同 key 的 kv 对都能处理到.
而在分布式设计中,我们使用 桶 和 hash key (hash 保证了相同的 key 算出的结果一样)的设计,将相应的 key 保证分在同一组文件中,保证了 reduce 编码的简单.(注意, paper 中说了,系统的目的就是让编码人员只关注 map function 和 reduce function 就能完成工作,所以保证 reduce 编码的简单是非常有意义的)
具体为我们首先对 key 做 hash 编码.将 string 编码为 数字, 并对数字结果 取模 N 运算, N 为传入的 nReduce 值.运算的结果代表了相应 kv 对应该放于第几个桶中,在这里就是归入哪一个输出文件.
综上,我们有 nReduce 个 reduce 任务.

  1. lab 中有指导文件命名方式,所以这里不把他作为单独的一个问题展示,但规范文件命名其实也能简化系统,也是通信消息设计的一部分.假如我们有 2 个 map 任务, nReduce 值为 2. 那么我们的流程如下:
    1. 首先我们执行完 map 任务, 会有 map * nReduce 个文件生成.格式为 mr-tmp-X-Y 其中 X 是 map 的任务号,代表第几个输入文件, Y 是 Reduce 的任务号,也代表着这是第几个桶.

image.png
image.png这四个文件是 paper 中说的中间文件.
b. 执行 reduce 任务.将 reduce 任务号相同的,即一个桶中的数据进行处理,一个桶所有数据处理之后输出文件, 格式为 mr-out-NN 为 reduce 任务号.
image.png

  1. 这里额外提一句, 为什么输出这么多文件就结束了,不用合并吗? 确实不用.

我们查看 test-mr.sh 文件查看测试规则

  1. # generate the correct output
  2. ../mrsequential ../../mrapps/wc.so ../pg*txt || exit 1
  3. sort mr-out-0 > mr-correct-wc.txt
  4. rm -f mr-out*
  5. #......省略部分
  6. sort mr-out* | grep . > mr-wc-all
  7. if cmp mr-wc-all mr-correct-wc.txt
  8. then
  9. echo '---' wc test: PASS
  1. 可以看到测试程序首先 使用 mrsequential 生成正确的解答 排序 后保存在一个文件中
  2. 然后将我们符合 mr-out* 格式的输出文件读取出来整体 排序 后保存为一个文件.
  3. 最后使用 cmp 命令进行判定两个文件是否一致.

问题四: 消息设计

我们完成了上述设计之后,开始实际逻辑的构建,我进行了以下设计, master 和 worker 通过 rpc 消息交互来分配任务,任务完成之后 worker 通知 master,并进行下一次任务分配.直到没有任务的时候,worker 退出.(如果没有任务的同时,所有任务都已完成 Master 也会退出)
image.png

  1. 这里看两个设计,一个是 commit 设计,worker 在完成 相应任务之后需要进行一次新的 rpc,这个增加了网络开销和不确定性(除了网络之外,还有安全问题,多了一个可以攻击的渠道)

但是注意 paper 中提到的是 Master 是无法得知 Worker 状态的.这里设计一个新的 rpc 服务来交换 task 是否完成的信息,这是因为我认为, worker 的边界就是收到任务处理任务, master 只需要知道 worker 有没有完成相应任务进行了.不仅如此,我认为这还简化了编码任务.

  1. Worker 退出设计, worker 在获取任务的时候,如果 Master 没有任务给他,怎么办呢? 我们挂起一个循环轮询吗?这个情况, master 是所有任务完成了吗?还是说 master 所有任务已经分配出去?
    1. 如果任务已经完成了,我这个 worker 还要一直轮询占用资源吗? 所以选择退出.
    2. 我们一直轮询会不会造成服务器负载?所以每一次轮询我们间隔 time.Sleep 一段时间
    3. 如果任务都没有完成,只是当前没有任务可以分配,如果 worker 退出了,没有启动 worker 机制或者保活 n 个 worker 的机制(你懂得,就像 kubernetes 的 pods 一样),那么接下来还有任务谁来干活?(考虑当前最后一个任务正在一个 worker 处理,但是这个 worker 不给力,挂了,master 看到这个任务超时了,重新分配给另一个 worker,但是其他 worker 在这个处理时间里都已经收到没事干的消息,都退出了,谁来干呢? 我在这里针对这一个场景(设计一定要针对具体需求) 设计了一个重试机制(巧的是我给 grpc 贡献的代码也是 retry 的代码). 当没有任务时,继续重复轮询 3 次,每次间隔 0.5 s. 尽量保证当前工作量的容错(现在就是处理几个单词)

那么我们有两种 rpc 服务,分别设计 发送和返回消息. 如下
image.png
其中 Workerid 用来区分 worker 是谁,帮助 master 掌控 worker 状态.(是不是你小子领了任务从来不交!)
WorkReply 用于获取任务信息, 如当前 taskid , MapReduce 当前 工作类型是 map 还是 reduce,
BucketNumber 用于指导 map 或者 reduce 任务应读取和输出多少个文件. Isfinished 用于 Master 返回是否所有任务都已完成.

最后每有一个任务,就会有一次提交(或者提交超时),该设计也让 master 状态管理代码逻辑更简洁.

问题五 状态管理

众所周知,分布式一大问题就是时序.虽然状态是万恶之源,但现实世界就是现实世界,熵增不可改变,复杂度只能转移,我们尽量简化即可.
关于这个系统,我们需要管理

  1. map 任务状态: 有多少个 map 任务,完成了多少
  2. reduce 任务状态:有多少个 reduce 任务,完成了多少.同时 reduce 还有在 map 任务都完成后进行
  3. worker 状态: 谁在干话,干了多久?

任务则设计三个状态,

  1. TaskIdle: 任务闲置,没人管 ing o(╥﹏╥)o
  2. TaskWorking: 任务工作中, (你咋知道我在磨洋工?因为你超时了!)
  3. TaskCommit: 任务完成!

Master 则针对每一个 map 存储状态 mapTasks []int,序号代表任务编号,reduce 同理. worker 用 map[string]int这是由于 workerid 在 worker 端生成,难以掌控,所以用 string.

则如下:
image.png

问题六:尽量健壮

我们希望我们的程序能处理任何问题吗?不是的.
但我们至少希望针对目标问题,具有鲁棒性,更强,更强壮.♂!

健壮的代码设计能处理很多的错误.针对意料之外的错误也有一定恢复能力.
之前提到的 重试机制,现在设计的超时机制, 共享锁机制都是增强的设计(当然正常情况下也需要更多开销 😁,但是一旦发生错误,这些开销都是值得的)

所有人都知道,共享数据,并发! 不加锁, 哼哼,就会有及其稀奇古怪,难以定位调试的问题出现.我们在访问这些数据的时候记得加锁.m.mu.Lock() 也不要忘记解锁 m.mu.UnLock()

超时

Master 在每一次分配任务的时候,计时处理,如果 worker 超时依旧没有提交任务,我们认为这个 worker 已经无法完成任务,并将这个任务重新分配给另一个 worker

  1. ctx, _ := context.WithTimeout(context.Background(), m.timeout)
  2. go func() {
  3. select {
  4. case <-ctx.Done():
  5. {
  6. m.mu.Lock()
  7. if m.workerCommit[args.Workerid] != TaskCommit && m.reduceTasks[k] != TaskCommit {
  8. m.reduceTasks[k] = TaskIdle
  9. log.Println("[Error]:", "worker:", args.Workerid, "reduce task:", k, "timeout")
  10. }
  11. m.mu.Unlock()
  12. }
  13. }
  14. }()

Code

mr/master.go

  1. package mr
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "net"
  7. "net/http"
  8. "net/rpc"
  9. "os"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. TaskIdle = iota
  15. TaskWorking
  16. TaskCommit
  17. )
  18. type Master struct {
  19. // Your definitions here.
  20. files []string
  21. nReduce int
  22. //init with 0
  23. mapTasks []int
  24. reduceTasks []int
  25. mapCount int
  26. //init with -1
  27. workerCommit map[string]int
  28. allCommited bool
  29. //init with 10 seconds
  30. timeout time.Duration
  31. mu sync.RWMutex
  32. }
  33. // Your code here -- RPC handlers for the worker to call.
  34. func (m *Master) Work(args *WorkArgs, reply *WorkReply) error {
  35. m.mu.Lock()
  36. defer m.mu.Unlock()
  37. // first for map work
  38. for k, v := range m.files {
  39. if m.mapTasks[k] != TaskIdle {
  40. continue
  41. }
  42. reply.Taskid = k
  43. reply.Filename = v
  44. reply.MapReduce = "map"
  45. reply.BucketNumber = m.nReduce
  46. reply.Isfinished = false
  47. m.workerCommit[args.Workerid] = TaskWorking
  48. m.mapTasks[k] = TaskWorking
  49. // log.Println("a worker", args.Workerid, "apply a map task:", *reply)
  50. ctx, _ := context.WithTimeout(context.Background(), m.timeout)
  51. go func() {
  52. select {
  53. case <-ctx.Done():
  54. {
  55. m.mu.Lock()
  56. defer m.mu.Unlock()
  57. if m.workerCommit[args.Workerid] != TaskCommit && m.mapTasks[k] != TaskCommit {
  58. m.mapTasks[k] = TaskIdle
  59. log.Println("[Error]:", "worker:", args.Workerid, "map task:", k, "timeout")
  60. }
  61. }
  62. }
  63. }()
  64. return nil
  65. }
  66. // then dispatch reduce work
  67. for k, v := range m.reduceTasks {
  68. if m.mapCount != len(m.files) {
  69. return nil
  70. }
  71. if v != TaskIdle {
  72. continue
  73. }
  74. reply.Taskid = k
  75. reply.Filename = ""
  76. reply.MapReduce = "reduce"
  77. reply.BucketNumber = len(m.files)
  78. reply.Isfinished = false
  79. m.workerCommit[args.Workerid] = TaskWorking
  80. m.reduceTasks[k] = TaskWorking
  81. ctx, _ := context.WithTimeout(context.Background(), m.timeout)
  82. go func() {
  83. select {
  84. case <-ctx.Done():
  85. {
  86. m.mu.Lock()
  87. if m.workerCommit[args.Workerid] != TaskCommit && m.reduceTasks[k] != TaskCommit {
  88. m.reduceTasks[k] = TaskIdle
  89. log.Println("[Error]:", "worker:", args.Workerid, "reduce task:", k, "timeout")
  90. }
  91. m.mu.Unlock()
  92. }
  93. }
  94. }()
  95. log.Println("a worker", args.Workerid, "apply a reduce task:", *reply)
  96. return nil
  97. }
  98. for _, v := range m.workerCommit {
  99. if v == TaskWorking {
  100. reply.Isfinished = false
  101. return nil
  102. }
  103. }
  104. reply.Isfinished = true
  105. return errors.New("worker apply but no tasks to dispatch")
  106. }
  107. func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {
  108. log.Println("a worker", args.Workerid, "commit a "+args.MapReduce+" task:", args.Taskid)
  109. m.mu.Lock()
  110. switch args.MapReduce {
  111. case "map":
  112. {
  113. m.mapTasks[args.Taskid] = TaskCommit
  114. m.workerCommit[args.Workerid] = TaskCommit
  115. m.mapCount++
  116. }
  117. case "reduce":
  118. {
  119. m.reduceTasks[args.Taskid] = TaskCommit
  120. m.workerCommit[args.Workerid] = TaskCommit
  121. }
  122. }
  123. m.mu.Unlock()
  124. log.Println("current", m.mapTasks, m.reduceTasks)
  125. for _, v := range m.mapTasks {
  126. if v != TaskCommit {
  127. return nil
  128. }
  129. }
  130. for _, v := range m.reduceTasks {
  131. if v != TaskCommit {
  132. return nil
  133. }
  134. }
  135. m.allCommited = true
  136. log.Println("all tasks completed")
  137. return nil
  138. }
  139. //
  140. // an example RPC handler.
  141. //
  142. // the RPC argument and reply types are defined in rpc.go.
  143. //
  144. func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {
  145. log.Println("a worker")
  146. reply.Y = args.X + 1
  147. return nil
  148. }
  149. //
  150. // start a thread that listens for RPCs from worker.go
  151. //
  152. func (m *Master) server() {
  153. rpc.Register(m)
  154. rpc.HandleHTTP()
  155. //l, e := net.Listen("tcp", ":1234")
  156. sockname := masterSock()
  157. os.Remove(sockname)
  158. l, e := net.Listen("unix", sockname)
  159. if e != nil {
  160. log.Fatal("listen error:", e)
  161. }
  162. go http.Serve(l, nil)
  163. }
  164. //
  165. // main/mrmaster.go calls Done() periodically to find out
  166. // if the entire job has finished.
  167. //
  168. func (m *Master) Done() bool {
  169. // Your code here.
  170. return m.allCommited
  171. }
  172. //
  173. // create a Master.
  174. // main/mrmaster.go calls this function.
  175. // nReduce is the number of reduce tasks to use.
  176. //
  177. func MakeMaster(files []string, nReduce int) *Master {
  178. m := Master{
  179. files: files,
  180. nReduce: nReduce,
  181. mapTasks: make([]int, len(files)),
  182. reduceTasks: make([]int, nReduce),
  183. workerCommit: make(map[string]int),
  184. allCommited: false,
  185. timeout: 10 * time.Second,
  186. }
  187. log.Println("[init] with:", files, nReduce)
  188. m.server()
  189. return &m
  190. }

mr/worker.go

  1. package mr
  2. import (
  3. "crypto/rand"
  4. "encoding/json"
  5. "fmt"
  6. "hash/fnv"
  7. "io/ioutil"
  8. "log"
  9. "net/rpc"
  10. "os"
  11. "sort"
  12. "strconv"
  13. "time"
  14. )
  15. //
  16. // Map functions return a slice of KeyValue.
  17. //
  18. type KeyValue struct {
  19. Key string
  20. Value string
  21. }
  22. // for sorting by key.
  23. type ByKey []KeyValue
  24. // for sorting by key.
  25. func (a ByKey) Len() int { return len(a) }
  26. func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  27. func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
  28. //
  29. // use ihash(key) % NReduce to choose the reduce
  30. // task number for each KeyValue emitted by Map.
  31. //
  32. func ihash(key string) int {
  33. h := fnv.New32a()
  34. h.Write([]byte(key))
  35. return int(h.Sum32() & 0x7fffffff)
  36. }
  37. func genWorkerID() (uuid string) {
  38. // generate 32 bits timestamp
  39. unix32bits := uint32(time.Now().UTC().Unix())
  40. buff := make([]byte, 12)
  41. numRead, err := rand.Read(buff)
  42. if numRead != len(buff) || err != nil {
  43. panic(err)
  44. }
  45. return fmt.Sprintf("%x-%x-%x-%x-%x-%x\n", unix32bits, buff[0:2], buff[2:4], buff[4:6], buff[6:8], buff[8:])
  46. }
  47. //
  48. // main/mrworker.go calls this function.
  49. //
  50. func Worker(mapf func(string, string) []KeyValue,
  51. reducef func(string, []string) string) {
  52. // Your worker implementation here.
  53. // uncomment to send the Example RPC to the master.
  54. // CallExample()
  55. workerId := genWorkerID()
  56. retry := 3
  57. // for to get tasks
  58. for {
  59. args := WorkArgs{Workerid: workerId}
  60. reply := WorkReply{}
  61. working := call("Master.Work", &args, &reply)
  62. // log.Println(working, reply.Isfinished)
  63. if reply.Isfinished || !working {
  64. log.Println("finished")
  65. return
  66. }
  67. log.Println("task info:", reply)
  68. //working switch map or reduce
  69. switch reply.MapReduce {
  70. case "map":
  71. MapWork(reply, mapf)
  72. retry = 3
  73. case "reduce":
  74. ReduceWork(reply, reducef)
  75. retry = 3
  76. default:
  77. log.Println("error reply: would retry times:", retry)
  78. if retry < 0 {
  79. return
  80. }
  81. retry--
  82. }
  83. commitArgs := CommitArgs{Workerid: workerId, Taskid: reply.Taskid, MapReduce: reply.MapReduce}
  84. commitReply := CommitReply{}
  85. _ = call("Master.Commit", &commitArgs, &commitReply)
  86. time.Sleep(500 * time.Millisecond)
  87. }
  88. }
  89. // get tasks and write to mr-tmp-taskid-reduceid
  90. func MapWork(task WorkReply, mapf func(string, string) []KeyValue) {
  91. // check task info
  92. file, err := os.Open(task.Filename)
  93. if err != nil {
  94. log.Fatalf("cannot open %v", task.Filename)
  95. }
  96. content, err := ioutil.ReadAll(file)
  97. if err != nil {
  98. log.Fatalf("cannot read %v", task.Filename)
  99. }
  100. kva := mapf(task.Filename, string(content))
  101. sort.Sort(ByKey(kva))
  102. // create file buckets
  103. tmpName := "mr-tmp-" + strconv.Itoa(task.Taskid)
  104. var fileBucket = make(map[int]*json.Encoder)
  105. for i := 0; i < task.BucketNumber; i++ {
  106. ofile, _ := os.Create(tmpName + "-" + strconv.Itoa(i))
  107. fileBucket[i] = json.NewEncoder(ofile)
  108. defer ofile.Close()
  109. }
  110. for _, kv := range kva {
  111. key := kv.Key
  112. reduce_idx := ihash(key) % task.BucketNumber
  113. err := fileBucket[reduce_idx].Encode(&kv)
  114. if err != nil {
  115. log.Fatal("Unable to write to file")
  116. }
  117. }
  118. }
  119. // get reduce task and reduce all reduce id = task.Taskid files.
  120. func ReduceWork(task WorkReply, reducef func(string, []string) string) {
  121. //check task info
  122. intermediate := []KeyValue{}
  123. // read mr-tmp n files to add inermediate then write
  124. for mapTaskNumber := 0; mapTaskNumber < task.BucketNumber; mapTaskNumber++ {
  125. filename := "mr-tmp-" + strconv.Itoa(mapTaskNumber) + "-" + strconv.Itoa(task.Taskid)
  126. f, err := os.Open(filename)
  127. if err != nil {
  128. log.Fatal("Unable to read from: ", filename)
  129. }
  130. defer f.Close()
  131. decoder := json.NewDecoder(f)
  132. var kv KeyValue
  133. for decoder.More() {
  134. err := decoder.Decode(&kv)
  135. if err != nil {
  136. log.Fatal("Json decode failed, ", err)
  137. }
  138. intermediate = append(intermediate, kv)
  139. }
  140. }
  141. sort.Sort(ByKey(intermediate))
  142. // write to "mr-out-Y" Y is reduce task id
  143. i := 0
  144. ofile, err := os.Create("mr-out-" + strconv.Itoa(task.Taskid+1))
  145. if err != nil {
  146. log.Fatal("Unable to create file: ", ofile)
  147. }
  148. defer ofile.Close()
  149. log.Println("complete to ", task.Taskid, "start to write in to ", ofile)
  150. for i < len(intermediate) {
  151. j := i + 1
  152. for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
  153. j++
  154. }
  155. values := []string{}
  156. for k := i; k < j; k++ {
  157. values = append(values, intermediate[k].Value)
  158. }
  159. output := reducef(intermediate[i].Key, values)
  160. // this is the correct format for each line of Reduce output.
  161. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
  162. i = j
  163. }
  164. ofile.Close()
  165. }
  166. //
  167. // example function to show how to make an RPC call to the master.
  168. //
  169. // the RPC argument and reply types are defined in rpc.go.
  170. //
  171. func CallExample() {
  172. // declare an argument structure.
  173. args := ExampleArgs{}
  174. // fill in the argument(s).
  175. args.X = 99
  176. // declare a reply structure.
  177. reply := ExampleReply{}
  178. // send the RPC request, wait for the reply.
  179. call("Master.Example", &args, &reply)
  180. // reply.Y should be 100.
  181. fmt.Printf("reply.Y %v\n", reply.Y)
  182. }
  183. //
  184. // send an RPC request to the master, wait for the response.
  185. // usually returns true.
  186. // returns false if something goes wrong.
  187. //
  188. func call(rpcname string, args interface{}, reply interface{}) bool {
  189. // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
  190. sockname := masterSock()
  191. c, err := rpc.DialHTTP("unix", sockname)
  192. if err != nil {
  193. log.Fatal("dialing:", err)
  194. }
  195. defer c.Close()
  196. err = c.Call(rpcname, args, reply)
  197. if err == nil {
  198. return true
  199. }
  200. fmt.Println(err)
  201. return false
  202. }

mr/rpc.go

  1. package mr
  2. //
  3. // RPC definitions.
  4. //
  5. // remember to capitalize all names.
  6. //
  7. import (
  8. "os"
  9. "strconv"
  10. )
  11. //
  12. // example to show how to declare the arguments
  13. // and reply for an RPC.
  14. //
  15. type ExampleArgs struct {
  16. X int
  17. }
  18. type ExampleReply struct {
  19. Y int
  20. }
  21. // Add your RPC definitions here.
  22. type WorkArgs struct {
  23. Workerid string
  24. }
  25. type WorkReply struct {
  26. Isfinished bool
  27. Taskid int
  28. Filename string
  29. MapReduce string
  30. BucketNumber int
  31. }
  32. type CommitArgs struct {
  33. Workerid string
  34. Taskid int
  35. MapReduce string
  36. }
  37. type CommitReply struct {
  38. IsOK bool
  39. }
  40. // Cook up a unique-ish UNIX-domain socket name
  41. // in /var/tmp, for the master.
  42. // Can't use the current directory since
  43. // Athena AFS doesn't support UNIX-domain sockets.
  44. func masterSock() string {
  45. s := "/var/tmp/824-mr-"
  46. s += strconv.Itoa(os.Getuid())
  47. return s
  48. }

测试

* PASSED ALL TESTS

  1. ArideMacBook-Air:main abser$ go run mrmaster.go pg*.txt
  2. 2020/03/06 18:24:19 [init] with: [pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt] 10
  3. 2020/03/06 18:24:19 rpc.Register: method "Done" has 1 input parameters; needs exactly three
  4. 2020/03/06 18:24:24 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  5. commit a map task: 0
  6. 2020/03/06 18:24:24 current [2 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]
  7. 2020/03/06 18:24:25 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  8. commit a map task: 1
  9. 2020/03/06 18:24:25 current [2 2 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]
  10. 2020/03/06 18:24:26 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  11. commit a map task: 2
  12. 2020/03/06 18:24:26 current [2 2 2 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]
  13. 2020/03/06 18:24:28 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  14. commit a map task: 3
  15. 2020/03/06 18:24:28 current [2 2 2 2 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]
  16. 2020/03/06 18:24:29 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  17. commit a map task: 4
  18. 2020/03/06 18:24:29 current [2 2 2 2 2 0 0 0] [0 0 0 0 0 0 0 0 0 0]
  19. 2020/03/06 18:24:30 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  20. commit a map task: 5
  21. 2020/03/06 18:24:30 current [2 2 2 2 2 2 0 0] [0 0 0 0 0 0 0 0 0 0]
  22. 2020/03/06 18:24:31 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  23. commit a map task: 6
  24. 2020/03/06 18:24:31 current [2 2 2 2 2 2 2 0] [0 0 0 0 0 0 0 0 0 0]
  25. 2020/03/06 18:24:32 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  26. commit a map task: 7
  27. 2020/03/06 18:24:32 current [2 2 2 2 2 2 2 2] [0 0 0 0 0 0 0 0 0 0]
  28. 2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  29. apply a reduce task: {false 0 reduce 8}
  30. 2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  31. commit a reduce task: 0
  32. 2020/03/06 18:24:33 current [2 2 2 2 2 2 2 2] [2 0 0 0 0 0 0 0 0 0]
  33. 2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  34. apply a reduce task: {false 1 reduce 8}
  35. 2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  36. commit a reduce task: 1
  37. 2020/03/06 18:24:33 current [2 2 2 2 2 2 2 2] [2 2 0 0 0 0 0 0 0 0]
  38. 2020/03/06 18:24:34 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  39. apply a reduce task: {false 2 reduce 8}
  40. 2020/03/06 18:24:34 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  41. commit a reduce task: 2
  42. 2020/03/06 18:24:34 current [2 2 2 2 2 2 2 2] [2 2 2 0 0 0 0 0 0 0]
  43. 2020/03/06 18:24:35 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  44. apply a reduce task: {false 3 reduce 8}
  45. 2020/03/06 18:24:35 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  46. commit a reduce task: 3
  47. 2020/03/06 18:24:35 current [2 2 2 2 2 2 2 2] [2 2 2 2 0 0 0 0 0 0]
  48. 2020/03/06 18:24:35 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  49. apply a reduce task: {false 4 reduce 8}
  50. 2020/03/06 18:24:36 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  51. commit a reduce task: 4
  52. 2020/03/06 18:24:36 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 0 0 0 0 0]
  53. 2020/03/06 18:24:36 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  54. apply a reduce task: {false 5 reduce 8}
  55. 2020/03/06 18:24:36 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  56. commit a reduce task: 5
  57. 2020/03/06 18:24:36 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 0 0 0 0]
  58. 2020/03/06 18:24:37 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  59. apply a reduce task: {false 6 reduce 8}
  60. 2020/03/06 18:24:37 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  61. commit a reduce task: 6
  62. 2020/03/06 18:24:37 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 0 0 0]
  63. 2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  64. apply a reduce task: {false 7 reduce 8}
  65. 2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  66. commit a reduce task: 7
  67. 2020/03/06 18:24:38 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 2 0 0]
  68. 2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  69. apply a reduce task: {false 8 reduce 8}
  70. 2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  71. commit a reduce task: 8
  72. 2020/03/06 18:24:38 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 2 2 0]
  73. 2020/03/06 18:24:39 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  74. apply a reduce task: {false 9 reduce 8}
  75. 2020/03/06 18:24:39 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9
  76. commit a reduce task: 9
  77. 2020/03/06 18:24:39 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 2 2 2]

结语

没想到写文档和编码是一比一的时间关系, 文档之后需要在表达上再做修改,同时相应的知识点再加上一些 wiki 或者 博客知识链接.希望学弟学妹能有所得.

另外遇到了更多问题请联系我,我会共同解决.