例子

接上面的单词统计

image.png

WorldCountCombiner类

  1. package com.hadooprpc;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class WorldCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
  7. @Override
  8. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  9. int count = 0;
  10. for (IntWritable v : values) {
  11. count += v.get();
  12. }
  13. context.write(key,new IntWritable(count));
  14. }
  15. }

WorldCountDriver类

  1. package com.hadooprpc;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. /**
  12. * 本类是客户端用来指定WorldCount job程序运行时所需要的很多参数
  13. * 比如:指定那个类作为map阶段的业务逻辑,那个类作为reduce阶段的业务逻辑类
  14. * 指定那个组件作为数据的读取组件,数据结果输出组件
  15. * ....
  16. * 以及其他各种所需要的参数
  17. */
  18. public class WorldCountDriver {
  19. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  20. Configuration conf = new Configuration();
  21. //设置权限,也可以在vm那边伪造
  22. System.setProperty("HADOOP_USER_NAME", "root");
  23. conf.set("fs.defaultFS","hdfs://master:9000");
  24. conf.set("mapreduce.framework.name","yarn");
  25. conf.set("yarn.resourcemanager.hostname","master");
  26. Job job = Job.getInstance(conf);
  27. //告诉框架,我们程序的位置
  28. // job.setJar("/root/wordCount.jar");
  29. //上面这样写,不好,换了路径又要重新写,我们改为用他的类加载器加载他自己
  30. job.setJarByClass(WorldCountDriver.class);
  31. //告诉框架,我们程序所用的mapper类和reduce类是什么
  32. job.setMapperClass(WorldCountMapper.class);
  33. job.setReducerClass(WorldCountReducer.class);
  34. //告诉框架,我们程序所用的mapper类和reduce类是什么
  35. job.setMapperClass(WorldCountMapper.class);
  36. job.setReducerClass(WorldCountReducer.class);
  37. //告诉框架我们程序输出的类型
  38. job.setMapOutputKeyClass(Text.class);
  39. job.setMapOutputValueClass(IntWritable.class);
  40. job.setOutputKeyClass(Text.class);
  41. job.setOutputValueClass(IntWritable.class);
  42. //设置combainer
  43. job.setCombinerClass(WorldCountCombiner.class);
  44. //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么
  45. //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件
  46. job.setInputFormatClass(TextInputFormat.class);
  47. //告诉框架,我们要处理的数据文件在那个路径下
  48. FileInputFormat.setInputPaths(job,new Path("/worldCount/input/"));
  49. //告诉框架我们的处理结果要输出到什么地方
  50. FileOutputFormat.setOutputPath(job,new Path("/worldCount/output/"));
  51. //这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了
  52. // job.submit();
  53. //提交后,然后等待服务器端返回值,看是不是true
  54. boolean res = job.waitForCompletion(true);
  55. //设置成功就退出码为0
  56. System.exit(res?0:1);
  57. }
  58. }

其他类和上面案列一样