|
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
/**
|
|
|
* A base class for file-based {@link InputFormat}.
|
|
@@ -58,6 +59,28 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
this.minSplitSize = minSplitSize;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Proxy PathFilter that accepts a path only if all filters given in the
|
|
|
+ * constructor do. Used by the listPaths() to apply the built-in
|
|
|
+ * hiddenFileFilter together with a user provided one (if any).
|
|
|
+ */
|
|
|
+ private static class MultiPathFilter implements PathFilter {
|
|
|
+ private List<PathFilter> filters;
|
|
|
+
|
|
|
+ public MultiPathFilter(List<PathFilter> filters) {
|
|
|
+ this.filters = filters;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean accept(Path path) {
|
|
|
+ for (PathFilter filter : filters) {
|
|
|
+ if (!filter.accept(path)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Is the given filename splitable? Usually, true, but if the file is
|
|
|
* stream compressed, it will not be.
|
|
@@ -79,6 +102,28 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
Reporter reporter)
|
|
|
throws IOException;
|
|
|
|
|
|
+ /**
|
|
|
+ * Set a PathFilter to be applied to the input paths for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param filter the PathFilter class use for filtering the input paths.
|
|
|
+ */
|
|
|
+ public static void setInputPathFilter(JobConf conf,
|
|
|
+ Class<? extends PathFilter> filter) {
|
|
|
+ conf.setClass("mapred.input.pathFilter.class", filter, PathFilter.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a PathFilter instance of the filter set for the input paths.
|
|
|
+ *
|
|
|
+ * @return the PathFilter instance set for the job, NULL if none has been set.
|
|
|
+ */
|
|
|
+ public static PathFilter getInputPathFilter(JobConf conf) {
|
|
|
+ Class filterClass = conf.getClass("mapred.input.pathFilter.class", null,
|
|
|
+ PathFilter.class);
|
|
|
+ return (filterClass != null) ?
|
|
|
+ (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
|
|
|
+ }
|
|
|
+
|
|
|
/** List input directories.
|
|
|
* Subclasses may override to, e.g., select only files matching a regular
|
|
|
* expression.
|
|
@@ -93,11 +138,23 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
if (dirs.length == 0) {
|
|
|
throw new IOException("No input paths specified in job");
|
|
|
}
|
|
|
- List<Path> result = new ArrayList<Path>();
|
|
|
+
|
|
|
+ List<Path> result = new ArrayList<Path>();
|
|
|
+
|
|
|
+ // creates a MultiPathFilter with the hiddenFileFilter and the
|
|
|
+ // user provided one (if any).
|
|
|
+ List<PathFilter> filters = new ArrayList<PathFilter>();
|
|
|
+ filters.add(hiddenFileFilter);
|
|
|
+ PathFilter jobFilter = getInputPathFilter(job);
|
|
|
+ if (jobFilter != null) {
|
|
|
+ filters.add(jobFilter);
|
|
|
+ }
|
|
|
+ PathFilter inputFilter = new MultiPathFilter(filters);
|
|
|
+
|
|
|
for (Path p: dirs) {
|
|
|
FileSystem fs = p.getFileSystem(job);
|
|
|
Path[] matches =
|
|
|
- fs.listPaths(fs.globPaths(p, hiddenFileFilter), hiddenFileFilter);
|
|
|
+ fs.listPaths(fs.globPaths(p, inputFilter), inputFilter);
|
|
|
for (Path match: matches) {
|
|
|
result.add(fs.makeQualified(match));
|
|
|
}
|