底层是使用CombineFileInputFormat的createSplits
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,Map<OneBlockInfo, String[]> blockToNodes,Map<String, List<OneBlockInfo>> rackToBlocks,long totLength,long maxSize,long minSizeNode,long minSizeRack,List<InputSplit> splits) {ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();long curSplitSize = 0;int totalNodes = nodeToBlocks.size();long totalLength = totLength;Multiset<String> splitsPerNode = HashMultiset.create();Set<String> completedNodes = new HashSet<String>();while(true) {// it is allowed for maxSize to be 0. Disable smoothing load for such cases// process all nodes and create splits that are local to a node. Generate// one split per node iteration, and walk over nodes multiple times to// distribute the splits across nodes.for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, Set<OneBlockInfo>> one = iter.next();String node = one.getKey();// Skip the node if it has previously been marked as completed.if (completedNodes.contains(node)) {continue;}Set<OneBlockInfo> blocksInCurrentNode = one.getValue();// for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();while (oneBlockIter.hasNext()) {OneBlockInfo oneblock = oneBlockIter.next();// Remove all blocks which may already have been assigned to other// splits.if(!blockToNodes.containsKey(oneblock)) {oneBlockIter.remove();continue;}validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then// create this split.if (maxSize != 0 && curSplitSize >= maxSize) {// create an input split and add it to the splits arrayaddCreatedSplit(splits, Collections.singleton(node), validBlocks);totalLength -= curSplitSize;curSplitSize = 0;splitsPerNode.add(node);// Remove entries from blocksInNode so that we don't walk these// again.blocksInCurrentNode.removeAll(validBlocks);validBlocks.clear();// Done creating a single split for this node. Move on to the next// node so that splits are distributed across nodes.break;}}if (validBlocks.size() != 0) {// This implies that the last few blocks (or all in case maxSize=0)// were not part of a split. The node is complete.// if there were any blocks left over and their combined size is// larger than minSplitNode, then combine them into one split.// Otherwise add them back to the unprocessed pool. It is likely// that they will be combined with other blocks from the// same rack later on.// This condition also kicks in when max split size is not set. All// blocks on a node will be grouped together into a single split.if (minSizeNode != 0 && curSplitSize >= minSizeNode&& splitsPerNode.count(node) == 0) {// haven't created any split on this machine. so its ok to add a// smaller one for parallelism. Otherwise group it in the rack for// balanced size create an input split and add it to the splits// arrayaddCreatedSplit(splits, Collections.singleton(node), validBlocks);totalLength -= curSplitSize;splitsPerNode.add(node);// Remove entries from blocksInNode so that we don't walk this again.blocksInCurrentNode.removeAll(validBlocks);// The node is done. This was the last set of blocks for this node.} else {// Put the unplaced blocks back into the pool for later rack-allocation.for (OneBlockInfo oneblock : validBlocks) {blockToNodes.put(oneblock, oneblock.hosts);}}validBlocks.clear();curSplitSize = 0;completedNodes.add(node);} else { // No in-flight blocks.if (blocksInCurrentNode.size() == 0) {// Node is done. All blocks were fit into node-local splits.completedNodes.add(node);} // else Run through the node again.}}// Check if node-local assignments are complete.if (completedNodes.size() == totalNodes || totalLength == 0) {// All nodes have been walked over and marked as completed or all blocks// have been assigned. The rest should be handled via rackLock assignment.LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "+ completedNodes.size() + ", size left: " + totalLength);break;}}// if blocks in a rack are below the specified minimum size, then keep them// in 'overflow'. After the processing of all racks is complete, these// overflow blocks will be combined into splits.ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();Set<String> racks = new HashSet<String>();// Process all racks over and over again until there is no more work to do.while (blockToNodes.size() > 0) {// Create one split for this rack before moving over to the next rack.// Come back to this rack after creating a single split for each of the// remaining racks.// Process one rack location at a time, Combine all possible blocks that// reside on this rack as one split. (constrained by minimum and maximum// split size).// iterate over all racksfor (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =rackToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, List<OneBlockInfo>> one = iter.next();racks.add(one.getKey());List<OneBlockInfo> blocks = one.getValue();// for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.boolean createdSplit = false;for (OneBlockInfo oneblock : blocks) {if (blockToNodes.containsKey(oneblock)) {validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then// create this split.if (maxSize != 0 && curSplitSize >= maxSize) {// create an input split and add it to the splits arrayaddCreatedSplit(splits, getHosts(racks), validBlocks);createdSplit = true;break;}}}// if we created a split, then just go to the next rackif (createdSplit) {curSplitSize = 0;validBlocks.clear();racks.clear();continue;}if (!validBlocks.isEmpty()) {if (minSizeRack != 0 && curSplitSize >= minSizeRack) {// if there is a minimum size specified, then create a single split// otherwise, store these blocks into overflow data structureaddCreatedSplit(splits, getHosts(racks), validBlocks);} else {// There were a few blocks in this rack that// remained to be processed. Keep them in 'overflow' block list.// These will be combined later.overflowBlocks.addAll(validBlocks);}}curSplitSize = 0;validBlocks.clear();racks.clear();}}assert blockToNodes.isEmpty();assert curSplitSize == 0;assert validBlocks.isEmpty();assert racks.isEmpty();// Process all overflow blocksfor (OneBlockInfo oneblock : overflowBlocks) {validBlocks.add(oneblock);curSplitSize += oneblock.length;// This might cause an exiting rack location to be re-added,// but it should be ok.for (int i = 0; i < oneblock.racks.length; i++) {racks.add(oneblock.racks[i]);}// if the accumulated split size exceeds the maximum, then// create this split.if (maxSize != 0 && curSplitSize >= maxSize) {// create an input split and add it to the splits arrayaddCreatedSplit(splits, getHosts(racks), validBlocks);curSplitSize = 0;validBlocks.clear();racks.clear();}}// Process any remaining blocks, if any.if (!validBlocks.isEmpty()) {addCreatedSplit(splits, getHosts(racks), validBlocks);}}
大致过程:
- 先遍历每个node,接着遍历每个node的block,在遍历中,将block放入validBlocks数组中,然后将数组中的所有block大小和max_split阈值进行比较,如果大于max_split则将数组中的block取出,合并成一个split。在block遍历结束后对于剩余的block 和min_split大小进行比较,如果大于min_split,则将剩余的block合并成一个split,否则将剩余的block放入nodeToBlocks数组进行缓存,等待rack级别进行合并
- 在node遍历结束后,对于同一rack下的剩余block进行合并,合并逻辑通步骤1。
从上面的分析中可以看出,每个split的大小可能是不一样的。
