Przeglądaj źródła

Merge -r 648753:648754 from trunk to branch-0.17 to fix HADOOP-3162

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@648765 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 17 lat temu
rodzic
commit
b2da46e307
67 zmienionych plików z 372 dodań i 168 usunięć
  1. 3 0
      CHANGES.txt
  2. 2 5
      src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
  3. 2 1
      src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java
  4. 3 6
      src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdater.java
  5. 3 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
  6. 6 3
      src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
  7. 6 3
      src/docs/src/documentation/content/xdocs/site.xml
  8. 2 2
      src/examples/org/apache/hadoop/examples/Grep.java
  9. 2 1
      src/examples/org/apache/hadoop/examples/MultiFileWordCount.java
  10. 2 1
      src/examples/org/apache/hadoop/examples/PiEstimator.java
  11. 2 1
      src/examples/org/apache/hadoop/examples/SleepJob.java
  12. 2 2
      src/examples/org/apache/hadoop/examples/Sort.java
  13. 2 1
      src/examples/org/apache/hadoop/examples/WordCount.java
  14. 1 1
      src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
  15. 120 2
      src/java/org/apache/hadoop/mapred/FileInputFormat.java
  16. 17 15
      src/java/org/apache/hadoop/mapred/JobConf.java
  17. 2 1
      src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
  18. 2 1
      src/java/org/apache/hadoop/mapred/join/Parser.java
  19. 2 4
      src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java
  20. 3 1
      src/java/org/apache/hadoop/mapred/pipes/Submitter.java
  21. 2 1
      src/java/org/apache/hadoop/tools/Logalyzer.java
  22. 0 38
      src/test/org/apache/hadoop/conf/TestJobConf.java
  23. 2 1
      src/test/org/apache/hadoop/dfs/NNBench.java
  24. 1 1
      src/test/org/apache/hadoop/fs/DFSCIOTest.java
  25. 1 1
      src/test/org/apache/hadoop/fs/DistributedFSCheck.java
  26. 1 1
      src/test/org/apache/hadoop/fs/TestDFSIO.java
  27. 4 3
      src/test/org/apache/hadoop/fs/TestFileSystem.java
  28. 2 2
      src/test/org/apache/hadoop/io/FileBench.java
  29. 1 1
      src/test/org/apache/hadoop/mapred/BigMapOutput.java
  30. 3 3
      src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
  31. 2 2
      src/test/org/apache/hadoop/mapred/MRBench.java
  32. 1 1
      src/test/org/apache/hadoop/mapred/MRCaching.java
  33. 1 1
      src/test/org/apache/hadoop/mapred/NotificationTestCase.java
  34. 1 1
      src/test/org/apache/hadoop/mapred/PiEstimator.java
  35. 12 12
      src/test/org/apache/hadoop/mapred/SortValidator.java
  36. 1 1
      src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
  37. 1 1
      src/test/org/apache/hadoop/mapred/TestComparators.java
  38. 1 1
      src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
  39. 1 1
      src/test/org/apache/hadoop/mapred/TestFieldSelection.java
  40. 1 1
      src/test/org/apache/hadoop/mapred/TestFileInputFormatPathFilter.java
  41. 109 0
      src/test/org/apache/hadoop/mapred/TestInputPath.java
  42. 1 1
      src/test/org/apache/hadoop/mapred/TestJavaSerialization.java
  43. 1 1
      src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
  44. 2 2
      src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
  45. 1 1
      src/test/org/apache/hadoop/mapred/TestMapOutputType.java
  46. 5 5
      src/test/org/apache/hadoop/mapred/TestMapRed.java
  47. 2 2
      src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
  48. 1 1
      src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
  49. 1 1
      src/test/org/apache/hadoop/mapred/TestMiniMRTaskTempDir.java
  50. 1 1
      src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
  51. 1 1
      src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
  52. 1 1
      src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
  53. 1 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
  54. 1 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java
  55. 1 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
  56. 1 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
  57. 1 1
      src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
  58. 3 3
      src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
  59. 1 1
      src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
  60. 2 6
      src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java
  61. 1 1
      src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java
  62. 1 1
      src/test/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java
  63. 2 1
      src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
  64. 2 2
      src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java
  65. 3 3
      src/test/org/apache/hadoop/record/TestRecordMR.java
  66. 1 1
      src/test/org/apache/hadoop/record/TestRecordWritable.java
  67. 2 1
      src/test/testshell/ExternalMapReduce.java

+ 3 - 0
CHANGES.txt

@@ -579,6 +579,9 @@ Release 0.17.0 - Unreleased
     HADOOP-3256. Encodes the job name used in the filename for history files.
     (Arun Murthy via ddas)
 
+    HADOOP-3162. Ensure that comma-separated input paths are treated correctly
+    as multiple input paths. (Amareshwari Sri Ramadasu via acmurthy)
+
 Release 0.16.3 - 2008-04-16
 
   BUG FIXES

