|
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib.input;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.LinkedHashSet;
|
|
|
import java.util.HashSet;
|
|
@@ -38,7 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.fs.LocatedFileStatus;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -215,46 +214,33 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
}
|
|
|
|
|
|
// all the files in input set
|
|
|
- Path[] paths = FileUtil.stat2Paths(
|
|
|
- listStatus(job).toArray(new FileStatus[0]));
|
|
|
+ List<FileStatus> stats = listStatus(job);
|
|
|
List<InputSplit> splits = new ArrayList<InputSplit>();
|
|
|
- if (paths.length == 0) {
|
|
|
+ if (stats.size() == 0) {
|
|
|
return splits;
|
|
|
}
|
|
|
|
|
|
- // Convert them to Paths first. This is a costly operation and
|
|
|
- // we should do it first, otherwise we will incur doing it multiple
|
|
|
- // times, one time each for each pool in the next loop.
|
|
|
- List<Path> newpaths = new LinkedList<Path>();
|
|
|
- for (int i = 0; i < paths.length; i++) {
|
|
|
- FileSystem fs = paths[i].getFileSystem(conf);
|
|
|
- Path p = fs.makeQualified(paths[i]);
|
|
|
- newpaths.add(p);
|
|
|
- }
|
|
|
-
|
|
|
// In one single iteration, process all the paths in a single pool.
|
|
|
// Processing one pool at a time ensures that a split contains paths
|
|
|
// from a single pool only.
|
|
|
for (MultiPathFilter onepool : pools) {
|
|
|
- ArrayList<Path> myPaths = new ArrayList<Path>();
|
|
|
+ ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
|
|
|
|
|
|
// pick one input path. If it matches all the filters in a pool,
|
|
|
// add it to the output set
|
|
|
- for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
|
|
|
- Path p = iter.next();
|
|
|
- if (onepool.accept(p)) {
|
|
|
+ for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
|
|
|
+ FileStatus p = iter.next();
|
|
|
+ if (onepool.accept(p.getPath())) {
|
|
|
myPaths.add(p); // add it to my output set
|
|
|
iter.remove();
|
|
|
}
|
|
|
}
|
|
|
// create splits for all files in this pool.
|
|
|
- getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
|
|
|
- maxSize, minSizeNode, minSizeRack, splits);
|
|
|
+ getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
|
|
|
}
|
|
|
|
|
|
// create splits for all files that are not in any pool.
|
|
|
- getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
|
|
|
- maxSize, minSizeNode, minSizeRack, splits);
|
|
|
+ getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
|
|
|
|
|
|
// free up rackToNodes map
|
|
|
rackToNodes.clear();
|
|
@@ -264,7 +250,7 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
/**
|
|
|
* Return all the splits in the specified set of paths
|
|
|
*/
|
|
|
- private void getMoreSplits(JobContext job, Path[] paths,
|
|
|
+ private void getMoreSplits(JobContext job, List<FileStatus> stats,
|
|
|
long maxSize, long minSizeNode, long minSizeRack,
|
|
|
List<InputSplit> splits)
|
|
|
throws IOException {
|
|
@@ -285,15 +271,16 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
|
|
|
new HashMap<String, Set<OneBlockInfo>>();
|
|
|
|
|
|
- files = new OneFileInfo[paths.length];
|
|
|
- if (paths.length == 0) {
|
|
|
+ files = new OneFileInfo[stats.size()];
|
|
|
+ if (stats.size() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// populate all the blocks for all files
|
|
|
long totLength = 0;
|
|
|
- for (int i = 0; i < paths.length; i++) {
|
|
|
- files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
|
|
|
+ int i = 0;
|
|
|
+ for (FileStatus stat : stats) {
|
|
|
+ files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
|
|
|
rackToBlocks, blockToNodes, nodeToBlocks,
|
|
|
rackToNodes, maxSize);
|
|
|
totLength += files[i].getLength();
|
|
@@ -569,7 +556,7 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
private long fileSize; // size of the file
|
|
|
private OneBlockInfo[] blocks; // all blocks in this file
|
|
|
|
|
|
- OneFileInfo(Path path, Configuration conf,
|
|
|
+ OneFileInfo(FileStatus stat, Configuration conf,
|
|
|
boolean isSplitable,
|
|
|
HashMap<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
HashMap<OneBlockInfo, String[]> blockToNodes,
|
|
@@ -580,10 +567,13 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
this.fileSize = 0;
|
|
|
|
|
|
// get block locations from file system
|
|
|
- FileSystem fs = path.getFileSystem(conf);
|
|
|
- FileStatus stat = fs.getFileStatus(path);
|
|
|
- BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
|
|
|
- stat.getLen());
|
|
|
+ BlockLocation[] locations;
|
|
|
+ if (stat instanceof LocatedFileStatus) {
|
|
|
+ locations = ((LocatedFileStatus) stat).getBlockLocations();
|
|
|
+ } else {
|
|
|
+ FileSystem fs = stat.getPath().getFileSystem(conf);
|
|
|
+ locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
|
|
|
+ }
|
|
|
// create a list of all block and their locations
|
|
|
if (locations == null) {
|
|
|
blocks = new OneBlockInfo[0];
|
|
@@ -598,8 +588,8 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
// full file length
|
|
|
blocks = new OneBlockInfo[1];
|
|
|
fileSize = stat.getLen();
|
|
|
- blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
|
|
|
- .getHosts(), locations[0].getTopologyPaths());
|
|
|
+ blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
|
|
|
+ locations[0].getHosts(), locations[0].getTopologyPaths());
|
|
|
} else {
|
|
|
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
|
|
|
locations.length);
|
|
@@ -625,9 +615,9 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
myLength = Math.min(maxSize, left);
|
|
|
}
|
|
|
}
|
|
|
- OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
|
|
|
- myLength, locations[i].getHosts(), locations[i]
|
|
|
- .getTopologyPaths());
|
|
|
+ OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
|
|
|
+ myOffset, myLength, locations[i].getHosts(),
|
|
|
+ locations[i].getTopologyPaths());
|
|
|
left -= myLength;
|
|
|
myOffset += myLength;
|
|
|
|
|
@@ -739,6 +729,9 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
|
|
|
protected BlockLocation[] getFileBlockLocations(
|
|
|
FileSystem fs, FileStatus stat) throws IOException {
|
|
|
+ if (stat instanceof LocatedFileStatus) {
|
|
|
+ return ((LocatedFileStatus) stat).getBlockLocations();
|
|
|
+ }
|
|
|
return fs.getFileBlockLocations(stat, 0, stat.getLen());
|
|
|
}
|
|
|
|