1
0
فهرست منبع

HADOOP-1400. Make JobClient retry requests, so that clients survive jobtracker problems. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@556746 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 سال پیش
والد
کامیت
25f0358b03

+ 3 - 0
CHANGES.txt

@@ -367,6 +367,9 @@ Trunk (unreleased changes)
      This reduces the namenode's memory requirements and increases
      This reduces the namenode's memory requirements and increases
      data integrity.  (Raghu Angadi via cutting)
      data integrity.  (Raghu Angadi via cutting)
 
 
+115. HADOOP-1400.  Make JobClient retry requests, so that clients can
+     survive jobtracker problems.  (omalley via cutting)
+
 
 
 Release 0.13.0 - 2007-06-08
 Release 0.13.0 - 2007-06-08
 
 

+ 49 - 22
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -21,6 +21,7 @@ import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
@@ -201,32 +202,52 @@ public class JobClient extends ToolBase implements MRConstants  {
   public JobClient() {
   public JobClient() {
   }
   }
     
     
-  public JobClient(Configuration conf) throws IOException {
+  public JobClient(JobConf conf) throws IOException {
     setConf(conf);
     setConf(conf);
-    init();
+    init(conf);
   }
   }
     
     
-  public void init() throws IOException {
+  public void init(JobConf conf) throws IOException {
     String tracker = conf.get("mapred.job.tracker", "local");
     String tracker = conf.get("mapred.job.tracker", "local");
     if ("local".equals(tracker)) {
     if ("local".equals(tracker)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
     } else {
-      this.jobSubmitClient = (JobSubmissionProtocol) 
-        RPC.getProxy(JobSubmissionProtocol.class,
-                     JobSubmissionProtocol.versionID,
-                     JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
     }        
     }        
   }
   }
-  
+
   /**
   /**
-   * Build a job client, connect to the indicated job tracker.
+   * Create a proxy JobSubmissionProtocol that retries timeouts.
+   * @param addr the address to connect to
+   * @param conf the server's configuration
+   * @return a proxy object that will retry timeouts
+   * @throws IOException
    */
    */
-  public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
-    this.jobSubmitClient = (JobSubmissionProtocol) 
+  private JobSubmissionProtocol createProxy(InetSocketAddress addr,
+                                            Configuration conf
+                                            ) throws IOException {
+    JobSubmissionProtocol raw = (JobSubmissionProtocol) 
       RPC.getProxy(JobSubmissionProtocol.class,
       RPC.getProxy(JobSubmissionProtocol.class,
-                   JobSubmissionProtocol.versionID, jobTrackAddr, conf);
+                   JobSubmissionProtocol.versionID, addr, conf);
+    RetryPolicy backoffPolicy =
+      RetryPolicies.retryUpToMaximumCountWithProportionalSleep
+      (5, 10, java.util.concurrent.TimeUnit.SECONDS);
+    Map<Class<? extends Exception>, RetryPolicy> handlers = 
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    handlers.put(SocketTimeoutException.class, backoffPolicy);
+    RetryPolicy backoffTimeOuts = 
+      RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
+    return (JobSubmissionProtocol)
+      RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
   }
   }
 
 
+  /**
+   * Build a job client, connect to the indicated job tracker.
+   */
+  public JobClient(InetSocketAddress jobTrackAddr, 
+                   Configuration conf) throws IOException {
+    jobSubmitClient = createProxy(jobTrackAddr, conf);
+  }
 
 
   /**
   /**
    */
    */
@@ -270,15 +291,15 @@ public class JobClient extends ToolBase implements MRConstants  {
     //
     //
 
 
     // Create a number of filenames in the JobTracker's fs namespace
     // Create a number of filenames in the JobTracker's fs namespace
-    Path submitJobDir = new Path(job.getSystemDir(), "submit_" + 
-                                 Integer.toString(r.nextInt(Integer.MAX_VALUE), 
-                                                  36));
+    String jobId = jobSubmitClient.getNewJobId();
+    Path submitJobDir = new Path(job.getSystemDir(), jobId);
+    FileSystem fs = getFs();
+    LOG.debug("default FileSystem: " + fs.getUri());
+    fs.delete(submitJobDir);    
     Path submitJobFile = new Path(submitJobDir, "job.xml");
     Path submitJobFile = new Path(submitJobDir, "job.xml");
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitSplitFile = new Path(submitJobDir, "job.split");
     Path submitSplitFile = new Path(submitJobDir, "job.split");
         
         
-    FileSystem fs = getFs();
-    LOG.debug("default FileSystem: " + fs.getUri());
     // try getting the md5 of the archives
     // try getting the md5 of the archives
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     URI[] tfiles = DistributedCache.getCacheFiles(job);
     URI[] tfiles = DistributedCache.getCacheFiles(job);
@@ -379,7 +400,7 @@ public class JobClient extends ToolBase implements MRConstants  {
     //
     //
     // Now, actually submit the job (using the submit name)
     // Now, actually submit the job (using the submit name)
     //
     //
-    JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString());
+    JobStatus status = jobSubmitClient.submitJob(jobId);
     if (status != null) {
     if (status != null) {
       return new NetworkedJob(status);
       return new NetworkedJob(status);
     } else {
     } else {
@@ -723,9 +744,6 @@ public class JobClient extends ToolBase implements MRConstants  {
       throw new RuntimeException("JobClient:" + cmd);
       throw new RuntimeException("JobClient:" + cmd);
     }
     }
 
 
-    // initialize JobClient
-    init();
-        
     // Process args
     // Process args
     String submitJobFile = null;
     String submitJobFile = null;
     String jobid = null;
     String jobid = null;
@@ -751,11 +769,20 @@ public class JobClient extends ToolBase implements MRConstants  {
       }
       }
     }
     }
 
 
