底层是使用CombineFileInputFormat的createSplits

    1. void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
    2. Map<OneBlockInfo, String[]> blockToNodes,
    3. Map<String, List<OneBlockInfo>> rackToBlocks,
    4. long totLength,
    5. long maxSize,
    6. long minSizeNode,
    7. long minSizeRack,
    8. List<InputSplit> splits
    9. ) {
    10. ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
    11. long curSplitSize = 0;
    12. int totalNodes = nodeToBlocks.size();
    13. long totalLength = totLength;
    14. Multiset<String> splitsPerNode = HashMultiset.create();
    15. Set<String> completedNodes = new HashSet<String>();
    16. while(true) {
    17. // it is allowed for maxSize to be 0. Disable smoothing load for such cases
    18. // process all nodes and create splits that are local to a node. Generate
    19. // one split per node iteration, and walk over nodes multiple times to
    20. // distribute the splits across nodes.
    21. for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
    22. .entrySet().iterator(); iter.hasNext();) {
    23. Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
    24. String node = one.getKey();
    25. // Skip the node if it has previously been marked as completed.
    26. if (completedNodes.contains(node)) {
    27. continue;
    28. }
    29. Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
    30. // for each block, copy it into validBlocks. Delete it from
    31. // blockToNodes so that the same block does not appear in
    32. // two different splits.
    33. Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
    34. while (oneBlockIter.hasNext()) {
    35. OneBlockInfo oneblock = oneBlockIter.next();
    36. // Remove all blocks which may already have been assigned to other
    37. // splits.
    38. if(!blockToNodes.containsKey(oneblock)) {
    39. oneBlockIter.remove();
    40. continue;
    41. }
    42. validBlocks.add(oneblock);
    43. blockToNodes.remove(oneblock);
    44. curSplitSize += oneblock.length;
    45. // if the accumulated split size exceeds the maximum, then
    46. // create this split.
    47. if (maxSize != 0 && curSplitSize >= maxSize) {
    48. // create an input split and add it to the splits array
    49. addCreatedSplit(splits, Collections.singleton(node), validBlocks);
    50. totalLength -= curSplitSize;
    51. curSplitSize = 0;
    52. splitsPerNode.add(node);
    53. // Remove entries from blocksInNode so that we don't walk these
    54. // again.
    55. blocksInCurrentNode.removeAll(validBlocks);
    56. validBlocks.clear();
    57. // Done creating a single split for this node. Move on to the next
    58. // node so that splits are distributed across nodes.
    59. break;
    60. }
    61. }
    62. if (validBlocks.size() != 0) {
    63. // This implies that the last few blocks (or all in case maxSize=0)
    64. // were not part of a split. The node is complete.
    65. // if there were any blocks left over and their combined size is
    66. // larger than minSplitNode, then combine them into one split.
    67. // Otherwise add them back to the unprocessed pool. It is likely
    68. // that they will be combined with other blocks from the
    69. // same rack later on.
    70. // This condition also kicks in when max split size is not set. All
    71. // blocks on a node will be grouped together into a single split.
    72. if (minSizeNode != 0 && curSplitSize >= minSizeNode
    73. && splitsPerNode.count(node) == 0) {
    74. // haven't created any split on this machine. so its ok to add a
    75. // smaller one for parallelism. Otherwise group it in the rack for
    76. // balanced size create an input split and add it to the splits
    77. // array
    78. addCreatedSplit(splits, Collections.singleton(node), validBlocks);
    79. totalLength -= curSplitSize;
    80. splitsPerNode.add(node);
    81. // Remove entries from blocksInNode so that we don't walk this again.
    82. blocksInCurrentNode.removeAll(validBlocks);
    83. // The node is done. This was the last set of blocks for this node.
    84. } else {
    85. // Put the unplaced blocks back into the pool for later rack-allocation.
    86. for (OneBlockInfo oneblock : validBlocks) {
    87. blockToNodes.put(oneblock, oneblock.hosts);
    88. }
    89. }
    90. validBlocks.clear();
    91. curSplitSize = 0;
    92. completedNodes.add(node);
    93. } else { // No in-flight blocks.
    94. if (blocksInCurrentNode.size() == 0) {
    95. // Node is done. All blocks were fit into node-local splits.
    96. completedNodes.add(node);
    97. } // else Run through the node again.
    98. }
    99. }
    100. // Check if node-local assignments are complete.
    101. if (completedNodes.size() == totalNodes || totalLength == 0) {
    102. // All nodes have been walked over and marked as completed or all blocks
    103. // have been assigned. The rest should be handled via rackLock assignment.
    104. LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
    105. + completedNodes.size() + ", size left: " + totalLength);
    106. break;
    107. }
    108. }
    109. // if blocks in a rack are below the specified minimum size, then keep them
    110. // in 'overflow'. After the processing of all racks is complete, these
    111. // overflow blocks will be combined into splits.
    112. ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
    113. Set<String> racks = new HashSet<String>();
    114. // Process all racks over and over again until there is no more work to do.
    115. while (blockToNodes.size() > 0) {
    116. // Create one split for this rack before moving over to the next rack.
    117. // Come back to this rack after creating a single split for each of the
    118. // remaining racks.
    119. // Process one rack location at a time, Combine all possible blocks that
    120. // reside on this rack as one split. (constrained by minimum and maximum
    121. // split size).
    122. // iterate over all racks
    123. for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
    124. rackToBlocks.entrySet().iterator(); iter.hasNext();) {
    125. Map.Entry<String, List<OneBlockInfo>> one = iter.next();
    126. racks.add(one.getKey());
    127. List<OneBlockInfo> blocks = one.getValue();
    128. // for each block, copy it into validBlocks. Delete it from
    129. // blockToNodes so that the same block does not appear in
    130. // two different splits.
    131. boolean createdSplit = false;
    132. for (OneBlockInfo oneblock : blocks) {
    133. if (blockToNodes.containsKey(oneblock)) {
    134. validBlocks.add(oneblock);
    135. blockToNodes.remove(oneblock);
    136. curSplitSize += oneblock.length;
    137. // if the accumulated split size exceeds the maximum, then
    138. // create this split.
    139. if (maxSize != 0 && curSplitSize >= maxSize) {
    140. // create an input split and add it to the splits array
    141. addCreatedSplit(splits, getHosts(racks), validBlocks);
    142. createdSplit = true;
    143. break;
    144. }
    145. }
    146. }
    147. // if we created a split, then just go to the next rack
    148. if (createdSplit) {
    149. curSplitSize = 0;
    150. validBlocks.clear();
    151. racks.clear();
    152. continue;
    153. }
    154. if (!validBlocks.isEmpty()) {
    155. if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
    156. // if there is a minimum size specified, then create a single split
    157. // otherwise, store these blocks into overflow data structure
    158. addCreatedSplit(splits, getHosts(racks), validBlocks);
    159. } else {
    160. // There were a few blocks in this rack that
    161. // remained to be processed. Keep them in 'overflow' block list.
    162. // These will be combined later.
    163. overflowBlocks.addAll(validBlocks);
    164. }
    165. }
    166. curSplitSize = 0;
    167. validBlocks.clear();
    168. racks.clear();
    169. }
    170. }
    171. assert blockToNodes.isEmpty();
    172. assert curSplitSize == 0;
    173. assert validBlocks.isEmpty();
    174. assert racks.isEmpty();
    175. // Process all overflow blocks
    176. for (OneBlockInfo oneblock : overflowBlocks) {
    177. validBlocks.add(oneblock);
    178. curSplitSize += oneblock.length;
    179. // This might cause an exiting rack location to be re-added,
    180. // but it should be ok.
    181. for (int i = 0; i < oneblock.racks.length; i++) {
    182. racks.add(oneblock.racks[i]);
    183. }
    184. // if the accumulated split size exceeds the maximum, then
    185. // create this split.
    186. if (maxSize != 0 && curSplitSize >= maxSize) {
    187. // create an input split and add it to the splits array
    188. addCreatedSplit(splits, getHosts(racks), validBlocks);
    189. curSplitSize = 0;
    190. validBlocks.clear();
    191. racks.clear();
    192. }
    193. }
    194. // Process any remaining blocks, if any.
    195. if (!validBlocks.isEmpty()) {
    196. addCreatedSplit(splits, getHosts(racks), validBlocks);
    197. }
    198. }

    大致过程:

    1. 先遍历每个node,接着遍历每个node的block,在遍历中,将block放入validBlocks数组中,然后将数组中的所有block大小和max_split阈值进行比较,如果大于max_split则将数组中的block取出,合并成一个split。在block遍历结束后对于剩余的block 和min_split大小进行比较,如果大于min_split,则将剩余的block合并成一个split,否则将剩余的block放入nodeToBlocks数组进行缓存,等待rack级别进行合并
    2. 在node遍历结束后,对于同一rack下的剩余block进行合并,合并逻辑通步骤1。

    从上面的分析中可以看出,每个split的大小可能是不一样的。