WordCountMapper

    1. package com.mapreduce;
    2. import java.io.IOException;
    3. import org.apache.hadoop.io.IntWritable;
    4. import org.apache.hadoop.io.LongWritable;
    5. import org.apache.hadoop.io.Text;
    6. import org.apache.hadoop.mapreduce.Mapper;
    7. /*
    8. * KEYIN:行首的偏移量,LongWritable(相当于java的long类型)
    9. * VALUEIN:分片后的文本内容,Text类型(相当于java中的string)
    10. * KEYOUT:输出的key,为一个单词(Text类型)
    11. * VALUEOUT:输出的value(都是1,记作{"word",1},数字类型IntWritable(java中为int))
    12. */
    13. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    14. private Text outkey =new Text();
    15. private IntWritable outvalue=new IntWritable(1);
    16. @Override
    17. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    18. //1、把输入的文本(value)分割成多个单词
    19. String data=value.toString();
    20. //正则表达式:逗号、句号、空格
    21. String words[]=data.split("[,\\.\\s]+");
    22. //2、遍历这些单词,将每一个单词都转换成(key,value)
    23. for(String word:words) {
    24. outkey.set(word);
    25. context.write(outkey, outvalue);
    26. }
    27. }
    28. }

    WordCountReducer

    1. package com.mapreduce;
    2. import java.io.IOException;
    3. import org.apache.hadoop.io.IntWritable;
    4. import org.apache.hadoop.io.Text;
    5. import org.apache.hadoop.mapreduce.Reducer;
    6. /*
    7. * KEYIN:reduse输入key(是一个单词,Text)
    8. * VALUEIN:reduse输入value(是单词的次数IntWritable)
    9. * KEYOUT:reduse输出的key(一个单词,Text)
    10. * VALUEOUT:reduse输出的value(单词次数,IntWritable)
    11. */
    12. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    13. private IntWritable outvalue = new IntWritable();
    14. /*
    15. * key,{value1,value2,...,valueN}
    16. * THOUGHTS,{1,1}
    17. */
    18. @Override
    19. protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
    20. int sum=0;
    21. for(IntWritable value:values) {
    22. sum+=value.get();
    23. }
    24. outvalue.set(sum);
    25. context.write(key, outvalue);
    26. }
    27. }

    WordCountDriver

    1. package com.mapreduce;
    2. import java.io.IOException;
    3. import org.apache.hadoop.conf.Configuration;
    4. import org.apache.hadoop.fs.Path;
    5. import org.apache.hadoop.io.IntWritable;
    6. import org.apache.hadoop.io.Text;
    7. import org.apache.hadoop.mapreduce.Job;
    8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    10. // hadoop jar xxx.jar demo.wordcount /input /output
    11. public class WordCountDriver {
    12. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    13. // 1.创建job对象
    14. Configuration conf = new Configuration();
    15. Job job = Job.getInstance(conf);
    16. // 2.设置jar路径
    17. job.setJarByClass(WordCountDriver.class);
    18. // 3.关联map与red
    19. job.setMapperClass(WordCountMapper.class);
    20. job.setReducerClass(WordCountReducer.class);
    21. // 4.设置map输出的键值对类型
    22. job.setMapOutputKeyClass(Text.class);
    23. job.setMapOutputValueClass(IntWritable.class);
    24. // 5.设置最终数据输出键值对类型
    25. job.setOutputKeyClass(Text.class);
    26. job.setOutputValueClass(IntWritable.class);
    27. // 6.设置输入路径(FileInputFormat)和输出路径(FileOutputFormat)
    28. FileInputFormat.setInputPaths(job, new Path("D:\\eclipse\\HdfsDemo\\input");
    29. FileOutputFormat.setOutputPath(job,new Path("D:\\eclipse\\HdfsDemo\\output");
    30. //FileInputFormat.setInputPaths(job, new Path(args[0]));
    31. //FileOutputFormat.setOutputPath(job,new Path(args[1])); //输出路径必须是不存在的
    32. // 7.提交job
    33. boolean result = job.waitForCompletion(true);// true:打印运行信息
    34. System.exit(result ? 0 : 1); // 0:正常退出 1:非正常退出
    35. }
    36. }

    //本地测试
    image.png

    //查看运行结果
    hadoop fs -cat /output/part-r-00000

    //Hadoop测试
    hadoop jar hdp.jar /wordcount /output
    image.png
    image.png
    image.png