一、MapReduce的执行大致流程
1、InputFormat数据读取流程
1.1、切片
- 切片的概念:从文件的逻辑上进行大小的切分,一个切片多大,将来一个MapTask的处理的数据就多大
- 一个切片就会产生一个MapTask
- 切片时只考虑文件本身,不考虑数据的整体集
- 切片大小和切块大小默认是一致的,这样设计的目的是为了避免将来切片读取数据的时候有跨机器的情况
1.2、InputFormat的体系结构
-- FileInputFormat InputFormat的子实现类,实现切片逻辑
实现了getSplits() 负责切片
-- TextInputFormat FileInputFormat的子实现类, 实现读取数据的逻辑
createRecordReader() 返回一个RecordReader,在RecordReader中实现了
读取数据的方式:安行读取。
-- CombineFileInputFormat FileInputFormat的子实现类,此类中也实现了
一套切片逻辑 (处理:适用于小文件计算场景。)
// 切片源码分析
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
// minSize = 1(默认情况)
// 但是我们也可以通过改变mapreduce.input.fileinputformat.split.minsize 配置项来改变minSize大小
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// maxSize = Long的最大值(默认情况)
// 但是我们也可以通过改变mapreduce.input.fileinputformat.split.maxsize 配置项来改变maxSize大小
long maxSize = getMaxSplitSize(job);
// 管理最终切完片的对象的集合 最终返回的就是此集合
List<InputSplit> splits = new ArrayList<InputSplit>();
// 获取当前文件的详情
List<FileStatus> files = listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
// 遍历获取到的文件列表,一次按照文件为单位进行切片
for (FileStatus file: files) {
// 如果是忽略文件以及是文件夹就不进行切片
if (ignoreDirs && file.isDirectory()) {
continue;
}
// 获取文件的路径
Path path = file.getPath();
// 获取文件的内容大小
long length = file.getLen();
// 如果不是空文件 继续切片
if (length != 0) {
// 获取文件的具体的块信息
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断是否要进行切片(主要判断当前文件是否是压缩文件,有一些压缩文件时不能够进行切片)
if (isSplitable(job, path)) {
// 获取HDFS中的数据块的大小
long blockSize = file.getBlockSize();
// 计算切片的大小--> 128M 默认情况下永远都是块大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-- 内部方法:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
long bytesRemaining = length;
// 判断当前的文件的剩余内容是否要继续切片 SPLIT_SLOP = 1.1
// 判断公式:bytesRemaining)/splitSize > SPLIT_SLOP
// 用文件的剩余大小/切片大小 > 1.1 才继续切片(这样做的目的是为了让我们每一个MapTask处理的数据更加均衡)
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
// 如果最后文件还有剩余且不足一个切片大小,最后再形成最后的一个切片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
2、Shuffle工作机制