Introduction

In this lab you’ll build a MapReduce system. You’ll implement a worker process that calls application Map and Reduce functions and handles reading and writing files, and a coordinator process that hands out tasks to workers and copes with failed workers. You’ll be building something similar to the MapReduce paper. (Note: this lab uses “coordinator” instead of the paper’s “master”.)
写一个worker 进程, 调用Map和Reduce 函数读写问及那,
写一个master进程,把任务分发给worker进程并cope with failed workers

Getting started

先把 6.824 git clone下来

We supply you with a simple sequential mapreduce implementation in src/main/mrsequential.go. It runs the maps and reduces one at a time, in a single process. We also provide you with a couple of MapReduce applications: word-count in mrapps/wc.go, and a text indexer in mrapps/indexer.go.

在src/main/mrsequential.go 提供了一个简单的顺序mapreduce 实现,可以在a single process 执行map后执行reduce
独立的MapReduce应用:
word-cout in mrapps/wc.go
文本索引 a text indexer in mrapps/indexer.go

  1. $ cd ~/6.824
  2. $ cd src/main
  3. $ go build -race -buildmode=plugin ../mrapps/wc.go
  4. $ rm mr-out*
  5. $ go run -race mrsequential.go wc.so pg*.txt
  6. $ more mr-out-0
  7. A 509
  8. ABOUT 2
  9. ACT 8
  10. ...

-race 代表跑go race检测

mrsequential.go leaves its output in the file mr-out-0. The input is from the text files named pg-xxx.txt.
输出保留在 mr-out-0, 输入来自pg-xxx.txt

Feel free to borrow code from mrsequential.go. You should also have a look at mrapps/wc.go to see what MapReduce application code looks like.

Your Job (moderate/hard)

Your job is to implement a distributed MapReduce, consisting of two programs, the coordinator and the worker.
写一个分布式MapReduce 一个master程序作为corrdinator 还有一个worker 程序

There will be just one coordinator process, and one or more worker processes executing in parallel. In a real system the workers would run on a bunch of different machines, but for this lab you’ll run them all on a single machine.
只执行一个master进程, worker进行多个执行, 在一个真实的系统中会在多个机器上运行,这个lab中在一个机器上就够了

The workers will talk to the coordinator via RPC. Each worker process will ask the coordinator for a task, read the task’s input from one or more files, execute the task, and write the task’s output to one or more files.
worker 程序通过PRC和coordinator通信, worker程序会向coordinator 要任务, 从一个或者多个文件中读取任务的输入, 执行任务,输出一个或多个文件
The coordinator should notice if a worker hasn’t completed its task in a reasonable amount of time (for this lab, use ten seconds), and give the same task to a different worker.
master程序因该要注意一个worker程序有没有在特定的时间内完成任务(这次实验室定位10s), 并将同一个任务交给另外一个worker

We have given you a little code to start you off. The “main” routines for the coordinator and worker are in main/mrcoordinator.go and main/mrworker.go; don’t change these files. You should put your implementation in mr/coordinator.go, mr/worker.go, and mr/rpc.go.

一些code给你开始, main程序在 main/mrcorrdinator.go 和mrworker.go,这些程序不饿能被更改, 应该放在rm/里面

Here’s how to run your code on the word-count MapReduce application. First, make sure the word-count plugin is freshly built:
如何运行 word-count application 的方法,首先build word count

$ go build -race -buildmode=plugin ../mrapps/wc.go

In the main directory, run the coordinator.

$ rm mr-out*
$ go run -race mrcoordinator.go pg-*.txt

The pg-.txt arguments to mrcoordinator.go are the input files; each file corresponds to one “split”, and is the input to one Map task. The -race flags runs go with its race detector.
pg-
是输入文件, 每一个文件一个”split”拆分

When the workers and coordinator have finished, look at the output in mr-out-. When you’ve completed the lab, the sorted union of the output files should match the sequential output, like this:
当workers 和 coordinator 完成的时候 mr-out-
是输出文件,当正确的时候,应该是这样

$ cat mr-out-* | sort | more
A 509
ABOUT 2
ACT 8
...

