1.常见输入格式
1.1 TextInputFormat 默认的切片策略
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
1.2NLineInputFormat
RecordReader: LineRecordReader,几行生成一个切片 一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
使用场景:当行逻辑复杂执行时间长
1.3 KeyValueTextInputFormat
RecordReader:KeyValueLineRecordReader 针对文本文件!使用分割字符,将每一行分割为key和value!
如果没有找到分隔符,当前行的内容作为key,value为空串!
默认分隔符为\t,可以通过参数mapreduce.input.keyvaluelinerecordreader.key.value.separator指定!(分割符byte类 型只能一个字符)
切片:默认的切片策略
RR : KeyValueLineRecordReader
Text key:
Text value
1.4ConbineTextInputFormat
作用: 改变了传统的切片方式!将多个小文件,划分到一个切片中!
适合小文件过多的场景!
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
切片: 先确定片的最大值maxSize,maxSize通过参数mapreduce.input.fileinputformat.split.maxsize设置!
流程:a. 以文件为单位,将每个文件划分为若干part
①判断文件的待切部分的大小<= maxSize,整个待切部分作为1part
②maxsize < 文件的待切部分的大小 <= 2 maxSize,将整个待切部分均分为2part
③文件的待切部分的大小> 2 maxSize,先切去maxSize大小,作为1部分,剩余待切部分继续
3.提交流程分析
1.断电入口 job.waitForCompletion(true);
2.执行submit
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
//1.检查是否是最新的api
setUseNewAPI();
connect();
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//2.执行提交流程
return submitter.submitJobInternal(Job.this, cluster);
}
});
}
3.submit分析
#计算切片
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
#根据driver配置的inputFormatClass参数反序列化出输入格式
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
#此处调用
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
4.核心逻辑
public List<InputSplit> getSplits(JobContext job) throws IOException {
#getFormatMinSplitSize=1,config里面取配置mapreduce.input.fileinputformat.split.minsize配置参数默认0
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));#结果1
#config 取mapreduce.input.fileinputformat.split.maxsize 如果没有设置的话是long的最大值
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
//查询所有文件的源数据信息
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//查看该文本格式是否支持切割,不同文本格式不同
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
/**
* protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
如果将块调大,那么将minsSize调大
如果将快调小,那么将maxSize跳下,小于blockSize
* return Math.max(minSize, Math.min(maxSize, blockSize));
* }
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
//切割到剩余1.1时不再切割
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//将剩余的作为一个切片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable 不能切割的话直接放到一个切片里
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
return splits;
切片大小的调节 :
如果将块调大,那么将minsSize调大 mapreduce.input.fileinputformat.split.minsize
如果将快调小,那么将maxSize跳下,小于blockSize mapreduce.input.fileinputformat.split.maxsize