1.常见输入格式

1.1 TextInputFormat 默认的切片策略
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
1.2NLineInputFormat
RecordReader: LineRecordReader,几行生成一个切片 一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
使用场景:当行逻辑复杂执行时间长
1.3 KeyValueTextInputFormat
RecordReader:KeyValueLineRecordReader 针对文本文件!使用分割字符,将每一行分割为key和value!
如果没有找到分隔符,当前行的内容作为key,value为空串!
默认分隔符为\t,可以通过参数mapreduce.input.keyvaluelinerecordreader.key.value.separator指定!(分割符byte类 型只能一个字符)
切片:默认的切片策略
RR : KeyValueLineRecordReader
Text key:
Text value
1.4ConbineTextInputFormat
作用: 改变了传统的切片方式!将多个小文件,划分到一个切片中!
适合小文件过多的场景!
RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value!
LongWritable key
Text value
切片: 先确定片的最大值maxSize,maxSize通过参数mapreduce.input.fileinputformat.split.maxsize设置!
流程:a. 以文件为单位,将每个文件划分为若干part
①判断文件的待切部分的大小<= maxSize,整个待切部分作为1part
②maxsize < 文件的待切部分的大小 <= 2 maxSize,将整个待切部分均分为2part
③文件的待切部分的大小> 2
maxSize,先切去maxSize大小,作为1部分,剩余待切部分继续

3.提交流程分析

1.断电入口 job.waitForCompletion(true);
2.执行submit

  1. public void submit()
  2. throws IOException, InterruptedException, ClassNotFoundException {
  3. ensureState(JobState.DEFINE);
  4. //1.检查是否是最新的api
  5. setUseNewAPI();
  6. connect();
  7. final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  8. status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
  9. public JobStatus run() throws IOException, InterruptedException,
  10. ClassNotFoundException {
  11. //2.执行提交流程
  12. return submitter.submitJobInternal(Job.this, cluster);
  13. }
  14. });
  15. }

3.submit分析

  1. #计算切片
  2. int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
  3. InterruptedException, ClassNotFoundException {
  4. Configuration conf = job.getConfiguration();
  5. #根据driver配置的inputFormatClass参数反序列化出输入格式
  6. InputFormat<?, ?> input =
  7. ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  8. #此处调用
  9. List<InputSplit> splits = input.getSplits(job);
  10. T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

4.核心逻辑

  1. public List<InputSplit> getSplits(JobContext job) throws IOException {
  2. #getFormatMinSplitSize=1,config里面取配置mapreduce.input.fileinputformat.split.minsize配置参数默认0
  3. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));#结果1
  4. #config mapreduce.input.fileinputformat.split.maxsize 如果没有设置的话是long的最大值
  5. long maxSize = getMaxSplitSize(job);
  6. // generate splits
  7. List<InputSplit> splits = new ArrayList<InputSplit>();
  8. //查询所有文件的源数据信息
  9. List<FileStatus> files = listStatus(job);
  10. for (FileStatus file: files) {
  11. Path path = file.getPath();
  12. long length = file.getLen();
  13. if (length != 0) {
  14. BlockLocation[] blkLocations;
  15. if (file instanceof LocatedFileStatus) {
  16. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  17. } else {
  18. FileSystem fs = path.getFileSystem(job.getConfiguration());
  19. blkLocations = fs.getFileBlockLocations(file, 0, length);
  20. }
  21. //查看该文本格式是否支持切割,不同文本格式不同
  22. if (isSplitable(job, path)) {
  23. long blockSize = file.getBlockSize();
  24. /**
  25. * protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
  26. 如果将块调大,那么将minsSize调大
  27. 如果将快调小,那么将maxSize跳下,小于blockSize
  28. * return Math.max(minSize, Math.min(maxSize, blockSize));
  29. * }
  30. */
  31. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  32. long bytesRemaining = length;
  33. //切割到剩余1.1时不再切割
  34. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  35. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  36. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  37. blkLocations[blkIndex].getHosts(),
  38. blkLocations[blkIndex].getCachedHosts()));
  39. bytesRemaining -= splitSize;
  40. }
  41. //将剩余的作为一个切片
  42. if (bytesRemaining != 0) {
  43. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  44. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  45. blkLocations[blkIndex].getHosts(),
  46. blkLocations[blkIndex].getCachedHosts()));
  47. }
  48. } else { // not splitable 不能切割的话直接放到一个切片里
  49. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
  50. blkLocations[0].getCachedHosts()));
  51. }
  52. } else {
  53. //Create empty hosts array for zero length files
  54. splits.add(makeSplit(path, 0, length, new String[0]));
  55. }
  56. }
  57. return splits;

切片大小的调节 :
如果将块调大,那么将minsSize调大 mapreduce.input.fileinputformat.split.minsize
如果将快调小,那么将maxSize跳下,小于blockSize mapreduce.input.fileinputformat.split.maxsize