|
@@ -181,26 +181,6 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
LOG.info("Total input paths to process : " + result.size());
|
|
|
return result.toArray(new FileStatus[result.size()]);
|
|
|
}
|
|
|
-
|
|
|
- /** List input directories.
|
|
|
- * Subclasses may override to, e.g., select only files matching a regular
|
|
|
- * expression.
|
|
|
- *
|
|
|
- * @param job the job to list input paths for
|
|
|
- * @return array of Path objects
|
|
|
- * @throws IOException if zero items.
|
|
|
- * @deprecated Use {@link #listStatus(JobConf)} instead.
|
|
|
- */
|
|
|
- @Deprecated
|
|
|
- protected Path[] listPaths(JobConf job)
|
|
|
- throws IOException {
|
|
|
- return FileUtil.stat2Paths(listStatus(job));
|
|
|
- }
|
|
|
-
|
|
|
- @Deprecated
|
|
|
- public void validateInput(JobConf job) throws IOException {
|
|
|
- // handled by getSplits
|
|
|
- }
|
|
|
|
|
|
/** Splits files returned by {@link #listStatus(JobConf)} when
|
|
|
* they're too big.*/
|
|
@@ -209,17 +189,6 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
throws IOException {
|
|
|
FileStatus[] files = listStatus(job);
|
|
|
|
|
|
- // Applications may have overridden listPaths so we need to check if
|
|
|
- // it returns a different set of paths to listStatus.
|
|
|
- // If it does we revert to the old behavior using Paths not FileStatus
|
|
|
- // objects.
|
|
|
- // When listPaths is removed, this check can be removed too.
|
|
|
- Path[] paths = listPaths(job);
|
|
|
- if (!Arrays.equals(paths, FileUtil.stat2Paths(files))) {
|
|
|
- LOG.warn("FileInputFormat#listPaths is deprecated, override listStatus " +
|
|
|
- "instead.");
|
|
|
- return getSplitsForPaths(job, numSplits, paths);
|
|
|
- }
|
|
|
long totalSize = 0; // compute total size
|
|
|
for (FileStatus file: files) { // check we have valid files
|
|
|
if (file.isDir()) {
|
|
@@ -265,57 +234,6 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
LOG.debug("Total # of splits: " + splits.size());
|
|
|
return splits.toArray(new FileSplit[splits.size()]);
|
|
|
}
|
|
|
-
|
|
|
- @Deprecated
|
|
|
- private InputSplit[] getSplitsForPaths(JobConf job, int numSplits,
|
|
|
- Path[] files) throws IOException {
|
|
|
- long totalSize = 0; // compute total size
|
|
|
- for (int i = 0; i < files.length; i++) { // check we have valid files
|
|
|
- Path file = files[i];
|
|
|
- FileSystem fs = file.getFileSystem(job);
|
|
|
- if (fs.isDirectory(file) || !fs.exists(file)) {
|
|
|
- throw new IOException("Not a file: "+files[i]);
|
|
|
- }
|
|
|
- totalSize += fs.getLength(files[i]);
|
|
|
- }
|
|
|
-
|
|
|
- long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
|
|
|
- long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
|
|
|
- minSplitSize);
|
|
|
-
|
|
|
- // generate splits
|
|
|
- ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
|
|
|
- for (int i = 0; i < files.length; i++) {
|
|
|
- Path file = files[i];
|
|
|
- FileSystem fs = file.getFileSystem(job);
|
|
|
- long length = fs.getLength(file);
|
|
|
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
|
|
|
- if ((length != 0) && isSplitable(fs, file)) {
|
|
|
- long blockSize = fs.getBlockSize(file);
|
|
|
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
|
|
|
-
|
|
|
- long bytesRemaining = length;
|
|
|
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
|
|
|
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
|
|
|
- splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
|
|
|
- blkLocations[blkIndex].getHosts()));
|
|
|
- bytesRemaining -= splitSize;
|
|
|
- }
|
|
|
-
|
|
|
- if (bytesRemaining != 0) {
|
|
|
- splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining,
|
|
|
- blkLocations[blkLocations.length-1].getHosts()));
|
|
|
- }
|
|
|
- } else if (length != 0) {
|
|
|
- splits.add(new FileSplit(file, 0, length, blkLocations[0].getHosts()));
|
|
|
- } else {
|
|
|
- //Create empty hosts array for zero length files
|
|
|
- splits.add(new FileSplit(file, 0, length, new String[0]));
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.debug("Total # of splits: " + splits.size());
|
|
|
- return splits.toArray(new FileSplit[splits.size()]);
|
|
|
- }
|
|
|
|
|
|
protected long computeSplitSize(long goalSize, long minSize,
|
|
|
long blockSize) {
|