06 WordCount入门示例

MapReduce 程序的业务编码分为两个大部分:

  • 一部分配置程序的运行信息;
  • 一部分编写该MapReduce程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类

MapReduce 程序编写规范

1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)

2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)

3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)

4、Mapper 中的业务逻辑写在 map()方法中

5、map()方法(maptask 进程)对每一个调用一次

6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式

7、Reducer 的业务逻辑写在 reduce()方法中

8、Reducetask 进程对每一组相同 k 的组调用一次 reduce()方法

9、用户自定义的 Mapper 和 Reducer 都要继承各自的父类

10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象

MapReduce 示例程序编写及编码规范

1、 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,比如指定 Mapper 类和 Reducer 类 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);

2、 该程序中的 TokenizerMapper 类继承了 Mapper 类

3、 该程序中的 IntSumReducer 类继承了 Reducer 类

代码

  1. package com.etc;
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. public class WordCount {
  15. /**
  16. * Mapper静态内部类 简单的统计单词出现次数 KEYIN
  17. * 默认情况下,是mapreduce所读取到的一行文本的起始偏移量,Long类型,在hadoop中有其自己的序列化类LongWriteable VALUEIN
  18. * 默认情况下,是mapreduce所读取到的一行文本的内容,hadoop中的序列化类型为Text KEYOUT
  19. * 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String VALUEOUT 是用户自定义逻辑输出的value,这里是单词出现的次数,Long
  20. *
  21. * @author root
  22. *
  23. */
  24. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  25. // 为变量赋值1,定义整数1,每个单词计数1次
  26. private final static IntWritable valueOut = new IntWritable(1);
  27. private Text keyOut = new Text();
  28. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  29. /*
  30. * 构造一个用来解析输入value值的StringTokerizer对象 Java默认的分隔符是 空格'' 制表符'\t' 换行符'\n' 回车符'\r'
  31. */
  32. StringTokenizer token = new StringTokenizer(value.toString());
  33. while (token.hasMoreElements()) {
  34. // 返回从当前位置到下一个分隔符的字符串
  35. keyOut.set(token.nextToken());
  36. // map方法输出键值对时,输出每个被拆分出来的单词,以及计数1
  37. context.write(keyOut, valueOut);
  38. }
  39. }
  40. }
  41. /**
  42. * Reducer静态内部类 第一个Text: 是传入的单词名称,是Mapper中传入的 第二个:LongWritable
  43. * 是该单词出现了多少次,这个是mapreduce计算出来的,比如 hello出现了几次 第三个Text: 是输出单词的名称 ,这里是要输出到文本中的内容
  44. * 第四个LongWritable: 是输出时显示出现了多少次,这里也是要输出到文本中的内容
  45. *
  46. * @author root
  47. *
  48. */
  49. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  50. private IntWritable result = new IntWritable();
  51. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  52. throws IOException, InterruptedException {
  53. int sum = 0;
  54. // 遍历
  55. for (IntWritable val : values) {
  56. sum += val.get();
  57. }
  58. // 为变量赋值递增
  59. result.set(sum);
  60. context.write(key, result);
  61. }
  62. }
  63. /**
  64. * 启动任务的运行
  65. * @param args
  66. * @throws Exception
  67. */
  68. public static void main(String[] args) throws Exception {
  69. // 提供对配置参数的访问
  70. Configuration conf = new Configuration();
  71. /*
  72. * hadoop框架中解析命令行参数的基本类。 它能够辨别一些标准的命令行参数,
  73. * 能够使应用程序轻易地指定namenode,jobtracker,以及其他额外的配置资源。
  74. * 在Hadoop中,用于执行MapReduce任务的机器角色有两个: 一个是JobTracker;另一个是TaskTracker;
  75. * JobTracker是用于调度工作的,TaskTracker是用于执行工作的。 一个Hadoop集群中只有一台JobTracker。
  76. */
  77. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  78. if (otherArgs.length < 2) {
  79. System.err.println("Usage: wordcount <in> [<in>...] <out>");
  80. // exit: 0表示正常退出,1表示中断退出,2表示异常退出
  81. System.exit(2);
  82. }
  83. /*
  84. * import org.apache.hadoop.mapreduce.Job; 存储了该程序运行的必要信息
  85. */
  86. Job job = Job.getInstance(conf, "word count");
  87. // 使得hadoop可以根据类包,找到jar包在哪里
  88. job.setJarByClass(WordCount.class);
  89. // 指定Mapper的类
  90. job.setMapperClass(TokenizerMapper.class);
  91. job.setCombinerClass(IntSumReducer.class);
  92. // 指定reduce的类
  93. job.setReducerClass(IntSumReducer.class);
  94. // 设置最终输出的类型
  95. job.setOutputKeyClass(Text.class);
  96. job.setOutputValueClass(IntWritable.class);
  97. // 指定输入文件的位置,这里为了灵活,接收外部参数
  98. for (int i = 0; i < otherArgs.length - 1; ++i) {
  99. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  100. }
  101. // 指定输入文件的位置,这里接收启动参数
  102. FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
  103. System.exit(job.waitForCompletion(true) ? 0 : 1);
  104. }
  105. }

运行步骤:

  1. 使用Hdfs.java,长传本地文件到hadoop0

  2. 运行WordCount.java,运行参数:

    1. hdfs://hadoop0:9000/input2/file1.txt hdfs://hadoop0:9000/input2/file2.txt hdfs://hadoop0:9000/output2/wordcount
  1. 运行结果

06 WordCount入门示例 - 图1