简介

MapReduce任务的输入文件一般是存储在HDFS里面.输入的文件格式包括:基于行的日志文件,二进制格式文件.这些文件一般很大,达到数十G,甚至更大,那么MapReduce是如何读取这些数据的?

InputFormat常见的接口实现类包括:
TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat和自定义InputFormat等

TextInputFormat

TextInputFormat是默认的InputFormat.每条记录是一行输入.
键是LongWritable类型,存储该行在整个文件中的字节偏移量,值是这行的内容,不包括任何行终止符(换行符和回车符)

一些是一个示例,比如一个分片包含了如下4条文本记录

  1. Rich learning form
  2. Intelligent learning engine
  3. Learning more convenient
  4. From the real demand for more close to the enterprise

每条记录表示为以下键值对

  1. (0, Rich learning form)
  2. (19, Intelligent learning engine)
  3. (47, Learning more convenient)
  4. (72, From the real demand for more close to the enterprise)

很明显,键并不是行号.一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片

KeyValueTextInputFormat

每一行均为一条记录,被分割符分割成key, value.

可以通过在驱动类中设置

  1. conf.set(keyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

来设定分割符,默认分割符是tab(\t)

以下是一个示例,输入是一个包含4条记录的分片.其中—->表示一个(水平方向的)制表符

  1. line1 ---> Rich learning form
  2. line2 ---> Intelligent learning engine
  3. line3 ---> Learning more convenient
  4. line4 ---> From the real demand for more close to the enterprise

每条记录表示为以下键/值对

  1. (line1, Rich learning form)
  2. (line2, Intelligent learning engine)
  3. (line3, Learning more convenient)
  4. (line4, From the real demand for more close to the enterprise)

此时的键是每行排在制表符之前的Text序列

代码

map

  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import java.io.IOException;
  5. public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable> {
  6. Text k = new Text();
  7. LongWritable v = new LongWritable();
  8. @Override
  9. protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
  10. //设置key和value
  11. k.set(key);
  12. //设置key的个数
  13. v.set(1);
  14. //写出
  15. context.write(k, v);
  16. }
  17. }

reducer

  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  6. LongWritable v = new LongWritable();
  7. @Override
  8. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  9. long count = 0l;
  10. //汇总统计
  11. for (LongWritable value : values) {
  12. count += value.get();
  13. }
  14. v.set(count);
  15. //输出
  16. context.write(key, v);
  17. }
  18. }

驱动

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
  8. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. public class MyDriver {
  12. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  13. Configuration conf = new Configuration();
  14. //设置切割符
  15. conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
  16. //获取job对象
  17. Job job = Job.getInstance(conf);
  18. //设置jar包关系,关联mapper和reducer
  19. job.setJarByClass(MyDriver.class);
  20. //告诉框架,我们程序所用的mapper类和reduce类是什么
  21. job.setMapperClass(KVTextMapper.class);
  22. job.setReducerClass(KVTextReducer.class);
  23. //告诉框架我们程序输出的类型
  24. job.setMapOutputKeyClass(Text.class);
  25. job.setMapOutputValueClass(LongWritable.class);
  26. job.setOutputKeyClass(Text.class);
  27. job.setOutputValueClass(LongWritable.class);
  28. //告诉框架,我们程序使用的数据读取组件,结果输出所用的组件是什么
  29. //TextInputFormat是mapreduce程序中内置的一种读取数据组件,准备的叫做读取文本的输入组件
  30. job.setInputFormatClass(KeyValueTextInputFormat.class);
  31. //job.setOutputFormatClass(TextOutputFormat.class);
  32. //告诉框架,我们要处理的数据文件在那个路径下
  33. FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input/"));
  34. //告诉框架我们的处理结果要输出到什么地方
  35. FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output/"));
  36. //提交后,然后等待服务器端返回值,看是不是true
  37. boolean res = job.waitForCompletion(true);
  38. //设置成功就退出码为0
  39. System.exit(res?0:1);
  40. }
  41. }

NLineInputFormat

如果使用NLineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NLineInputFormat指定的行数N来划分.即输入文件的总行数/N=切片数,如果不整除,切片数=商+1

以下是一个示例,仍然以上面的4行输入为例子

  1. Rich learning form
  2. Intelligent learning engine
  3. Learning more convenient
  4. From the real demand for more close to the enterprise

例如,如果N是2,则每个输入分片包含两行,开启2个maptask

  1. (0, Rich learning form)
  2. (19, Intelligent learning engine)

另一个mapper则收到后两行

  1. (47, Learning more convenient)
  2. (72, From the real demand for more close to the enterprise)

这里的键和值与TextInputFormat生成的一样

代码

map

  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import java.io.IOException;
  5. public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  6. private Text k = new Text();
  7. private LongWritable v = new LongWritable(1);
  8. @Override
  9. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  10. //获取一行
  11. String line = value.toString();
  12. //切割
  13. String[] splited = line.split(" ");
  14. //循环写出
  15. for (int i = 0; i < splited.length; i++) {
  16. k.set(splited[i]);
  17. context.write(k, v);
  18. }
  19. }
  20. }

reducer

  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  6. LongWritable v = new LongWritable();
  7. @Override
  8. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  9. long count= 0l;
  10. //汇总
  11. for (LongWritable value : values) {
  12. count += value.get();
  13. }
  14. v.set(count);
  15. //输出
  16. context.write(key, v);
  17. }
  18. }

驱动

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class NLineDriver {
  11. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  12. //获取job对象
  13. Configuration conf = new Configuration();
  14. Job job = Job.getInstance(conf);
  15. //设置每个切片InputSplit中划分三条记录
  16. NLineInputFormat.setNumLinesPerSplit(job, 3);
  17. //使用NLineInputFormat处理记录
  18. job.setInputFormatClass(NLineInputFormat.class);
  19. //设置jar包位置,关联mapper和reducer
  20. job.setJarByClass(NLineDriver.class);
  21. job.setMapperClass(NLineMapper.class);
  22. job.setReducerClass(NLineReducer.class);
  23. //设置map输出kv类型
  24. job.setMapOutputKeyClass(Text.class);
  25. job.setMapOutputValueClass(LongWritable.class);
  26. //设置最终输出kv类型
  27. job.setOutputKeyClass(Text.class);
  28. job.setOutputValueClass(LongWritable.class);
  29. //告诉框架,我们要处理的数据文件在那个路径下
  30. FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/data/input/"));
  31. //告诉框架我们的处理结果要输出到什么地方
  32. FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/data/output/"));
  33. //这边不用submit,因为一提交就和我这个没关系了,我这就断开了就看不见了
  34. // job.submit();
  35. //提交后,然后等待服务器端返回值,看是不是true
  36. boolean res = job.waitForCompletion(true);
  37. //设置成功就退出码为0
  38. System.exit(res?0:1);
  39. }
  40. }

输出结果的切片数

  1. number of splits