1.常见输入格式
1.1 TextInputFormat 默认的切片策略
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
1.2NLineInputFormat
RecordReader: LineRecordReader,几行生成一个切片 一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
使用场景:当行逻辑复杂执行时间长
1.3 KeyValueTextInputFormat
RecordReader:KeyValueLineRecordReader 针对文本文件!使用分割字符,将每一行分割为key和value!
如果没有找到分隔符,当前行的内容作为key,value为空串!
默认分隔符为\t,可以通过参数mapreduce.input.keyvaluelinerecordreader.key.value.separator指定!(分割符byte类 型只能一个字符)
切片:默认的切片策略
RR : KeyValueLineRecordReader
Text key:
Text value
1.4ConbineTextInputFormat
作用: 改变了传统的切片方式!将多个小文件,划分到一个切片中!
适合小文件过多的场景!
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
切片: 先确定片的最大值maxSize,maxSize通过参数mapreduce.input.fileinputformat.split.maxsize设置!
流程:a. 以文件为单位,将每个文件划分为若干part
①判断文件的待切部分的大小<= maxSize,整个待切部分作为1part
②maxsize < 文件的待切部分的大小 <= 2 maxSize,将整个待切部分均分为2part
③文件的待切部分的大小> 2 maxSize,先切去maxSize大小,作为1部分,剩余待切部分继续
3.提交流程分析
1.断电入口 job.waitForCompletion(true);
2.执行submit
public void submit()throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState.DEFINE);//1.检查是否是最新的apisetUseNewAPI();connect();final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException,ClassNotFoundException {//2.执行提交流程return submitter.submitJobInternal(Job.this, cluster);}});}
3.submit分析
#计算切片int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();#根据driver配置的inputFormatClass参数反序列化出输入格式InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);#此处调用List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
4.核心逻辑
public List<InputSplit> getSplits(JobContext job) throws IOException {#getFormatMinSplitSize=1,config里面取配置mapreduce.input.fileinputformat.split.minsize配置参数默认0long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));#结果1#config 取mapreduce.input.fileinputformat.split.maxsize 如果没有设置的话是long的最大值long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();//查询所有文件的源数据信息List<FileStatus> files = listStatus(job);for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}//查看该文本格式是否支持切割,不同文本格式不同if (isSplitable(job, path)) {long blockSize = file.getBlockSize();/*** protected long computeSplitSize(long blockSize, long minSize,long maxSize) {如果将块调大,那么将minsSize调大如果将快调小,那么将maxSize跳下,小于blockSize* return Math.max(minSize, Math.min(maxSize, blockSize));* }*/long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;//切割到剩余1.1时不再切割while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}//将剩余的作为一个切片if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitable 不能切割的话直接放到一个切片里splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else {//Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}return splits;
切片大小的调节 :
如果将块调大,那么将minsSize调大 mapreduce.input.fileinputformat.split.minsize
如果将快调小,那么将maxSize跳下,小于blockSize mapreduce.input.fileinputformat.split.maxsize
