案例一

分析

image.png

准备数据

  1. hello--a.txt 1
  2. hello--b.txt 2
  3. hello--c.txt 1
  4. allen--b.txt 2
  5. jerry--a.txt 2
  6. allen--a.txt 1
  7. jerry--c.txt 2

代码

  1. package com.index;
  2. import com.folwsum.FlowSumSort;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  15. import java.io.IOException;
  16. public class IndexStepTwo {
  17. public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
  18. Text k = new Text();
  19. Text v = new Text();
  20. @Override
  21. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  22. String line = value.toString();
  23. String[] fields = line.split(" ");
  24. String word_file = fields[0];
  25. String count = fields[1];
  26. String[] split = word_file.split("--");
  27. String word = split[0];
  28. String file = split[1];
  29. k.set(word);
  30. v.set(file+"--"+count);
  31. context.write(k, v); // k hello v a.txt--1
  32. }
  33. }
  34. public static class IndexStepTwoReduce extends Reducer<Text, Text, Text, Text> {
  35. Text v = new Text();
  36. @Override
  37. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  38. StringBuffer sBuffer = new StringBuffer();
  39. for (Text value : values) {
  40. //拼接下格式
  41. sBuffer.append(value.toString()).append(" ");
  42. }
  43. v.set(sBuffer.toString());
  44. context.write(key, v);
  45. }
  46. }
  47. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  48. Configuration conf = new Configuration();
  49. Job job = Job.getInstance();
  50. job.setJarByClass(IndexStepTwo.class);
  51. //告诉程序,我们的程序所用的mapper类和reducer类是什么
  52. job.setMapperClass(IndexStepTwoMapper.class);
  53. job.setReducerClass(IndexStepTwoReduce.class);
  54. //告诉框架,我们程序输出的数据类型
  55. job.setMapOutputKeyClass(Text.class);
  56. job.setMapOutputValueClass(Text.class);
  57. job.setOutputKeyClass(Text.class);
  58. job.setOutputValueClass(Text.class);
  59. //这里可以进行combiner组件的设置
  60. job.setCombinerClass(IndexStepTwoReduce.class);
  61. //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
  62. //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
  63. job.setInputFormatClass(TextInputFormat.class);
  64. job.setOutputFormatClass(TextOutputFormat.class);
  65. //告诉框架,我们要处理的数据文件在那个路劲下
  66. FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input/"));
  67. //如果有这个文件夹就删除
  68. Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/");
  69. FileSystem fileSystem = FileSystem.get(conf);
  70. if (fileSystem.exists(out)) {
  71. fileSystem.delete(out, true);
  72. }
  73. //告诉框架,我们的处理结果要输出到什么地方
  74. FileOutputFormat.setOutputPath(job, out);
  75. boolean res = job.waitForCompletion(true);
  76. System.exit(res ? 0 : 1);
  77. }
  78. }

结果展示

里面的crc是个校验文件

  1. allen a.txt--1 b.txt--2
  2. hello c.txt--1 b.txt--2 a.txt--1
  3. jerry c.txt--2 a.txt--2

案例二

代码前提

如果准备的数据是这样

image.png

那就要把他先变成这样

  1. hello--a.txt 1
  2. hello--b.txt 2
  3. hello--c.txt 1
  4. allen--b.txt 2
  5. jerry--a.txt 2
  6. allen--a.txt 1
  7. jerry--c.txt 2

代码

  1. public class IndexStepOne {
  2. public static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  3. Text k = new Text();
  4. IntWritable v = new IntWritable(1);
  5. @Override
  6. protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
  7. String line = value.toString();
  8. String[] words = line.split(" ");
  9. FileSplit Split = (FileSplit)context.getInputSplit();
  10. String filename = Split.getPath().getName();
  11. //输出key :单词--文件名 value:1
  12. for(String word : words){
  13. k.set(word +"--"+ filename);
  14. context.write(k, v);
  15. }
  16. }
  17. }
  18. public static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  19. IntWritable v = new IntWritable();
  20. @Override
  21. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  22. int count = 0;
  23. for(IntWritable value : values){
  24. count += value.get();
  25. }
  26. v.set(count);
  27. context.write(key, v);
  28. }
  29. }
  30. public static void main(String[] args) throws Exception {
  31. Configuration conf = new Configuration();
  32. Job job = Job.getInstance(conf);
  33. job.setJarByClass(IndexStepOne.class);
  34. //告诉程序,我们的程序所用的mapper类和reducer类是什么
  35. job.setMapperClass(IndexStepOneMapper.class);
  36. job.setReducerClass(IndexStepOneReducer.class);
  37. //告诉框架,我们程序输出的数据类型
  38. job.setMapOutputKeyClass(Text.class);
  39. job.setMapOutputValueClass(IntWritable.class);
  40. job.setOutputKeyClass(Text.class);
  41. job.setOutputValueClass(IntWritable.class);
  42. //这里可以进行combiner组件的设置
  43. job.setCombinerClass(IndexStepOneReducer.class);
  44. //告诉框架,我们程序使用的数据读取组件 结果输出所用的组件是什么
  45. //TextInputFormat是mapreduce程序中内置的一种读取数据组件 准确的说 叫做 读取文本文件的输入组件
  46. job.setInputFormatClass(TextInputFormat.class);
  47. job.setOutputFormatClass(TextOutputFormat.class);
  48. //告诉框架,我们要处理的数据文件在那个路劲下
  49. FileInputFormat.setInputPaths(job, new Path("D:/index/input"));
  50. //告诉框架,我们的处理结果要输出到什么地方
  51. FileOutputFormat.setOutputPath(job, new Path("D:/index/output-1"));
  52. boolean res = job.waitForCompletion(true);
  53. System.exit(res?0:1);
  54. }
  55. }