|
@@ -295,6 +295,15 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
String[] hosts) {
|
|
|
return new FileSplit(file, start, length, hosts);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A factory that makes the split for this class. It can be overridden
|
|
|
+ * by sub-classes to make sub-types
|
|
|
+ */
|
|
|
+ protected FileSplit makeSplit(Path file, long start, long length,
|
|
|
+ String[] hosts, String[] inMemoryHosts) {
|
|
|
+ return new FileSplit(file, start, length, hosts, inMemoryHosts);
|
|
|
+ }
|
|
|
|
|
|
/** Splits files returned by {@link #listStatus(JobConf)} when
|
|
|
* they're too big.*/
|
|
@@ -337,22 +346,22 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
|
|
|
long bytesRemaining = length;
|
|
|
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
|
|
|
- String[] splitHosts = getSplitHosts(blkLocations,
|
|
|
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
|
|
|
length-bytesRemaining, splitSize, clusterMap);
|
|
|
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
|
|
|
- splitHosts));
|
|
|
+ splitHosts[0], splitHosts[1]));
|
|
|
bytesRemaining -= splitSize;
|
|
|
}
|
|
|
|
|
|
if (bytesRemaining != 0) {
|
|
|
- String[] splitHosts = getSplitHosts(blkLocations, length
|
|
|
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
|
|
|
- bytesRemaining, bytesRemaining, clusterMap);
|
|
|
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
|
|
|
- splitHosts));
|
|
|
+ splitHosts[0], splitHosts[1]));
|
|
|
}
|
|
|
} else {
|
|
|
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
|
|
|
- splits.add(makeSplit(path, 0, length, splitHosts));
|
|
|
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
|
|
|
+ splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
|
|
|
}
|
|
|
} else {
|
|
|
//Create empty hosts array for zero length files
|
|
@@ -538,10 +547,30 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
* @param blkLocations The list of block locations
|
|
|
* @param offset
|
|
|
* @param splitSize
|
|
|
- * @return array of hosts that contribute most to this split
|
|
|
+ * @return an array of hosts that contribute most to this split
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
protected String[] getSplitHosts(BlockLocation[] blkLocations,
|
|
|
+ long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
|
|
|
+ return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
|
|
|
+ clusterMap)[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This function identifies and returns the hosts that contribute
|
|
|
+ * most for a given split. For calculating the contribution, rack
|
|
|
+ * locality is treated on par with host locality, so hosts from racks
|
|
|
+ * that contribute the most are preferred over hosts on racks that
|
|
|
+ * contribute less
|
|
|
+ * @param blkLocations The list of block locations
|
|
|
+ * @param offset
|
|
|
+ * @param splitSize
|
|
|
+ * @return two arrays - one of hosts that contribute most to this split, and
|
|
|
+ * one of hosts that contribute most to this split that have the data
|
|
|
+ * cached on them
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations,
|
|
|
long offset, long splitSize, NetworkTopology clusterMap)
|
|
|
throws IOException {
|
|
|
|
|
@@ -552,7 +581,8 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
|
|
|
//If this is the only block, just return
|
|
|
if (bytesInThisBlock >= splitSize) {
|
|
|
- return blkLocations[startIndex].getHosts();
|
|
|
+ return new String[][] { blkLocations[startIndex].getHosts(),
|
|
|
+ blkLocations[startIndex].getCachedHosts() };
|
|
|
}
|
|
|
|
|
|
long bytesInFirstBlock = bytesInThisBlock;
|
|
@@ -639,7 +669,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
|
|
|
} // for all indices
|
|
|
|
|
|
- return identifyHosts(allTopos.length, racksMap);
|
|
|
+ // We don't yet support cached hosts when bytesInThisBlock > splitSize
|
|
|
+ return new String[][] { identifyHosts(allTopos.length, racksMap),
|
|
|
+ new String[0]};
|
|
|
}
|
|
|
|
|
|
private String[] identifyHosts(int replicationFactor,
|