+    // initialize JobClient
+    JobConf conf = null;
+    if (submitJobFile != null) {
+      conf = new JobConf(submitJobFile);
+    } else {
+      conf = new JobConf();
+    }
+    init(conf);
+        
     // Submit the request
     // Submit the request
     int exitCode = -1;
     int exitCode = -1;
     try {
     try {
       if (submitJobFile != null) {
       if (submitJobFile != null) {
-        RunningJob job = submitJob(submitJobFile);
+        RunningJob job = submitJob(conf);
         System.out.println("Created job " + job.getJobID());
         System.out.println("Created job " + job.getJobID());
       } else if (getStatus) {
       } else if (getStatus) {
         RunningJob job = getJob(jobid);
         RunningJob job = getJob(jobid);

+ 17 - 17
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -29,7 +29,6 @@ import java.util.Vector;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
@@ -115,28 +114,28 @@ class JobInProgress {
    * Create a JobInProgress with the given job file, plus a handle
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
    * to the tracker.
    */
    */
-  public JobInProgress(String jobFile, JobTracker jobtracker, 
-                       Configuration default_conf) throws IOException {
-    jobId = jobtracker.getTrackerIdentifier() + "_" +jobtracker.createJobId();
-    String fullJobId = "job_" + jobId;
+  public JobInProgress(String jobid, JobTracker jobtracker, 
+                       JobConf default_conf) throws IOException {
+    this.jobId = jobid;
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + fullJobId;
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
     this.jobtracker = jobtracker;
     this.jobtracker = jobtracker;
-    this.status = new JobStatus(fullJobId, 0.0f, 0.0f, JobStatus.PREP);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
     this.startTime = System.currentTimeMillis();
     this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf);
     this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf);
 
 
     JobConf default_job_conf = new JobConf(default_conf);
     JobConf default_job_conf = new JobConf(default_conf);
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
-                                                      +"/"+fullJobId + ".xml");
+                                                      +"/"+jobid + ".xml");
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
-                                                      +"/"+ fullJobId + ".jar");
+                                                      +"/"+ jobid + ".jar");
     FileSystem fs = FileSystem.get(default_conf);
     FileSystem fs = FileSystem.get(default_conf);
-    fs.copyToLocalFile(new Path(jobFile), localJobFile);
+    Path jobFile = new Path(default_conf.getSystemDir(), jobid + "/job.xml");
+    fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
     this.priority = conf.getJobPriority();
-    this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url,
-                                  conf.getJobName());
+    this.profile = new JobProfile(conf.getUser(), jobid, 
+                                  jobFile.toString(), url, jobid);
     String jarFile = conf.getJar();
     String jarFile = conf.getJar();
     if (jarFile != null) {
     if (jarFile != null) {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
@@ -151,15 +150,16 @@ class JobInProgress {
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
         
-    JobHistory.JobInfo.logSubmitted(fullJobId, conf.getJobName(), conf.getUser(), 
-                                    System.currentTimeMillis(), jobFile); 
+    JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
+                                    System.currentTimeMillis(), 
+                                    jobFile.toString()); 
         
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", fullJobId);
+    this.jobMetrics.setTag("jobId", jobid);
   }
   }
 
 
   /**
   /**
@@ -1076,8 +1076,8 @@ class JobInProgress {
         
         
       // Delete temp dfs dirs created if any, like in case of 
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       // speculative exn of reduces.  
-      String tempDir = conf.get("mapred.system.dir") + "/job_" + jobId; 
-      fs.delete(new Path(tempDir)); 
+      Path tempDir = new Path(conf.getSystemDir(), jobId); 
+      fs.delete(tempDir); 
 
 
     } catch (IOException e) {
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);
       LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);

+ 10 - 1
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -33,11 +33,20 @@ public interface JobSubmissionProtocol extends VersionedProtocol {
    *changed
    *changed
    */
    */
   public static final long versionID = 3L;
   public static final long versionID = 3L;
