Mapper类方法

  1. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  2. public Mapper() {
  3. }
  4. protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  5. }
  6. protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  7. context.write(key, value);
  8. }
  9. protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  10. }
  11. /**
  12. * mapper阶段调用run方法执行maptask任务
  13. * 首先调用setup方法 -- 可做准备工作
  14. * 再次调用map方法 -- 具体业务逻辑
  15. * 最后调用cleanup方法 -- 资源释放等
  16. * 注意:每个分片都会调用一次run方法,见下图
  17. * context -- 上下文
  18. * job的conf配置信息
  19. * 文件解析器的reader -- LineReportReader
  20. * 当前执行的文件分片
  21. * job的相关信息,jobId、状态等
  22. */
  23. public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  24. this.setup(context);
  25. try {
  26. while(context.nextKeyValue()) {
  27. this.map(context.getCurrentKey(), context.getCurrentValue(), context);
  28. }
  29. } finally {
  30. this.cleanup(context);
  31. }
  32. }
  33. public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  34. public Context() {
  35. }
  36. }

**
image.png
image.png

Reducer类方法

  1. public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  2. public Reducer() {
  3. }
  4. protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  5. }
  6. protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  7. Iterator i$ = values.iterator();
  8. while(i$.hasNext()) {
  9. VALUEIN value = i$.next();
  10. context.write(key, value);
  11. }
  12. }
  13. protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  14. }
  15. /**
  16. * reducer阶段调用run方法执行reducetask任务
  17. * 首先调用setup方法 -- 可做准备工作
  18. * 再次调用reduce方法 -- 具体业务逻辑
  19. * 最后调用cleanup方法 -- 资源释放等
  20. *
  21. */
  22. public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  23. this.setup(context);
  24. try {
  25. while(context.nextKey()) {
  26. this.reduce(context.getCurrentKey(), context.getValues(), context);
  27. Iterator<VALUEIN> iter = context.getValues().iterator();
  28. if (iter instanceof ValueIterator) {
  29. ((ValueIterator)iter).resetBackupStore();
  30. }
  31. }
  32. } finally {
  33. this.cleanup(context);
  34. }
  35. }
  36. public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  37. public Context() {
  38. }
  39. }
  40. }

image.png