Browse Source

HADOOP-451. Add a Split interface. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@488438 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
e93cfef777
34 changed files with 321 additions and 169 deletions
  1. 6 0
      CHANGES.txt
  2. 1 1
      build.xml
  3. 15 7
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
  4. 2 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
  5. 9 7
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
  6. 15 16
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
  7. 1 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
  8. 5 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
  9. 1 1
      src/java/org/apache/hadoop/fs/LocalFileSystem.java
  10. 4 2
      src/java/org/apache/hadoop/fs/s3/package.html
  11. 15 6
      src/java/org/apache/hadoop/mapred/FileSplit.java
  12. 8 12
      src/java/org/apache/hadoop/mapred/InputFormat.java
  13. 34 25
      src/java/org/apache/hadoop/mapred/InputFormatBase.java
  14. 36 0
      src/java/org/apache/hadoop/mapred/InputSplit.java
  15. 64 0
      src/java/org/apache/hadoop/mapred/InvalidInputException.java
  16. 2 1
      src/java/org/apache/hadoop/mapred/IsolationRunner.java
  17. 1 24
      src/java/org/apache/hadoop/mapred/JobClient.java
  18. 10 16
      src/java/org/apache/hadoop/mapred/JobInProgress.java
  19. 2 2
      src/java/org/apache/hadoop/mapred/LocalJobRunner.java
  20. 19 13
      src/java/org/apache/hadoop/mapred/MapTask.java
  21. 6 0
      src/java/org/apache/hadoop/mapred/RecordReader.java
  22. 1 1
      src/java/org/apache/hadoop/mapred/ReduceTask.java
  23. 2 3
      src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
  24. 5 5
      src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
  25. 13 0
      src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
  26. 1 1
      src/java/org/apache/hadoop/mapred/Task.java
  27. 2 2
      src/java/org/apache/hadoop/mapred/TaskInProgress.java
  28. 17 3
      src/java/org/apache/hadoop/mapred/TextInputFormat.java
  29. 2 2
      src/test/org/apache/hadoop/mapred/EmptyInputFormat.java
  30. 6 0
      src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
  31. 3 2
      src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
  32. 2 2
      src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
  33. 8 9
      src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
  34. 3 3
      src/test/org/apache/hadoop/record/test/TestWritable.java

+ 6 - 0
CHANGES.txt

@@ -122,6 +122,12 @@ Trunk (unreleased changes)
 34. HADOOP-823.  Fix problem starting datanode when not all configured
 34. HADOOP-823.  Fix problem starting datanode when not all configured
     data directories exist.  (Bryan Pendleton via cutting)
     data directories exist.  (Bryan Pendleton via cutting)
 
 
+35. HADOOP-451.  Add a Split interface.  CAUTION: This incompatibly
+    changes the InputFormat and RecordReader interfaces.  Not only is
+    FileSplit replaced with Split, but a FileSystem parameter is no
+    longer passed in several methods, input validation has changed,
+    etc.  (omalley via cutting)
+
 
 
 Release 0.9.2 - 2006-12-15
 Release 0.9.2 - 2006-12-15
 
 

+ 1 - 1
build.xml

@@ -366,7 +366,7 @@
     <delete dir="${test.log.dir}"/>
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" 
     <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" 
-           fork="yes" maxmemory="128m" dir="${basedir}"
+           fork="yes" maxmemory="256m" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>
       <sysproperty key="test.build.data" value="${test.build.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>

+ 15 - 7
src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java

@@ -84,20 +84,20 @@ public class MergerInputFormat extends InputFormatBase {
    (and if there was, this index may need to be created for the first time
    (and if there was, this index may need to be created for the first time
    full file at a time...    )
    full file at a time...    )
    */
    */
-  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
-    checkReady(fs, job);
-    return ((StreamInputFormat) primary_).getFullFileSplits(fs, job);
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return ((StreamInputFormat) primary_).getFullFileSplits(job);
   }
   }
 
 
   /**
   /**
    */
    */
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+  public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    FileSystem fs = ((FileSplit) split).getPath().getFileSystem(job);
     checkReady(fs, job);
     checkReady(fs, job);
 
 
     reporter.setStatus(split.toString());
     reporter.setStatus(split.toString());
 
 
     ArrayList readers = new ArrayList();
     ArrayList readers = new ArrayList();
-    String primary = split.getPath().toString();
+    String primary = ((FileSplit) split).getPath().toString();
     CompoundDirSpec spec = CompoundDirSpec.findInputSpecForPrimary(primary, job);
     CompoundDirSpec spec = CompoundDirSpec.findInputSpecForPrimary(primary, job);
     if (spec == null) {
     if (spec == null) {
       throw new IOException("Did not find -input spec in JobConf for primary:" + primary);
       throw new IOException("Did not find -input spec in JobConf for primary:" + primary);
@@ -106,7 +106,7 @@ public class MergerInputFormat extends InputFormatBase {
       InputFormat f = (InputFormat) fmts_.get(i);
       InputFormat f = (InputFormat) fmts_.get(i);
       Path path = new Path(spec.getPaths()[i][0]);
       Path path = new Path(spec.getPaths()[i][0]);
       FileSplit fsplit = makeFullFileSplit(path);
       FileSplit fsplit = makeFullFileSplit(path);
-      RecordReader r = f.getRecordReader(fs, fsplit, job, reporter);
+      RecordReader r = f.getRecordReader(fsplit, job, reporter);
       readers.add(r);
       readers.add(r);
     }
     }
 
 
@@ -115,7 +115,7 @@ public class MergerInputFormat extends InputFormatBase {
 
 
   private FileSplit makeFullFileSplit(Path path) throws IOException {
   private FileSplit makeFullFileSplit(Path path) throws IOException {
     long len = fs_.getLength(path);
     long len = fs_.getLength(path);
-    return new FileSplit(path, 0, len);
+    return new FileSplit(path, 0, len, job_);
   }
   }
 
 
   /*
   /*
@@ -188,6 +188,14 @@ public class MergerInputFormat extends InputFormatBase {
       }
       }
     }
     }
 
 
+    public float getProgress() throws IOException {
+      if (primaryClosed_) {
+        return 1.0f;
+      } else {
+        return primaryReader_.getProgress();
+      }
+    }
+    
     public void close() throws IOException {
     public void close() throws IOException {
       IOException firstErr = null;
       IOException firstErr = null;
 
 

+ 2 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -189,7 +189,8 @@ public abstract class PipeMapRed {
     if (split.getStart() == 0) {
     if (split.getStart() == 0) {
       return leaf;
       return leaf;
     } else {
     } else {
-      return new FileSplit(new Path(leaf), split.getStart(), split.getLength()).toString();
+      return new FileSplit(new Path(leaf), split.getStart(), 
+                           split.getLength(), job_).toString();
     }
     }
   }
   }
 
 

+ 9 - 7
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java

@@ -69,13 +69,7 @@ public abstract class StreamBaseRecordReader implements RecordReader {
   public abstract boolean next(Writable key, Writable value) throws IOException;
   public abstract boolean next(Writable key, Writable value) throws IOException;
 
 
   /** This implementation always returns true. */
   /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
-    int n = inputDirs.length;
-    boolean[] b = new boolean[n];
-    for (int i = 0; i < n; i++) {
-      b[i] = true;
-    }
-    return b;
+  public void validateInput(JobConf job) throws IOException {
   }
   }
 
 
   /** Returns the current position in the input. */
   /** Returns the current position in the input. */
@@ -88,6 +82,14 @@ public abstract class StreamBaseRecordReader implements RecordReader {
     in_.close();
     in_.close();
   }
   }
 
 
