|
@@ -22,13 +22,18 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.LinkedList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.LinkedHashSet;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Set;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -50,6 +55,8 @@ import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.collect.HashMultiset;
|
|
|
+import com.google.common.collect.Multiset;
|
|
|
|
|
|
/**
|
|
|
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
|
|
@@ -79,6 +86,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
public abstract class CombineFileInputFormat<K, V>
|
|
|
extends FileInputFormat<K, V> {
|
|
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
|
|
|
+
|
|
|
public static final String SPLIT_MINSIZE_PERNODE =
|
|
|
"mapreduce.input.fileinputformat.split.minsize.per.node";
|
|
|
public static final String SPLIT_MINSIZE_PERRACK =
|
|
@@ -186,6 +195,8 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
maxSize = maxSplitSize;
|
|
|
} else {
|
|
|
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
|
|
|
+ // If maxSize is not configured, a single split will be generated per
|
|
|
+ // node.
|
|
|
}
|
|
|
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
|
|
|
throw new IOException("Minimum split size pernode " + minSizeNode +
|
|
@@ -271,8 +282,8 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
new HashMap<OneBlockInfo, String[]>();
|
|
|
|
|
|
// mapping from a node to the list of blocks that it contains
|
|
|
- HashMap<String, List<OneBlockInfo>> nodeToBlocks =
|
|
|
- new HashMap<String, List<OneBlockInfo>>();
|
|
|
+ HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
|
|
|
+ new HashMap<String, Set<OneBlockInfo>>();
|
|
|
|
|
|
files = new OneFileInfo[paths.length];
|
|
|
if (paths.length == 0) {
|
|
@@ -292,9 +303,9 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
|
|
|
- HashMap<OneBlockInfo, String[]> blockToNodes,
|
|
|
- HashMap<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
+ void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
|
|
|
+ Map<OneBlockInfo, String[]> blockToNodes,
|
|
|
+ Map<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
long totLength,
|
|
|
long maxSize,
|
|
|
long minSizeNode,
|
|
@@ -302,83 +313,118 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
List<InputSplit> splits
|
|
|
) {
|
|
|
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
|
|
|
- Set<String> nodes = new HashSet<String>();
|
|
|
long curSplitSize = 0;
|
|
|
|
|
|
- int numNodes = nodeToBlocks.size();
|
|
|
+ int totalNodes = nodeToBlocks.size();
|
|
|
long totalLength = totLength;
|
|
|
|
|
|
+ Multiset<String> splitsPerNode = HashMultiset.create();
|
|
|
+ Set<String> completedNodes = new HashSet<String>();
|
|
|
+
|
|
|
while(true) {
|
|
|
// it is allowed for maxSize to be 0. Disable smoothing load for such cases
|
|
|
- int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
|
|
|
- ((int) (totalLength/maxSize))/numNodes
|
|
|
- : Integer.MAX_VALUE;
|
|
|
- int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
|
|
|
- numNodes = 0;
|
|
|
-
|
|
|
- // process all nodes and create splits that are local to a node.
|
|
|
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
|
|
|
+
|
|
|
+ // process all nodes and create splits that are local to a node. Generate
|
|
|
+ // one split per node iteration, and walk over nodes multiple times to
|
|
|
+ // distribute the splits across nodes.
|
|
|
+ for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
|
|
|
.entrySet().iterator(); iter.hasNext();) {
|
|
|
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
|
|
|
- nodes.add(one.getKey());
|
|
|
- List<OneBlockInfo> blocksInNode = one.getValue();
|
|
|
+ Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
|
|
|
+
|
|
|
+ String node = one.getKey();
|
|
|
+
|
|
|
+ // Skip the node if it has previously been marked as completed.
|
|
|
+ if (completedNodes.contains(node)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
|
|
|
|
|
|
// for each block, copy it into validBlocks. Delete it from
|
|
|
// blockToNodes so that the same block does not appear in
|
|
|
// two different splits.
|
|
|
- int splitsInNode = 0;
|
|
|
- for (OneBlockInfo oneblock : blocksInNode) {
|
|
|
- if (blockToNodes.containsKey(oneblock)) {
|
|
|
- validBlocks.add(oneblock);
|
|
|
- blockToNodes.remove(oneblock);
|
|
|
- curSplitSize += oneblock.length;
|
|
|
-
|
|
|
- // if the accumulated split size exceeds the maximum, then
|
|
|
- // create this split.
|
|
|
- if (maxSize != 0 && curSplitSize >= maxSize) {
|
|
|
- // create an input split and add it to the splits array
|
|
|
- addCreatedSplit(splits, nodes, validBlocks);
|
|
|
- totalLength -= curSplitSize;
|
|
|
- curSplitSize = 0;
|
|
|
- validBlocks.clear();
|
|
|
- splitsInNode++;
|
|
|
- if (splitsInNode == maxSplitsByNodeOnly) {
|
|
|
- // stop grouping on a node so as not to create
|
|
|
- // disproportionately more splits on a node because it happens
|
|
|
- // to have many blocks
|
|
|
- // consider only these nodes in next round of grouping because
|
|
|
- // they have leftover blocks that may need to be grouped
|
|
|
- numNodes++;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
|
|
|
+ while (oneBlockIter.hasNext()) {
|
|
|
+ OneBlockInfo oneblock = oneBlockIter.next();
|
|
|
+
|
|
|
+ // Remove all blocks which may already have been assigned to other
|
|
|
+ // splits.
|
|
|
+ if(!blockToNodes.containsKey(oneblock)) {
|
|
|
+ oneBlockIter.remove();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ validBlocks.add(oneblock);
|
|
|
+ blockToNodes.remove(oneblock);
|
|
|
+ curSplitSize += oneblock.length;
|
|
|
+
|
|
|
+ // if the accumulated split size exceeds the maximum, then
|
|
|
+ // create this split.
|
|
|
+ if (maxSize != 0 && curSplitSize >= maxSize) {
|
|
|
+ // create an input split and add it to the splits array
|
|
|
+ addCreatedSplit(splits, Collections.singleton(node), validBlocks);
|
|
|
+ totalLength -= curSplitSize;
|
|
|
+ curSplitSize = 0;
|
|
|
+
|
|
|
+ splitsPerNode.add(node);
|
|
|
+
|
|
|
+ // Remove entries from blocksInNode so that we don't walk these
|
|
|
+ // again.
|
|
|
+ blocksInCurrentNode.removeAll(validBlocks);
|
|
|
+ validBlocks.clear();
|
|
|
+
|
|
|
+ // Done creating a single split for this node. Move on to the next
|
|
|
+ // node so that splits are distributed across nodes.
|
|
|
+ break;
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
- // if there were any blocks left over and their combined size is
|
|
|
- // larger than minSplitNode, then combine them into one split.
|
|
|
- // Otherwise add them back to the unprocessed pool. It is likely
|
|
|
- // that they will be combined with other blocks from the
|
|
|
- // same rack later on.
|
|
|
- if (minSizeNode != 0 && curSplitSize >= minSizeNode
|
|
|
- && splitsInNode == 0) {
|
|
|
- // haven't created any split on this machine. so its ok to add a
|
|
|
- // smaller
|
|
|
- // one for parallelism. Otherwise group it in the rack for balanced
|
|
|
- // size
|
|
|
- // create an input split and add it to the splits array
|
|
|
- addCreatedSplit(splits, nodes, validBlocks);
|
|
|
- totalLength -= curSplitSize;
|
|
|
- } else {
|
|
|
- for (OneBlockInfo oneblock : validBlocks) {
|
|
|
- blockToNodes.put(oneblock, oneblock.hosts);
|
|
|
+ if (validBlocks.size() != 0) {
|
|
|
+ // This implies that the last few blocks (or all in case maxSize=0)
|
|
|
+ // were not part of a split. The node is complete.
|
|
|
+
|
|
|
+ // if there were any blocks left over and their combined size is
|
|
|
+ // larger than minSplitNode, then combine them into one split.
|
|
|
+ // Otherwise add them back to the unprocessed pool. It is likely
|
|
|
+ // that they will be combined with other blocks from the
|
|
|
+ // same rack later on.
|
|
|
+ // This condition also kicks in when max split size is not set. All
|
|
|
+ // blocks on a node will be grouped together into a single split.
|
|
|
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode
|
|
|
+ && splitsPerNode.count(node) == 0) {
|
|
|
+ // haven't created any split on this machine. so its ok to add a
|
|
|
+ // smaller one for parallelism. Otherwise group it in the rack for
|
|
|
+ // balanced size create an input split and add it to the splits
|
|
|
+ // array
|
|
|
+ addCreatedSplit(splits, Collections.singleton(node), validBlocks);
|
|
|
+ totalLength -= curSplitSize;
|
|
|
+ splitsPerNode.add(node);
|
|
|
+ // Remove entries from blocksInNode so that we don't walk this again.
|
|
|
+ blocksInCurrentNode.removeAll(validBlocks);
|
|
|
+ // The node is done. This was the last set of blocks for this node.
|
|
|
+ } else {
|
|
|
+ // Put the unplaced blocks back into the pool for later rack-allocation.
|
|
|
+ for (OneBlockInfo oneblock : validBlocks) {
|
|
|
+ blockToNodes.put(oneblock, oneblock.hosts);
|
|
|
+ }
|
|
|
}
|
|
|
+ validBlocks.clear();
|
|
|
+ curSplitSize = 0;
|
|
|
+ completedNodes.add(node);
|
|
|
+ } else { // No in-flight blocks.
|
|
|
+ if (blocksInCurrentNode.size() == 0) {
|
|
|
+ // Node is done. All blocks were fit into node-local splits.
|
|
|
+ completedNodes.add(node);
|
|
|
+ } // else Run through the node again.
|
|
|
}
|
|
|
- validBlocks.clear();
|
|
|
- nodes.clear();
|
|
|
- curSplitSize = 0;
|
|
|
}
|
|
|
-
|
|
|
- if(!(numNodes>0 && totalLength>0)) {
|
|
|
+
|
|
|
+ // Check if node-local assignments are complete.
|
|
|
+ if (completedNodes.size() == totalNodes || totalLength == 0) {
|
|
|
+ // All nodes have been walked over and marked as completed or all blocks
|
|
|
+ // have been assigned. The rest should be handled via rackLock assignment.
|
|
|
+ LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
|
|
|
+ + completedNodes.size() + ", size left: " + totalLength);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -527,7 +573,7 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
boolean isSplitable,
|
|
|
HashMap<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
HashMap<OneBlockInfo, String[]> blockToNodes,
|
|
|
- HashMap<String, List<OneBlockInfo>> nodeToBlocks,
|
|
|
+ HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
|
|
|
HashMap<String, Set<String>> rackToNodes,
|
|
|
long maxSize)
|
|
|
throws IOException {
|
|
@@ -598,10 +644,10 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
|
|
|
@VisibleForTesting
|
|
|
static void populateBlockInfo(OneBlockInfo[] blocks,
|
|
|
- HashMap<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
- HashMap<OneBlockInfo, String[]> blockToNodes,
|
|
|
- HashMap<String, List<OneBlockInfo>> nodeToBlocks,
|
|
|
- HashMap<String, Set<String>> rackToNodes) {
|
|
|
+ Map<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
+ Map<OneBlockInfo, String[]> blockToNodes,
|
|
|
+ Map<String, Set<OneBlockInfo>> nodeToBlocks,
|
|
|
+ Map<String, Set<String>> rackToNodes) {
|
|
|
for (OneBlockInfo oneblock : blocks) {
|
|
|
// add this block to the block --> node locations map
|
|
|
blockToNodes.put(oneblock, oneblock.hosts);
|
|
@@ -633,9 +679,9 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
// add this block to the node --> block map
|
|
|
for (int j = 0; j < oneblock.hosts.length; j++) {
|
|
|
String node = oneblock.hosts[j];
|
|
|
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
|
|
|
+ Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
|
|
|
if (blklist == null) {
|
|
|
- blklist = new ArrayList<OneBlockInfo>();
|
|
|
+ blklist = new LinkedHashSet<OneBlockInfo>();
|
|
|
nodeToBlocks.put(node, blklist);
|
|
|
}
|
|
|
blklist.add(oneblock);
|
|
@@ -696,7 +742,7 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
return fs.getFileBlockLocations(stat, 0, stat.getLen());
|
|
|
}
|
|
|
|
|
|
- private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
|
|
|
+ private static void addHostToRack(Map<String, Set<String>> rackToNodes,
|
|
|
String rack, String host) {
|
|
|
Set<String> hosts = rackToNodes.get(rack);
|
|
|
if (hosts == null) {
|