Parcourir la source

HADOOP-339. Add a method to the JobClient API listing jobs that are not yet complete. Contributed by Mahadev.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@420764 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting il y a 19 ans
Parent
commit
3c253cc2b5

+ 4 - 0
CHANGES.txt

@@ -10,6 +10,10 @@ Trunk (unreleased changes)
  2. HADOOP-313.  Permit task state to be saved so that single tasks
     may be manually re-executed when debugging.  (omalley via cutting)
 
+ 3. HADOOP-339.  Add method to JobClient API listing jobs that are
+    not yet complete, i.e., that are queued or running.
+    (Mahadev Konar via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

+ 4 - 0
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -314,6 +314,10 @@ public class JobClient extends ToolBase implements MRConstants  {
       return jobSubmitClient.getClusterStatus();
     }
     
+    public JobStatus[] jobsToComplete() throws IOException {
+	return jobSubmitClient.jobsToComplete();
+    }
+    
     /** Utility that submits a job, then polls for progress until the job is
      * complete. */
     public static void runJob(JobConf job) throws IOException {

+ 45 - 5
src/java/org/apache/hadoop/mapred/JobStatus.java

@@ -26,7 +26,7 @@ import java.io.*;
  *
  * @author Mike Cafarella
  **************************************************/
-class JobStatus implements Writable {
+public class JobStatus implements Writable {
 
     static {                                      // register a ctor
       WritableFactories.setFactory
@@ -45,13 +45,18 @@ class JobStatus implements Writable {
     float mapProgress;
     float reduceProgress;
     int runState;
-
+    long startTime;
     /**
      */
     public JobStatus() {
     }
 
     /**
+     * Create a job status object for a given jobid.
+     * @param jobid The jobid of the job
+     * @param mapProgress The progress made on the maps
+     * @param reduceProgress The progress made on the reduces
+     * @param runState The current state of the job
      */
     public JobStatus(String jobid, float mapProgress, float reduceProgress, int runState) {
         this.jobid = jobid;
@@ -61,14 +66,47 @@ class JobStatus implements Writable {
     }
 
     /**
+     * @return The jobid of the Job
      */
     public String getJobId() { return jobid; }
+    
+    /**
+     * @return Percentage of progress in maps 
+     */
     public float mapProgress() { return mapProgress; }
-    public void setMapProgress(float p) { this.mapProgress = p; }
+    
+    /**
+     * Sets the map progress of this job
+     * @param p The value of map progress to set to
+     */
+    void setMapProgress(float p) { this.mapProgress = p; }
+    
+    /**
+     * @return Percentage of progress in reduce 
+     */
     public float reduceProgress() { return reduceProgress; }
-    public void setReduceProgress(float p) { this.reduceProgress = p; }
+    
+    /**
+     * Sets the reduce progress of this Job
+     * @param p The value of reduce progress to set to
+     */
+    void setReduceProgress(float p) { this.reduceProgress = p; }
+    
+    /**
+     * @return running state of the job
+     */
     public int getRunState() { return runState; }
-
+    
+    /** 
+     * Set the start time of the job
+     * @param startTime The startTime of the job
+     */
+    void setStartTime(long startTime) { this.startTime = startTime;};
+    
+    /**
+     * @return start time of the job
+     */
+    public long getStartTime() { return startTime;};
     ///////////////////////////////////////
     // Writable
     ///////////////////////////////////////
@@ -77,11 +115,13 @@ class JobStatus implements Writable {
         out.writeFloat(mapProgress);
         out.writeFloat(reduceProgress);
         out.writeInt(runState);
+        out.writeLong(startTime);
     }
     public void readFields(DataInput in) throws IOException {
         this.jobid = UTF8.readString(in);
         this.mapProgress = in.readFloat();
         this.reduceProgress = in.readFloat();
         this.runState = in.readInt();
+        this.startTime = in.readLong();
     }
 }

+ 8 - 0
src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -17,6 +17,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.*;
+import java.util.*;
 
 /** 
  * Protocol that a JobClient and the central JobTracker use to communicate.  The
@@ -64,4 +65,11 @@ interface JobSubmissionProtocol {
      * prior to submitting the job.
      */
     public String getFilesystemName() throws IOException;
+
+    /** 
+     * Get the jobs that are not completed and not failed
+     * @return array of JobStatus for the running/to-be-run
+     * jobs.
+     */
+    public JobStatus[] jobsToComplete() throws IOException;
 }

+ 14 - 0
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -1038,6 +1038,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       return (String) taskidToTrackerMap.get(taskId);
     }
     
+    public JobStatus[] jobsToComplete() {
+        Vector v = new Vector();
+        for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
+            JobInProgress jip = (JobInProgress) it.next();
+            JobStatus status = jip.getStatus();
+            if (status.getRunState() == JobStatus.RUNNING 
+		|| status.getRunState() == JobStatus.PREP) {
+		status.setStartTime(jip.getStartTime());
+                v.add(status);
+            }
+        }
+        return (JobStatus[]) v.toArray(new JobStatus[v.size()]);
+    } 
+    
     ///////////////////////////////////////////////////////////////
     // JobTracker methods
     ///////////////////////////////////////////////////////////////

+ 2 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -210,4 +210,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public ClusterStatus getClusterStatus() {
     return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
   }
+
+  public JobStatus[] jobsToComplete() {return null;};
 }