Jelajahi Sumber

HADOOP-1368. Fix inconsistent synchronization in JobInProgress. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@539093 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 tahun lalu
induk
melakukan
dd8996239c

+ 3 - 0
CHANGES.txt

@@ -422,6 +422,9 @@ Branch 0.13 (unreleased changes)
 118. HADOOP-1363.  Fix locking bug in JobClient#waitForCompletion().
 118. HADOOP-1363.  Fix locking bug in JobClient#waitForCompletion().
      (omalley via cutting)
      (omalley via cutting)
 
 
+119. HADOOP-1368.  Fix inconsistent synchronization in JobInProgress.
+     (omalley via cutting)
+
 
 
 Release 0.12.3 - 2007-04-06
 Release 0.12.3 - 2007-04-06
 
 

+ 11 - 7
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -220,7 +220,9 @@ class JobInProgress {
       // Finished time need to be setted here to prevent this job to be retired
       // Finished time need to be setted here to prevent this job to be retired
       // from the job tracker jobs at the next retire iteration.
       // from the job tracker jobs at the next retire iteration.
       this.finishTime = System.currentTimeMillis();
       this.finishTime = System.currentTimeMillis();
-      this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);
+      status.setMapProgress(1.0f);
+      status.setReduceProgress(1.0f);
+      status.setRunState(JobStatus.SUCCEEDED);
       tasksInited = true;
       tasksInited = true;
 
 
       // Special case because the Job is not queued
       // Special case because the Job is not queued
@@ -263,7 +265,7 @@ class JobInProgress {
   public int desiredMaps() {
   public int desiredMaps() {
     return numMapTasks;
     return numMapTasks;
   }
   }
-  public int finishedMaps() {
+  public synchronized int finishedMaps() {
     return finishedMapTasks;
     return finishedMapTasks;
   }
   }
   public int desiredReduces() {
   public int desiredReduces() {
@@ -275,7 +277,7 @@ class JobInProgress {
   public synchronized int runningReduces() {
   public synchronized int runningReduces() {
     return runningReduceTasks;
     return runningReduceTasks;
   }
   }
-  public int finishedReduces() {
+  public synchronized int finishedReduces() {
     return finishedReduceTasks;
     return finishedReduceTasks;
   }
   }
  
  
@@ -485,8 +487,9 @@ class JobInProgress {
   /**
   /**
    * Return a MapTask, if appropriate, to run on the given tasktracker
    * Return a MapTask, if appropriate, to run on the given tasktracker
    */
    */
-  public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize
-                               ) throws IOException {
+  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
+                                            int clusterSize
+                                           ) throws IOException {
     if (!tasksInited) {
     if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobId());
       LOG.info("Cannot create task split for " + profile.getJobId());
       return null;
       return null;
@@ -513,8 +516,9 @@ class JobInProgress {
    * We don't have cache-sensitivity for reduce tasks, as they
    * We don't have cache-sensitivity for reduce tasks, as they
    *  work on temporary MapRed files.  
    *  work on temporary MapRed files.  
    */
    */
-  public Task obtainNewReduceTask(TaskTrackerStatus tts,
-                                  int clusterSize) throws IOException {
+  public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
+                                               int clusterSize
+                                              ) throws IOException {
     if (!tasksInited) {
     if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobId());
       LOG.info("Cannot create task split for " + profile.getJobId());
       return null;
       return null;

+ 10 - 13
src/java/org/apache/hadoop/mapred/JobStatus.java

@@ -30,8 +30,6 @@ import org.apache.hadoop.io.WritableFactory;
  * Describes the current status of a job.  This is
  * Describes the current status of a job.  This is
  * not intended to be a comprehensive piece of data.
  * not intended to be a comprehensive piece of data.
  * For that, look at JobProfile.
  * For that, look at JobProfile.
- *
- * @author Mike Cafarella
  **************************************************/
  **************************************************/
 public class JobStatus implements Writable {
 public class JobStatus implements Writable {
 
 
@@ -83,39 +81,38 @@ public class JobStatus implements Writable {
   /**
   /**
    * @return Percentage of progress in maps 
    * @return Percentage of progress in maps 
    */
    */
-  public float mapProgress() { return mapProgress; }
+  public synchronized float mapProgress() { return mapProgress; }
     
     
   /**
   /**
    * Sets the map progress of this job
    * Sets the map progress of this job
    * @param p The value of map progress to set to
    * @param p The value of map progress to set to
    */
    */
-  void setMapProgress(float p) { 
+  synchronized void setMapProgress(float p) { 
     this.mapProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
     this.mapProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
-    
   }
   }
     
     
   /**
   /**
    * @return Percentage of progress in reduce 
    * @return Percentage of progress in reduce 
    */
    */
-  public float reduceProgress() { return reduceProgress; }
+  public synchronized float reduceProgress() { return reduceProgress; }
     
     
   /**
   /**
    * Sets the reduce progress of this Job
    * Sets the reduce progress of this Job
    * @param p The value of reduce progress to set to
    * @param p The value of reduce progress to set to
    */
    */
-  void setReduceProgress(float p) { 
+  synchronized void setReduceProgress(float p) { 
     this.reduceProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
     this.reduceProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
   }
   }
     
     
   /**
   /**
    * @return running state of the job
    * @return running state of the job
    */
    */
-  public int getRunState() { return runState; }
+  public synchronized int getRunState() { return runState; }
     
     
   /**
   /**
    * Change the current run state of the job.
    * Change the current run state of the job.
    */
    */
-  public void setRunState(int state) {
+  public synchronized void setRunState(int state) {
     this.runState = state;
     this.runState = state;
   }
   }
 
 
@@ -123,22 +120,22 @@ public class JobStatus implements Writable {
    * Set the start time of the job
    * Set the start time of the job
    * @param startTime The startTime of the job
    * @param startTime The startTime of the job
    */
    */
-  void setStartTime(long startTime) { this.startTime = startTime;};
+  synchronized void setStartTime(long startTime) { this.startTime = startTime;}
     
     
   /**
   /**
    * @return start time of the job
    * @return start time of the job
    */
    */
-  public long getStartTime() { return startTime;};
+  synchronized public long getStartTime() { return startTime;}
 
 
   /**
   /**
    * @param user The username of the job
    * @param user The username of the job
    */
    */
-  void setUsername(String userName) { this.user = userName;};
+  synchronized void setUsername(String userName) { this.user = userName;}
 
 
   /**
   /**
    * @return the username of the job
    * @return the username of the job
    */
    */
-  public String getUsername() { return this.user;};
+  public synchronized String getUsername() { return this.user;}
     
     
   ///////////////////////////////////////
   ///////////////////////////////////////
   // Writable
   // Writable