一、为什么要切片?——切片决定Map任务并行度

  • 数据块:HDFS物理上把数据分成长度相等的块存储
  • 数据切片:在逻辑上对数据进行切片处理,与物理分块无关
  • 数据切片大小越小,即数据切片越多,map任务并行度越高
  • 在数据切片大小不等于数据块大小的整数倍时,会涉及跨节点io,影响性能
  • 每个切片分配一个map任务并行实例处理
  • 默认情况下切片大小等于block大小
  • 切片时不考虑整体,针对每个文件单独切片

    二、如何切片?——Job提交流程源码解析

    1.源码

    1. //drive类
    2. waitForCompletion();
    3. //连接流程
    4. submit();
    5. // 1.建立连接
    6. connect();
    7. // 1)由配置信息创建集群对象
    8. new Cluster(getConfiguration());
    9. // (1)判断是本地yarn还是远程yarn
    10. initialize(jobTrackerAddr, conf);
    11. // 2.提交job
    12. submitter.submitJobInternal(Job.this, cluster);
    13. // 1)创建给集群提交数据的stag路径
    14. JobSubmissionFiles.getStagingDir(cluster, conf);
    15. // 2)获取jobId,并创建Job路径
    16. JobId jobId = submitClient.getNewJobID();
    17. // 3)拷贝jar包到集群
    18. copyAndConfigureFiles(job, submitJobDir);
    19. rUploader.uploadFiles(job, jobSubmitDir);
    20. // 4)计算切片,生成切片规划文件
    21. writeSplits(job, submitJobDir);
    22. maps = writeNewSplits(job, submitJobDir);
    23. input.getSplits(job);
    24. // 5)向stag路径写XML配置文件
    25. writeConf(conf, submitJobFile);
    26. conf.writeXml(out);
    27. // 6)提交job,返回提交状态
    28. status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

    2.流程图

    3. 解析

  1. 程序先找到你数据存储的目录
  2. 开始遍历处理(规划切片)目录下的每一个文件
  3. 遍历第一个文件ss.txt
    1. 获取文件大小fs.sizeOf(ss.txt)
    2. 计算切片大小
    3. 默认情况下,切片大小=blocksize
    4. 开始切片(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划到本块中)
    5. 将切片信息写到一个切片规划文件中
    6. 整个切片的核心过程在getSplit()方法中完成
    7. InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
  4. 提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数

    4. 配置切片大小参数

    切片大小 = Math.max(minSize, Math.min(maxSize, blocksize))
    mapreduce.input.fileinputformat.split.minsize=1,默认值1
    mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue,默认值Long.MAXValue
    因此,默认情况下,切片大小=blocksize
    maxsize:调到小于blocksize,则可以让切片变小,且切片大小等于这个值
    minsize:调到大于blocksize,则可以让切片变大,且切片大小等于这个值