We supply you with a test script in main/test-mr.sh. The tests check that the wc and indexer MapReduce applications produce the correct output when given the pg-xxx.txt files as input. The tests also check that your implementation runs the Map and Reduce tasks in parallel, and that your implementation recovers from workers that crash while running tasks.

提供了个测试脚本在 main/test-mr.sh 给定pg-xxx.txt 作为文件输入的时候,检查wc 和indexer, 这个脚本同时也检查是否并行运行Map 和Reduce ,以及是否能从运行任务崩溃中回复

A few rules:

The map phase should divide the intermediate keys into buckets for nReduce reduce tasks, where nReduce is the number of reduce tasks — argument that main/mrcoordinator.go passes to MakeCoordinator(). So, each mapper needs to create nReduce intermediate files for consumption by the reduce tasks.
map phase 应该把imtermediate keys 划分为 nReduce reduce 任务的存储桶
其中nReduce 是main/mrmaster.go 传递给MakeMaster() 的参数

The worker implementation should put the output of the X’th reduce task in the file mr-out-X.
第x次reduce 任务放进mr-out-X中

A mr-out-X file should contain one line per Reduce function output. The line should be generated with the Go “%v %v” format, called with the key and value. Have a look in main/mrsequential.go for the line commented “this is the correct format”. The test script will fail if your implementation deviates too much from this format.

mr-out-X 应该每行是一个Reduce function 输出
以%v %v格式生成,并使用键和值进行调用
在main/mrsequential.go 查看注释为这是正确格式的行

You can modify mr/worker.go, mr/coordinator.go, and mr/rpc.go. You can temporarily modify other files for testing, but make sure your code works with the original versions; we’ll test with the original versions.
可以修改mr/worker mr/corrdinator.go mr/rpc.go

The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.

worker 应该将中间的Map输出放在目录中的文件中,worker以后可以再读取,作为Reduce任务的输入

main/mrcoordinator.go expects mr/coordinator.go to implement a Done() method that returns true when the MapReduce job is completely finished; at that point, mrcoordinator.go will exit.
main/mrcoordinator.go 期望mr/coordinator.go 实现Done() ,这样mrcoordinator.go可以退出

When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from call(): if the worker fails to contact the coordinator, it can assume that the coordinator has exited because the job is done, and so the worker can terminate too. Depending on your design, you might also find it helpful to have a “please exit” pseudo-task that the coordinator can give to workers.

当任务完成时候,worker 进程应该退出。 一个简单的实现方法就是利用call() 的return value
如果worker 无法和coordinator进行通信,这就认为coordinator 已经退出了, job is done, 所以worker 也要终止。 please exit tast?

Hints

The Guidance page has some tips on developing and debugging.

One way to get started is to modify mr/worker.go’s Worker() to send an RPC to the coordinator asking for a task. Then modify the coordinator to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.
开始的一个方法是修改mr/worker.go 的Worker() 将RPC 发送给 corrdinator 来请求任务。
修改coordinator 使尚未启动的映射任务的文件名进行响应. 然后修改worker 来读取文件并且响应map函数, 类似 mrsequential.go

The application Map and Reduce functions are loaded at run-time using the Go plugin package, from files whose names end in .so.
Go插件包从.so 结尾文件中加载Map 和 Reduce 函数

If you change anything in the mr/ directory, you will probably have to re-build any MapReduce plugins you use, with something like go build -race -buildmode=plugin ../mrapps/wc.go
修改mr 目录下的东西后需要重新 build

This lab relies on the workers sharing a file system. That’s straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.
跑在不同的机器上需要GFS这样的全局文件系统

A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.
约定中间转换的文件保存为mr-X-Y

The worker’s map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go’s encoding/json package. To write key/value pairs in JSON format to an open file:
worker map需要在reduce期间一种存储中间键/值对的方法。 一种可能性是使用Go的encoding/json包, 把键值对写进去

  enc := json.NewEncoder(file)
  for _, kv := ... {
    err := enc.Encode(&kv)

并且读文件这样

  dec := json.NewDecoder(file)
  for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
      break
    }
    kva = append(kva, kv)
  }