+  public float getProgress() throws IOException {
+    if (end_ == start_) {
+      return 1.0f;
+    } else {
+      return (in_.getPos() - start_) / (end_ - start_);
+    }
+  }
+  
   public WritableComparable createKey() {
   public WritableComparable createKey() {
     return new Text();
     return new Text();
   }
   }

+ 15 - 16
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java

@@ -49,12 +49,7 @@ public class StreamInputFormat extends InputFormatBase {
   protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
   protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
 
 
   /** This implementation always returns true. */
   /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
-    boolean[] b = new boolean[inputDirs.length];
-    for (int i = 0; i < inputDirs.length; ++i) {
-      b[i] = true;
-    }
-    return b;
+  public void validateInput(JobConf job) throws IOException {
   }
   }
 
 
   static boolean isGzippedInput(JobConf job) {
   static boolean isGzippedInput(JobConf job) {
@@ -62,29 +57,29 @@ public class StreamInputFormat extends InputFormatBase {
     return "gzip".equals(val);
     return "gzip".equals(val);
   }
   }
 
 
-  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
 
 
     if (isGzippedInput(job)) {
     if (isGzippedInput(job)) {
-      return getFullFileSplits(fs, job);
+      return getFullFileSplits(job);
     } else {
     } else {
-      return super.getSplits(fs, job, numSplits);
+      return super.getSplits(job, numSplits);
     }
     }
   }
   }
 
 
   /** For the compressed-files case: override InputFormatBase to produce one split. */
   /** For the compressed-files case: override InputFormatBase to produce one split. */
-  FileSplit[] getFullFileSplits(FileSystem fs, JobConf job) throws IOException {
-    Path[] files = listPaths(fs, job);
+  FileSplit[] getFullFileSplits(JobConf job) throws IOException {
+    Path[] files = listPaths(job);
     int numSplits = files.length;
     int numSplits = files.length;
     ArrayList splits = new ArrayList(numSplits);
     ArrayList splits = new ArrayList(numSplits);
     for (int i = 0; i < files.length; i++) {
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
       Path file = files[i];
-      long splitSize = fs.getLength(file);
-      splits.add(new FileSplit(file, 0, splitSize));
+      long splitSize = file.getFileSystem(job).getLength(file);
+      splits.add(new FileSplit(file, 0, splitSize, job));
     }
     }
     return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
     return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
   }
   }
 
 
