06 WordCount入门示例
MapReduce 程序的业务编码分为两个大部分:
- 一部分配置程序的运行信息;
- 一部分编写该MapReduce程序的业务逻辑,并且业务逻辑的 map 阶段和 reduce 阶段的代码分别继 承 Mapper 类和 Reducer 类
MapReduce 程序编写规范
1、用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
2、Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
3、Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
4、Mapper 中的业务逻辑写在 map()方法中
5、map()方法(maptask 进程)对每一个
6、Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
7、Reducer 的业务逻辑写在 reduce()方法中
8、Reducetask 进程对每一组相同 k 的
9、用户自定义的 Mapper 和 Reducer 都要继承各自的父类
10、整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象
MapReduce 示例程序编写及编码规范
1、 该程序有一个 main 方法,来启动任务的运行,其中 job 对象就存储了该程序运行的必要 信息,比如指定 Mapper 类和 Reducer 类 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);
2、 该程序中的 TokenizerMapper 类继承了 Mapper 类
3、 该程序中的 IntSumReducer 类继承了 Reducer 类
代码
package com.etc;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {/*** Mapper静态内部类 简单的统计单词出现次数 KEYIN* 默认情况下,是mapreduce所读取到的一行文本的起始偏移量,Long类型,在hadoop中有其自己的序列化类LongWriteable VALUEIN* 默认情况下,是mapreduce所读取到的一行文本的内容,hadoop中的序列化类型为Text KEYOUT* 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String VALUEOUT 是用户自定义逻辑输出的value,这里是单词出现的次数,Long** @author root**/public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {// 为变量赋值1,定义整数1,每个单词计数1次private final static IntWritable valueOut = new IntWritable(1);private Text keyOut = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {/** 构造一个用来解析输入value值的StringTokerizer对象 Java默认的分隔符是 空格'' 制表符'\t' 换行符'\n' 回车符'\r'*/StringTokenizer token = new StringTokenizer(value.toString());while (token.hasMoreElements()) {// 返回从当前位置到下一个分隔符的字符串keyOut.set(token.nextToken());// map方法输出键值对时,输出每个被拆分出来的单词,以及计数1context.write(keyOut, valueOut);}}}/*** Reducer静态内部类 第一个Text: 是传入的单词名称,是Mapper中传入的 第二个:LongWritable* 是该单词出现了多少次,这个是mapreduce计算出来的,比如 hello出现了几次 第三个Text: 是输出单词的名称 ,这里是要输出到文本中的内容* 第四个LongWritable: 是输出时显示出现了多少次,这里也是要输出到文本中的内容** @author root**/public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;// 遍历for (IntWritable val : values) {sum += val.get();}// 为变量赋值递增result.set(sum);context.write(key, result);}}/*** 启动任务的运行* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 提供对配置参数的访问Configuration conf = new Configuration();/** hadoop框架中解析命令行参数的基本类。 它能够辨别一些标准的命令行参数,* 能够使应用程序轻易地指定namenode,jobtracker,以及其他额外的配置资源。* 在Hadoop中,用于执行MapReduce任务的机器角色有两个: 一个是JobTracker;另一个是TaskTracker;* JobTracker是用于调度工作的,TaskTracker是用于执行工作的。 一个Hadoop集群中只有一台JobTracker。*/String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");// exit: 0表示正常退出,1表示中断退出,2表示异常退出System.exit(2);}/** import org.apache.hadoop.mapreduce.Job; 存储了该程序运行的必要信息*/Job job = Job.getInstance(conf, "word count");// 使得hadoop可以根据类包,找到jar包在哪里job.setJarByClass(WordCount.class);// 指定Mapper的类job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);// 指定reduce的类job.setReducerClass(IntSumReducer.class);// 设置最终输出的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 指定输入文件的位置,这里为了灵活,接收外部参数for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}// 指定输入文件的位置,这里接收启动参数FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}
运行步骤:
使用
Hdfs.java,长传本地文件到hadoop0运行
WordCount.java,运行参数:hdfs://hadoop0:9000/input2/file1.txt hdfs://hadoop0:9000/input2/file2.txt hdfs://hadoop0:9000/output2/wordcount
- 运行结果