+
+  /**
+   * Allocate a name for the job.
+   * @return a unique job name for submitting jobs.
+   * @throws IOException
+   */
+  public String getNewJobId() throws IOException;
+
   /**
   /**
    * Submit a Job for execution.  Returns the latest profile for
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
    * that job.
+   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
    */
    */
-  public JobStatus submitJob(String jobFile) throws IOException;
+  public JobStatus submitJob(String jobName) throws IOException;
 
 
   /**
   /**
    * Get the current status of the cluster
    * Get the current status of the cluster

+ 22 - 11
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -99,7 +99,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    * @param conf configuration for the JobTracker.
    * @param conf configuration for the JobTracker.
    * @throws IOException
    * @throws IOException
    */
    */
-  public static void startTracker(Configuration conf) throws IOException {
+  public static void startTracker(JobConf conf) throws IOException {
     if (tracker != null)
     if (tracker != null)
       throw new IOException("JobTracker already running.");
       throw new IOException("JobTracker already running.");
     runTracker = true;
     runTracker = true;
@@ -604,12 +604,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   static final String SUBDIR = "jobTracker";
   static final String SUBDIR = "jobTracker";
   FileSystem fs;
   FileSystem fs;
   Path systemDir;
   Path systemDir;
-  private Configuration conf;
+  private JobConf conf;
 
 
   /**
   /**
    * Start the JobTracker process, listen on the indicated port
    * Start the JobTracker process, listen on the indicated port
    */
    */
-  JobTracker(Configuration conf) throws IOException {
+  JobTracker(JobConf conf) throws IOException {
     //
     //
     // Grab some static constants
     // Grab some static constants
     //
     //
@@ -1441,9 +1441,27 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     LOG.warn("Report from " + taskTracker + ": " + errorMessage);        
     LOG.warn("Report from " + taskTracker + ": " + errorMessage);        
   }
   }
 
 
+  /**
+   * Remove the job_ from jobids to get the unique string.
+   */
+  static String getJobUniqueString(String jobid) {
+    return jobid.substring(4);
+  }
+
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
   // JobSubmissionProtocol
   // JobSubmissionProtocol
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
+
+  /**
+   * Allocates a new JobId string.
+   */
+  public String getNewJobId() {
+    synchronized (this) {
+      return "job_" + getTrackerIdentifier() + "_" + 
+             idFormat.format(nextJobId++);
+    }
+  }
+
   /**
   /**
    * JobTracker.submitJob() kicks off a new job.  
    * JobTracker.submitJob() kicks off a new job.  
    *
    *
@@ -1677,12 +1695,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   public JobInProgress getJob(String jobid) {
   public JobInProgress getJob(String jobid) {
     return jobs.get(jobid);
     return jobs.get(jobid);
   }
   }
-  /**
-   * Grab random num for job id
-   */
-  String createJobId() {
-    return idFormat.format(nextJobId++);
-  }
 
 
   ////////////////////////////////////////////////////
   ////////////////////////////////////////////////////
   // Methods to track all the TaskTrackers
   // Methods to track all the TaskTrackers
@@ -1773,8 +1785,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
     }
       
       
     try {
     try {
-      Configuration conf=new Configuration();
-      startTracker(conf);
+      startTracker(new JobConf());
     } catch (Throwable e) {
     } catch (Throwable e) {
       LOG.fatal(StringUtils.stringifyException(e));
       LOG.fatal(StringUtils.stringifyException(e));
       System.exit(-1);
       System.exit(-1);

+ 20 - 15
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -26,7 +26,6 @@ import java.util.Random;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
@@ -39,7 +38,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
 
   private FileSystem fs;
   private FileSystem fs;
   private HashMap<String, Job> jobs = new HashMap<String, Job>();
   private HashMap<String, Job> jobs = new HashMap<String, Job>();
-  private Configuration conf;
+  private JobConf conf;
   private int map_tasks = 0;
   private int map_tasks = 0;
   private int reduce_tasks = 0;
   private int reduce_tasks = 0;
 
 
@@ -51,7 +50,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
   
   
   private class Job extends Thread
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
     implements TaskUmbilicalProtocol {
-    private String file;
+    private Path file;
     private String id;
     private String id;
     private JobConf job;
     private JobConf job;
     private Random random = new Random();
     private Random random = new Random();
@@ -74,18 +73,18 @@ class LocalJobRunner implements JobSubmissionProtocol {
       return TaskUmbilicalProtocol.versionID;
       return TaskUmbilicalProtocol.versionID;
     }
     }
     
     
-    public Job(String file, Configuration conf) throws IOException {
-      this.file = file;
-      this.id = "job_" + newId();
+    public Job(String jobid, JobConf conf) throws IOException {
+      this.file = new Path(conf.getSystemDir(), jobid + "/job.xml");
+      this.id = jobid;
       this.mapoutputFile = new MapOutputFile();
       this.mapoutputFile = new MapOutputFile();
       this.mapoutputFile.setConf(conf);
       this.mapoutputFile.setConf(conf);
 
 
       this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");
       this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
       this.localFs = FileSystem.getLocal(conf);
 
 
-      fs.copyToLocalFile(new Path(file), localFile);
+      fs.copyToLocalFile(file, localFile);
       this.job = new JobConf(localFile);
       this.job = new JobConf(localFile);
-      profile = new JobProfile(job.getUser(), id, file, 
+      profile = new JobProfile(job.getUser(), id, file.toString(), 
                                "http://localhost:8080/", job.getJobName());
                                "http://localhost:8080/", job.getJobName());
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
 
 
@@ -119,7 +118,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
           splits[i].write(buffer);
           splits[i].write(buffer);
           BytesWritable split = new BytesWritable();
           BytesWritable split = new BytesWritable();
           split.set(buffer.getData(), 0, buffer.getLength());
           split.set(buffer.getData(), 0, buffer.getLength());
-          MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, 
+          MapTask map = new MapTask(jobId, file.toString(), "tip_m_" + mapId, 
                                     mapId, i,
                                     mapId, i,
                                     splits[i].getClass().getName(),
                                     splits[i].getClass().getName(),
                                     split);
                                     split);
@@ -152,8 +151,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
             }
             }
 
 
             {
             {
-              ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
-                  reduceId, 0, mapIds.size());
+              ReduceTask reduce = new ReduceTask(jobId, file.toString(), 
+                                                 "tip_r_0001",
+                                                 reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
               JobConf localConf = new JobConf(job);
               reduce.localizeConfiguration(localConf);
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce.setConf(localConf);
@@ -187,7 +187,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
 
       } finally {
       } finally {
         try {
         try {
-          fs.delete(new Path(file).getParent());  // delete submit dir
+          fs.delete(file.getParent());  // delete submit dir
           localFs.delete(localFile);              // delete local copy
           localFs.delete(localFile);              // delete local copy
         } catch (IOException e) {
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
           LOG.warn("Error cleaning up "+id+": "+e);
@@ -258,7 +258,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     
     
   }
   }
 
 
-  public LocalJobRunner(Configuration conf) throws IOException {
+  public LocalJobRunner(JobConf conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.fs = FileSystem.get(conf);
     this.conf = conf;
     this.conf = conf;
     myMetrics = new JobTrackerMetrics(new JobConf(conf));
     myMetrics = new JobTrackerMetrics(new JobConf(conf));
@@ -266,8 +266,13 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
 
   // JobSubmissionProtocol methods
   // JobSubmissionProtocol methods
 
 
-  public JobStatus submitJob(String jobFile) throws IOException {
-    return new Job(jobFile, this.conf).status;
+  private int jobid = 0;
+  public String getNewJobId() {
+    return "job_local_" + Integer.toString(++jobid);
+  }
+
+  public JobStatus submitJob(String jobid) throws IOException {
+    return new Job(jobid, this.conf).status;
   }
   }
 
 
   public void killJob(String id) {
   public void killJob(String id) {

+ 4 - 4
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -114,7 +114,7 @@ class TaskInProgress {
   /**
   /**
    * Constructor for MapTask
    * Constructor for MapTask
    */
    */
-  public TaskInProgress(String uniqueString, String jobFile, 
+  public TaskInProgress(String jobid, String jobFile, 
                         String splitClass, BytesWritable split, 
                         String splitClass, BytesWritable split, 
                         JobTracker jobtracker, JobConf conf, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition) {
                         JobInProgress job, int partition) {
@@ -126,13 +126,13 @@ class TaskInProgress {
     this.conf = conf;
     this.conf = conf;
     this.partition = partition;
     this.partition = partition;
     setMaxTaskAttempts();
     setMaxTaskAttempts();
-    init(uniqueString);
+    init(JobTracker.getJobUniqueString(jobid));
   }
   }
         
         
   /**
   /**
    * Constructor for ReduceTask
    * Constructor for ReduceTask
    */
    */
-  public TaskInProgress(String uniqueString, String jobFile, 
+  public TaskInProgress(String jobid, String jobFile, 
                         int numMaps, 
                         int numMaps, 
                         int partition, JobTracker jobtracker, JobConf conf,
                         int partition, JobTracker jobtracker, JobConf conf,
                         JobInProgress job) {
                         JobInProgress job) {
@@ -143,7 +143,7 @@ class TaskInProgress {
     this.job = job;
     this.job = job;
     this.conf = conf;
     this.conf = conf;
     setMaxTaskAttempts();
     setMaxTaskAttempts();
-    init(uniqueString);
+    init(JobTracker.getJobUniqueString(jobid));
   }
   }
   /**
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
    * Set the max number of attempts before we declare a TIP as "failed"