|
@@ -22,12 +22,14 @@ import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.PathFilter;
|
|
|
|
|
|
/** A base class for {@link InputFormat}. */
|
|
|
public abstract class InputFormatBase implements InputFormat {
|
|
@@ -38,7 +40,12 @@ public abstract class InputFormatBase implements InputFormat {
|
|
|
private static final double SPLIT_SLOP = 1.1; // 10% slop
|
|
|
|
|
|
private long minSplitSize = 1;
|
|
|
-
|
|
|
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
|
|
|
+ public boolean accept( Path p ){
|
|
|
+ String name = p.getName();
|
|
|
+ return !name.startsWith("_") && !name.startsWith(".");
|
|
|
+ }
|
|
|
+ };
|
|
|
protected void setMinSplitSize(long minSplitSize) {
|
|
|
this.minSplitSize = minSplitSize;
|
|
|
}
|
|
@@ -63,39 +70,23 @@ public abstract class InputFormatBase implements InputFormat {
|
|
|
* Subclasses may override to, e.g., select only files matching a regular
|
|
|
* expression.
|
|
|
*
|
|
|
- * <p>Property <code>mapred.input.subdir</code>, if set, names a subdirectory
|
|
|
- * that is appended to all input dirs specified by job, and if the given fs
|
|
|
- * lists those too, each is added to the returned array of Path.
|
|
|
- *
|
|
|
* @param job the job to list input paths for
|
|
|
* @return array of Path objects
|
|
|
* @throws IOException if zero items.
|
|
|
*/
|
|
|
protected Path[] listPaths(JobConf job)
|
|
|
throws IOException {
|
|
|
- String subdir = job.get("mapred.input.subdir");
|
|
|
Path[] dirs = job.getInputPaths();
|
|
|
if (dirs.length == 0) {
|
|
|
- throw new IOException("No input directories specified in job");
|
|
|
+ throw new IOException("No input paths specified in job");
|
|
|
}
|
|
|
- ArrayList result = new ArrayList();
|
|
|
- for (int i = 0; i < dirs.length; i++) {
|
|
|
- FileSystem fs = dirs[i].getFileSystem(job);
|
|
|
- Path[] dir = fs.listPaths(dirs[i]);
|
|
|
- if (dir != null) {
|
|
|
- for (int j = 0; j < dir.length; j++) {
|
|
|
- Path file = dir[j];
|
|
|
- if (subdir != null) {
|
|
|
- Path[] subFiles = fs.listPaths(new Path(file, subdir));
|
|
|
- if (subFiles != null) {
|
|
|
- for (int k = 0; k < subFiles.length; k++) {
|
|
|
- result.add(fs.makeQualified(subFiles[k]));
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- result.add(fs.makeQualified(file));
|
|
|
- }
|
|
|
- }
|
|
|
+ List<Path> result = new ArrayList();
|
|
|
+ for (Path p: dirs) {
|
|
|
+ FileSystem fs = p.getFileSystem(job);
|
|
|
+ Path[] matches =
|
|
|
+ fs.listPaths(fs.globPaths(p, hiddenFileFilter),hiddenFileFilter);
|
|
|
+ for (Path match: matches) {
|
|
|
+ result.add(fs.makeQualified(match));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -104,22 +95,50 @@ public abstract class InputFormatBase implements InputFormat {
|
|
|
|
|
|
public void validateInput(JobConf job) throws IOException {
|
|
|
Path[] inputDirs = job.getInputPaths();
|
|
|
+ if (inputDirs.length == 0) {
|
|
|
+ throw new IOException("No input paths specified in input");
|
|
|
+ }
|
|
|
+
|
|
|
List<IOException> result = new ArrayList();
|
|
|
- for(int i=0; i < inputDirs.length; ++i) {
|
|
|
- FileSystem fs = inputDirs[i].getFileSystem(job);
|
|
|
- if (!fs.exists(inputDirs[i])) {
|
|
|
- result.add(new FileNotFoundException("Input directory " +
|
|
|
- inputDirs[i] +
|
|
|
- " doesn't exist."));
|
|
|
- } else if (!fs.isDirectory(inputDirs[i])) {
|
|
|
- result.add(new InvalidFileTypeException
|
|
|
- ("Invalid input path, expecting directory : " +
|
|
|
- inputDirs[i]));
|
|
|
+ int totalFiles = 0;
|
|
|
+ for (Path p: inputDirs) {
|
|
|
+ FileSystem fs = p.getFileSystem(job);
|
|
|
+ if (fs.exists(p)) {
|
|
|
+ // make sure all paths are files to avoid exception
|
|
|
+ // while generating splits
|
|
|
+ for (Path subPath : fs.listPaths(p, hiddenFileFilter)) {
|
|
|
+ FileSystem subFS = subPath.getFileSystem(job);
|
|
|
+ if (!subFS.isFile(subPath)) {
|
|
|
+ result.add(new IOException(
|
|
|
+ "Input path is not a file : " + subPath));
|
|
|
+ } else {
|
|
|
+ totalFiles++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Path [] paths = fs.globPaths(p, hiddenFileFilter);
|
|
|
+ if (paths.length == 0) {
|
|
|
+ result.add(
|
|
|
+ new IOException("Input Pattern " + p + " matches 0 files"));
|
|
|
+ } else {
|
|
|
+ // validate globbed paths
|
|
|
+ for (Path gPath : paths) {
|
|
|
+ FileSystem gPathFS = gPath.getFileSystem(job);
|
|
|
+ if (!gPathFS.exists(gPath)) {
|
|
|
+ result.add(
|
|
|
+ new FileNotFoundException(
|
|
|
+ "Input path doesnt exist : " + gPath));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ totalFiles += paths.length ;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (!result.isEmpty()) {
|
|
|
throw new InvalidInputException(result);
|
|
|
}
|
|
|
+ // send output to client.
|
|
|
+ LOG.info("Total input paths to process : " + totalFiles);
|
|
|
}
|
|
|
|
|
|
/** Splits files returned by {@link #listPaths(JobConf)} when
|
|
@@ -146,7 +165,7 @@ public abstract class InputFormatBase implements InputFormat {
|
|
|
Path file = files[i];
|
|
|
FileSystem fs = file.getFileSystem(job);
|
|
|
long length = fs.getLength(file);
|
|
|
- if (isSplitable(fs, file)) {
|
|
|
+ if (isSplitable(fs, file)) {
|
|
|
long blockSize = fs.getBlockSize(file);
|
|
|
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
|
|
|
|