|
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.BlockLocation;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
|
* A base class for file-based {@link InputFormat}.
|
|
@@ -136,7 +137,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
*/
|
|
|
protected Path[] listPaths(JobConf job)
|
|
|
throws IOException {
|
|
|
- Path[] dirs = job.getInputPaths();
|
|
|
+ Path[] dirs = getInputPaths(job);
|
|
|
if (dirs.length == 0) {
|
|
|
throw new IOException("No input paths specified in job");
|
|
|
}
|
|
@@ -167,7 +168,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
}
|
|
|
|
|
|
public void validateInput(JobConf job) throws IOException {
|
|
|
- Path[] inputDirs = job.getInputPaths();
|
|
|
+ Path[] inputDirs = getInputPaths(job);
|
|
|
if (inputDirs.length == 0) {
|
|
|
throw new IOException("No input paths specified in input");
|
|
|
}
|
|
@@ -285,4 +286,121 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the given comma separated paths as the list of inputs
|
|
|
+ * for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf Configuration of the job
|
|
|
+ * @param commaSeparatedPaths Comma separated paths to be set as
|
|
|
+ * the list of inputs for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
|
|
|
+ setInputPaths(conf, StringUtils.stringToPath(
|
|
|
+ getPathStrings(commaSeparatedPaths)));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add the given comma separated paths to the list of inputs for
|
|
|
+ * the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf The configuration of the job
|
|
|
+ * @param commaSeparatedPaths Comma separated paths to be added to
|
|
|
+ * the list of inputs for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
|
|
|
+ for (String str : getPathStrings(commaSeparatedPaths)) {
|
|
|
+ addInputPath(conf, new Path(str));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set the array of {@link Path}s as the list of inputs
|
|
|
+ * for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf Configuration of the job.
|
|
|
+ * @param inputPaths the {@link Path}s of the input directories/files
|
|
|
+ * for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void setInputPaths(JobConf conf, Path... inputPaths) {
|
|
|
+ Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
|
|
|
+ StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
|
|
|
+ for(int i = 1; i < inputPaths.length;i++) {
|
|
|
+ str.append(StringUtils.COMMA_STR);
|
|
|
+ path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
|
|
|
+ str.append(StringUtils.escapeString(path.toString()));
|
|
|
+ }
|
|
|
+ conf.set("mapred.input.dir", str.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a {@link Path} to the list of inputs for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf The configuration of the job
|
|
|
+ * @param path {@link Path} to be added to the list of inputs for
|
|
|
+ * the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void addInputPath(JobConf conf, Path path ) {
|
|
|
+ path = new Path(conf.getWorkingDirectory(), path);
|
|
|
+ String dirStr = StringUtils.escapeString(path.toString());
|
|
|
+ String dirs = conf.get("mapred.input.dir");
|
|
|
+ conf.set("mapred.input.dir", dirs == null ? dirStr :
|
|
|
+ dirs + StringUtils.COMMA_STR + dirStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ // This method escapes commas in the glob pattern of the given paths.
|
|
|
+ private static String[] getPathStrings(String commaSeparatedPaths) {
|
|
|
+ int length = commaSeparatedPaths.length();
|
|
|
+ int curlyOpen = 0;
|
|
|
+ int pathStart = 0;
|
|
|
+ boolean globPattern = false;
|
|
|
+ List<String> pathStrings = new ArrayList<String>();
|
|
|
+
|
|
|
+ for (int i=0; i<length; i++) {
|
|
|
+ char ch = commaSeparatedPaths.charAt(i);
|
|
|
+ switch(ch) {
|
|
|
+ case '{' : {
|
|
|
+ curlyOpen++;
|
|
|
+ if (!globPattern) {
|
|
|
+ globPattern = true;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case '}' : {
|
|
|
+ curlyOpen--;
|
|
|
+ if (curlyOpen == 0 && globPattern) {
|
|
|
+ globPattern = false;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case ',' : {
|
|
|
+ if (!globPattern) {
|
|
|
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
|
|
|
+ pathStart = i + 1 ;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
|
|
|
+
|
|
|
+ return pathStrings.toArray(new String[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the list of input {@link Path}s for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf The configuration of the job
|
|
|
+ * @return the list of input {@link Path}s for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static Path[] getInputPaths(JobConf conf) {
|
|
|
+ String dirs = conf.get("mapred.input.dir", "");
|
|
|
+ String [] list = StringUtils.split(dirs);
|
|
|
+ Path[] result = new Path[list.length];
|
|
|
+ for (int i = 0; i < list.length; i++) {
|
|
|
+ result[i] = new Path(StringUtils.unEscapeString(list[i]));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
}
|