需求
将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个小文件,存储的形式为文件路径+名称为key,文件的内容为value
输入数据
one.txt
two.txt
three.txt
输出数据
part-r-0000
分析
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
实现
本节实现的是上述第二种方式
程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
代码如下
自定义InputFromat
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
//创建个读取的流
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
自定义RecordReader
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
//默认没有读取文件
private boolean processed = false;
//初始化方法
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//获取切片信息,我们知道切片就是文件,InputSplit是所有输入的切片,不转我们后面获取不到切片的信息
this.fileSplit = (FileSplit) split;
//获取上下文信息
this.conf = context.getConfiguration();
}
//通过流的方式一次读取一个文件,几个文件就循环几次
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//读取一个一个文件
if (!processed) {
//定义缓存区
byte[] contents = new byte[(int) fileSplit.getLength()];
//根据切片信息获取文件路径
Path file = fileSplit.getPath();
//根据文件路径信息获取文件系统
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
//读取数据,打开文件输入流
in = fs.open(file);
//读取文件内容,流的拷贝
IOUtils.readFully(in, contents, 0, contents.length);
//输出文件内容
value.set(contents, 0, contents.length);
} finally {
//关闭IO流
IOUtils.closeStream(in);
IOUtils.closeStream(fs);
}
//表示不让他这个方法再执行了
processed = true;
return true;
}
return false;
}
//获取当前的key
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
//获取当前的value
@Override
public BytesWritable getCurrentValue() throws IOException,InterruptedException {
return value;
}
//读取的过程
@Override
public float getProgress() throws IOException {
//是否在读取
return processed ? 1.0f : 0.0f;
}
//关流
@Override
public void close() throws IOException {
// do nothing
}
}
定义mapreduce处理流程
定义map处理流程
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
//自定义的FileRecordReader那边泛型是什么这边输入就是什么
class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通过上下文获取整个切片信息
FileSplit split = (FileSplit) context.getInputSplit();
//获取路径
Path path = split.getPath();
k.set(path.toString());
}
//几个文件就执行几次map
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(k, value);
}
}
定义reducer处理流程
class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for(BytesWritable bytesWritable : values) {
context.write(key, bytesWritable);
}
}
}
定义执行
public class SequenceFileDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//告诉框架,我们要处理的数据文件在那个路径下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input/"));
//如果有这个文件夹就删除
Path out = new Path("/Users/jdxia/Desktop/website/data/output/");
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(out)) {
fileSystem.delete(out, true);
}
//告诉框架,我们的处理结果要输出到什么地方
FileOutputFormat.setOutputPath(job, out);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}