|
@@ -214,6 +214,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
|
|
|
// generate splits
|
|
// generate splits
|
|
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
|
|
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
|
|
|
|
+ NetworkTopology clusterMap = new NetworkTopology();
|
|
for (FileStatus file: files) {
|
|
for (FileStatus file: files) {
|
|
Path path = file.getPath();
|
|
Path path = file.getPath();
|
|
FileSystem fs = path.getFileSystem(job);
|
|
FileSystem fs = path.getFileSystem(job);
|
|
@@ -226,7 +227,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
long bytesRemaining = length;
|
|
long bytesRemaining = length;
|
|
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
|
|
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
|
|
String[] splitHosts = getSplitHosts(blkLocations,
|
|
String[] splitHosts = getSplitHosts(blkLocations,
|
|
- length-bytesRemaining, splitSize);
|
|
|
|
|
|
+ length-bytesRemaining, splitSize, clusterMap);
|
|
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
|
|
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
|
|
splitHosts));
|
|
splitHosts));
|
|
bytesRemaining -= splitSize;
|
|
bytesRemaining -= splitSize;
|
|
@@ -237,7 +238,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
blkLocations[blkLocations.length-1].getHosts()));
|
|
blkLocations[blkLocations.length-1].getHosts()));
|
|
}
|
|
}
|
|
} else if (length != 0) {
|
|
} else if (length != 0) {
|
|
- String[] splitHosts = getSplitHosts(blkLocations,0,length);
|
|
|
|
|
|
+ String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
|
|
splits.add(new FileSplit(path, 0, length, splitHosts));
|
|
splits.add(new FileSplit(path, 0, length, splitHosts));
|
|
} else {
|
|
} else {
|
|
//Create empty hosts array for zero length files
|
|
//Create empty hosts array for zero length files
|
|
@@ -417,7 +418,8 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
protected String[] getSplitHosts(BlockLocation[] blkLocations,
|
|
protected String[] getSplitHosts(BlockLocation[] blkLocations,
|
|
- long offset, long splitSize) throws IOException {
|
|
|
|
|
|
+ long offset, long splitSize, NetworkTopology clusterMap)
|
|
|
|
+ throws IOException {
|
|
|
|
|
|
int startIndex = getBlockIndex(blkLocations, offset);
|
|
int startIndex = getBlockIndex(blkLocations, offset);
|
|
|
|
|
|
@@ -442,7 +444,6 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
long bytesInLastBlock = bytesInThisBlock;
|
|
long bytesInLastBlock = bytesInThisBlock;
|
|
int endIndex = index - 1;
|
|
int endIndex = index - 1;
|
|
|
|
|
|
- NetworkTopology clusterMap = new NetworkTopology();
|
|
|
|
Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
|
|
Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
|
|
Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
|
|
Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
|
|
String [] allTopos = new String[0];
|
|
String [] allTopos = new String[0];
|
|
@@ -486,6 +487,11 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
if (node == null) {
|
|
if (node == null) {
|
|
node = new NodeBase(topo);
|
|
node = new NodeBase(topo);
|
|
clusterMap.add(node);
|
|
clusterMap.add(node);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ nodeInfo = hostsMap.get(node);
|
|
|
|
+
|
|
|
|
+ if (nodeInfo == null) {
|
|
nodeInfo = new NodeInfo(node);
|
|
nodeInfo = new NodeInfo(node);
|
|
hostsMap.put(node,nodeInfo);
|
|
hostsMap.put(node,nodeInfo);
|
|
parentNode = node.getParent();
|
|
parentNode = node.getParent();
|