-  protected Path[] listPaths(FileSystem fs, JobConf job) throws IOException {
+  protected Path[] listPaths(JobConf job) throws IOException {
     Path[] globs = job.getInputPaths();
     Path[] globs = job.getInputPaths();
     ArrayList list = new ArrayList();
     ArrayList list = new ArrayList();
     int dsup = globs.length;
     int dsup = globs.length;
@@ -93,6 +88,7 @@ public class StreamInputFormat extends InputFormatBase {
       LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
       LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
       Path[] paths;
       Path[] paths;
       Path dir;
       Path dir;
+      FileSystem fs = globs[d].getFileSystem(job);
       PathFilter filter = new GlobFilter(fs, leafName);
       PathFilter filter = new GlobFilter(fs, leafName);
       dir = new Path(globs[d].getParent().toString());
       dir = new Path(globs[d].getParent().toString());
       if (dir == null) dir = new Path(".");
       if (dir == null) dir = new Path(".");
@@ -132,8 +128,11 @@ public class StreamInputFormat extends InputFormatBase {
     FileSystem fs_;
     FileSystem fs_;
   }
   }
 
 
-  public RecordReader getRecordReader(FileSystem fs, final FileSplit split, JobConf job,
-      Reporter reporter) throws IOException {
+  public RecordReader getRecordReader(final InputSplit genericSplit, 
+                                      JobConf job,
+                                      Reporter reporter) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    FileSystem fs = split.getPath().getFileSystem(job);
     LOG.info("getRecordReader start.....split=" + split);
     LOG.info("getRecordReader start.....split=" + split);
     reporter.setStatus(split.toString());
     reporter.setStatus(split.toString());
 
 

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java

@@ -418,7 +418,7 @@ public class StreamUtil {
     Path p = new Path(path);
     Path p = new Path(path);
     long start = Long.parseLong(job.get("map.input.start"));
     long start = Long.parseLong(job.get("map.input.start"));
     long length = Long.parseLong(job.get("map.input.length"));
     long length = Long.parseLong(job.get("map.input.length"));
-    return new FileSplit(p, start, length);
+    return new FileSplit(p, start, length, job);
   }
   }
 
 
   static class TaskId {
   static class TaskId {

+ 5 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java

@@ -51,7 +51,8 @@ public class TupleInputFormat extends InputFormatBase {
 
 
   /**
   /**
    */
    */
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+  public RecordReader getRecordReader(InputSplit split, JobConf job, 
+                                      Reporter reporter) throws IOException {
 
 
     reporter.setStatus(split.toString());
     reporter.setStatus(split.toString());
 
 
@@ -82,6 +83,9 @@ public class TupleInputFormat extends InputFormatBase {
       return new UTF8();
       return new UTF8();
     }
     }
 
 
+    public float getProgress() {
+      return 1.0f;
+    }
   }
   }
 
 
   ArrayList/*<InputFormat>*/fmts_;
   ArrayList/*<InputFormat>*/fmts_;

+ 1 - 1
src/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -44,7 +44,7 @@ public class LocalFileSystem extends FileSystem {
     
     
     public LocalFileSystem() {}
     public LocalFileSystem() {}
 
 
-    /** @deprecated. */
+    /** @deprecated */
     public LocalFileSystem(Configuration conf) throws IOException {
     public LocalFileSystem(Configuration conf) throws IOException {
       initialize(NAME, conf);
       initialize(NAME, conf);
     }
     }

+ 4 - 2
src/java/org/apache/hadoop/fs/s3/package.html

@@ -5,8 +5,10 @@
 org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
 org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
 
 
 <p>
 <p>
-Files are stored in S3 as blocks (represented by {@link Block}), which have an ID and a length.
-Block metadata is stored in S3 as a small record (represented by {@link INode}) using the URL-encoded
+Files are stored in S3 as blocks (represented by 
+{@link org.apache.hadoop.fs.s3.Block}), which have an ID and a length.
+Block metadata is stored in S3 as a small record (represented by 
+{@link org.apache.hadoop.fs.s3.INode}) using the URL-encoded
 path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
 path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
 This design makes it easy to seek to any given position in a file by reading the inode data to compute
 This design makes it easy to seek to any given position in a file by reading the inode data to compute
 which block to access, then using S3's support for 
 which block to access, then using S3's support for 

+ 15 - 6
src/java/org/apache/hadoop/mapred/FileSplit.java

@@ -23,18 +23,18 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutput;
 import java.io.File;                              // deprecated
 import java.io.File;                              // deprecated
 
 
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 
 
 /** A section of an input file.  Returned by {@link
 /** A section of an input file.  Returned by {@link
- * InputFormat#getSplits(FileSystem, JobConf, int)} and passed to
- * {@link InputFormat#getRecordReader(FileSystem,FileSplit,JobConf,Reporter)}. */
-public class FileSplit implements Writable {
+ * InputFormat#getSplits(JobConf, int)} and passed to
+ * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}. */
+public class FileSplit implements InputSplit {
   private Path file;
   private Path file;
   private long start;
   private long start;
   private long length;
   private long length;
+  private JobConf conf;
   
   
   FileSplit() {}
   FileSplit() {}
 
 
@@ -44,10 +44,11 @@ public class FileSplit implements Writable {
    * @param start the position of the first byte in the file to process
    * @param start the position of the first byte in the file to process
    * @param length the number of bytes in the file to process
    * @param length the number of bytes in the file to process
    */
    */
-  public FileSplit(Path file, long start, long length) {
+  public FileSplit(Path file, long start, long length, JobConf conf) {
     this.file = file;
     this.file = file;
     this.start = start;
     this.start = start;
     this.length = length;
     this.length = length;
+    this.conf = conf;
   }
   }
   
   
   /** @deprecated Call {@link #getPath()} instead. */
   /** @deprecated Call {@link #getPath()} instead. */
@@ -79,5 +80,13 @@ public class FileSplit implements Writable {
     length = in.readLong();
     length = in.readLong();
   }
   }
 
 
-
+  public String[] getLocations() throws IOException {
+    String[][] hints = file.getFileSystem(conf).
+                            getFileCacheHints(file, start, length);
+    if (hints != null && hints.length > 0) {
+      return hints[0];
+    }
+    return new String[]{};
+  }
+  
 }
 }

+ 8 - 12
src/java/org/apache/hadoop/mapred/InputFormat.java

@@ -33,13 +33,10 @@ public interface InputFormat {
    * Are the input directories valid? This method is used to test the input
    * Are the input directories valid? This method is used to test the input
    * directories when a job is submitted so that the framework can fail early
    * directories when a job is submitted so that the framework can fail early
    * with a useful error message when the input directory does not exist.
    * with a useful error message when the input directory does not exist.
-   * @param fileSys the file system to check for the directories
-   * @param inputDirs the list of input directories
-   * @return is each inputDir valid?
-   * @throws IOException
+   * @param job the job to check
+   * @throws InvalidInputException if the job does not have valid input
    */
    */
-  boolean[] areValidInputDirectories(FileSystem fileSys,
-                                     Path[] inputDirs) throws IOException;
+  void validateInput(JobConf job) throws IOException;
   
   
   /** Splits a set of input files.  One split is created per map task.
   /** Splits a set of input files.  One split is created per map task.
    *
    *
@@ -47,17 +44,16 @@ public interface InputFormat {
    * @param numSplits the desired number of splits
    * @param numSplits the desired number of splits
    * @return the splits
    * @return the splits
    */
    */
-  FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
-    throws IOException;
+  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
 
   /** Construct a {@link RecordReader} for a {@link FileSplit}.
   /** Construct a {@link RecordReader} for a {@link FileSplit}.
    *
    *
-   * @param split the {@link FileSplit}
+   * @param split the {@link InputSplit}
    * @param job the job that this split belongs to
    * @param job the job that this split belongs to
    * @return a {@link RecordReader}
    * @return a {@link RecordReader}
    */
    */
-  RecordReader getRecordReader(FileSystem ignored, FileSplit split,
-                               JobConf job, Reporter reporter)
-    throws IOException;
+  RecordReader getRecordReader(InputSplit split,
+                               JobConf job, 
+                               Reporter reporter) throws IOException;
 }
 }
 
 

+ 34 - 25
src/java/org/apache/hadoop/mapred/InputFormatBase.java

@@ -18,9 +18,11 @@
 
 
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.List;
 
 
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
@@ -52,8 +54,7 @@ public abstract class InputFormatBase implements InputFormat {
     return true;
     return true;
   }
   }
   
   
-  public abstract RecordReader getRecordReader(FileSystem fs,
-                                               FileSplit split,
+  public abstract RecordReader getRecordReader(InputSplit split,
                                                JobConf job,
                                                JobConf job,
                                                Reporter reporter)
                                                Reporter reporter)
     throws IOException;
     throws IOException;
@@ -66,15 +67,17 @@ public abstract class InputFormatBase implements InputFormat {
    * that is appended to all input dirs specified by job, and if the given fs
    * that is appended to all input dirs specified by job, and if the given fs
    * lists those too, each is added to the returned array of Path.
    * lists those too, each is added to the returned array of Path.
    *
    *
-   * @param fs
-   * @param job
-   * @return array of Path objects, never zero length.
+   * @param job the job to list input paths for
+   * @return array of Path objects
    * @throws IOException if zero items.
    * @throws IOException if zero items.
    */
    */
-  protected Path[] listPaths(FileSystem ignored, JobConf job)
+  protected Path[] listPaths(JobConf job)
     throws IOException {
     throws IOException {
-    Path[] dirs = job.getInputPaths();
     String subdir = job.get("mapred.input.subdir");
     String subdir = job.get("mapred.input.subdir");
+    Path[] dirs = job.getInputPaths();
+    if (dirs.length == 0) {
+      throw new IOException("No input directories specified in job");
+    }
     ArrayList result = new ArrayList();
     ArrayList result = new ArrayList();
     for (int i = 0; i < dirs.length; i++) {
     for (int i = 0; i < dirs.length; i++) {
       FileSystem fs = dirs[i].getFileSystem(job);
       FileSystem fs = dirs[i].getFileSystem(job);
@@ -96,30 +99,34 @@ public abstract class InputFormatBase implements InputFormat {
       }
       }
     }
     }
 
 
-    if (result.size() == 0) {
-      throw new IOException("No input directories specified in: "+job);
-    }
     return (Path[])result.toArray(new Path[result.size()]);
     return (Path[])result.toArray(new Path[result.size()]);
   }
   }
 
 
-  // NOTE: should really pass a Configuration here, not a FileSystem
-  public boolean[] areValidInputDirectories(FileSystem fs, Path[] inputDirs)
-    throws IOException {
-    boolean[] result = new boolean[inputDirs.length];
+  public void validateInput(JobConf job) throws IOException {
+    Path[] inputDirs = job.getInputPaths();
+    List<IOException> result = new ArrayList();
     for(int i=0; i < inputDirs.length; ++i) {
     for(int i=0; i < inputDirs.length; ++i) {
-      result[i] =
-        inputDirs[i].getFileSystem(fs.getConf()).isDirectory(inputDirs[i]);
+      FileSystem fs = inputDirs[i].getFileSystem(job);
+      if (!fs.exists(inputDirs[i])) {
+        result.add(new FileNotFoundException("Input directory " + 
+                                             inputDirs[i] + 
+                                             " doesn't exist."));
+      } else if (!fs.isDirectory(inputDirs[i])) {
+        result.add(new InvalidFileTypeException
+                     ("Invalid input path, expecting directory : " + 
+                      inputDirs[i]));
+      }
+    }
+    if (!result.isEmpty()) {
+      throw new InvalidInputException(result);
     }
     }
-    return result;
   }
   }
 
 
-  /** Splits files returned by {@link #listPaths(FileSystem,JobConf)} when
+  /** Splits files returned by {@link #listPaths(JobConf)} when
    * they're too big.*/ 
    * they're too big.*/ 
-  public FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
+  public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
     throws IOException {
-
-    Path[] files = listPaths(ignored, job);
-
+    Path[] files = listPaths(job);
     long totalSize = 0;                           // compute total size
     long totalSize = 0;                           // compute total size
     for (int i = 0; i < files.length; i++) {      // check we have valid files
     for (int i = 0; i < files.length; i++) {      // check we have valid files
       Path file = files[i];
       Path file = files[i];
@@ -145,16 +152,18 @@ public abstract class InputFormatBase implements InputFormat {
 
 
         long bytesRemaining = length;
         long bytesRemaining = length;
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
+          splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
+                                   job));
           bytesRemaining -= splitSize;
           bytesRemaining -= splitSize;
         }
         }
         
         
         if (bytesRemaining != 0) {
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+          splits.add(new FileSplit(file, length-bytesRemaining, 
+                                   bytesRemaining, job));
         }
         }
       } else {
       } else {
         if (length != 0) {
         if (length != 0) {
-          splits.add(new FileSplit(file, 0, length));
+          splits.add(new FileSplit(file, 0, length, job));
         }
         }
       }
       }
     }
     }

