浏览代码

HADOOP-867. Move split creation out of JobTracker. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@510190 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
0534028c00

+ 5 - 0
CHANGES.txt

@@ -81,6 +81,11 @@ Trunk (unreleased changes)
 24. HADOOP-1017.  Cache constructors, for improved performance.
     (Ron Bodkin via cutting)
 
+25. HADOOP-867.  Move split creation out of JobTracker to client.
+    Splits are now saved in a separate file, read by task processes
+    directly, so that user code is no longer required in the
+    JobTracker.  (omalley via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

+ 7 - 0
src/java/org/apache/hadoop/mapred/InputSplit.java

@@ -27,6 +27,13 @@ import org.apache.hadoop.io.Writable;
  */
 public interface InputSplit extends Writable {
 
+  /**
+   * Get the number of input bytes in the split.
+   * @return the number of bytes in the input split
+   * @throws IOException
+   */
+  long getLength() throws IOException;
+  
   /**
    * Get the list of hostnames where the input split is located.
    * @return A list of prefered hostnames

+ 11 - 5
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
@@ -26,7 +27,9 @@ import java.util.List;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 
 public class IsolationRunner {
   private static final Log LOG = 
@@ -152,12 +155,15 @@ public class IsolationRunner {
     
     Task task;
     if (isMap) {
-      FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
-                                      conf.getLong("map.input.start", 0),
-                                      conf.getLong("map.input.length", 0),
-                                      conf);
+      Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), 
+                                 "split.dta");
+      DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
+      String splitClass = Text.readString(splitFile);
+      BytesWritable split = new BytesWritable();
+      split.readFields(splitFile);
+      splitFile.close();
       task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), 
-          taskId, partition, split);
+                         taskId, partition, splitClass, split);
     } else {
       int numMaps = conf.getNumMapTasks();
       fillInMissingMapOutputs(local, taskId, numMaps, conf);

+ 174 - 13
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
@@ -38,7 +39,7 @@ import java.util.*;
  *******************************************************/
 public class JobClient extends ToolBase implements MRConstants  {
     private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
-    public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL };
+    public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL }
     private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
 
     static long MAX_JOBPROFILE_AGE = 1000 * 2;
@@ -259,7 +260,10 @@ public class JobClient extends ToolBase implements MRConstants  {
         Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
         Path submitJobFile = new Path(submitJobDir, "job.xml");
         Path submitJarFile = new Path(submitJobDir, "job.jar");
+        Path submitSplitFile = new Path(submitJobDir, "job.split");
+        
         FileSystem fs = getFs();
+        LOG.debug("default FileSystem: " + fs.getUri());
         // try getting the md5 of the archives
         URI[] tarchives = DistributedCache.getCacheArchives(job);
         URI[] tfiles = DistributedCache.getCacheFiles(job);
@@ -317,8 +321,42 @@ public class JobClient extends ToolBase implements MRConstants  {
         // Check the output specification
         job.getOutputFormat().checkOutputSpecs(fs, job);
 
+        // Create the splits for the job
+        LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+        InputSplit[] splits = 
+          job.getInputFormat().getSplits(job, job.getNumMapTasks());
+        // sort the splits into order based on size, so that the biggest
+        // go first
+        Arrays.sort(splits, new Comparator() {
+          public int compare(Object a, Object b) {
+            try {
+              long left = ((InputSplit) a).getLength();
+              long right = ((InputSplit) b).getLength();
+              if (left == right) {
+                return 0;
+              } else if (left < right) {
+                return 1;
+              } else {
+                return -1;
+              }
+            } catch (IOException ie) {
+              throw new RuntimeException("Problem getting input split size",
+                                         ie);
+            }
+          }
+        });
+        // write the splits to a file for the job tracker
+        FSDataOutputStream out = fs.create(submitSplitFile);
+        try {
+          writeSplitsFile(splits, out);
+        } finally {
+          out.close();
+        }
+        job.set("mapred.job.split.file", submitSplitFile.toString());
+        job.setNumMapTasks(splits.length);
+        
         // Write job file to JobTracker's fs        
-        FSDataOutputStream out = fs.create(submitJobFile, replication);
+        out = fs.create(submitJobFile, replication);
         try {
           job.write(out);
         } finally {
@@ -336,6 +374,108 @@ public class JobClient extends ToolBase implements MRConstants  {
         }
     }
 
+    static class RawSplit implements Writable {
+      private String splitClass;
+      private BytesWritable bytes = new BytesWritable();
+      private String[] locations;
+      
+      public void setBytes(byte[] data, int offset, int length) {
+        bytes.set(data, offset, length);
+      }
+
+      public void setClassName(String className) {
+        splitClass = className;
+      }
+      
+      public String getClassName() {
+        return splitClass;
+      }
+      
+      public BytesWritable getBytes() {
+        return bytes;
+      }
+      
+      public void setLocations(String[] locations) {
+        this.locations = locations;
+      }
+      
+      public String[] getLocations() {
+        return locations;
+      }
+      
+      public void readFields(DataInput in) throws IOException {
+        splitClass = Text.readString(in);
+        bytes.readFields(in);
+        int len = WritableUtils.readVInt(in);
+        locations = new String[len];
+        for(int i=0; i < len; ++i) {
+          locations[i] = Text.readString(in);
+        }
+      }
+      
+      public void write(DataOutput out) throws IOException {
+        Text.writeString(out, splitClass);
+        bytes.write(out);
+        WritableUtils.writeVInt(out, locations.length);
+        for(int i = 0; i < locations.length; i++) {
+          Text.writeString(out, locations[i]);
+        }        
+      }
+    }
+    
+    private static final int CURRENT_SPLIT_FILE_VERSION = 0;
+    private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
+    
+    /** Create the list of input splits and write them out in a file for
+     *the JobTracker. The format is:
+     * <format version>
+     * <numSplits>
+     * for each split:
+     *    <RawSplit>
+     * @param splits the input splits to write out
+     * @param out the stream to write to
+     */
+    private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
+      out.write(SPLIT_FILE_HEADER);
+      WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
+      WritableUtils.writeVInt(out, splits.length);
+      DataOutputBuffer buffer = new DataOutputBuffer();
+      RawSplit rawSplit = new RawSplit();
+      for(InputSplit split: splits) {
+        rawSplit.setClassName(split.getClass().getName());
+        buffer.reset();
+        split.write(buffer);
+        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+        rawSplit.setLocations(split.getLocations());
+        rawSplit.write(out);
+      }
+    }
+
+    /**
+     * Read a splits file into a list of raw splits
+     * @param in the stream to read from
+     * @return the complete list of splits
+     * @throws IOException
+     */
+    static RawSplit[] readSplitFile(DataInput in) throws IOException {
+      byte[] header = new byte[SPLIT_FILE_HEADER.length];
+      in.readFully(header);
+      if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
+        throw new IOException("Invalid header on split file");
+      }
+      int vers = WritableUtils.readVInt(in);
+      if (vers != CURRENT_SPLIT_FILE_VERSION) {
+        throw new IOException("Unsupported split version " + vers);
+      }
+      int len = WritableUtils.readVInt(in);
+      RawSplit[] result = new RawSplit[len];
+      for(int i=0; i < len; ++i) {
+        result[i] = new RawSplit();
+        result[i].readFields(in);
+      }
+      return result;
+    }
+    
     /**
      * Get an RunningJob object to track an ongoing job.  Returns
      * null if the id does not correspond to any known job.
@@ -384,15 +524,13 @@ public class JobClient extends ToolBase implements MRConstants  {
       String lastReport = null;
       final int MAX_RETRIES = 5;
       int retries = MAX_RETRIES;
-      String outputFilterName = job.get("jobclient.output.filter", "FAILED");
-
-      if (null != outputFilterName) {
-        try {
-          jc.setTaskOutputFilter(TaskStatusFilter.valueOf(outputFilterName));
-        } catch(IllegalArgumentException e) {
-          LOG.warn("Invalid Output filter : " + outputFilterName + 
-              " Valid values are : NONE, FAILED, SUCCEEDED, ALL"); 
-        }
+      TaskStatusFilter filter;
+      try {
+        filter = getTaskOutputFilter(job);
+      } catch(IllegalArgumentException e) {
+        LOG.warn("Invalid Output filter : " + e.getMessage() + 
+        " Valid values are : NONE, FAILED, SUCCEEDED, ALL");
+        throw e;
       }
       try {
         running = jc.submitJob(job);
@@ -418,12 +556,12 @@ public class JobClient extends ToolBase implements MRConstants  {
               lastReport = report;
             }
             
-            if( jc.getTaskOutputFilter()  != TaskStatusFilter.NONE){
+            if( filter  != TaskStatusFilter.NONE){
               TaskCompletionEvent[] events = 
                 running.getTaskCompletionEvents(eventCounter); 
               eventCounter += events.length ;
               for(TaskCompletionEvent event : events ){
-                switch( jc.getTaskOutputFilter() ){
+                switch( filter ){
                 case SUCCEEDED:
                   if( event.getTaskStatus() == 
                     TaskCompletionEvent.Status.SUCCEEDED){
@@ -524,13 +662,36 @@ public class JobClient extends ToolBase implements MRConstants  {
      * output matches the filter. 
      * @param newValue task filter.
      */
+    @Deprecated
     public void setTaskOutputFilter(TaskStatusFilter newValue){
       this.taskOutputFilter = newValue ;
     }
+    
+    /**
+     * Get the task output filter out of the JobConf
+     * @param job the JobConf to examine
+     * @return the filter level
+     */
+    public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
+      return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
+                                              "FAILED"));
+    }
+    
+    /**
+     * Modify the JobConf to set the task output filter
+     * @param job the JobConf to modify
+     * @param newValue the value to set
+     */
+    public static void setTaskOutputFilter(JobConf job, 
+                                           TaskStatusFilter newValue) {
+      job.set("jobclient.output.filter", newValue.toString());
+    }
+    
     /**
      * Returns task output filter.
      * @return task filter. 
      */
