

TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat和自定义InputFormat等




  1. Rich learning form
  2. Intelligent learning engine
  3. Learning more convenient
  4. From the real demand for more close to the enterprise


  1. (0, Rich learning form)
  2. (19, Intelligent learning engine)
  3. (47, Learning more convenient)
  4. (72, From the real demand for more close to the enterprise)



每一行均为一条记录,被分割符分割成key, value.


  1. conf.set(keyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");



  1. line1 ---> Rich learning form
  2. line2 ---> Intelligent learning engine
  3. line3 ---> Learning more convenient
  4. line4 ---> From the real demand for more close to the enterprise


  1. (line1, Rich learning form)
  2. (line2, Intelligent learning engine)
  3. (line3, Learning more convenient)
  4. (line4, From the real demand for more close to the enterprise)




  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import java.io.IOException;
  5. public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable> {
  6. Text k = new Text();
  7. LongWritable v = new LongWritable();
  8. @Override
  9. protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
  10. //设置key和value
  11. k.set(key);
  12. //设置key的个数
  13. v.set(1);
  14. //写出
  15. context.write(k, v);
  16. }
  17. }


  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  6. LongWritable v = new LongWritable();
  7. @Override
  8. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  9. long count = 0l;
  10. //汇总统计
  11. for (LongWritable value : values) {
  12. count += value.get();
  13. }
  14. v.set(count);
  15. //输出
  16. context.write(key, v);
  17. }
  18. }


  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
  8. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. public class MyDriver {
  12. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  13. Configuration conf = new Configuration();
  14. //设置切割符
  15. conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
  16. //获取job对象
  17. Job job = Job.getInstance(conf);
  18. //设置jar包关系,关联mapper和reducer
  19. job.setJarByClass(MyDriver.class);
  20. //告诉框架,我们程序所用的mapper类和reduce类是什么
  21. job.setMapperClass(KVTextMapper.class);
  22. job.setReducerClass(KVTextReducer.class);
  23. //告诉框架我们程序输出的类型
  24. job.setMapOutputKeyClass(Text.class);
  25. job.setMapOutputValueClass(LongWritable.class);
  26. job.setOutputKeyClass(Text.class);
  27. job.setOutputValueClass(LongWritable.class);
  28. //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么
  29. //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件
  30. job.setInputFormatClass(KeyValueTextInputFormat.class);
  31. //job.setOutputFormatClass(TextOutputFormat.class);
  32. //告诉框架,我们要处理的数据文件在那个路径下
  33. FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input/"));
  34. //告诉框架我们的处理结果要输出到什么地方
  35. FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output/"));
  36. //提交后,然后等待服务器端返回值,看是不是true
  37. boolean res = job.waitForCompletion(true);
  38. //设置成功就退出码为0
  39. System.exit(res?0:1);
  40. }
  41. }




  1. Rich learning form
  2. Intelligent learning engine
  3. Learning more convenient
  4. From the real demand for more close to the enterprise


  1. (0, Rich learning form)
  2. (19, Intelligent learning engine)


  1. (47, Learning more convenient)
  2. (72, From the real demand for more close to the enterprise)




  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import java.io.IOException;
  5. public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  6. private Text k = new Text();
  7. private LongWritable v = new LongWritable(1);
  8. @Override
  9. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  10. //获取一行
  11. String line = value.toString();
  12. //切割
  13. String[] splited = line.split(" ");
  14. //循环写出
  15. for (int i = 0; i < splited.length; i++) {
  16. k.set(splited[i]);
  17. context.write(k, v);
  18. }
  19. }
  20. }


  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  6. LongWritable v = new LongWritable();
  7. @Override
  8. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  9. long count= 0l;
  10. //汇总
  11. for (LongWritable value : values) {
  12. count += value.get();
  13. }
  14. v.set(count);
  15. //输出
  16. context.write(key, v);
  17. }
  18. }


  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class NLineDriver {
  11. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  12. //获取job对象
  13. Configuration conf = new Configuration();
  14. Job job = Job.getInstance(conf);
  15. //设置每个切片InputSplit中划分三条记录
  16. NLineInputFormat.setNumLinesPerSplit(job, 3);
  17. //使用NLineInputFormat处理记录
  18. job.setInputFormatClass(NLineInputFormat.class);
  19. //设置jar包位置,关联mapper和reducer
  20. job.setJarByClass(NLineDriver.class);
  21. job.setMapperClass(NLineMapper.class);
  22. job.setReducerClass(NLineReducer.class);
  23. //设置map输出kv类型
  24. job.setMapOutputKeyClass(Text.class);
  25. job.setMapOutputValueClass(LongWritable.class);
  26. //设置最终输出kv类型
  27. job.setOutputKeyClass(Text.class);
  28. job.setOutputValueClass(LongWritable.class);
  29. //告诉框架,我们要处理的数据文件在那个路径下
  30. FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input/"));
  31. //告诉框架我们的处理结果要输出到什么地方
  32. FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output/"));
  33. //这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了
  34. // job.submit();
  35. //提交后,然后等待服务器端返回值,看是不是true
  36. boolean res = job.waitForCompletion(true);
  37. //设置成功就退出码为0
  38. System.exit(res?0:1);
  39. }
  40. }


  1. number of splits