需求

将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个小文件,存储的形式为文件路径+名称为key,文件的内容为value

输入数据

  1. one.txt
  2. two.txt
  3. three.txt

输出数据

  1. part-r-0000

分析

小文件的优化无非以下几种方式:

  1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
  3. 在mapreduce处理时,可采用combineInputFormat提高效率

实现

本节实现的是上述第二种方式
程序的核心机制:

  1. 自定义一个InputFormat
  2. 改写RecordReader,实现一次读取一个完整文件封装为KV
  3. 在输出时使用SequenceFileOutPutFormat输出合并文件

代码如下

自定义InputFromat

  1. public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
  2. //设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
  3. @Override
  4. protected boolean isSplitable(JobContext context, Path file) {
  5. return false;
  6. }
  7. //创建个读取的流
  8. @Override
  9. public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  10. WholeFileRecordReader reader = new WholeFileRecordReader();
  11. reader.initialize(split, context);
  12. return reader;
  13. }
  14. }

自定义RecordReader

  1. class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
  2. private FileSplit fileSplit;
  3. private Configuration conf;
  4. private BytesWritable value = new BytesWritable();
  5. //默认没有读取文件
  6. private boolean processed = false;
  7. //初始化方法
  8. @Override
  9. public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  10. //获取切片信息,我们知道切片就是文件,InputSplit是所有输入的切片,不转我们后面获取不到切片的信息
  11. this.fileSplit = (FileSplit) split;
  12. //获取上下文信息
  13. this.conf = context.getConfiguration();
  14. }
  15. //通过流的方式一次读取一个文件,几个文件就循环几次
  16. @Override
  17. public boolean nextKeyValue() throws IOException, InterruptedException {
  18. //读取一个一个文件
  19. if (!processed) {
  20. //定义缓存区
  21. byte[] contents = new byte[(int) fileSplit.getLength()];
  22. //根据切片信息获取文件路径
  23. Path file = fileSplit.getPath();
  24. //根据文件路径信息获取文件系统
  25. FileSystem fs = file.getFileSystem(conf);
  26. FSDataInputStream in = null;
  27. try {
  28. //读取数据,打开文件输入流
  29. in = fs.open(file);
  30. //读取文件内容,流的拷贝
  31. IOUtils.readFully(in, contents, 0, contents.length);
  32. //输出文件内容
  33. value.set(contents, 0, contents.length);
  34. } finally {
  35. //关闭IO流
  36. IOUtils.closeStream(in);
  37. IOUtils.closeStream(fs);
  38. }
  39. //表示不让他这个方法再执行了
  40. processed = true;
  41. return true;
  42. }
  43. return false;
  44. }
  45. //获取当前的key
  46. @Override
  47. public NullWritable getCurrentKey() throws IOException, InterruptedException {
  48. return NullWritable.get();
  49. }
  50. //获取当前的value
  51. @Override
  52. public BytesWritable getCurrentValue() throws IOException,InterruptedException {
  53. return value;
  54. }
  55. //读取的过程
  56. @Override
  57. public float getProgress() throws IOException {
  58. //是否在读取
  59. return processed ? 1.0f : 0.0f;
  60. }
  61. //关流
  62. @Override
  63. public void close() throws IOException {
  64. // do nothing
  65. }
  66. }

定义mapreduce处理流程

定义map处理流程

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

//自定义的FileRecordReader那边泛型是什么这边输入就是什么
class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text k = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //通过上下文获取整个切片信息
        FileSplit split = (FileSplit) context.getInputSplit();
        //获取路径
        Path path = split.getPath();
        k.set(path.toString());
    }

    //几个文件就执行几次map
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(k, value);
    }
}

定义reducer处理流程

class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        for(BytesWritable bytesWritable : values) {
            context.write(key, bytesWritable);
        }
    }
}

定义执行

public class SequenceFileDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //告诉框架,我们要处理的数据文件在那个路径下
        FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input/"));

        //如果有这个文件夹就删除
        Path out = new Path("/Users/jdxia/Desktop/website/data/output/");
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(out)) {
            fileSystem.delete(out, true);
        }
        //告诉框架,我们的处理结果要输出到什么地方
        FileOutputFormat.setOutputPath(job, out);

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);

    }
}