+    @Deprecated
     public TaskStatusFilter getTaskOutputFilter(){
       return this.taskOutputFilter; 
     }

+ 30 - 49
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -21,6 +21,9 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 import org.apache.hadoop.mapred.JobHistory.Values ; 
 import java.io.*;
@@ -52,7 +55,7 @@ class JobInProgress {
     int failedMapTasks = 0 ; 
     int failedReduceTasks = 0 ; 
     JobTracker jobtracker = null;
-    HashMap hostToMaps = new HashMap();
+    Map<String,List<TaskInProgress>> hostToMaps = new HashMap();
     private int taskCompletionEventTracker = 0 ; 
     List<TaskCompletionEvent> taskCompletionEvents ; 
 
@@ -114,35 +117,35 @@ class JobInProgress {
         }
 
         //
-        // construct input splits
+        // read input splits and create a map per a split
         //
         String jobFile = profile.getJobFile();
 
         FileSystem fs = FileSystem.get(conf);
-        if (localJarFile != null) {
-            ClassLoader loader =
-              new URLClassLoader(new URL[]{ localFs.pathToFile(localJarFile).toURL() });
-            conf.setClassLoader(loader);
+        DataInputStream splitFile =
+          fs.open(new Path(conf.get("mapred.job.split.file")));
+        JobClient.RawSplit[] splits;
+        try {
+          splits = JobClient.readSplitFile(splitFile);
+        } finally {
+          splitFile.close();
         }
-        InputFormat inputFormat = conf.getInputFormat();
-
-        InputSplit[] splits = inputFormat.getSplits(conf, numMapTasks);
-
-        //
-        // sort splits by decreasing length, to reduce job's tail
-        //
-        Arrays.sort(splits, new Comparator() {
-            public int compare(Object a, Object b) {
-                long diff =
-                    ((FileSplit)b).getLength() - ((FileSplit)a).getLength();
-                return diff==0 ? 0 : (diff > 0 ? 1 : -1);
+        numMapTasks = splits.length;
+        maps = new TaskInProgress[numMapTasks];
+        for(int i=0; i < numMapTasks; ++i) {
+          maps[i] = new TaskInProgress(uniqueString, jobFile, 
+                                       splits[i].getClassName(),
+                                       splits[i].getBytes(), 
+                                       jobtracker, conf, this, i);
+          for(String host: splits[i].getLocations()) {
+            List<TaskInProgress> hostMaps = hostToMaps.get(host);
+            if (hostMaps == null) {
+              hostMaps = new ArrayList();
+              hostToMaps.put(host, hostMaps);
             }
-        });
-
-        //
-        // adjust number of map tasks to actual number of splits
-        //
-        this.numMapTasks = splits.length;
+            hostMaps.add(maps[i]);              
+          }
+        }
         
         // if no split is returned, job is considered completed and successful
         if (numMapTasks == 0) {
@@ -154,13 +157,6 @@ class JobInProgress {
             return;
         }
         
-        // create a map task for each split
-        this.maps = new TaskInProgress[numMapTasks];
-        for (int i = 0; i < numMapTasks; i++) {
-            maps[i] = new TaskInProgress(uniqueString, jobFile, splits[i], 
-                                         jobtracker, conf, this, i);
-        }
-
         //
         // Create reduce tasks
         //
@@ -171,22 +167,6 @@ class JobInProgress {
                                             jobtracker, conf, this);
         }
 
-        //
-        // Obtain some tasktracker-cache information for the map task splits.
-        //
-        for (int i = 0; i < maps.length; i++) {
-          String hints[] = splits[i].getLocations();
-          for (int k = 0; k < hints.length; k++) {
-            ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k]);
-            if (hostMaps == null) {
-              hostMaps = new ArrayList();
-              hostToMaps.put(hints[k], hostMaps);
-            }
-            hostMaps.add(maps[i]);
-            
-          }
-        }
-
         this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
         tasksInited = true;
         
@@ -342,7 +322,8 @@ class JobInProgress {
     /**
      * Return a MapTask, if appropriate, to run on the given tasktracker
      */
-    public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) {
+    public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize
+                                 ) throws IOException {
       if (! tasksInited) {
         LOG.info("Cannot create task split for " + profile.getJobId());
         return null;
@@ -370,7 +351,7 @@ class JobInProgress {
      *  work on temporary MapRed files.  
      */
     public Task obtainNewReduceTask(TaskTrackerStatus tts,
-                                    int clusterSize) {
+                                    int clusterSize) throws IOException {
         if (! tasksInited) {
             LOG.info("Cannot create task split for " + profile.getJobId());
             return null;

+ 2 - 1
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -1143,7 +1143,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
      * and incorporate knowledge of DFS file placement.  But for right now, it
      * just grabs a single item out of the pending task list and hands it back.
      */
-    private synchronized Task getNewTaskForTaskTracker(String taskTracker) {
+    private synchronized Task getNewTaskForTaskTracker(String taskTracker
+                                                       ) throws IOException {
         //
         // Compute average map and reduce task numbers across pool
         //

+ 8 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -25,6 +25,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
@@ -95,12 +96,18 @@ class LocalJobRunner implements JobSubmissionProtocol {
         
         // run a map task for each split
         job.setNumReduceTasks(1);                 // force a single reduce task
+        DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
           String mapId = "map_" + newId() ; 
           mapIds.add(mapId);
+          buffer.reset();
+          splits[i].write(buffer);
+          BytesWritable split = new BytesWritable();
+          split.set(buffer.getData(), 0, buffer.getLength());
           MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, 
                                     mapId, i,
-                                    splits[i]);
+                                    splits[i].getClass().getName(),
+                                    split);
           JobConf localConf = new JobConf(job);
           map.localizeConfiguration(localConf);
           map.setConf(localConf);

+ 37 - 26
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -44,17 +44,15 @@ import org.apache.hadoop.metrics.Updater;
 
 /** A Map task. */
 class MapTask extends Task {
+  
+  private MapTaskMetrics myMetrics = null;
 
-    public static final Log LOG =
-        LogFactory.getLog("org.apache.hadoop.mapred.MapTask");
+  private BytesWritable split = new BytesWritable();
+  private String splitClass;
+  private MapOutputFile mapOutputFile = new MapOutputFile();
+  private JobConf conf;
 
-  static {                                        // register a ctor
-    WritableFactories.setFactory
-      (MapTask.class,
-       new WritableFactory() {
-         public Writable newInstance() { return new MapTask(); }
-       });
-  }
+  private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
 
   {   // set phase for this task
     setPhase(TaskStatus.Phase.MAP); 
@@ -83,19 +81,15 @@ class MapTask extends Task {
     }
     
   }
-  
-  private MapTaskMetrics myMetrics = null;
-
-  private InputSplit split;
-  private MapOutputFile mapOutputFile = new MapOutputFile();
-  private JobConf conf;
 
   public MapTask() {}
 
   public MapTask(String jobId, String jobFile, String tipId, String taskId, 
-                 int partition, InputSplit split) {
+                 int partition, String splitClass, BytesWritable split
+                 ) throws IOException {
     super(jobId, jobFile, tipId, taskId, partition);
-    this.split = split;
+    this.splitClass = splitClass;
+    this.split.set(split);
   }
 
   public boolean isMapTask() {
@@ -107,30 +101,25 @@ class MapTask extends Task {
     Path localSplit = new Path(new Path(getJobFile()).getParent(), 
                                "split.dta");
     DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
+    Text.writeString(out, splitClass);
     split.write(out);
     out.close();
-    if (split instanceof FileSplit) {
-      conf.set("map.input.file", ((FileSplit) split).getPath().toString());
-      conf.setLong("map.input.start", ((FileSplit) split).getStart());
-      conf.setLong("map.input.length", ((FileSplit) split).getLength());
-    }
   }
   
   public TaskRunner createRunner(TaskTracker tracker) {
     return new MapTaskRunner(this, tracker, this.conf);
   }
 
-  public InputSplit getSplit() { return split; }
-
   public void write(DataOutput out) throws IOException {
     super.write(out);
+    Text.writeString(out, splitClass);
     split.write(out);
     
   }
+  
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-
-    split = new FileSplit();
+    splitClass = Text.readString(in);
     split.readFields(in);
     if (myMetrics == null) {
         myMetrics = new MapTaskMetrics("unknown");
@@ -144,6 +133,28 @@ class MapTask extends Task {
     Reporter reporter = getReporter(umbilical, getProgress());
 
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
+    
+    // reinstantiate the split
+    InputSplit split;
+    try {
+      split = (InputSplit) 
+         ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
+    } catch (ClassNotFoundException exp) {
+      IOException wrap = new IOException("Split class " + splitClass + 
+                                         " not found");
+      wrap.initCause(exp);
+      throw wrap;
+    }
+    DataInputBuffer splitBuffer = new DataInputBuffer();
+    splitBuffer.reset(this.split.get(), 0, this.split.getSize());
+    split.readFields(splitBuffer);
+    
+    // if it is a file split, we can give more details
+    if (split instanceof FileSplit) {
+      job.set("map.input.file", ((FileSplit) split).getPath().toString());
+      job.setLong("map.input.start", ((FileSplit) split).getStart());
+      job.setLong("map.input.length", ((FileSplit) split).getLength());
+    }
       
     final RecordReader rawIn =                  // open input
       job.getInputFormat().getRecordReader(split, job, reporter);

+ 1 - 1
src/java/org/apache/hadoop/mapred/Task.java

@@ -150,7 +150,7 @@ abstract class Task implements Writable, Configurable {
 
   public Progress getProgress() { return taskProgress; }
 
-  public Reporter getReporter(final TaskUmbilicalProtocol umbilical,
+  protected Reporter getReporter(final TaskUmbilicalProtocol umbilical,
                               final Progress progress) throws IOException {
     return new Reporter() {
         public void setStatus(String status) throws IOException {

+ 10 - 4
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -18,8 +18,10 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.*;
+import org.apache.hadoop.io.BytesWritable;
 
 import java.text.NumberFormat;
+import java.io.IOException;
 import java.util.*;
 
 
@@ -52,7 +54,8 @@ class TaskInProgress {
 
     // Defines the TIP
     private String jobFile = null;
-    private InputSplit split = null;
+    private String splitClass = null;
+    private BytesWritable split = null;
     private int numMaps;
     private int partition;
     private JobTracker jobtracker;
@@ -93,10 +96,12 @@ class TaskInProgress {
     /**
      * Constructor for MapTask
      */
-    public TaskInProgress(String uniqueString, String jobFile, InputSplit split, 
+    public TaskInProgress(String uniqueString, String jobFile, 
+                          String splitClass, BytesWritable split, 
                           JobTracker jobtracker, JobConf conf, 
                           JobInProgress job, int partition) {
         this.jobFile = jobFile;
+        this.splitClass = splitClass;
         this.split = split;
         this.jobtracker = jobtracker;
         this.job = job;
@@ -501,7 +506,7 @@ class TaskInProgress {
     /**
      * Return a Task that can be sent to a TaskTracker for execution.
      */
-    public Task getTaskToRun(String taskTracker) {
+    public Task getTaskToRun(String taskTracker) throws IOException {
         Task t = null;
         if( 0 == execStartTime ){
           // assume task starts running now
@@ -522,7 +527,8 @@ class TaskInProgress {
         String jobId = job.getProfile().getJobId();
 
         if (isMapTask()) {
-          t = new MapTask(jobId, jobFile, this.id, taskid, partition, split);
+          t = new MapTask(jobId, jobFile, this.id, taskid, partition, 
+                          splitClass, split);
         } else {
           t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
         }

+ 24 - 9
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -913,8 +913,8 @@ public class TaskTracker
         volatile TaskStatus.State runstate;
         long lastProgressReport;
         StringBuffer diagnosticInfo = new StringBuffer();
-        TaskRunner runner;
-        boolean done = false;
+        private TaskRunner runner;
+        volatile boolean done = false;
         boolean wasKilled = false;
         private JobConf defaultJobConf;
         private JobConf localJobConf;
@@ -1226,7 +1226,9 @@ public class TaskTracker
             }
             synchronized (this) {
               try {
-                runner.close();
+                if (runner != null) {
+                  runner.close();
+                }
                 defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
                                                 JOBCACHE + Path.SEPARATOR + 
                                                 task.getJobId() + 
@@ -1398,6 +1400,7 @@ public class TaskTracker
             
           Task task = umbilical.getTask(taskid);
           JobConf job = new JobConf(task.getJobFile());
+          task.setConf(job);
           
           defaultConf.addFinalResource(new Path(task.getJobFile()));
 
@@ -1468,16 +1471,28 @@ public class TaskTracker
      * job tracker in the next heartbeat cycle.
      * @return a copy of the list of TaskStatus objects
      */
-    synchronized List getRunningTaskStatuses() {
-      List result = new ArrayList(runningTasks.size());
-      Iterator itr = runningTasks.values().iterator();
-      while (itr.hasNext()) {
-        TaskInProgress tip = (TaskInProgress) itr.next();
+    synchronized List<TaskStatus> getRunningTaskStatuses() {
+      List<TaskStatus> result = new ArrayList(runningTasks.size());
+      for(TaskInProgress tip: runningTasks.values()) {
         result.add(tip.createStatus());
       }
       return result;
     }
-    
+
+    /**
+     * Get the list of stored tasks on this task tracker.
+     * @return
+     */
+    synchronized List<TaskStatus> getNonRunningTasks() {
+      List<TaskStatus> result = new ArrayList(tasks.size());
+      for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) {
+        if (!runningTasks.containsKey(task.getKey())) {
+          result.add(task.getValue().createStatus());
+        }
+      }
+      return result;
+    }
+
     /**
      * Get the default job conf for this tracker.
      */

+ 2 - 0
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -216,6 +216,8 @@ public class MiniMRCluster {
       // this timeout controls the minimum time for the test, so
       // set it down at 1 seconds.
       result.setInt("ipc.client.timeout", 1000);
+      // for debugging have all task output sent to the test output
+      JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
       return result;
     }
     

+ 0 - 7
src/test/org/apache/hadoop/mapred/PiEstimator.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -44,12 +43,6 @@ public class PiEstimator {
   
   public static class PiMapper extends MapReduceBase implements Mapper {
     
-    /** Mapper configuration.
-     *
-     */
-    public void configure(JobConf job) {
-    }
-    
     static Random r = new Random();
     
     /** Map method.

+ 198 - 2
src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

@@ -18,8 +18,18 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.File;
+import java.util.Iterator;
 import junit.framework.TestCase;
 
 /**
@@ -49,8 +59,9 @@ public class TestMiniMRLocalFS extends TestCase {
                                                 TEST_ROOT_DIR + "/wc/output", 
                                                 TEST_ROOT_DIR + "/cachedir",
                                                 job,
-                                                "The quick brown fox\nhas many silly\n"
-                                                    + "red fox sox\n");
+                                                "The quick brown fox\n" 
+                                                + "has many silly\n"
+                                                + "red fox sox\n");
           // assert the number of lines read during caching
           assertTrue("Failed test archives not matching", ret);
           // test the task report fetchers
@@ -59,8 +70,193 @@ public class TestMiniMRLocalFS extends TestCase {
           assertEquals("number of maps", 10, reports.length);
           reports = client.getReduceTaskReports("job_0001");
           assertEquals("number of reduces", 1, reports.length);
+          runCustomFormats(mr);
       } finally {
           if (mr != null) { mr.shutdown(); }
       }
   }
+  
+  private void runCustomFormats(MiniMRCluster mr) throws IOException {
+    JobConf job = mr.createJobConf();
+    FileSystem fileSys = FileSystem.get(job);
+    Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
+    Path outDir = new Path(testDir, "out");
+    System.out.println("testDir= " + testDir);
+    fileSys.delete(testDir);
+    
+    job.setInputFormat(MyInputFormat.class);
+    job.setOutputFormat(MyOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    
+    job.setMapperClass(MyMapper.class);        
+    job.setReducerClass(MyReducer.class);
+    job.setNumMapTasks(100);
+    job.setNumReduceTasks(1);
+    // explicitly do not use "normal" job.setOutputPath to make sure
+    // that it is not hardcoded anywhere in the framework.
+    job.set("non.std.out", outDir.toString());
+    try {
+      JobClient.runJob(job);
+      String result = 
+        TestMiniMRWithDFS.readOutput(outDir, job);
+      assertEquals("output", ("aunt annie\t1\n" +
+                              "bumble boat\t4\n" +
+                              "crocodile pants\t0\n" +
+                              "duck-dog\t5\n"+
+                              "eggs\t2\n" + 
+                              "finagle the agent\t3\n"), result);
+    } finally {
+      fileSys.delete(testDir);
+    }
+    
+  }
+  
+  private static class MyInputFormat implements InputFormat {
+    static final String[] data = new String[]{
+                                              "crocodile pants", 
+                                              "aunt annie", 
+                                              "eggs",
+                                              "finagle the agent",
+                                              "bumble boat", 
+                                              "duck-dog",
+                                              };
+
+    private static class MySplit implements InputSplit {
+      int first;
+      int length;
+
+      public MySplit() { }
+
+      public MySplit(int first, int length) {
+        this.first = first;
+        this.length = length;
+      }
+
+      public String[] getLocations() {
+        return new String[0];
+      }
+
+      public long getLength() {
+        return length;
+      }
+
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVInt(out, first);
+        WritableUtils.writeVInt(out, length);
+      }
+
+      public void readFields(DataInput in) throws IOException {
+        first = WritableUtils.readVInt(in);
+        length = WritableUtils.readVInt(in);
+      }
+    }
+
+    static class MyRecordReader implements RecordReader {
+      int index;
+      int past;
+      int length;
+      
+      MyRecordReader(int index, int length) {
+        this.index = index;
+        this.past = index + length;
+        this.length = length;
+      }
+
+      public boolean next(Writable key, Writable value) throws IOException {
+        if (index < past) {
+          ((IntWritable) key).set(index);
+          ((Text) value).set(data[index]);
+          index += 1;
+          return true;
+        }
+        return false;
+      }
+      
+      public WritableComparable createKey() {
+        return new IntWritable();
+      }
+      
+      public Writable createValue() {
+        return new Text();
+      }
+
+      public long getPos() throws IOException {
+        return index;
+      }
+
+      public void close() throws IOException {}
+
+      public float getProgress() throws IOException {
+        return 1.0f - (past-index)/length;
+      }
+    }
+    
+    public void validateInput(JobConf job) throws IOException {
+    }
+    
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      return new MySplit[]{new MySplit(0,1), new MySplit(1,3),
+                           new MySplit(4,2)};
+    }
+
+    public RecordReader getRecordReader(InputSplit split,
+                                        JobConf job, 
+                                        Reporter reporter) throws IOException {
+      MySplit sp = (MySplit) split;
+      return new MyRecordReader(sp.first, sp.length);
+    }
+    
+  }
+  
+  static class MyMapper extends MapReduceBase implements Mapper {
+    public void map(WritableComparable key, Writable value, 
+                    OutputCollector out, Reporter reporter) throws IOException {
+      System.out.println("map: " + key + ", " + value);
+      out.collect((WritableComparable) value, key);
+    }
+  }
+
+  static class MyReducer extends MapReduceBase implements Reducer {
+    public void reduce(WritableComparable key, Iterator values, 
+                       OutputCollector output, Reporter reporter
+                       ) throws IOException {
+      while (values.hasNext()) {
+        Writable value = (Writable) values.next();
+        System.out.println("reduce: " + key + ", " + value);
+        output.collect(key, value);
+      }
+    }
+  }
+
+  static class MyOutputFormat implements OutputFormat {
+    static class MyRecordWriter implements RecordWriter {
+      private DataOutputStream out;
+      
+      public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
+        out = outputFile.getFileSystem(job).create(outputFile);
+      }
+      
+      public void write(WritableComparable key, 
+                        Writable value) throws IOException {
+        out.writeBytes(key.toString() + "\t" + value.toString() + "\n");
+      }
+
+      public void close(Reporter reporter) throws IOException { 
+        out.close();
+      }
+    }
+    
+    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
+                                        String name,
+                                        Progressable progress
+                                        ) throws IOException {
+      return new MyRecordWriter(new Path(job.get("non.std.out")), job);
+    }
+
+    public void checkOutputSpecs(FileSystem ignored, 
+                                 JobConf job) throws IOException {
+    }
+  }
 }

+ 11 - 0
src/webapps/task/tasktracker.jsp

@@ -46,6 +46,17 @@
 </table>
 </center>
 
+<h2>Non-Running Tasks</h2>
+<table border=2 cellpadding="5" cellspacing="2">
+<tr><td align="center">Task Attempts</td><td>Status</td>
+  <%
+    for(TaskStatus status: tracker.getNonRunningTasks()) {
+      out.print("<tr><td>" + status.getTaskId() + "</td>");
+      out.print("<td>" + status.getRunState() + "</td></tr>\n");
+    }
+  %>
+</table>
+
 <h2>Local Logs</h2>
 <a href="/logs/">Log</a> directory