+ 36 - 0
src/java/org/apache/hadoop/mapred/InputSplit.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The description of the data for a single map task.
+ * @author Owen O'Malley
+ */
+public interface InputSplit extends Writable {
+
+  /**
+   * Get the list of hostnames where the input split is located.
+   * @return A list of prefered hostnames
+   * @throws IOException
+   */
+  String[] getLocations() throws IOException;
+}

+ 64 - 0
src/java/org/apache/hadoop/mapred/InvalidInputException.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Iterator;
+
+/**
+ * This class wraps a list of problems with the input, so that the user
+ * can get a list of problems together instead of finding and fixing them one 
+ * by one.
+ * @author Owen O'Malley
+ */
+public class InvalidInputException extends IOException {
+  private List<IOException> problems;
+  
+  /**
+   * Create the exception with the given list.
+   * @param probs the list of problems to report. this list is not copied.
+   */
+  public InvalidInputException(List<IOException> probs) {
+    problems = probs;
+  }
+  
+  /**
+   * Get the complete list of the problems reported.
+   * @return the list of problems, which must not be modified
+   */
+  public List<IOException> getProblems() {
+    return problems;
+  }
+  
+  /**
+   * Get a summary message of the problems found.
+   * @return the concatenated messages from all of the problems.
+   */
+  public String getMessage() {
+    StringBuffer result = new StringBuffer();
+    Iterator<IOException> itr = problems.iterator();
+    while(itr.hasNext()) {
+      result.append(itr.next().getMessage());
+      if (itr.hasNext()) {
+        result.append("\n");
+      }
+    }
+    return result.toString();
+  }
+}

