一、为什么要切片?——切片决定Map任务并行度
- 数据块:HDFS物理上把数据分成长度相等的块存储
- 数据切片:在逻辑上对数据进行切片处理,与物理分块无关
- 数据切片大小越小,即数据切片越多,map任务并行度越高
- 在数据切片大小不等于数据块大小的整数倍时,会涉及跨节点io,影响性能
- 每个切片分配一个map任务并行实例处理
- 默认情况下切片大小等于block大小
- 切片时不考虑整体,针对每个文件单独切片
二、如何切片?——Job提交流程源码解析
1.源码
//drive类waitForCompletion();//连接流程submit();// 1.建立连接connect();// 1)由配置信息创建集群对象new Cluster(getConfiguration());// (1)判断是本地yarn还是远程yarninitialize(jobTrackerAddr, conf);// 2.提交jobsubmitter.submitJobInternal(Job.this, cluster);// 1)创建给集群提交数据的stag路径JobSubmissionFiles.getStagingDir(cluster, conf);// 2)获取jobId,并创建Job路径JobId jobId = submitClient.getNewJobID();// 3)拷贝jar包到集群copyAndConfigureFiles(job, submitJobDir);rUploader.uploadFiles(job, jobSubmitDir);// 4)计算切片,生成切片规划文件writeSplits(job, submitJobDir);maps = writeNewSplits(job, submitJobDir);input.getSplits(job);// 5)向stag路径写XML配置文件writeConf(conf, submitJobFile);conf.writeXml(out);// 6)提交job,返回提交状态status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2.流程图
3. 解析
- 程序先找到你数据存储的目录
- 开始遍历处理(规划切片)目录下的每一个文件
- 遍历第一个文件ss.txt
- 获取文件大小fs.sizeOf(ss.txt)
- 计算切片大小
- 默认情况下,切片大小=blocksize
- 开始切片(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划到本块中)
- 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在getSplit()方法中完成
- InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
- 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
4. 配置切片大小参数
切片大小 = Math.max(minSize, Math.min(maxSize, blocksize))
mapreduce.input.fileinputformat.split.minsize=1,默认值1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue,默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize
maxsize:调到小于blocksize,则可以让切片变小,且切片大小等于这个值
minsize:调到大于blocksize,则可以让切片变大,且切片大小等于这个值
