Mapper类方法
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Mapper() {}protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {context.write(key, value);}protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}/*** mapper阶段调用run方法执行maptask任务* 首先调用setup方法 -- 可做准备工作* 再次调用map方法 -- 具体业务逻辑* 最后调用cleanup方法 -- 资源释放等* 注意:每个分片都会调用一次run方法,见下图* context -- 上下文* job的conf配置信息* 文件解析器的reader -- LineReportReader* 当前执行的文件分片* job的相关信息,jobId、状态等*/public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {this.setup(context);try {while(context.nextKeyValue()) {this.map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {this.cleanup(context);}}public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Context() {}}
Reducer类方法
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Reducer() {}protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {Iterator i$ = values.iterator();while(i$.hasNext()) {VALUEIN value = i$.next();context.write(key, value);}}protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {}/*** reducer阶段调用run方法执行reducetask任务* 首先调用setup方法 -- 可做准备工作* 再次调用reduce方法 -- 具体业务逻辑* 最后调用cleanup方法 -- 资源释放等**/public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {this.setup(context);try {while(context.nextKey()) {this.reduce(context.getCurrentKey(), context.getValues(), context);Iterator<VALUEIN> iter = context.getValues().iterator();if (iter instanceof ValueIterator) {((ValueIterator)iter).resetBackupStore();}}} finally {this.cleanup(context);}}public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public Context() {}}}