+ 2 - 1
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -153,7 +153,8 @@ public class IsolationRunner {
     if (isMap) {
     if (isMap) {
       FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
       FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
                                       conf.getLong("map.input.start", 0),
                                       conf.getLong("map.input.start", 0),
-                                      conf.getLong("map.input.length", 0));
+                                      conf.getLong("map.input.length", 0),
+                                      conf);
       task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), 
       task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), 
           taskId, partition, split);
           taskId, partition, split);
     } else {
     } else {

+ 1 - 24
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -313,30 +313,7 @@ public class JobClient extends ToolBase implements MRConstants  {
         }
         }
 
 
         // input paths should exist. 
         // input paths should exist. 
-        boolean[] validDirs = 
-          job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs);
-        for(int i=0; i < validDirs.length; ++i) {
-          if (!validDirs[i]) {
-            String msg = null ; 
-            if( !userFileSys.exists(inputDirs[i]) ){
-              msg = "Input directory " + inputDirs[i] + 
-                         " doesn't exist in " + userFileSys.getName();
-              LOG.error(msg);
-              throw new FileNotFoundException(msg);
-            }else if( !userFileSys.isDirectory(inputDirs[i])){
-              msg = "Invalid input path, expecting directory : " + inputDirs[i] ;
-              LOG.error(msg); 
-              throw new InvalidFileTypeException(msg);  
-            }else{
-              // some other error
-              msg = "Input directory " + inputDirs[i] + 
-                           " in " + userFileSys.getName() + " is invalid.";
-              LOG.error(msg);
-              throw new IOException(msg);
-            }
-          }
-        }
-
+        job.getInputFormat().validateInput(job);
 
 
         // Check the output specification
         // Check the output specification
         job.getOutputFormat().checkOutputSpecs(fs, job);
         job.getOutputFormat().checkOutputSpecs(fs, job);

+ 10 - 16
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -122,7 +122,7 @@ class JobInProgress {
         }
         }
         InputFormat inputFormat = conf.getInputFormat();
         InputFormat inputFormat = conf.getInputFormat();
 
 
-        FileSplit[] splits = inputFormat.getSplits(fs, conf, numMapTasks);
+        InputSplit[] splits = inputFormat.getSplits(conf, numMapTasks);
 
 
         //
         //
         // sort splits by decreasing length, to reduce job's tail
         // sort splits by decreasing length, to reduce job's tail
@@ -168,22 +168,16 @@ class JobInProgress {
         // Obtain some tasktracker-cache information for the map task splits.
         // Obtain some tasktracker-cache information for the map task splits.
         //
         //
         for (int i = 0; i < maps.length; i++) {
         for (int i = 0; i < maps.length; i++) {
-            String hints[][] =
-              fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),
-                                   splits[i].getLength());
-
-            if (hints != null) {
-              for (int k = 0; k < hints.length; k++) {
-                for (int j = 0; j < hints[k].length; j++) {
-                  ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]);
-                  if (hostMaps == null) {
-                    hostMaps = new ArrayList();
-                    hostToMaps.put(hints[k][j], hostMaps);
-                  }
-                  hostMaps.add(maps[i]);
-                }
-              }
+          String hints[] = splits[i].getLocations();
+          for (int k = 0; k < hints.length; k++) {
+            ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k]);
+            if (hostMaps == null) {
+              hostMaps = new ArrayList();
+              hostToMaps.put(hints[k], hostMaps);
             }
             }
+            hostMaps.add(maps[i]);
+            
+          }
         }
         }
 
 
         this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
         this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);

