简单总结

最近好几个学弟提到了 6.824, 今天下午有时间,我也学习一下.花了大概 4 小时做了初版通过了所有测试,一小时看代码设计,两个小时编码,一个小时编码加测试. 大概也就 300 行代码.总体难度,对我来说正好能学习其中讲授知识部分. 我正好使用 go 语言(快两年),了解 rpc(自己设计过也给 grpc 贡献有代码), 看过一点 MapReduce paper (之前看 chubby spanner 的时候)遇到了问题还比较多,其中 IDE 能力不同的问题还挺奇特的,也就现在有.
先花时间整理一下大概有哪些问题,
然后描述设计和编码是遇到的问题和解决方案,
最后展示一下关键代码和测试结果.
希望能够总结一些适合复用的解决方案,提醒自己.也帮助他人学习如何解决相应种类问题.
编码问题
- 首先是 vscode 对 golang 的支持,在 你的代码不符合 golang 规范的时候, vscode 对于代码提示的功能非常小,相当于白板编程(那我要 gopls 有什么用?). 比较重要的问题是 代码跳转 用不了,另外是 类型检查 和 拼写错误会较多.
这个时候只能凭经验来补代码提示和库函数运用, 这里我用了 tabnine 这个插件用作代码提示. tabnine 用 ai 的方式预测接下来的输入,对于重复的代码片段很有用,尽量保证了我拼写的正确.
最后迫于更快的代码检查,还是转用了 goland. goland 对于开源项目工作者是免费的,可以了解一下.
- go 1.14 版本的问题, 我遇到的问题有两个,
- 一个是 从 1.11 开始启用的 go module 后,引用本地包里的文件无法通过编译的问题 如 6.824 中
"../mr"这种相对目录的方式通不过编译. - 另一个是 goland 2019.3 这个版本,对于 1.14 的 gomodule 是不支持的(因为 go 1.14 刚出),这个在 2019.3.3 修复了
- 一个是 从 1.11 开始启用的 go module 后,引用本地包里的文件无法通过编译的问题 如 6.824 中
于是我在 6.824 的根目录下go mod init ds 并把 "../mr"改为了 "ds/mr" ,并且在运行调试的时候又转为了 vscode
原有代码和工作要求
解决了环境问题,接下来明晰一下工作要求. 首先 6.824 提供了串行执行的 MapReduce 示例, 对 MapReduce 了解更多请看论文, 对 本 lab 了解更多请直接看最上链接.我们直接简要分析,提出主要问题和解决方案.
我们的主要问题就是,分布式设计和串行化设计的差异和考虑的问题,以及我们的方案,除此之外, go 语法啊, 怎样更优雅更好看, go 的插件工作模式啊,属于其他问题,自己下来日常养成.首先你应该先看 lab, paper, 执行 lab 中的程序直到开始自己编码.
我们看 lab 1 中的 Your Job部分.
要求是, 实现一个系统,其中 master 用于分配 map 和 reduce 任务, worker 负责申请任务并执行.
这里有个特别的参数 nReduce ,他可以理解为我们需要 nReduce 个桶.一个桶负责装一类信息,在这里我们使用桶来保证相同 key 的 keyvalue 对放在同一个文件中等待处理.
问题一:网络通信
通信问题,首先我们分析 master 和 worker 需要通信, 如何通信是首要问题.
解决:
lab 帮我们决定了使用 rpc 通信,同时 master 和 worker 通过 unix domain socket 作为通信方式, 用 http 服务器提供 rpc 服务. unix 通信我写过例子代码. 由于代码编写 lab 中已经完成,理解即可.需要注意的是,
//// start a thread that listens for RPCs from worker.go//func (m *Master) server() {rpc.Register(m)rpc.HandleHTTP()sockname := masterSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)}
以上是 golang 标准库 rpc 包的便利操作.快速启动了一个 rpc 服务.
问题二:共享文件
存储问题, 我们需要的工作是处理文件啊,文件内容跟着网络到处传输吗?这样网络开销也太大了,我们看过 paper 知道,当年的瓶颈就是网络传输的带宽.
解决:
我们现在实现的版本是在本地执行的一个分布式设计.这是因为 MapReduce 论文中提到, 所有实例应该共享文件系统, paper 中是 GFS, 我们本地设计就是本机的文件系统.同样解决了这个问题.
解决了以上问题,意味着这个系统的数据能具有一致性了.接下来设计该系统的处理逻辑.
问题三:任务切分
我们知道, map 和 reduce 的输入输出都是文件, 我们现在同一文件系统,能够相互交流,我们在交流中携带需要处理的文件路径即可. 根据 MapReduce 的思想,首先我们需要切分任务.
解决:
- 我们程序的输入是 n 个文件,
go run mrsequential.go wc.so pg*.txt命令还记得吗, 其中 pg.txt 就是我们的输入文件, 是通配符.意思是下面这 8 个文件名都会输入到我们程序里
for _, filename := range os.Args[2:]用了这行代码读取
那么根据 paper, 这里就有 8 个 map 任务, 每个 map 任务读取一个文件中的内容并处理后输出.
- 这里输出需要注意,为什么要给你 nReduce 这个参数啊? 你想你从各个文章中读取的数据 (KV 对), reduce 要怎样工作才能把 map 的输出都 reduce 呢?
我们先看之前 串行化程序 mrsequential.go怎么处理的. 他用 sort.Sort(ByKey(intermediate))排了序,这意味着, Key 一样的 kv 对聚集在一起了, 这样 reduce 工作代码很好写对吧.一溜下来保证相同 key 的 kv 对都能处理到.
而在分布式设计中,我们使用 桶 和 hash key (hash 保证了相同的 key 算出的结果一样)的设计,将相应的 key 保证分在同一组文件中,保证了 reduce 编码的简单.(注意, paper 中说了,系统的目的就是让编码人员只关注 map function 和 reduce function 就能完成工作,所以保证 reduce 编码的简单是非常有意义的)
具体为我们首先对 key 做 hash 编码.将 string 编码为 数字, 并对数字结果 取模 N 运算, N 为传入的 nReduce 值.运算的结果代表了相应 kv 对应该放于第几个桶中,在这里就是归入哪一个输出文件.
综上,我们有 nReduce 个 reduce 任务.
- lab 中有指导文件命名方式,所以这里不把他作为单独的一个问题展示,但规范文件命名其实也能简化系统,也是通信消息设计的一部分.假如我们有 2 个 map 任务, nReduce 值为 2. 那么我们的流程如下:
- 首先我们执行完 map 任务, 会有 map * nReduce 个文件生成.格式为
mr-tmp-X-Y其中 X 是 map 的任务号,代表第几个输入文件, Y 是 Reduce 的任务号,也代表着这是第几个桶.
- 首先我们执行完 map 任务, 会有 map * nReduce 个文件生成.格式为

