|
@@ -29,7 +29,6 @@ import java.util.HashMap;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
-import java.util.Map.Entry;
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -289,6 +288,26 @@ public abstract class CombineFileInputFormat<K, V>
|
|
maxSize, minSizeNode, minSizeRack, splits);
|
|
maxSize, minSizeNode, minSizeRack, splits);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Process all the nodes and create splits that are local to a node.
|
|
|
|
+ * Generate one split per node iteration, and walk over nodes multiple times
|
|
|
|
+ * to distribute the splits across nodes.
|
|
|
|
+ * <p>
|
|
|
|
+ * Note: The order of processing the nodes is undetermined because the
|
|
|
|
+ * implementation of nodeToBlocks is {@link java.util.HashMap} and its order
|
|
|
|
+ * of the entries is undetermined.
|
|
|
|
+ * @param nodeToBlocks Mapping from a node to the list of blocks that
|
|
|
|
+ * it contains.
|
|
|
|
+ * @param blockToNodes Mapping from a block to the nodes on which
|
|
|
|
+ * it has replicas.
|
|
|
|
+ * @param rackToBlocks Mapping from a rack name to the list of blocks it has.
|
|
|
|
+ * @param totLength Total length of the input files.
|
|
|
|
+ * @param maxSize Max size of each split.
|
|
|
|
+ * If set to 0, disable smoothing load.
|
|
|
|
+ * @param minSizeNode Minimum split size per node.
|
|
|
|
+ * @param minSizeRack Minimum split size per rack.
|
|
|
|
+ * @param splits New splits created by this method are added to the list.
|
|
|
|
+ */
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
|
|
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
|
|
Map<OneBlockInfo, String[]> blockToNodes,
|
|
Map<OneBlockInfo, String[]> blockToNodes,
|
|
@@ -309,11 +328,6 @@ public abstract class CombineFileInputFormat<K, V>
|
|
Set<String> completedNodes = new HashSet<String>();
|
|
Set<String> completedNodes = new HashSet<String>();
|
|
|
|
|
|
while(true) {
|
|
while(true) {
|
|
- // it is allowed for maxSize to be 0. Disable smoothing load for such cases
|
|
|
|
-
|
|
|
|
- // process all nodes and create splits that are local to a node. Generate
|
|
|
|
- // one split per node iteration, and walk over nodes multiple times to
|
|
|
|
- // distribute the splits across nodes.
|
|
|
|
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
|
|
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
|
|
.entrySet().iterator(); iter.hasNext();) {
|
|
.entrySet().iterator(); iter.hasNext();) {
|
|
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
|
|
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
|