+ 2 - 2
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -89,8 +89,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
     public void run() {
     public void run() {
       try {
       try {
         // split input into minimum number of splits
         // split input into minimum number of splits
-        FileSplit[] splits;
-        splits = job.getInputFormat().getSplits(fs, job, 1);
+        InputSplit[] splits;
+        splits = job.getInputFormat().getSplits(job, 1);
         String jobId = profile.getJobId();
         String jobId = profile.getJobId();
         
         
         // run a map task for each split
         // run a map task for each split

+ 19 - 13
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -86,14 +86,14 @@ class MapTask extends Task {
   
   
   private MapTaskMetrics myMetrics = null;
   private MapTaskMetrics myMetrics = null;
 
 
-  private FileSplit split;
+  private InputSplit split;
   private MapOutputFile mapOutputFile = new MapOutputFile();
   private MapOutputFile mapOutputFile = new MapOutputFile();
   private JobConf conf;
   private JobConf conf;
 
 
   public MapTask() {}
   public MapTask() {}
 
 
   public MapTask(String jobId, String jobFile, String tipId, String taskId, 
   public MapTask(String jobId, String jobFile, String tipId, String taskId, 
-                 int partition, FileSplit split) {
+                 int partition, InputSplit split) {
     super(jobId, jobFile, tipId, taskId, partition);
     super(jobId, jobFile, tipId, taskId, partition);
     this.split = split;
     this.split = split;
     myMetrics = new MapTaskMetrics(taskId);
     myMetrics = new MapTaskMetrics(taskId);
@@ -103,18 +103,25 @@ class MapTask extends Task {
       return true;
       return true;
   }
   }
 
 
-  public void localizeConfiguration(JobConf conf) {
+  public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
     super.localizeConfiguration(conf);
-    conf.set("map.input.file", split.getPath().toString());
-    conf.setLong("map.input.start", split.getStart());
-    conf.setLong("map.input.length", split.getLength());
+    Path localSplit = new Path(new Path(getJobFile()).getParent(), 
+                               "split.dta");
+    DataOutputStream out = LocalFileSystem.get(conf).create(localSplit);
+    split.write(out);
+    out.close();
+    if (split instanceof FileSplit) {
+      conf.set("map.input.file", ((FileSplit) split).getPath().toString());
+      conf.setLong("map.input.start", ((FileSplit) split).getStart());
+      conf.setLong("map.input.length", ((FileSplit) split).getLength());
+    }
   }
   }
   
   
   public TaskRunner createRunner(TaskTracker tracker) {
   public TaskRunner createRunner(TaskTracker tracker) {
     return new MapTaskRunner(this, tracker, this.conf);
     return new MapTaskRunner(this, tracker, this.conf);
   }
   }
 
 
-  public FileSplit getSplit() { return split; }
+  public InputSplit getSplit() { return split; }
 
 
   public void write(DataOutput out) throws IOException {
   public void write(DataOutput out) throws IOException {
     super.write(out);
     super.write(out);
@@ -139,11 +146,9 @@ class MapTask extends Task {
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
       
       
     final RecordReader rawIn =                  // open input
     final RecordReader rawIn =                  // open input
-      job.getInputFormat().getRecordReader
-      (FileSystem.get(job), split, job, reporter);
+      job.getInputFormat().getRecordReader(split, job, reporter);
 
 
     RecordReader in = new RecordReader() {      // wrap in progress reporter
     RecordReader in = new RecordReader() {      // wrap in progress reporter
-        private float perByte = 1.0f /(float)split.getLength();
 
 
         public WritableComparable createKey() {
         public WritableComparable createKey() {
           return rawIn.createKey();
           return rawIn.createKey();
@@ -156,9 +161,7 @@ class MapTask extends Task {
         public synchronized boolean next(Writable key, Writable value)
         public synchronized boolean next(Writable key, Writable value)
           throws IOException {
           throws IOException {
 
 
-          float progress =                        // compute progress
-            (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
-          reportProgress(umbilical, progress);
+          reportProgress(umbilical, getProgress());
           long beforePos = getPos();
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
           boolean ret = rawIn.next(key, value);
           myMetrics.mapInput(getPos() - beforePos);
           myMetrics.mapInput(getPos() - beforePos);
@@ -166,6 +169,9 @@ class MapTask extends Task {
         }
         }
         public long getPos() throws IOException { return rawIn.getPos(); }
         public long getPos() throws IOException { return rawIn.getPos(); }
         public void close() throws IOException { rawIn.close(); }
         public void close() throws IOException { rawIn.close(); }
+        public float getProgress() throws IOException {
+          return rawIn.getProgress();
+        }
       };
       };
 
 
     MapRunnable runner =
     MapRunnable runner =

+ 6 - 0
src/java/org/apache/hadoop/mapred/RecordReader.java

@@ -55,4 +55,10 @@ public interface RecordReader {
   /** Close this to future operations.*/ 
   /** Close this to future operations.*/ 
   public void close() throws IOException;
   public void close() throws IOException;
 
 
+  /**
+   * How far has the reader gone through the input.
+   * @return progress from 0.0 to 1.0
+   * @throws IOException
+   */
+  float getProgress() throws IOException;
 }
 }

+ 1 - 1
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -100,7 +100,7 @@ class ReduceTask extends Task {
   /**
   /**
    * Localize the given JobConf to be specific for this task.
    * Localize the given JobConf to be specific for this task.
    */
    */
-  public void localizeConfiguration(JobConf conf) {
+  public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
     super.localizeConfiguration(conf);
     conf.setNumMapTasks(numMaps);
     conf.setNumMapTasks(numMaps);
   }
   }

+ 2 - 3
src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java

@@ -53,19 +53,18 @@ public class SequenceFileInputFilter extends SequenceFileInputFormat {
     }
     }
     
     
     /** Create a record reader for the given split
     /** Create a record reader for the given split
-     * @param fs file system where the file split is stored
      * @param split file split
      * @param split file split
      * @param job job configuration
      * @param job job configuration
      * @param reporter reporter who sends report to task tracker
      * @param reporter reporter who sends report to task tracker
      * @return RecordReader
      * @return RecordReader
      */
      */
-    public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+    public RecordReader getRecordReader(InputSplit split,
             JobConf job, Reporter reporter)
             JobConf job, Reporter reporter)
     throws IOException {
     throws IOException {
         
         
         reporter.setStatus(split.toString());
         reporter.setStatus(split.toString());
         
         
-        return new FilterRecordReader(job, split);
+        return new FilterRecordReader(job, (FileSplit) split);
     }
     }
 
 
 
 

+ 5 - 5
src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java

@@ -33,26 +33,26 @@ public class SequenceFileInputFormat extends InputFormatBase {
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);
   }
   }
 
 
-  protected Path[] listPaths(FileSystem fs, JobConf job)
+  protected Path[] listPaths(JobConf job)
     throws IOException {
     throws IOException {
 
 
-    Path[] files = super.listPaths(fs, job);
+    Path[] files = super.listPaths(job);
     for (int i = 0; i < files.length; i++) {
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
       Path file = files[i];
-      if (fs.isDirectory(file)) {                 // it's a MapFile
+      if (file.getFileSystem(job).isDirectory(file)) {     // it's a MapFile
         files[i] = new Path(file, MapFile.DATA_FILE_NAME); // use the data file
         files[i] = new Path(file, MapFile.DATA_FILE_NAME); // use the data file
       }
       }
     }
     }
     return files;
     return files;
   }
   }
 
 
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+  public RecordReader getRecordReader(InputSplit split,
                                       JobConf job, Reporter reporter)
                                       JobConf job, Reporter reporter)
     throws IOException {
     throws IOException {
 
 
     reporter.setStatus(split.toString());
     reporter.setStatus(split.toString());
 
 
-    return new SequenceFileRecordReader(job, split);
+    return new SequenceFileRecordReader(job, (FileSplit) split);
   }
   }
 
 
 }
 }

+ 13 - 0
src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 /** An {@link RecordReader} for {@link SequenceFile}s. */
 /** An {@link RecordReader} for {@link SequenceFile}s. */
 public class SequenceFileRecordReader implements RecordReader {
 public class SequenceFileRecordReader implements RecordReader {
   private SequenceFile.Reader in;
   private SequenceFile.Reader in;
+  private long start;
   private long end;
   private long end;
   private boolean more = true;
   private boolean more = true;
   private Configuration conf;
   private Configuration conf;
@@ -95,6 +96,18 @@ public class SequenceFileRecordReader implements RecordReader {
       in.getCurrentValue(value);
       in.getCurrentValue(value);
   }
   }
   
   
+  /**
+   * Return the progress within the input split
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return (in.getPosition() - start) / (end - start);
+    }
+  }
+  
   public synchronized long getPos() throws IOException {
   public synchronized long getPos() throws IOException {
     return in.getPosition();
     return in.getPosition();
   }
   }

+ 1 - 1
src/java/org/apache/hadoop/mapred/Task.java

@@ -119,7 +119,7 @@ abstract class Task implements Writable, Configurable {
   /**
   /**
    * Localize the given JobConf to be specific for this task.
    * Localize the given JobConf to be specific for this task.
    */
    */
-  public void localizeConfiguration(JobConf conf) {
+  public void localizeConfiguration(JobConf conf) throws IOException {
     conf.set("mapred.tip.id", tipId); 
     conf.set("mapred.tip.id", tipId); 
     conf.set("mapred.task.id", taskId);
     conf.set("mapred.task.id", taskId);
     conf.setBoolean("mapred.task.is.map",isMapTask());
     conf.setBoolean("mapred.task.is.map",isMapTask());

+ 2 - 2
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -52,7 +52,7 @@ class TaskInProgress {
 
 
     // Defines the TIP
     // Defines the TIP
     private String jobFile = null;
     private String jobFile = null;
-    private FileSplit split = null;
+    private InputSplit split = null;
     private int numMaps;
     private int numMaps;
     private int partition;
     private int partition;
     private JobTracker jobtracker;
     private JobTracker jobtracker;
@@ -88,7 +88,7 @@ class TaskInProgress {
     /**
     /**
      * Constructor for MapTask
      * Constructor for MapTask
      */
      */
-    public TaskInProgress(String uniqueString, String jobFile, FileSplit split, 
+    public TaskInProgress(String uniqueString, String jobFile, InputSplit split, 
                           JobTracker jobtracker, JobConf conf, 
                           JobTracker jobtracker, JobConf conf, 
                           JobInProgress job, int partition) {
                           JobInProgress job, int partition) {
         this.jobFile = jobFile;
         this.jobFile = jobFile;

+ 17 - 3
src/java/org/apache/hadoop/mapred/TextInputFormat.java

@@ -40,6 +40,7 @@ public class TextInputFormat extends InputFormatBase implements JobConfigurable
   }
   }
   
   
   protected static class LineRecordReader implements RecordReader {
   protected static class LineRecordReader implements RecordReader {
+    private long start;
     private long pos;
     private long pos;
     private long end;
     private long end;
     private BufferedInputStream in;
     private BufferedInputStream in;
@@ -61,6 +62,7 @@ public class TextInputFormat extends InputFormatBase implements JobConfigurable
 
 
     public LineRecordReader(InputStream in, long offset, long endOffset) {
     public LineRecordReader(InputStream in, long offset, long endOffset) {
       this.in = new BufferedInputStream(in);
       this.in = new BufferedInputStream(in);
+      this.start = offset;
       this.pos = offset;
       this.pos = offset;
       this.end = endOffset;
       this.end = endOffset;
     }
     }
@@ -73,6 +75,17 @@ public class TextInputFormat extends InputFormatBase implements JobConfigurable
       return new Text();
       return new Text();
     }
     }
     
     
+    /**
+     * Get the progress within the split
+     */
+    public float getProgress() {
+      if (start == end) {
+        return 0.0f;
+      } else {
+        return (pos - start) / (end - start);
+      }
+    }
+    
     /** Read a line. */
     /** Read a line. */
     public synchronized boolean next(Writable key, Writable value)
     public synchronized boolean next(Writable key, Writable value)
       throws IOException {
       throws IOException {
@@ -101,18 +114,19 @@ public class TextInputFormat extends InputFormatBase implements JobConfigurable
 
 
   }
   }
   
   
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+  public RecordReader getRecordReader(InputSplit genericSplit,
                                       JobConf job, Reporter reporter)
                                       JobConf job, Reporter reporter)
     throws IOException {
     throws IOException {
 
 
-    reporter.setStatus(split.toString());
-
+    reporter.setStatus(genericSplit.toString());
+    FileSplit split = (FileSplit) genericSplit;
     long start = split.getStart();
     long start = split.getStart();
     long end = start + split.getLength();
     long end = start + split.getLength();
     final Path file = split.getPath();
     final Path file = split.getPath();
     final CompressionCodec codec = compressionCodecs.getCodec(file);
     final CompressionCodec codec = compressionCodecs.getCodec(file);
 
 
     // open the file and seek to the start of the split
     // open the file and seek to the start of the split
+    FileSystem fs = FileSystem.get(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
     FSDataInputStream fileIn = fs.open(split.getPath());
     InputStream in = fileIn;
     InputStream in = fileIn;
     
     

+ 2 - 2
src/test/org/apache/hadoop/mapred/EmptyInputFormat.java

@@ -31,7 +31,7 @@ public class EmptyInputFormat extends InputFormatBase {
     return new FileSplit[0];
     return new FileSplit[0];
   }
   }
 
 
-  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
-    return new SequenceFileRecordReader(job, split);
+  public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    return new SequenceFileRecordReader(job, (FileSplit) split);
   }
   }
 }
 }

+ 6 - 0
src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java

@@ -22,6 +22,8 @@ import java.io.IOException;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +37,8 @@ import org.apache.hadoop.mapred.lib.IdentityReducer;
  * A JUnit test to test Map-Reduce empty jobs Mini-DFS.
  * A JUnit test to test Map-Reduce empty jobs Mini-DFS.
  */
  */
 public class TestEmptyJobWithDFS extends TestCase {
 public class TestEmptyJobWithDFS extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
   
   
   /**
   /**
    * Simple method running a MapReduce job with no input data. Used
    * Simple method running a MapReduce job with no input data. Used
@@ -58,6 +62,7 @@ public class TestEmptyJobWithDFS extends TestCase {
       FileSystem fs = FileSystem.getNamed(fileSys, conf);
       FileSystem fs = FileSystem.getNamed(fileSys, conf);
       fs.delete(outDir);
       fs.delete(outDir);
       if (!fs.mkdirs(inDir)) {
       if (!fs.mkdirs(inDir)) {
+          LOG.warn("Can't create " + inDir);
           return false;
           return false;
       }
       }
 
 
@@ -88,6 +93,7 @@ public class TestEmptyJobWithDFS extends TestCase {
           }
           }
       }
       }
       // return job result
       // return job result
+      LOG.info("job is complete: " + runningJob.isSuccessful());
       return (runningJob.isSuccessful());
       return (runningJob.isSuccessful());
   }
   }
   
   

+ 3 - 2
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java

@@ -80,13 +80,14 @@ public class TestSequenceFileInputFilter extends TestCase {
         numSplits =
         numSplits =
             random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
             random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
       }
       }
-      FileSplit[] splits = format.getSplits(fs, job, numSplits);
+      InputSplit[] splits = format.getSplits(job, numSplits);
       
       
       // check each split
       // check each split
       int count = 0;
       int count = 0;
+      LOG.info("Generated " + splits.length + " splits.");
       for (int j = 0; j < splits.length; j++) {
       for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
           RecordReader reader =
-              format.getRecordReader(fs, splits[j], job, reporter);
+              format.getRecordReader(splits[j], job, reporter);
           try {
           try {
               while (reader.next(key, value)) {
               while (reader.next(key, value)) {
                   LOG.info("Accept record "+key.toString());
                   LOG.info("Accept record "+key.toString());

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java

@@ -83,14 +83,14 @@ public class TestSequenceFileInputFormat extends TestCase {
         int numSplits =
         int numSplits =
           random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
           random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
         //LOG.info("splitting: requesting = " + numSplits);
         //LOG.info("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
         //LOG.info("splitting: got =        " + splits.length);
         //LOG.info("splitting: got =        " + splits.length);
 
 
         // check each split
         // check each split
         BitSet bits = new BitSet(length);
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
         for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
           RecordReader reader =
-            format.getRecordReader(fs, splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, reporter);
           try {
           try {
             int count = 0;
             int count = 0;
             while (reader.next(key, value)) {
             while (reader.next(key, value)) {

+ 8 - 9
src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -88,16 +88,15 @@ public class TestTextInputFormat extends TestCase {
       for (int i = 0; i < 3; i++) {
       for (int i = 0; i < 3; i++) {
         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
         LOG.debug("splitting: requesting = " + numSplits);
         LOG.debug("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(localFs, job, numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
         LOG.debug("splitting: got =        " + splits.length);
         LOG.debug("splitting: got =        " + splits.length);
 
 
         // check each split
         // check each split
         BitSet bits = new BitSet(length);
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
         for (int j = 0; j < splits.length; j++) {
-          LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" +
-                   splits[j].getLength());
+          LOG.debug("split["+j+"]= " + splits[j]);
           RecordReader reader =
           RecordReader reader =
-            format.getRecordReader(localFs, splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, reporter);
           try {
           try {
             int count = 0;
             int count = 0;
             while (reader.next(key, value)) {
             while (reader.next(key, value)) {
@@ -186,10 +185,10 @@ public class TestTextInputFormat extends TestCase {
   private static final Reporter voidReporter = new VoidReporter();
   private static final Reporter voidReporter = new VoidReporter();
   
   
   private static List<Text> readSplit(InputFormat format, 
   private static List<Text> readSplit(InputFormat format, 
-                                      FileSplit split, 
+                                      InputSplit split, 
                                       JobConf job) throws IOException {
                                       JobConf job) throws IOException {
     List<Text> result = new ArrayList<Text>();
     List<Text> result = new ArrayList<Text>();
-    RecordReader reader = format.getRecordReader(localFs, split, job,
+    RecordReader reader = format.getRecordReader(split, job,
                                                  voidReporter);
                                                  voidReporter);
     LongWritable key = (LongWritable) reader.createKey();
     LongWritable key = (LongWritable) reader.createKey();
     Text value = (Text) reader.createValue();
     Text value = (Text) reader.createValue();
@@ -215,10 +214,10 @@ public class TestTextInputFormat extends TestCase {
     job.setInputPath(workDir);
     job.setInputPath(workDir);
     TextInputFormat format = new TextInputFormat();
     TextInputFormat format = new TextInputFormat();
     format.configure(job);
     format.configure(job);
-    FileSplit[] splits = format.getSplits(localFs, job, 100);
+    InputSplit[] splits = format.getSplits(job, 100);
     assertEquals("compressed splits == 2", 2, splits.length);
     assertEquals("compressed splits == 2", 2, splits.length);
-    if (splits[0].getPath().getName().equals("part2.txt.gz")) {
-      FileSplit tmp = splits[0];
+    FileSplit tmp = (FileSplit) splits[0];
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
       splits[0] = splits[1];
       splits[0] = splits[1];
       splits[1] = tmp;
       splits[1] = tmp;
     }
     }

+ 3 - 3
src/test/org/apache/hadoop/record/test/TestWritable.java

@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputFormatBase;
 import org.apache.hadoop.mapred.InputFormatBase;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
@@ -93,14 +93,14 @@ public class TestWritable extends TestCase {
         int numSplits =
         int numSplits =
           random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
           random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
         //LOG.info("splitting: requesting = " + numSplits);
         //LOG.info("splitting: requesting = " + numSplits);
-        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
         //LOG.info("splitting: got =        " + splits.length);
         //LOG.info("splitting: got =        " + splits.length);
 
 
         // check each split
         // check each split
         BitSet bits = new BitSet(length);
         BitSet bits = new BitSet(length);
         for (int j = 0; j < splits.length; j++) {
         for (int j = 0; j < splits.length; j++) {
           RecordReader reader =
           RecordReader reader =
-            format.getRecordReader(fs, splits[j], job, reporter);
+            format.getRecordReader(splits[j], job, reporter);
           try {
           try {
             int count = 0;
             int count = 0;
             while (reader.next(key, value)) {
             while (reader.next(key, value)) {