+ 2 - 5
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -92,11 +93,7 @@ public class DataJoinJob {
 
     FileSystem fs = FileSystem.get(defaults);
     fs.delete(new Path(outputDir));
-    String[] inputDirsSpecs = inputDir.split(",");
-    for (int i = 0; i < inputDirsSpecs.length; i++) {
-      String spec = inputDirsSpecs[i];
-      job.addInputPath(new Path(spec));
-    }
+    FileInputFormat.setInputPaths(job, inputDir);
 
     job.setInputFormat(inputFormat);
 

+ 2 - 1
src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -205,7 +206,7 @@ public class UpdateIndex {
     if (inputPathsString != null) {
       jobConf.set("mapred.input.dir", inputPathsString);
     }
-    inputPaths = jobConf.getInputPaths();
+    inputPaths = FileInputFormat.getInputPaths(jobConf);
     if (inputPaths.length == 0) {
       inputPaths = null;
     }

+ 3 - 6
src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdater.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
 import org.apache.hadoop.contrib.index.lucene.LuceneUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -75,11 +76,7 @@ public class IndexUpdater implements IIndexUpdater {
         + System.currentTimeMillis());
 
     // provided by application
-    jobConf.setInputPath(inputPaths[0]);
-    for (int i = 1; i < inputPaths.length; i++) {
-      jobConf.addInputPath(inputPaths[i]);
-    }
-
+    FileInputFormat.setInputPaths(jobConf, inputPaths);
     FileOutputFormat.setOutputPath(jobConf, outputPath);
 
     jobConf.setNumMapTasks(numMapTasks);
@@ -89,7 +86,7 @@ public class IndexUpdater implements IIndexUpdater {
 
     jobConf.setInputFormat(iconf.getIndexInputFormatClass());
 
-    Path[] inputs = jobConf.getInputPaths();
+    Path[] inputs = FileInputFormat.getInputPaths(jobConf);
     StringBuilder buffer = new StringBuilder(inputs[0].toString());
     for (int i = 1; i < inputs.length; i++) {
       buffer.append(",");

+ 3 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapred.JobConf;
@@ -687,7 +688,8 @@ public class StreamJob {
     // (to resolve local vs. dfs drive letter differences) 
     // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
     for (int i = 0; i < inputSpecs_.size(); i++) {
-      jobConf_.addInputPath(new Path(((String) inputSpecs_.get(i))));
+      FileInputFormat.addInputPaths(jobConf_, 
+                        (String) inputSpecs_.get(i));
     }
     jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
 

+ 6 - 3
src/docs/src/documentation/content/xdocs/mapred_tutorial.xml

@@ -503,7 +503,7 @@
             <td>52.</td>
             <td>
               &nbsp;&nbsp;&nbsp;&nbsp;
-              <code>conf.setInputPath(new Path(args[0]));</code>
+              <code>FileInputFormat.setInputPaths(conf, new Path(args[0]));</code>
             </td>
           </tr>
           <tr>
@@ -1000,7 +1000,10 @@
         <code>Reducer</code>, <code>InputFormat</code> and 
         <code>OutputFormat</code> implementations. <code>JobConf</code> also 
         indicates the set of input files 
-        (<a href="ext:api/org/apache/hadoop/mapred/jobconf/setinputpath">setInputPath(Path)</a>/<a href="ext:api/org/apache/hadoop/mapred/jobconf/addinputpath">addInputPath(Path)</a>)
+        (<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/setinputpaths">setInputPaths(JobConf, Path...)</a>
+        /<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/addinputpath">addInputPath(JobConf, Path)</a>)
+        and (<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/setinputpathstring">setInputPaths(JobConf, String)</a>
+        /<a href="ext:api/org/apache/hadoop/mapred/fileinputformat/addinputpathstring">addInputPaths(JobConf, String)</a>)
         and where the output files should be written
         (<a href="ext:api/org/apache/hadoop/mapred/fileoutputformat/setoutputpath">setOutputPath(Path)</a>).</p>
 
@@ -2388,7 +2391,7 @@
             <td>111.</td>
             <td>
               &nbsp;&nbsp;&nbsp;&nbsp;
-              <code>conf.setInputPath(new Path(other_args.get(0)));</code>
+              <code>FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));</code>
             </td>
           </tr>
           <tr>

+ 6 - 3
src/docs/src/documentation/content/xdocs/site.xml

@@ -125,7 +125,12 @@ See http://forrest.apache.org/docs/linking.html for more info.
             <mapred href="mapred/">
               <clusterstatus href="ClusterStatus.html" />
               <counters href="Counters.html" />
-              <fileinputformat href="FileInputFormat.html" />
+              <fileinputformat href="FileInputFormat.html">
+                 <setinputpaths href="#setInputPaths(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path[])" />
+                 <addinputpath href="#addInputPath(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.fs.Path)" />
+                 <setinputpathstring href="#setInputPaths(org.apache.hadoop.mapred.JobConf,%20java.lang.String)" />
+                 <addinputpathstring href="#addInputPath(org.apache.hadoop.mapred.JobConf,%20java.lang.String)" />
+              </fileinputformat>
               <fileoutputformat href="FileInputFormat.html">
                 <getoutputpath href="#getOutputPath(org.apache.hadoop.mapred.JobConf)" />
                 <getworkoutputpath href="#getWorkOutputPath(org.apache.hadoop.mapred.JobConf)" />
@@ -144,8 +149,6 @@ See http://forrest.apache.org/docs/linking.html for more info.
                 <setnumreducetasks href="#setNumReduceTasks(int)" />
                 <setoutputkeycomparatorclass href="#setOutputKeyComparatorClass(java.lang.Class)" />
                 <setoutputvaluegroupingcomparator href="#setOutputValueGroupingComparator(java.lang.Class)" />
-                <setinputpath href="#setInputPath(org.apache.hadoop.fs.Path)" />
-                <addinputpath href="#addInputPath(org.apache.hadoop.fs.Path)" />
                 <setcombinerclass href="#setCombinerClass(java.lang.Class)" />
                 <setmapdebugscript href="#setMapDebugScript(java.lang.String)" />
                 <setreducedebugscript href="#setReduceDebugScript(java.lang.String)" />

+ 2 - 2
src/examples/org/apache/hadoop/examples/Grep.java

@@ -51,7 +51,7 @@ public class Grep extends Configured implements Tool {
       
       grepJob.setJobName("grep-search");
 
-      grepJob.setInputPath(new Path(args[0]));
+      FileInputFormat.setInputPaths(grepJob, args[0]);
 
       grepJob.setMapperClass(RegexMapper.class);
       grepJob.set("mapred.mapper.regex", args[2]);
@@ -71,7 +71,7 @@ public class Grep extends Configured implements Tool {
       JobConf sortJob = new JobConf(Grep.class);
       sortJob.setJobName("grep-sort");
 
-      sortJob.setInputPath(tempDir);
+      FileInputFormat.setInputPaths(sortJob, tempDir);
       sortJob.setInputFormat(SequenceFileInputFormat.class);
 
       sortJob.setMapperClass(InverseMapper.class);

+ 2 - 1
src/examples/org/apache/hadoop/examples/MultiFileWordCount.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobClient;
@@ -250,7 +251,7 @@ public class MultiFileWordCount extends Configured implements Tool {
     job.setCombinerClass(WordCount.Reduce.class);
     job.setReducerClass(WordCount.Reduce.class);
 
-    job.addInputPath(new Path(args[0]));
+    FileInputFormat.addInputPaths(job, args[0]);
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
     JobClient.runJob(job);

+ 2 - 1
src/examples/org/apache/hadoop/examples/PiEstimator.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -189,7 +190,7 @@ public class PiEstimator extends Configured implements Tool {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
     }
     
-    jobConf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(jobConf, inDir);
     FileOutputFormat.setOutputPath(jobConf, outDir);
     
     jobConf.setNumMapTasks(numMaps);

+ 2 - 1
src/examples/org/apache/hadoop/examples/SleepJob.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -146,7 +147,7 @@ public class SleepJob extends Configured implements Tool,
       job.setInputFormat(SequenceFileInputFormat.class);
       job.setSpeculativeExecution(false);
       job.setJobName("Sleep job");
-      job.addInputPath(tempPath);
+      FileInputFormat.addInputPath(job, tempPath);
       job.setLong("sleep.job.map.sleep.time", mapSleepTime);
       job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
       job.setLong("sleep.job.map.sleep.count", mapSleepCount);

+ 2 - 2
src/examples/org/apache/hadoop/examples/Sort.java

@@ -133,13 +133,13 @@ public class Sort extends Configured implements Tool {
           otherArgs.size() + " instead of 2.");
       return printUsage();
     }
-    jobConf.setInputPath(new Path(otherArgs.get(0)));
+    FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
     FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
 
     System.out.println("Running on " +
         cluster.getTaskTrackers() +
         " nodes to sort from " + 
-        jobConf.getInputPaths()[0] + " into " +
+        FileInputFormat.getInputPaths(jobConf)[0] + " into " +
         FileOutputFormat.getOutputPath(jobConf) +
         " with " + num_reduces + " reduces.");
     Date startTime = new Date();

+ 2 - 1
src/examples/org/apache/hadoop/examples/WordCount.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -142,7 +143,7 @@ public class WordCount extends Configured implements Tool {
                          other_args.size() + " instead of 2.");
       return printUsage();
     }
-    conf.setInputPath(new Path(other_args.get(0)));
+    FileInputFormat.setInputPaths(conf, other_args.get(0));
     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
         
     JobClient.runJob(conf);

+ 1 - 1
src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java

@@ -177,7 +177,7 @@ public class DistributedPentomino extends Configured implements Tool {
     Path input = new Path(output + "_input");
     FileSystem fileSys = FileSystem.get(conf);
     try {
-      conf.setInputPath(input);
+      FileInputFormat.setInputPaths(conf, input);
       FileOutputFormat.setOutputPath(conf, output);
       conf.setJarByClass(PentMap.class);
       

+ 120 - 2
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /** 
  * A base class for file-based {@link InputFormat}.
@@ -136,7 +137,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
    */
   protected Path[] listPaths(JobConf job)
     throws IOException {
-    Path[] dirs = job.getInputPaths();
+    Path[] dirs = getInputPaths(job);
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
     }
@@ -167,7 +168,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
   }
 
   public void validateInput(JobConf job) throws IOException {
-    Path[] inputDirs = job.getInputPaths();
+    Path[] inputDirs = getInputPaths(job);
     if (inputDirs.length == 0) {
       throw new IOException("No input paths specified in input"); 
     }
@@ -285,4 +286,121 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
     }
     return 0;
   }
+
+  /**
+   * Sets the given comma separated paths as the list of inputs 
+   * for the map-reduce job.
+   * 
+   * @param conf Configuration of the job
+   * @param commaSeparatedPaths Comma separated paths to be set as 
+   *        the list of inputs for the map-reduce job.
+   */
+  public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
+    setInputPaths(conf, StringUtils.stringToPath(
+                        getPathStrings(commaSeparatedPaths)));
+  }
+
+  /**
+   * Add the given comma separated paths to the list of inputs for
+   *  the map-reduce job.
+   * 
+   * @param conf The configuration of the job 
+   * @param commaSeparatedPaths Comma separated paths to be added to
+   *        the list of inputs for the map-reduce job.
+   */
+  public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
+    for (String str : getPathStrings(commaSeparatedPaths)) {
+      addInputPath(conf, new Path(str));
+    }
+  }
+
+  /**
+   * Set the array of {@link Path}s as the list of inputs
+   * for the map-reduce job.
+   * 
+   * @param conf Configuration of the job. 
+   * @param inputPaths the {@link Path}s of the input directories/files 
+   * for the map-reduce job.
+   */ 
+  public static void setInputPaths(JobConf conf, Path... inputPaths) {
+    Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
+    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
+    for(int i = 1; i < inputPaths.length;i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
+      str.append(StringUtils.escapeString(path.toString()));
+    }
+    conf.set("mapred.input.dir", str.toString());
+  }
+
+  /**
+   * Add a {@link Path} to the list of inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job 
+   * @param path {@link Path} to be added to the list of inputs for 
+   *            the map-reduce job.
+   */
+  public static void addInputPath(JobConf conf, Path path ) {
+    path = new Path(conf.getWorkingDirectory(), path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get("mapred.input.dir");
+    conf.set("mapred.input.dir", dirs == null ? dirStr :
+      dirs + StringUtils.COMMA_STR + dirStr);
+  }
+
+  // This method escapes commas in the glob pattern of the given paths. 
+  private static String[] getPathStrings(String commaSeparatedPaths) {
+    int length = commaSeparatedPaths.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+    
+    for (int i=0; i<length; i++) {
+      char ch = commaSeparatedPaths.charAt(i);
+      switch(ch) {
+        case '{' : {
+          curlyOpen++;
+          if (!globPattern) {
+            globPattern = true;
+          }
+          break;
+        }
+        case '}' : {
+          curlyOpen--;
+          if (curlyOpen == 0 && globPattern) {
+            globPattern = false;
+          }
+          break;
+        }
+        case ',' : {
+          if (!globPattern) {
+            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+            pathStart = i + 1 ;
+          }
+          break;
+        }
+      }
+    }
+    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+    
+    return pathStrings.toArray(new String[0]);
+  }
+  
+  /**
+   * Get the list of input {@link Path}s for the map-reduce job.
+   * 
+   * @param conf The configuration of the job 
+   * @return the list of input {@link Path}s for the map-reduce job.
+   */
+  public static Path[] getInputPaths(JobConf conf) {
+    String dirs = conf.get("mapred.input.dir", "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
 }

+ 17 - 15
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -21,10 +21,10 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
-import java.util.StringTokenizer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.StringTokenizer;
 
 import java.net.URL;
 import java.net.URLDecoder;
@@ -45,7 +45,6 @@ import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 
 /** 
@@ -70,10 +69,7 @@ import org.apache.hadoop.util.Tool;
  * 
  * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
  * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
- * {@link OutputFormat} implementations to be used etc. It also indicates the 
- * set of input files ({@link #setInputPath(Path)}/{@link #addInputPath(Path)}), 
- * and where the output files should be written via
- * {@link FileOutputFormat#setOutputPath(JobConf, Path)}.
+ * {@link OutputFormat} implementations to be used etc.
  *
  * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
  * of the job such as <code>Comparator</code>s to be used, files to be put in  
@@ -91,7 +87,7 @@ import org.apache.hadoop.util.Tool;
  *     // Specify various job-specific parameters     
  *     job.setJobName("myjob");
  *     
- *     job.setInputPath(new Path("in"));
+ *     FileInputFormat.setInputPaths(job, new Path("in"));
  *     FileOutputFormat.setOutputPath(job, new Path("out"));
  *     
  *     job.setMapperClass(MyJob.MyMapper.class);
@@ -228,10 +224,13 @@ public class JobConf extends Configuration {
    * Set the {@link Path} of the input directory for the map-reduce job.
    * 
    * @param dir the {@link Path} of the input directory for the map-reduce job.
+   * @deprecated Use {@link FileInputFormat#setInputPaths(JobConf, Path...)} or
+   *                 {@link FileInputFormat#setInputPaths(JobConf, String)}
    */
+  @Deprecated
   public void setInputPath(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
-    set("mapred.input.dir", StringUtils.escapeString(dir.toString()));
+    set("mapred.input.dir", dir.toString());
   }
 
   /**
@@ -239,26 +238,29 @@ public class JobConf extends Configuration {
    * 
    * @param dir {@link Path} to be added to the list of inputs for 
    *            the map-reduce job.
+   * @deprecated Use {@link FileInputFormat#addInputPath(JobConf, Path)} or
+   *                 {@link FileInputFormat#addInputPaths(JobConf, String)}
    */
+  @Deprecated
   public void addInputPath(Path dir) {
     dir = new Path(getWorkingDirectory(), dir);
-    String dirStr = StringUtils.escapeString(dir.toString());
     String dirs = get("mapred.input.dir");
-    set("mapred.input.dir", dirs == null ? dirStr :
-      dirs + StringUtils.COMMA_STR + dirStr);
+    set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
   }
 
   /**
    * Get the list of input {@link Path}s for the map-reduce job.
    * 
    * @return the list of input {@link Path}s for the map-reduce job.
+   * @deprecated Use {@link FileInputFormat#getInputPaths(JobConf)}
    */
+  @Deprecated
   public Path[] getInputPaths() {
     String dirs = get("mapred.input.dir", "");
-    String [] list = StringUtils.split(dirs);
-    Path[] result = new Path[list.length];
-    for (int i = 0; i < list.length; i++) {
-      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    ArrayList<Object> list = Collections.list(new StringTokenizer(dirs, ","));
+    Path[] result = new Path[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      result[i] = new Path((String)list.get(i));
     }
     return result;
   }

+ 2 - 1
src/java/org/apache/hadoop/mapred/jobcontrol/Job.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred.jobcontrol;
 
 
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
@@ -332,7 +333,7 @@ public class Job {
     try {
       if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
         FileSystem fs = FileSystem.get(theJobConf);
-        Path inputPaths[] = theJobConf.getInputPaths();
+        Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
         for (int i = 0; i < inputPaths.length; i++) {
           if (!fs.exists(inputPaths[i])) {
             try {

+ 2 - 1
src/java/org/apache/hadoop/mapred/join/Parser.java

@@ -34,6 +34,7 @@ import java.util.Stack;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -291,7 +292,7 @@ public class Parser {
 
     private JobConf getConf(JobConf job) {
       JobConf conf = new JobConf(job);
-      conf.setInputPath(new Path(indir));
+      FileInputFormat.setInputPaths(conf, indir);
       return conf;
     }
 

+ 2 - 4
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJob.java

@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -161,10 +162,7 @@ public class ValueAggregatorJob {
     }
     theJob.setJobName("ValueAggregatorJob: " + jobName);
 
-    String[] inputDirsSpecs = inputDir.split(",");
-    for (int i = 0; i < inputDirsSpecs.length; i++) {
-      theJob.addInputPath(new Path(inputDirsSpecs[i]));
-    }
+    FileInputFormat.addInputPaths(theJob, inputDir);
 
     theJob.setInputFormat(theInputFormat);
     

+ 3 - 1
src/java/org/apache/hadoop/mapred/pipes/Submitter.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -354,7 +355,8 @@ public class Submitter {
         conf.addResource(new Path((String) results.getValue("-conf")));
       }
       if (results.hasOption("-input")) {
-        conf.setInputPath(new Path((String) results.getValue("-input")));
+        FileInputFormat.setInputPaths(conf, 
+                          (String) results.getValue("-input"));
       }
       if (results.hasOption("-output")) {
         FileOutputFormat.setOutputPath(conf, 

+ 2 - 1
src/java/org/apache/hadoop/tools/Logalyzer.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -213,7 +214,7 @@ public class Logalyzer {
     JobConf grepJob = new JobConf(fsConfig);
     grepJob.setJobName("logalyzer-grep-sort");
     
-    grepJob.setInputPath(grepInput);
+    FileInputFormat.setInputPaths(grepJob, grepInput);
     grepJob.setInputFormat(TextInputFormat.class);
     
     grepJob.setMapperClass(LogRegexMapper.class);

+ 0 - 38
src/test/org/apache/hadoop/conf/TestJobConf.java

@@ -20,9 +20,7 @@ package org.apache.hadoop.conf;
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
 
 public class TestJobConf extends TestCase {
 
@@ -51,40 +49,4 @@ public class TestJobConf extends TestCase {
     configuration.set("mapred.task.profile.params", "test");
     Assert.assertEquals("test", configuration.getProfileParams());
   }
-
-  public void testInputPath() throws Exception {
-    JobConf jobConf = new JobConf();
-    Path path = new Path(jobConf.getWorkingDirectory(), 
-        "xx{y"+StringUtils.COMMA_STR+"z}");
-    jobConf.setInputPath(path);
-    Path[] paths = jobConf.getInputPaths();
-    assertEquals(1, paths.length);
-    assertEquals(path.toString(), paths[0].toString());
-    
-    StringBuilder pathStr = new StringBuilder();
-    pathStr.append(StringUtils.ESCAPE_CHAR);
-    pathStr.append(StringUtils.ESCAPE_CHAR);
-    pathStr.append(StringUtils.COMMA);
-    pathStr.append(StringUtils.COMMA);
-    pathStr.append('a');
-    path = new Path(jobConf.getWorkingDirectory(), pathStr.toString());
-    jobConf.setInputPath(path);
-    paths = jobConf.getInputPaths();
-    assertEquals(1, paths.length);
-    assertEquals(path.toString(), paths[0].toString());
-    
-    pathStr.setLength(0);
-    pathStr.append(StringUtils.ESCAPE_CHAR);
-    pathStr.append("xx");
-    pathStr.append(StringUtils.ESCAPE_CHAR);
-    path = new Path(jobConf.getWorkingDirectory(), pathStr.toString());
-    Path path1 = new Path(jobConf.getWorkingDirectory(),
-        "yy"+StringUtils.COMMA_STR+"zz");
-    jobConf.setInputPath(path);
-    jobConf.addInputPath(path1);
-    paths = jobConf.getInputPaths();
-    assertEquals(2, paths.length);
-    assertEquals(path.toString(), paths[0].toString());
-    assertEquals(path1.toString(), paths[1].toString());
-  }
 }

+ 2 - 1
src/test/org/apache/hadoop/dfs/NNBench.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile;
 
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
@@ -464,7 +465,7 @@ public class NNBench {
     JobConf job = new JobConf(config, NNBench.class);
 
     job.setJobName("NNBench-" + operation);
-    job.setInputPath(new Path(baseDir, CONTROL_DIR_NAME));
+    FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
     
     // Explicitly set number of max map attempts to 1.

+ 1 - 1
src/test/org/apache/hadoop/fs/DFSCIOTest.java

@@ -267,7 +267,7 @@ public class DFSCIOTest extends TestCase {
                                  ) throws IOException {
     JobConf job = new JobConf(fsConfig, DFSCIOTest.class);
 
-    job.setInputPath(CONTROL_DIR);
+    FileInputFormat.setInputPaths(job, CONTROL_DIR);
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(mapperClass);

+ 1 - 1
src/test/org/apache/hadoop/fs/DistributedFSCheck.java

@@ -188,7 +188,7 @@ public class DistributedFSCheck extends TestCase {
   private void runDistributedFSCheck() throws Exception {
     JobConf job = new JobConf(fs.getConf(), DistributedFSCheck.class);
 
-    job.setInputPath(MAP_INPUT_DIR);
+    FileInputFormat.setInputPaths(job, MAP_INPUT_DIR);
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(DistributedFSCheckMapper.class);

+ 1 - 1
src/test/org/apache/hadoop/fs/TestDFSIO.java

@@ -223,7 +223,7 @@ public class TestDFSIO extends TestCase {
                                  ) throws IOException {
     JobConf job = new JobConf(fsConfig, TestDFSIO.class);
 
-    job.setInputPath(CONTROL_DIR);
+    FileInputFormat.setInputPaths(job, CONTROL_DIR);
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(mapperClass);

+ 4 - 3
src/test/org/apache/hadoop/fs/TestFileSystem.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -222,7 +223,7 @@ public class TestFileSystem extends TestCase {
     JobConf job = new JobConf(conf, TestFileSystem.class);
     job.setBoolean("fs.test.fastCheck", fastCheck);
 
-    job.setInputPath(CONTROL_DIR);
+    FileInputFormat.setInputPaths(job, CONTROL_DIR);
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(WriteMapper.class);
@@ -320,7 +321,7 @@ public class TestFileSystem extends TestCase {
     job.setBoolean("fs.test.fastCheck", fastCheck);
 
 
-    job.setInputPath(CONTROL_DIR);
+    FileInputFormat.setInputPaths(job, CONTROL_DIR);
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(ReadMapper.class);
@@ -416,7 +417,7 @@ public class TestFileSystem extends TestCase {
     JobConf job = new JobConf(conf, TestFileSystem.class);
     job.setBoolean("fs.test.fastCheck", fastCheck);
 
-    job.setInputPath(CONTROL_DIR);
+    FileInputFormat.setInputPaths(job,CONTROL_DIR);
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(SeekMapper.class);

+ 2 - 2
src/test/org/apache/hadoop/io/FileBench.java

@@ -134,7 +134,7 @@ public class FileBench extends Configured implements Tool {
   static long readBench(JobConf conf) throws IOException {
     InputFormat inf = conf.getInputFormat();
     final String fn = conf.get("test.filebench.name", "");
-    Path pin = new Path(conf.getInputPaths()[0], fn);
+    Path pin = new Path(FileInputFormat.getInputPaths(conf)[0], fn);
     FileStatus in = pin.getFileSystem(conf).getFileStatus(pin);
     RecordReader rr = inf.getRecordReader(new FileSplit(pin, 0, in.getLen(), 
                                           (String[])null), conf, Reporter.NULL);
@@ -197,7 +197,7 @@ public class FileBench extends Configured implements Tool {
     fillBlocks(job);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    job.setInputPath(root);
+    FileInputFormat.setInputPaths(job, root);
     FileOutputFormat.setOutputPath(job, root);
 
     if (null == cc) cc = EnumSet.allOf(CCodec.class);

+ 1 - 1
src/test/org/apache/hadoop/mapred/BigMapOutput.java

@@ -131,7 +131,7 @@ public class BigMapOutput extends Configured implements Tool {
     jobConf.setJobName("BigMapOutput");
     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    jobConf.setInputPath(bigMapInput);
+    FileInputFormat.setInputPaths(jobConf, bigMapInput);
     if (fs.exists(outputPath)) {
       fs.delete(outputPath, true);
     }

+ 3 - 3
src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java

@@ -111,7 +111,7 @@ public class GenericMRLoadGenerator extends Configured implements Tool {
         } else if ("-outdir".equals(argv[i])) {
           FileOutputFormat.setOutputPath(job, new Path(argv[++i]));
         } else if ("-indir".equals(argv[i])) {
-          job.addInputPath(new Path(argv[++i]));
+          FileInputFormat.addInputPaths(job, argv[++i]);
         } else if ("-inFormatIndirect".equals(argv[i])) {
           job.setClass("mapred.indirect.input.format",
               Class.forName(argv[++i]).asSubclass(InputFormat.class),
@@ -145,7 +145,7 @@ public class GenericMRLoadGenerator extends Configured implements Tool {
       job.setOutputFormat(NullOutputFormat.class);
     }
 
-    if (0 == job.getInputPaths().length) {
+    if (0 == FileInputFormat.getInputPaths(job).length) {
       // No input dir? Generate random data
       System.err.println("No input path; ignoring InputFormat");
       confRandom(job);
@@ -161,7 +161,7 @@ public class GenericMRLoadGenerator extends Configured implements Tool {
           LongWritable.class, Text.class,
           SequenceFile.CompressionType.NONE);
       try {
-        for (Path p : job.getInputPaths()) {
+        for (Path p : FileInputFormat.getInputPaths(job)) {
           FileSystem fs = p.getFileSystem(job);
           Stack<Path> pathstack = new Stack<Path>();
           pathstack.push(p);

+ 2 - 2
src/test/org/apache/hadoop/mapred/MRBench.java

@@ -139,7 +139,7 @@ public class MRBench {
    */
   private static JobConf setupJob(int numMaps, int numReduces, String jarFile) {
     JobConf jobConf = new JobConf(MRBench.class);
-    jobConf.addInputPath(INPUT_DIR);
+    FileInputFormat.addInputPath(jobConf, INPUT_DIR);
     
     jobConf.setInputFormat(TextInputFormat.class);
     jobConf.setOutputFormat(TextOutputFormat.class);
@@ -180,7 +180,7 @@ public class MRBench {
                          new Path(OUTPUT_DIR, "output_" + rand.nextInt()));
 
       LOG.info("Running job " + i + ":" +
-               " input=" + jobConf.getInputPaths()[0] + 
+               " input=" + FileInputFormat.getInputPaths(jobConf)[0] + 
                " output=" + FileOutputFormat.getOutputPath(jobConf));
       
       // run the mapred task now 

+ 1 - 1
src/test/org/apache/hadoop/mapred/MRCaching.java

@@ -166,7 +166,7 @@ public class MRCaching {
     conf.setMapperClass(MRCaching.MapClass.class);
     conf.setCombinerClass(MRCaching.ReduceClass.class);
     conf.setReducerClass(MRCaching.ReduceClass.class);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(1);

+ 1 - 1
src/test/org/apache/hadoop/mapred/NotificationTestCase.java

@@ -202,7 +202,7 @@ public abstract class NotificationTestCase extends HadoopTestCase {
     conf.setCombinerClass(WordCount.Reduce.class);
     conf.setReducerClass(WordCount.Reduce.class);
 
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);

+ 1 - 1
src/test/org/apache/hadoop/mapred/PiEstimator.java

@@ -157,7 +157,7 @@ public class PiEstimator {
     if (!fileSys.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
     }
-    jobConf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(jobConf, inDir);
     FileOutputFormat.setOutputPath(jobConf, outDir);
     
     jobConf.setNumMapTasks(numMaps);

+ 12 - 12
src/test/org/apache/hadoop/mapred/SortValidator.java

@@ -63,7 +63,7 @@ public class SortValidator extends Configured implements Tool {
   }
 
   static private IntWritable deduceInputFile(JobConf job) {
-    Path[] inputPaths = job.getInputPaths();
+    Path[] inputPaths = FileInputFormat.getInputPaths(job);
     Path inputFile = new Path(job.get("map.input.file"));
 
     // value == one for sort-input; value == two for sort-output
@@ -336,8 +336,8 @@ public class SortValidator extends Configured implements Tool {
       jobConf.setNumMapTasks(noSortReduceTasks);
       jobConf.setNumReduceTasks(1);
 
-      jobConf.setInputPath(sortInput);
-      jobConf.addInputPath(sortOutput);
+      FileInputFormat.setInputPaths(jobConf, sortInput);
+      FileInputFormat.addInputPath(jobConf, sortOutput);
       Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
       if (fs.exists(outputPath)) {
         fs.delete(outputPath, true);
@@ -346,11 +346,11 @@ public class SortValidator extends Configured implements Tool {
       
       // Uncomment to run locally in a single process
       //job_conf.set("mapred.job.tracker", "local");
-      
+      Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
       System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
-                         "from " + jobConf.getInputPaths()[0] + " (" + 
+                         "from " + inputPaths[0] + " (" + 
                          noSortInputpaths + " files), " + 
-                         jobConf.getInputPaths()[1] + " (" + 
+                         inputPaths[1] + " (" + 
                          noSortReduceTasks + 
                          " files) into " + 
                          FileOutputFormat.getOutputPath(jobConf) + 
@@ -479,8 +479,8 @@ public class SortValidator extends Configured implements Tool {
       jobConf.setNumMapTasks(noMaps);
       jobConf.setNumReduceTasks(noReduces);
       
-      jobConf.setInputPath(sortInput);
-      jobConf.addInputPath(sortOutput);
+      FileInputFormat.setInputPaths(jobConf, sortInput);
+      FileInputFormat.addInputPath(jobConf, sortOutput);
       Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
       FileSystem fs = FileSystem.get(defaults);
       if (fs.exists(outputPath)) {
@@ -490,12 +490,12 @@ public class SortValidator extends Configured implements Tool {
       
       // Uncomment to run locally in a single process
       //job_conf.set("mapred.job.tracker", "local");
-      
+      Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
       System.out.println("\nSortValidator.RecordChecker: Running on " +
                          cluster.getTaskTrackers() +
-                         " nodes to validate sort from " +
-                         jobConf.getInputPaths()[0] + ", " + 
-                         jobConf.getInputPaths()[1] + " into " +
+                        " nodes to validate sort from " + 
+                         inputPaths[0] + ", " + 
+                         inputPaths[1] + " into " + 
                          FileOutputFormat.getOutputPath(jobConf) + 
                          " with " + noReduces + " reduces.");
       Date startTime = new Date();

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java

@@ -55,7 +55,7 @@ public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase {
     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
 
-    conf.setInputPath(getInputDir());
+    FileInputFormat.setInputPaths(conf, getInputDir());
 
     FileOutputFormat.setOutputPath(conf, getOutputDir());
 

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestComparators.java

@@ -296,7 +296,7 @@ public class TestComparators extends TestCase
     FileSystem fs = FileSystem.get(conf);
     fs.delete(testdir, true);
     conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setOutputKeyClass(IntWritable.class);
     conf.setOutputValueClass(Text.class);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java

@@ -76,7 +76,7 @@ public class TestEmptyJobWithDFS extends TestCase {
     conf.setOutputValueClass(IntWritable.class);
     conf.setMapperClass(IdentityMapper.class);        
     conf.setReducerClass(IdentityReducer.class);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestFieldSelection.java

@@ -89,7 +89,7 @@ private static NumberFormat idFormat = NumberFormat.getInstance();
     System.out.println("inputData:");
     System.out.println(inputData.toString());
     JobConf job = new JobConf(conf, TestFieldSelection.class);
-    job.setInputPath(INPUT_DIR);
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
     job.setInputFormat(TextInputFormat.class);
     job.setMapperClass(FieldSelectionMapReduce.class);
     job.setReducerClass(FieldSelectionMapReduce.class);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestFileInputFormatPathFilter.java

@@ -101,7 +101,7 @@ public class TestFileInputFormatPathFilter extends TestCase {
     JobConf conf = new JobConf();
 
     Path inputDir = (withGlob) ? new Path(workDir, "a*") : workDir;
-    conf.setInputPath(inputDir);
+    FileInputFormat.setInputPaths(conf, inputDir);
     conf.setInputFormat(DummyFileInputFormat.class);
 
     if (withFilter) {

+ 109 - 0
src/test/org/apache/hadoop/mapred/TestInputPath.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestInputPath extends TestCase {
+  public void testInputPath() throws Exception {
+    JobConf jobConf = new JobConf();
+    Path workingDir = jobConf.getWorkingDirectory();
+    
+    Path path = new Path(workingDir, 
+        "xx{y"+StringUtils.COMMA_STR+"z}");
+    FileInputFormat.setInputPaths(jobConf, path);
+    Path[] paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(1, paths.length);
+    assertEquals(path.toString(), paths[0].toString());
+	    
+    StringBuilder pathStr = new StringBuilder();
+    pathStr.append(StringUtils.ESCAPE_CHAR);
+    pathStr.append(StringUtils.ESCAPE_CHAR);
+    pathStr.append(StringUtils.COMMA);
+    pathStr.append(StringUtils.COMMA);
+    pathStr.append('a');
+    path = new Path(workingDir, pathStr.toString());
+    FileInputFormat.setInputPaths(jobConf, path);
+    paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(1, paths.length);
+    assertEquals(path.toString(), paths[0].toString());
+		    
+    pathStr.setLength(0);
+    pathStr.append(StringUtils.ESCAPE_CHAR);
+    pathStr.append("xx");
+    pathStr.append(StringUtils.ESCAPE_CHAR);
+    path = new Path(workingDir, pathStr.toString());
+    Path path1 = new Path(workingDir,
+        "yy"+StringUtils.COMMA_STR+"zz");
+    FileInputFormat.setInputPaths(jobConf, path);
+    FileInputFormat.addInputPath(jobConf, path1);
+    paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(2, paths.length);
+    assertEquals(path.toString(), paths[0].toString());
+    assertEquals(path1.toString(), paths[1].toString());
+
+    FileInputFormat.setInputPaths(jobConf, path, path1);
+    paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(2, paths.length);
+    assertEquals(path.toString(), paths[0].toString());
+    assertEquals(path1.toString(), paths[1].toString());
+
+    Path[] input = new Path[] {path, path1};
+    FileInputFormat.setInputPaths(jobConf, input);
+    paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(2, paths.length);
+    assertEquals(path.toString(), paths[0].toString());
+    assertEquals(path1.toString(), paths[1].toString());
+    
+    pathStr.setLength(0);
+    String str1 = "{a{b,c},de}";
+    String str2 = "xyz";
+    String str3 = "x{y,z}";
+    pathStr.append(str1);
+    pathStr.append(StringUtils.COMMA);
+    pathStr.append(str2);
+    pathStr.append(StringUtils.COMMA);
+    pathStr.append(str3);
+    FileInputFormat.setInputPaths(jobConf, pathStr.toString());
+    paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(3, paths.length);
+    assertEquals(new Path(workingDir, str1).toString(), paths[0].toString());
+    assertEquals(new Path(workingDir, str2).toString(), paths[1].toString());
+    assertEquals(new Path(workingDir, str3).toString(), paths[2].toString());
+
+    pathStr.setLength(0);
+    String str4 = "abc";
+    String str5 = "pq{r,s}";
+    pathStr.append(str4);
+    pathStr.append(StringUtils.COMMA);
+    pathStr.append(str5);
+    FileInputFormat.addInputPaths(jobConf, pathStr.toString());
+    paths = FileInputFormat.getInputPaths(jobConf);
+    assertEquals(5, paths.length);
+    assertEquals(new Path(workingDir, str1).toString(), paths[0].toString());
+    assertEquals(new Path(workingDir, str2).toString(), paths[1].toString());
+    assertEquals(new Path(workingDir, str3).toString(), paths[2].toString());
+    assertEquals(new Path(workingDir, str4).toString(), paths[3].toString());
+    assertEquals(new Path(workingDir, str5).toString(), paths[4].toString());
+  }
+}

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

@@ -117,7 +117,7 @@ public class TestJavaSerialization extends ClusterMapReduceTestCase {
     conf.setMapperClass(TypeConverterMapper.class);
     conf.setReducerClass(IdentityReducer.class);
 
-    conf.setInputPath(getInputDir());
+    FileInputFormat.setInputPaths(conf, getInputDir());
 
     FileOutputFormat.setOutputPath(conf, getOutputDir());
 

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java

@@ -49,7 +49,7 @@ public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
 
-    conf.setInputPath(getInputDir());
+    FileInputFormat.setInputPaths(conf, getInputDir());
 
     FileOutputFormat.setOutputPath(conf, getOutputDir());
 

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java

@@ -60,7 +60,7 @@ public class TestKeyValueTextInputFormat extends TestCase {
     Random random = new Random(seed);
 
     localFs.delete(workDir, true);
-    job.setInputPath(workDir);
+    FileInputFormat.setInputPaths(job, workDir);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
@@ -206,7 +206,7 @@ public class TestKeyValueTextInputFormat extends TestCase {
               "line-1\tthe quick\nline-2\tbrown\nline-3\tfox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n");
     writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
               "line-1\tthis is a test\nline-1\tof gzip\n");
-    job.setInputPath(workDir);
+    FileInputFormat.setInputPaths(job, workDir);
     KeyValueTextInputFormat format = new KeyValueTextInputFormat();
     format.configure(job);
     InputSplit[] splits = format.getSplits(job, 100);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestMapOutputType.java

@@ -83,7 +83,7 @@ public class TestMapOutputType extends TestCase
     fs.delete(testdir, true);
     conf.setInt("io.sort.mb", 1);
     conf.setInputFormat(SequenceFileInputFormat.class);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setMapperClass(TextGen.class);
     conf.setReducerClass(TextReduce.class);

+ 5 - 5
src/test/org/apache/hadoop/mapred/TestMapRed.java

@@ -297,7 +297,7 @@ public class TestMapRed extends TestCase {
     Path outDir = new Path(testdir, "out");
     FileSystem fs = FileSystem.get(conf);
     fs.delete(testdir, true);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setMapperClass(MyMap.class);
     conf.setReducerClass(MyReduce.class);
@@ -423,7 +423,7 @@ public class TestMapRed extends TestCase {
 
 
     JobConf genJob = new JobConf(conf, TestMapRed.class);
-    genJob.setInputPath(randomIns);
+    FileInputFormat.setInputPaths(genJob, randomIns);
     genJob.setInputFormat(SequenceFileInputFormat.class);
     genJob.setMapperClass(RandomGenMapper.class);
 
@@ -468,7 +468,7 @@ public class TestMapRed extends TestCase {
     Path intermediateOuts = new Path(testdir, "intermediateouts");
     fs.delete(intermediateOuts, true);
     JobConf checkJob = new JobConf(conf, TestMapRed.class);
-    checkJob.setInputPath(randomOuts);
+    FileInputFormat.setInputPaths(checkJob, randomOuts);
     checkJob.setInputFormat(TextInputFormat.class);
     checkJob.setMapperClass(RandomCheckMapper.class);
 
@@ -491,7 +491,7 @@ public class TestMapRed extends TestCase {
     Path finalOuts = new Path(testdir, "finalouts");        
     fs.delete(finalOuts, true);
     JobConf mergeJob = new JobConf(conf, TestMapRed.class);
-    mergeJob.setInputPath(intermediateOuts);
+    FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
     mergeJob.setInputFormat(SequenceFileInputFormat.class);
     mergeJob.setMapperClass(MergeMapper.class);
         
@@ -598,7 +598,7 @@ public class TestMapRed extends TestCase {
       fs.delete(testdir, true);
       conf.setInt("io.sort.mb", 1);
       conf.setInputFormat(SequenceFileInputFormat.class);
-      conf.setInputPath(inDir);
+      FileInputFormat.setInputPaths(conf, inDir);
       FileOutputFormat.setOutputPath(conf, outDir);
       conf.setMapperClass(IdentityMapper.class);
       conf.setReducerClass(IdentityReducer.class);

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java

@@ -66,7 +66,7 @@ public class TestMiniMRClasspath extends TestCase {
     conf.set("mapred.mapper.class", "testjar.ClassWordCount$MapClass");        
     conf.set("mapred.combine.class", "testjar.ClassWordCount$Reduce");
     conf.set("mapred.reducer.class", "testjar.ClassWordCount$Reduce");
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);
@@ -119,7 +119,7 @@ public class TestMiniMRClasspath extends TestCase {
     // the values are the messages
     conf.set("mapred.output.key.class", "testjar.ExternalWritable");
 
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java

@@ -133,7 +133,7 @@ public class TestMiniMRMapRedDebugScript extends TestCase {
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(0);
     conf.setMapDebugScript(debugScript);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
                                       "/tmp")).toString().replace(' ', '+');

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRTaskTempDir.java

@@ -113,7 +113,7 @@ public class TestMiniMRTaskTempDir extends TestCase {
     conf.setReducerClass(IdentityReducer.class);
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(0);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
                                       "/tmp")).toString().replace(' ', '+');

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

@@ -79,7 +79,7 @@ public class TestMiniMRWithDFS extends TestCase {
     conf.setMapperClass(WordCount.MapClass.class);        
     conf.setCombinerClass(WordCount.Reduce.class);
     conf.setReducerClass(WordCount.Reduce.class);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java

@@ -76,7 +76,7 @@ public class TestMultiFileInputFormat extends TestCase{
        }
        lengths.put(path.getName(), new Long(numBytes));
     }
-    job.setInputPath(multiFileDir);
+    FileInputFormat.setInputPaths(job, multiFileDir);
     return multiFileDir;
   }
   

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java

@@ -177,7 +177,7 @@ public class TestRackAwareTaskPlacement extends TestCase {
     jobConf.setJobName("TestForRackAwareness");
     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    jobConf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(jobConf, inDir);
     FileOutputFormat.setOutputPath(jobConf, outputPath);
     jobConf.setMapperClass(IdentityMapper.class);
     jobConf.setReducerClass(IdentityReducer.class);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java

@@ -42,7 +42,7 @@ public class TestSequenceFileAsBinaryInputFormat extends TestCase {
     r.setSeed(seed);
 
     fs.delete(dir, true);
-    job.setInputPath(dir);
+    FileInputFormat.setInputPaths(job, dir);
 
     Text tkey = new Text();
     Text tval = new Text();

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java

@@ -48,7 +48,7 @@ public class TestSequenceFileAsTextInputFormat extends TestCase {
 
     fs.delete(dir, true);
 
-    job.setInputPath(dir);
+    FileInputFormat.setInputPaths(job, dir);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java

@@ -41,7 +41,7 @@ public class TestSequenceFileInputFilter extends TestCase {
   private static final Reporter reporter = Reporter.NULL;
   
   static {
-    job.setInputPath(inDir);
+    FileInputFormat.setInputPaths(job, inDir);
     try {
       fs = FileSystem.getLocal(conf);
     } catch (IOException e) {

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java

@@ -48,7 +48,7 @@ public class TestSequenceFileInputFormat extends TestCase {
 
     fs.delete(dir, true);
 
-    job.setInputPath(dir);
+    FileInputFormat.setInputPaths(job, dir);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java

@@ -74,7 +74,7 @@ public class TestSpecialCharactersInOutputPath extends TestCase {
     conf.setOutputValueClass(Text.class);
     conf.setMapperClass(IdentityMapper.class);        
     conf.setReducerClass(IdentityReducer.class);
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);

+ 3 - 3
src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -60,7 +60,7 @@ public class TestTextInputFormat extends TestCase {
     Random random = new Random(seed);
 
     localFs.delete(workDir, true);
-    job.setInputPath(workDir);
+    FileInputFormat.setInputPaths(job, workDir);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
@@ -205,7 +205,7 @@ public class TestTextInputFormat extends TestCase {
               "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");
     writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
               "this is a test\nof gzip\n");
-    job.setInputPath(workDir);
+    FileInputFormat.setInputPaths(job, workDir);
     TextInputFormat format = new TextInputFormat();
     format.configure(job);
     InputSplit[] splits = format.getSplits(job, 100);
@@ -235,7 +235,7 @@ public class TestTextInputFormat extends TestCase {
     ReflectionUtils.setConf(gzip, job);
     localFs.delete(workDir, true);
     writeFile(localFs, new Path(workDir, "empty.gz"), gzip, "");
-    job.setInputPath(workDir);
+    FileInputFormat.setInputPaths(job, workDir);
     TextInputFormat format = new TextInputFormat();
     format.configure(job);
     InputSplit[] splits = format.getSplits(job, 100);

+ 1 - 1
src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java

@@ -295,7 +295,7 @@ public class ThreadedMapBenchmark extends Configured implements Tool {
       job.setMapperClass(IdentityMapper.class);        
       job.setReducerClass(IdentityReducer.class);
       
-      job.addInputPath(INPUT_DIR);
+      FileInputFormat.addInputPath(job, INPUT_DIR);
       FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
       
       JobClient client = new JobClient(job);

+ 2 - 6
src/test/org/apache/hadoop/mapred/jobcontrol/JobControlTestUtils.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
@@ -118,12 +119,7 @@ public class JobControlTestUtils {
     JobConf theJob = new JobConf(defaults, TestJobControl.class);
     theJob.setJobName("DataMoveJob");
 
-    theJob.setInputPath((Path) indirs.get(0));
-    if (indirs.size() > 1) {
-      for (int i = 1; i < indirs.size(); i++) {
-        theJob.addInputPath((Path) indirs.get(i));
-      }
-    }
+    FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
     theJob.setMapperClass(DataCopy.class);
     FileOutputFormat.setOutputPath(theJob, outdir);
     theJob.setOutputKeyClass(Text.class);

+ 1 - 1
src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java

@@ -87,7 +87,7 @@ public class TestMultithreadedMapRunner extends HadoopTestCase {
     conf.setMapperClass(IDMap.class);
     conf.setReducerClass(IDReduce.class);
 
-    conf.setInputPath(inDir);
+    FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
 
     conf.setMapRunnerClass(MultithreadedMapRunner.class);

+ 1 - 1
src/test/org/apache/hadoop/mapred/lib/aggregate/TestAggregates.java

@@ -78,7 +78,7 @@ public class TestAggregates extends TestCase {
     System.out.println("inputData:");
     System.out.println(inputData.toString());
     JobConf job = new JobConf(conf, TestAggregates.class);
-    job.setInputPath(INPUT_DIR);
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
     job.setInputFormat(TextInputFormat.class);
 
     FileOutputFormat.setOutputPath(job, OUTPUT_DIR);

+ 2 - 1
src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
@@ -146,7 +147,7 @@ public class TestPipes extends TestCase {
       Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
       Submitter.setIsJavaRecordReader(job, true);
       Submitter.setIsJavaRecordWriter(job, true);
-      job.setInputPath(inputPath);
+      FileInputFormat.setInputPaths(job, inputPath);
       FileOutputFormat.setOutputPath(job, outputPath);
       RunningJob result = Submitter.submitJob(job);
       assertTrue("pipes job failed", result.isSuccessful());

+ 2 - 2
src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.*;
  * RecordReaders are not implemented in Java, naturally...
  */
 public class WordCountInputFormat
-  implements InputFormat<IntWritable, Text> {
+  extends FileInputFormat<IntWritable, Text> {
   
   static class WordCountInputSplit implements InputSplit  {
     private String filename;
@@ -53,7 +53,7 @@ public class WordCountInputFormat
                                 int numSplits) throws IOException {
     ArrayList<InputSplit> result = new ArrayList<InputSplit>();
     FileSystem local = FileSystem.getLocal(conf);
-    for(Path dir: conf.getInputPaths()) {
+    for(Path dir: getInputPaths(conf)) {
       for(FileStatus file: local.listStatus(dir)) {
         result.add(new WordCountInputSplit(file.getPath()));
       }

+ 3 - 3
src/test/org/apache/hadoop/record/TestRecordMR.java

@@ -308,7 +308,7 @@ public class TestRecordMR extends TestCase {
 
 
     JobConf genJob = new JobConf(conf, TestRecordMR.class);
-    genJob.setInputPath(randomIns);
+    FileInputFormat.setInputPaths(genJob, randomIns);
     genJob.setInputFormat(SequenceFileInputFormat.class);
     genJob.setMapperClass(RandomGenMapper.class);
 
@@ -353,7 +353,7 @@ public class TestRecordMR extends TestCase {
     Path intermediateOuts = new Path(testdir, "intermediateouts");
     fs.delete(intermediateOuts, true);
     JobConf checkJob = new JobConf(conf, TestRecordMR.class);
-    checkJob.setInputPath(randomOuts);
+    FileInputFormat.setInputPaths(checkJob, randomOuts);
     checkJob.setInputFormat(SequenceFileInputFormat.class);
     checkJob.setMapperClass(RandomCheckMapper.class);
 
@@ -376,7 +376,7 @@ public class TestRecordMR extends TestCase {
     Path finalOuts = new Path(testdir, "finalouts");        
     fs.delete(finalOuts, true);
     JobConf mergeJob = new JobConf(conf, TestRecordMR.class);
-    mergeJob.setInputPath(intermediateOuts);
+    FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
     mergeJob.setInputFormat(SequenceFileInputFormat.class);
     mergeJob.setMapperClass(MergeMapper.class);
         

+ 1 - 1
src/test/org/apache/hadoop/record/TestRecordWritable.java

@@ -52,7 +52,7 @@ public class TestRecordWritable extends TestCase {
 
     fs.delete(dir, true);
 
-    job.setInputPath(dir);
+    FileInputFormat.setInputPaths(job, dir);
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;

+ 2 - 1
src/test/testshell/ExternalMapReduce.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -100,7 +101,7 @@ public class ExternalMapReduce
     Configuration commandConf = JobClient.getCommandLineConfig();
     JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
     testConf.setJobName("external job");
-    testConf.setInputPath(input);
+    FileInputFormat.setInputPaths(testConf, input);
     FileOutputFormat.setOutputPath(testConf, outDir);
     testConf.setMapperClass(ExternalMapReduce.class);
     testConf.setReducerClass(ExternalMapReduce.class);