一、为什么要切片?——切片决定Map任务并行度
- 数据块:HDFS物理上把数据分成长度相等的块存储
- 数据切片:在逻辑上对数据进行切片处理,与物理分块无关
- 数据切片大小越小,即数据切片越多,map任务并行度越高
- 在数据切片大小不等于数据块大小的整数倍时,会涉及跨节点io,影响性能
- 每个切片分配一个map任务并行实例处理
- 默认情况下切片大小等于block大小
- 切片时不考虑整体,针对每个文件单独切片
二、如何切片?——Job提交流程源码解析
1.源码
//drive类
waitForCompletion();
//连接流程
submit();
// 1.建立连接
connect();
// 1)由配置信息创建集群对象
new Cluster(getConfiguration());
// (1)判断是本地yarn还是远程yarn
initialize(jobTrackerAddr, conf);
// 2.提交job
submitter.submitJobInternal(Job.this, cluster);
// 1)创建给集群提交数据的stag路径
JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取jobId,并创建Job路径
JobId jobId = submitClient.getNewJobID();
// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, submitJobDir);
input.getSplits(job);
// 5)向stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2.流程图
3. 解析
- 程序先找到你数据存储的目录
- 开始遍历处理(规划切片)目录下的每一个文件
- 遍历第一个文件ss.txt
- 获取文件大小fs.sizeOf(ss.txt)
- 计算切片大小
- 默认情况下,切片大小=blocksize
- 开始切片(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划到本块中)
- 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在getSplit()方法中完成
- InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
- 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
4. 配置切片大小参数
切片大小 = Math.max(minSize, Math.min(maxSize, blocksize))
mapreduce.input.fileinputformat.split.minsize=1,默认值1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue,默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize
maxsize:调到小于blocksize,则可以让切片变小,且切片大小等于这个值
minsize:调到大于blocksize,则可以让切片变大,且切片大小等于这个值