需求
将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个小文件,存储的形式为文件路径+名称为key,文件的内容为value
输入数据
one.txttwo.txtthree.txt
输出数据
part-r-0000
分析
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
实现
本节实现的是上述第二种方式
程序的核心机制:
自定义一个InputFormat改写RecordReader,实现一次读取一个完整文件封装为KV在输出时使用SequenceFileOutPutFormat输出合并文件
代码如下
自定义InputFromat
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}//创建个读取的流@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}}
自定义RecordReader
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();//默认没有读取文件private boolean processed = false;//初始化方法@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {//获取切片信息,我们知道切片就是文件,InputSplit是所有输入的切片,不转我们后面获取不到切片的信息this.fileSplit = (FileSplit) split;//获取上下文信息this.conf = context.getConfiguration();}//通过流的方式一次读取一个文件,几个文件就循环几次@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//读取一个一个文件if (!processed) {//定义缓存区byte[] contents = new byte[(int) fileSplit.getLength()];//根据切片信息获取文件路径Path file = fileSplit.getPath();//根据文件路径信息获取文件系统FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {//读取数据,打开文件输入流in = fs.open(file);//读取文件内容,流的拷贝IOUtils.readFully(in, contents, 0, contents.length);//输出文件内容value.set(contents, 0, contents.length);} finally {//关闭IO流IOUtils.closeStream(in);IOUtils.closeStream(fs);}//表示不让他这个方法再执行了processed = true;return true;}return false;}//获取当前的key@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}//获取当前的value@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}//读取的过程@Overridepublic float getProgress() throws IOException {//是否在读取return processed ? 1.0f : 0.0f;}//关流@Overridepublic void close() throws IOException {// do nothing}}
定义mapreduce处理流程
定义map处理流程
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
//自定义的FileRecordReader那边泛型是什么这边输入就是什么
class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通过上下文获取整个切片信息
FileSplit split = (FileSplit) context.getInputSplit();
//获取路径
Path path = split.getPath();
k.set(path.toString());
}
//几个文件就执行几次map
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(k, value);
}
}
定义reducer处理流程
class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for(BytesWritable bytesWritable : values) {
context.write(key, bytesWritable);
}
}
}
定义执行
public class SequenceFileDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//告诉框架,我们要处理的数据文件在那个路径下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input/"));
//如果有这个文件夹就删除
Path out = new Path("/Users/jdxia/Desktop/website/data/output/");
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(out)) {
fileSystem.delete(out, true);
}
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, out);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
