1、从hadoop安装程序里找到wordcount代码如下

    1. /**
    2. * Licensed to the Apache Software Foundation (ASF) under one
    3. * or more contributor license agreements. See the NOTICE file
    4. * distributed with this work for additional information
    5. * regarding copyright ownership. The ASF licenses this file
    6. * to you under the Apache License, Version 2.0 (the
    7. * "License"); you may not use this file except in compliance
    8. * with the License. You may obtain a copy of the License at
    9. *
    10. * http://www.apache.org/licenses/LICENSE-2.0
    11. *
    12. * Unless required by applicable law or agreed to in writing, software
    13. * distributed under the License is distributed on an "AS IS" BASIS,
    14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15. * See the License for the specific language governing permissions and
    16. * limitations under the License.
    17. */
    18. package org.apache.hadoop.examples;
    19. import java.io.IOException;
    20. import java.util.StringTokenizer;
    21. import org.apache.hadoop.conf.Configuration;
    22. import org.apache.hadoop.fs.Path;
    23. import org.apache.hadoop.io.IntWritable;
    24. import org.apache.hadoop.io.Text;
    25. import org.apache.hadoop.mapreduce.Job;
    26. import org.apache.hadoop.mapreduce.Mapper;
    27. import org.apache.hadoop.mapreduce.Reducer;
    28. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    29. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    30. import org.apache.hadoop.util.GenericOptionsParser;
    31. public class WordCount {
    32. public static class TokenizerMapper
    33. extends Mapper<Object, Text, Text, IntWritable>{
    34. private final static IntWritable one = new IntWritable(1);
    35. private Text word = new Text();
    36. public void map(Object key, Text value, Context context
    37. ) throws IOException, InterruptedException {
    38. StringTokenizer itr = new StringTokenizer(value.toString());
    39. while (itr.hasMoreTokens()) {
    40. word.set(itr.nextToken());
    41. context.write(word, one);
    42. }
    43. }
    44. }
    45. public static class IntSumReducer
    46. extends Reducer<Text,IntWritable,Text,IntWritable> {
    47. private IntWritable result = new IntWritable();
    48. public void reduce(Text key, Iterable<IntWritable> values,
    49. Context context
    50. ) throws IOException, InterruptedException {
    51. int sum = 0;
    52. for (IntWritable val : values) {
    53. sum += val.get();
    54. }
    55. result.set(sum);
    56. context.write(key, result);
    57. }
    58. }
    59. public static void main(String[] args) throws Exception {
    60. Configuration conf = new Configuration();
    61. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    62. if (otherArgs.length != 2) {
    63. System.err.println("Usage: wordcount ");
    64. System.exit(2);
    65. }
    66. Job job = new Job(conf, "word count");
    67. job.setJarByClass(WordCount.class);
    68. job.setMapperClass(TokenizerMapper.class);
    69. job.setCombinerClass(IntSumReducer.class);
    70. job.setReducerClass(IntSumReducer.class);
    71. job.setOutputKeyClass(Text.class);
    72. job.setOutputValueClass(IntWritable.class);
    73. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    74. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    75. System.exit(job.waitForCompletion(true) ? 0 : 1);
    76. }
    77. }

    2.WordCount代码详解
      如何运行它,这里不做累述了,大伙可以百度下,网上这方面的资料很多。这里的实例代码是使用新的api,大家可能在很多书籍里看到讲解mapreduce的WordCount实例都是老版本的api,这里我不给出老版本的api,因为老版本的api不太建议使用了,大家做开发最好使用新版本的api,新版本api和旧版本api有区别在哪里:
    新的api放在:org.apache.hadoop.mapreduce,旧版api放在:org.apache.hadoop.mapred
    新版api使用虚类,而旧版的使用的是接口,虚类更加利于扩展,这个是一个经验,大家可以好好学习下hadoop的这个经验。
      其他还有很多区别,都是说明新版本api的优势,因为我提倡使用新版api,这里就不讲这些,因为没必要再用旧版本,因此这种比较也没啥意义了。

    下面我对代码做简单的讲解,大家看到要写一个mapreduce程序,我们的实现一个map函数和reduce函数。我们看看map的方法:

    1. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}

    这里有三个参数,前面两个Object key, Text value就是输入的key和value,第三个参数Context context这是可以记录输入的key和value,例如:context.write(word, one);此外context还会记录map运算的状态。
      对于reduce函数的方法:

    1. public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {…}

    reduce函数的输入也是一个key/value的形式,不过它的value是一个迭代器的形式Iterable values,也就是说reduce的输入是一个key对应一组的值的value,reduce也有context和map的context作用一致。

    至于计算的逻辑就是程序员自己去实现了。

    下面就是main函数的调用了,这个我要详细讲述下,首先是:

    1. Configuration conf = new Configuration();

    运行mapreduce程序前都要初始化Configuration,该类主要是读取mapreduce系统配置信息,这些信息包括hdfs还有mapreduce,也就是安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解为啥要这么做,这个是没有深入思考mapreduce计算框架造成,我们程序员开发mapreduce时候只是在填空,在map函数和reduce函数里编写实际进行的业务逻辑,其它的工作都是交给mapreduce框架自己操作的,但是至少我们要告诉它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而这些信息就在conf包下的配置文件里。

    接下来的代码是:

    1. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) {
    2. System.err.println("Usage: wordcount ");
    3. System.exit(2);
    4. }

    If的语句好理解,就是运行WordCount程序时候一定是两个参数,如果不是就会报错退出。至于第一句里的GenericOptionsParser类,它是用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值,其实平时开发里我们不太常用它,而是让类实现Tool接口,然后再main函数里使用ToolRunner运行程序,而ToolRunner内部会调用GenericOptionsParser。

    接下来的代码是:

    1. Job job = new Job(conf, "word count");
    2. job.setJarByClass(WordCount.class);
    3. job.setMapperClass(TokenizerMapper.class);
    4. job.setCombinerClass(IntSumReducer.class);
    5. job.setReducerClass(IntSumReducer.class);

    第一行就是在构建一个job,在mapreduce框架里一个mapreduce任务也叫mapreduce作业也叫做一个mapreduce的job,而具体的map和reduce运算就是task了,这里我们构建一个job,构建时候有两个参数,一个是conf这个就不累述了,一个是这个job的名称。

    第二行就是装载程序员编写好的计算程序,例如我们的程序类名就是WordCount了。这里我要做下纠正,虽然我们编写mapreduce程序只需要实现map函数和reduce函数,但是实际开发我们要实现三个类,第三个类是为了配置mapreduce如何运行map和reduce函数,准确的说就是构建一个mapreduce能执行的job了,例如WordCount类。

    第三行和第五行就是装载map函数和reduce函数实现类了,这里多了个第四行,这个是装载Combiner类,这个我后面讲mapreduce运行机制时候会讲述,其实本例去掉第四行也没有关系,但是使用了第四行理论上运行效率会更好。

    接下来的代码:

    1. job.setOutputKeyClass(Text.class);
    2. job.setOutputValueClass(IntWritable.class);

    这个是定义输出的key/value的类型,也就是最终存储在hdfs上结果文件的key/value的类型。

    最后的代码是:

    1. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    2. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    3. System.exit(job.waitForCompletion(true) ? 0 : 1);

    第一行就是构建输入的数据文件,第二行是构建输出的数据文件,最后一行如果job运行成功了,我们的程序就会正常退出。