The map part of your worker can use the ihash(key) function (in worker.go) to pick the reduce task for a given key.
worker 的map部分可以用 ihash(key) 来选择reduce任务

You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.
你可以参考mrsequential.go 来读取Map 的input files, 同时能够对中键值进行排序, 以及将Reduce 输出存储在问及那里

The coordinator, as an RPC server, will be concurrent; don’t forget to lock shared data.
coordinator 作为RPC server, 是并发的, 不要忘记lock shared data

Use Go’s race detector, with go build -race and go run -race. test-mr.sh by default runs the tests with the race detector.
要用 race Go dector,test-mr.sh默认跑race dector

Workers will sometimes need to wait, e.g. reduces can’t start until the last map has finished. One possibility is for workers to periodically ask the coordinator for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the coordinator to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won’t prevent the coordinator from processing other RPCs.
Worker进程有时候需要等待,比如reduce操作到最后一个map完成后才能开始, 一种实现是worker定时向coordinator 寻求任务,并且用time.Sleep 等待, 每次请求之间都要睡眠, 另外一个可能性是相关的RPC程序有循环,等待时间是time.Sleep()或者sync.Cond。
Go在其自己的线程中为每个RPC运行处理程序, 因此一个处理程序在等待过程中不会组织主服务器处理其他RPC

The coordinator can’t reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the coordinator wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the coordinator wait for ten seconds; after that the coordinator should assume the worker has died (of course, it might not have).
coordinator 不能够可靠的分辨 crashed workers, 那种活着的但是由于某种问题stalled或者执行速度太慢的workers。你能做的就是让coordinator 等待固定的时间,到了时间就放弃任务重分配给另外一个worker。在这个实验中, coordinator 等待10s, 超过10s后coordinator 就认为这个worker 已经died

If you choose to implement Backup Tasks (Section 3.6), note that we test that your code doesn’t schedule extraneous tasks when workers execute tasks without crashing. Backup tasks should only be scheduled after some relatively long period of time (e.g., 10s).
如果你选择实现 Backup Tasks

To test crash recovery, you can use the mrapps/crash.go application plugin. It randomly exits in the Map and Reduce functions.
为了测试崩恢复, 可以使用mrapps/crash.go插件, 在Map 和 Reduce函数中随机退出

To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.
为了保证没人有看到部分写入的问及那, MapReduce的论文提到用临时文件并在写入后自动对其重命名的技巧,使用ioutil.TempFile创建临时文件,使用os.Rename 原子的方式重新命名

test-mr.sh runs all its processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there. You can temporarily modify test-mr.sh to exit after the failing test, so the script does not continue testing (and overwrite the output files).
test-mr.sh运行mr-tmp中所有进程,如果出现问题,可以在这里查看中间文件或者输出文件

test-mr-many.sh provides a bare-bones script for running test-mr.sh with a timeout (which is how we’ll test your code). It takes as an argument the number of times to run the tests. You should not run several test-mr.sh instances in parallel because the coordinator will reuse the same socket, causing conflicts.

test-mr-many.sh 提供了bare-bones脚本来跑test-mr.sh
你不能同时并行跑test-mr.sh


Go RPC sends only struct fields whose names start with capital letters. Sub-structures must also have capitalized field names.
Go RPC 只发送那些大写字母开头的struct fields, sub-structures 也要大写field names

When passing a pointer to a reply struct to the RPC system, the object that *reply points to should be zero-allocated. The code for RPC calls should always look like
当传递一个指针结构去RPC system的时候reply指针应该指向空的,代码如下

  reply := SomeType{}   
  call(..., &reply)

without setting any fields of reply before the call. If you don’t follow this requirement, there will be a problem when you pre-initialize a reply field to the non-default value for that datatype, and the server on which the RPC executes sets that reply field to the default value; you will observe that the write doesn’t appear to take effect, and that on the caller side, the non-default value remains.
如果你不遵守这个要求, 当你预先initialize a reply field, 你会发现write 不管用