这四个文件是 paper 中说的中间文件.
b. 执行 reduce 任务.将 reduce 任务号相同的,即一个桶中的数据进行处理,一个桶所有数据处理之后输出文件, 格式为 mr-out-NN 为 reduce 任务号.
- 这里额外提一句, 为什么输出这么多文件就结束了,不用合并吗? 确实不用.
我们查看 test-mr.sh 文件查看测试规则
# generate the correct output../mrsequential ../../mrapps/wc.so ../pg*txt || exit 1sort mr-out-0 > mr-correct-wc.txtrm -f mr-out*#......省略部分sort mr-out* | grep . > mr-wc-allif cmp mr-wc-all mr-correct-wc.txtthenecho '---' wc test: PASS
- 可以看到测试程序首先 使用 mrsequential 生成正确的解答 排序 后保存在一个文件中
- 然后将我们符合
mr-out*格式的输出文件读取出来整体 排序 后保存为一个文件. - 最后使用 cmp 命令进行判定两个文件是否一致.
问题四: 消息设计
我们完成了上述设计之后,开始实际逻辑的构建,我进行了以下设计, master 和 worker 通过 rpc 消息交互来分配任务,任务完成之后 worker 通知 master,并进行下一次任务分配.直到没有任务的时候,worker 退出.(如果没有任务的同时,所有任务都已完成 Master 也会退出)
- 这里看两个设计,一个是 commit 设计,worker 在完成 相应任务之后需要进行一次新的 rpc,这个增加了网络开销和不确定性(除了网络之外,还有安全问题,多了一个可以攻击的渠道)
但是注意 paper 中提到的是 Master 是无法得知 Worker 状态的.这里设计一个新的 rpc 服务来交换 task 是否完成的信息,这是因为我认为, worker 的边界就是收到任务处理任务, master 只需要知道 worker 有没有完成相应任务进行了.不仅如此,我认为这还简化了编码任务.
- Worker 退出设计, worker 在获取任务的时候,如果 Master 没有任务给他,怎么办呢? 我们挂起一个循环轮询吗?这个情况, master 是所有任务完成了吗?还是说 master 所有任务已经分配出去?
- 如果任务已经完成了,我这个 worker 还要一直轮询占用资源吗? 所以选择退出.
- 我们一直轮询会不会造成服务器负载?所以每一次轮询我们间隔 time.Sleep 一段时间
- 如果任务都没有完成,只是当前没有任务可以分配,如果 worker 退出了,没有启动 worker 机制或者保活 n 个 worker 的机制(你懂得,就像 kubernetes 的 pods 一样),那么接下来还有任务谁来干活?(考虑当前最后一个任务正在一个 worker 处理,但是这个 worker 不给力,挂了,master 看到这个任务超时了,重新分配给另一个 worker,但是其他 worker 在这个处理时间里都已经收到没事干的消息,都退出了,谁来干呢? 我在这里针对这一个场景(设计一定要针对具体需求) 设计了一个重试机制(巧的是我给 grpc 贡献的代码也是 retry 的代码). 当没有任务时,继续重复轮询 3 次,每次间隔 0.5 s. 尽量保证当前工作量的容错(现在就是处理几个单词)
那么我们有两种 rpc 服务,分别设计 发送和返回消息. 如下
其中 Workerid 用来区分 worker 是谁,帮助 master 掌控 worker 状态.(是不是你小子领了任务从来不交!)
WorkReply 用于获取任务信息, 如当前 taskid , MapReduce 当前 工作类型是 map 还是 reduce,
BucketNumber 用于指导 map 或者 reduce 任务应读取和输出多少个文件. Isfinished 用于 Master 返回是否所有任务都已完成.
最后每有一个任务,就会有一次提交(或者提交超时),该设计也让 master 状态管理代码逻辑更简洁.
问题五 状态管理
众所周知,分布式一大问题就是时序.虽然状态是万恶之源,但现实世界就是现实世界,熵增不可改变,复杂度只能转移,我们尽量简化即可.
关于这个系统,我们需要管理
- map 任务状态: 有多少个 map 任务,完成了多少
- reduce 任务状态:有多少个 reduce 任务,完成了多少.同时 reduce 还有在 map 任务都完成后进行
- worker 状态: 谁在干话,干了多久?
任务则设计三个状态,
- TaskIdle: 任务闲置,没人管 ing o(╥﹏╥)o
- TaskWorking: 任务工作中, (你咋知道我在磨洋工?因为你超时了!)
- TaskCommit: 任务完成!
Master 则针对每一个 map 存储状态 mapTasks []int,序号代表任务编号,reduce 同理. worker 用 map[string]int这是由于 workerid 在 worker 端生成,难以掌控,所以用 string.
则如下:
问题六:尽量健壮
我们希望我们的程序能处理任何问题吗?不是的.
但我们至少希望针对目标问题,具有鲁棒性,更强,更强壮.♂!
健壮的代码设计能处理很多的错误.针对意料之外的错误也有一定恢复能力.
之前提到的 重试机制,现在设计的超时机制, 共享锁机制都是增强的设计(当然正常情况下也需要更多开销 😁,但是一旦发生错误,这些开销都是值得的)
锁
所有人都知道,共享数据,并发! 不加锁, 哼哼,就会有及其稀奇古怪,难以定位调试的问题出现.我们在访问这些数据的时候记得加锁.m.mu.Lock() 也不要忘记解锁 m.mu.UnLock()
超时
Master 在每一次分配任务的时候,计时处理,如果 worker 超时依旧没有提交任务,我们认为这个 worker 已经无法完成任务,并将这个任务重新分配给另一个 worker
ctx, _ := context.WithTimeout(context.Background(), m.timeout)go func() {select {case <-ctx.Done():{m.mu.Lock()if m.workerCommit[args.Workerid] != TaskCommit && m.reduceTasks[k] != TaskCommit {m.reduceTasks[k] = TaskIdlelog.Println("[Error]:", "worker:", args.Workerid, "reduce task:", k, "timeout")}m.mu.Unlock()}}}()
Code
mr/master.go
package mrimport ("context""errors""log""net""net/http""net/rpc""os""sync""time")const (TaskIdle = iotaTaskWorkingTaskCommit)type Master struct {// Your definitions here.files []stringnReduce int//init with 0mapTasks []intreduceTasks []intmapCount int//init with -1workerCommit map[string]intallCommited bool//init with 10 secondstimeout time.Durationmu sync.RWMutex}// Your code here -- RPC handlers for the worker to call.func (m *Master) Work(args *WorkArgs, reply *WorkReply) error {m.mu.Lock()defer m.mu.Unlock()// first for map workfor k, v := range m.files {if m.mapTasks[k] != TaskIdle {continue}reply.Taskid = kreply.Filename = vreply.MapReduce = "map"reply.BucketNumber = m.nReducereply.Isfinished = falsem.workerCommit[args.Workerid] = TaskWorkingm.mapTasks[k] = TaskWorking// log.Println("a worker", args.Workerid, "apply a map task:", *reply)ctx, _ := context.WithTimeout(context.Background(), m.timeout)go func() {select {case <-ctx.Done():{m.mu.Lock()defer m.mu.Unlock()if m.workerCommit[args.Workerid] != TaskCommit && m.mapTasks[k] != TaskCommit {m.mapTasks[k] = TaskIdlelog.Println("[Error]:", "worker:", args.Workerid, "map task:", k, "timeout")}}}}()return nil}// then dispatch reduce workfor k, v := range m.reduceTasks {if m.mapCount != len(m.files) {return nil}if v != TaskIdle {continue}reply.Taskid = kreply.Filename = ""reply.MapReduce = "reduce"reply.BucketNumber = len(m.files)reply.Isfinished = falsem.workerCommit[args.Workerid] = TaskWorkingm.reduceTasks[k] = TaskWorkingctx, _ := context.WithTimeout(context.Background(), m.timeout)go func() {select {case <-ctx.Done():{m.mu.Lock()if m.workerCommit[args.Workerid] != TaskCommit && m.reduceTasks[k] != TaskCommit {m.reduceTasks[k] = TaskIdlelog.Println("[Error]:", "worker:", args.Workerid, "reduce task:", k, "timeout")}m.mu.Unlock()}}}()log.Println("a worker", args.Workerid, "apply a reduce task:", *reply)return nil}for _, v := range m.workerCommit {if v == TaskWorking {reply.Isfinished = falsereturn nil}}reply.Isfinished = truereturn errors.New("worker apply but no tasks to dispatch")}func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {log.Println("a worker", args.Workerid, "commit a "+args.MapReduce+" task:", args.Taskid)m.mu.Lock()switch args.MapReduce {case "map":{m.mapTasks[args.Taskid] = TaskCommitm.workerCommit[args.Workerid] = TaskCommitm.mapCount++}case "reduce":{m.reduceTasks[args.Taskid] = TaskCommitm.workerCommit[args.Workerid] = TaskCommit}}m.mu.Unlock()log.Println("current", m.mapTasks, m.reduceTasks)for _, v := range m.mapTasks {if v != TaskCommit {return nil}}for _, v := range m.reduceTasks {if v != TaskCommit {return nil}}m.allCommited = truelog.Println("all tasks completed")return nil}//// an example RPC handler.//// the RPC argument and reply types are defined in rpc.go.//func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {log.Println("a worker")reply.Y = args.X + 1return nil}//// start a thread that listens for RPCs from worker.go//func (m *Master) server() {rpc.Register(m)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := masterSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)}//// main/mrmaster.go calls Done() periodically to find out// if the entire job has finished.//func (m *Master) Done() bool {// Your code here.return m.allCommited}//// create a Master.// main/mrmaster.go calls this function.// nReduce is the number of reduce tasks to use.//func MakeMaster(files []string, nReduce int) *Master {m := Master{files: files,nReduce: nReduce,mapTasks: make([]int, len(files)),reduceTasks: make([]int, nReduce),workerCommit: make(map[string]int),allCommited: false,timeout: 10 * time.Second,}log.Println("[init] with:", files, nReduce)m.server()return &m}
mr/worker.go
package mrimport ("crypto/rand""encoding/json""fmt""hash/fnv""io/ioutil""log""net/rpc""os""sort""strconv""time")//// Map functions return a slice of KeyValue.//type KeyValue struct {Key stringValue string}// for sorting by key.type ByKey []KeyValue// for sorting by key.func (a ByKey) Len() int { return len(a) }func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }//// use ihash(key) % NReduce to choose the reduce// task number for each KeyValue emitted by Map.//func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)}func genWorkerID() (uuid string) {// generate 32 bits timestampunix32bits := uint32(time.Now().UTC().Unix())buff := make([]byte, 12)numRead, err := rand.Read(buff)if numRead != len(buff) || err != nil {panic(err)}return fmt.Sprintf("%x-%x-%x-%x-%x-%x\n", unix32bits, buff[0:2], buff[2:4], buff[4:6], buff[6:8], buff[8:])}//// main/mrworker.go calls this function.//func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.// uncomment to send the Example RPC to the master.// CallExample()workerId := genWorkerID()retry := 3// for to get tasksfor {args := WorkArgs{Workerid: workerId}reply := WorkReply{}working := call("Master.Work", &args, &reply)// log.Println(working, reply.Isfinished)if reply.Isfinished || !working {log.Println("finished")return}log.Println("task info:", reply)//working switch map or reduceswitch reply.MapReduce {case "map":MapWork(reply, mapf)retry = 3case "reduce":ReduceWork(reply, reducef)retry = 3default:log.Println("error reply: would retry times:", retry)if retry < 0 {return}retry--}commitArgs := CommitArgs{Workerid: workerId, Taskid: reply.Taskid, MapReduce: reply.MapReduce}commitReply := CommitReply{}_ = call("Master.Commit", &commitArgs, &commitReply)time.Sleep(500 * time.Millisecond)}}// get tasks and write to mr-tmp-taskid-reduceidfunc MapWork(task WorkReply, mapf func(string, string) []KeyValue) {// check task infofile, err := os.Open(task.Filename)if err != nil {log.Fatalf("cannot open %v", task.Filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", task.Filename)}kva := mapf(task.Filename, string(content))sort.Sort(ByKey(kva))// create file bucketstmpName := "mr-tmp-" + strconv.Itoa(task.Taskid)var fileBucket = make(map[int]*json.Encoder)for i := 0; i < task.BucketNumber; i++ {ofile, _ := os.Create(tmpName + "-" + strconv.Itoa(i))fileBucket[i] = json.NewEncoder(ofile)defer ofile.Close()}for _, kv := range kva {key := kv.Keyreduce_idx := ihash(key) % task.BucketNumbererr := fileBucket[reduce_idx].Encode(&kv)if err != nil {log.Fatal("Unable to write to file")}}}// get reduce task and reduce all reduce id = task.Taskid files.func ReduceWork(task WorkReply, reducef func(string, []string) string) {//check task infointermediate := []KeyValue{}// read mr-tmp n files to add inermediate then writefor mapTaskNumber := 0; mapTaskNumber < task.BucketNumber; mapTaskNumber++ {filename := "mr-tmp-" + strconv.Itoa(mapTaskNumber) + "-" + strconv.Itoa(task.Taskid)f, err := os.Open(filename)if err != nil {log.Fatal("Unable to read from: ", filename)}defer f.Close()decoder := json.NewDecoder(f)var kv KeyValuefor decoder.More() {err := decoder.Decode(&kv)if err != nil {log.Fatal("Json decode failed, ", err)}intermediate = append(intermediate, kv)}}sort.Sort(ByKey(intermediate))// write to "mr-out-Y" Y is reduce task idi := 0ofile, err := os.Create("mr-out-" + strconv.Itoa(task.Taskid+1))if err != nil {log.Fatal("Unable to create file: ", ofile)}defer ofile.Close()log.Println("complete to ", task.Taskid, "start to write in to ", ofile)for i < len(intermediate) {j := i + 1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, intermediate[k].Value)}output := reducef(intermediate[i].Key, values)// this is the correct format for each line of Reduce output.fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)i = j}ofile.Close()}//// example function to show how to make an RPC call to the master.//// the RPC argument and reply types are defined in rpc.go.//func CallExample() {// declare an argument structure.args := ExampleArgs{}// fill in the argument(s).args.X = 99// declare a reply structure.reply := ExampleReply{}// send the RPC request, wait for the reply.call("Master.Example", &args, &reply)// reply.Y should be 100.fmt.Printf("reply.Y %v\n", reply.Y)}//// send an RPC request to the master, wait for the response.// usually returns true.// returns false if something goes wrong.//func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := masterSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false}
mr/rpc.go
package mr//// RPC definitions.//// remember to capitalize all names.//import ("os""strconv")//// example to show how to declare the arguments// and reply for an RPC.//type ExampleArgs struct {X int}type ExampleReply struct {Y int}// Add your RPC definitions here.type WorkArgs struct {Workerid string}type WorkReply struct {Isfinished boolTaskid intFilename stringMapReduce stringBucketNumber int}type CommitArgs struct {Workerid stringTaskid intMapReduce string}type CommitReply struct {IsOK bool}// Cook up a unique-ish UNIX-domain socket name// in /var/tmp, for the master.// Can't use the current directory since// Athena AFS doesn't support UNIX-domain sockets.func masterSock() string {s := "/var/tmp/824-mr-"s += strconv.Itoa(os.Getuid())return s}
测试
* PASSED ALL TESTS
ArideMacBook-Air:main abser$ go run mrmaster.go pg*.txt2020/03/06 18:24:19 [init] with: [pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt] 102020/03/06 18:24:19 rpc.Register: method "Done" has 1 input parameters; needs exactly three2020/03/06 18:24:24 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 02020/03/06 18:24:24 current [2 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:25 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 12020/03/06 18:24:25 current [2 2 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:26 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 22020/03/06 18:24:26 current [2 2 2 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:28 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 32020/03/06 18:24:28 current [2 2 2 2 0 0 0 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:29 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 42020/03/06 18:24:29 current [2 2 2 2 2 0 0 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:30 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 52020/03/06 18:24:30 current [2 2 2 2 2 2 0 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:31 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 62020/03/06 18:24:31 current [2 2 2 2 2 2 2 0] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:32 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a map task: 72020/03/06 18:24:32 current [2 2 2 2 2 2 2 2] [0 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 0 reduce 8}2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 02020/03/06 18:24:33 current [2 2 2 2 2 2 2 2] [2 0 0 0 0 0 0 0 0 0]2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 1 reduce 8}2020/03/06 18:24:33 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 12020/03/06 18:24:33 current [2 2 2 2 2 2 2 2] [2 2 0 0 0 0 0 0 0 0]2020/03/06 18:24:34 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 2 reduce 8}2020/03/06 18:24:34 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 22020/03/06 18:24:34 current [2 2 2 2 2 2 2 2] [2 2 2 0 0 0 0 0 0 0]2020/03/06 18:24:35 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 3 reduce 8}2020/03/06 18:24:35 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 32020/03/06 18:24:35 current [2 2 2 2 2 2 2 2] [2 2 2 2 0 0 0 0 0 0]2020/03/06 18:24:35 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 4 reduce 8}2020/03/06 18:24:36 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 42020/03/06 18:24:36 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 0 0 0 0 0]2020/03/06 18:24:36 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 5 reduce 8}2020/03/06 18:24:36 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 52020/03/06 18:24:36 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 0 0 0 0]2020/03/06 18:24:37 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 6 reduce 8}2020/03/06 18:24:37 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 62020/03/06 18:24:37 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 0 0 0]2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 7 reduce 8}2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 72020/03/06 18:24:38 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 2 0 0]2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 8 reduce 8}2020/03/06 18:24:38 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 82020/03/06 18:24:38 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 2 2 0]2020/03/06 18:24:39 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9apply a reduce task: {false 9 reduce 8}2020/03/06 18:24:39 a worker 5e6224d8-32c6-36e0-9994-644d-95dc9da9commit a reduce task: 92020/03/06 18:24:39 current [2 2 2 2 2 2 2 2] [2 2 2 2 2 2 2 2 2 2]
结语
没想到写文档和编码是一比一的时间关系, 文档之后需要在表达上再做修改,同时相应的知识点再加上一些 wiki 或者 博客知识链接.希望学弟学妹能有所得.
另外遇到了更多问题请联系我,我会共同解决.
