Browse Source

Merge -r 539088:539093 from trunk to 0.13 branch. Fixes: HADOOP-1356, HADOOP-1363, and HADOOP-1368.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/branches/branch-0.13@539095 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
41aea9046c

+ 8 - 0
CHANGES.txt

@@ -390,6 +390,14 @@ Branch 0.13 (unreleased changes)
 116. HADOOP-1358.  Fix a potential bug when DFSClient calls skipBytes.
      (Hairong Kuang via cutting)
 
+117. HADOOP-1356.  Fix a bug in ValueHistogram.  (Runping Qi via cutting)
+
+118. HADOOP-1363.  Fix locking bug in JobClient#waitForCompletion().
+     (omalley via cutting)
+
+119. HADOOP-1368.  Fix inconsistent synchronization in JobInProgress.
+     (omalley via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 1 - 1
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -137,7 +137,7 @@ public class JobClient extends ToolBase implements MRConstants  {
     /**
      * Blocks until the job is finished
      */
-    public synchronized void waitForCompletion() throws IOException {
+    public void waitForCompletion() throws IOException {
       while (!isComplete()) {
         try {
           Thread.sleep(5000);

+ 11 - 7
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -220,7 +220,9 @@ class JobInProgress {
       // Finished time need to be setted here to prevent this job to be retired
       // from the job tracker jobs at the next retire iteration.
       this.finishTime = System.currentTimeMillis();
-      this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);
+      status.setMapProgress(1.0f);
+      status.setReduceProgress(1.0f);
+      status.setRunState(JobStatus.SUCCEEDED);
       tasksInited = true;
 
       // Special case because the Job is not queued
@@ -263,7 +265,7 @@ class JobInProgress {
   public int desiredMaps() {
     return numMapTasks;
   }
-  public int finishedMaps() {
+  public synchronized int finishedMaps() {
     return finishedMapTasks;
   }
   public int desiredReduces() {
@@ -275,7 +277,7 @@ class JobInProgress {
   public synchronized int runningReduces() {
     return runningReduceTasks;
   }
-  public int finishedReduces() {
+  public synchronized int finishedReduces() {
     return finishedReduceTasks;
   }
  
@@ -485,8 +487,9 @@ class JobInProgress {
   /**
    * Return a MapTask, if appropriate, to run on the given tasktracker
    */
-  public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize
-                               ) throws IOException {
+  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
+                                            int clusterSize
+                                           ) throws IOException {
     if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobId());
       return null;
@@ -513,8 +516,9 @@ class JobInProgress {
    * We don't have cache-sensitivity for reduce tasks, as they
    *  work on temporary MapRed files.  
    */
-  public Task obtainNewReduceTask(TaskTrackerStatus tts,
-                                  int clusterSize) throws IOException {
+  public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
+                                               int clusterSize
+                                              ) throws IOException {
     if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobId());
       return null;

+ 10 - 13
src/java/org/apache/hadoop/mapred/JobStatus.java

@@ -30,8 +30,6 @@ import org.apache.hadoop.io.WritableFactory;
  * Describes the current status of a job.  This is
  * not intended to be a comprehensive piece of data.
  * For that, look at JobProfile.
- *
- * @author Mike Cafarella
  **************************************************/
 public class JobStatus implements Writable {
 
@@ -83,39 +81,38 @@ public class JobStatus implements Writable {
   /**
    * @return Percentage of progress in maps 
    */
-  public float mapProgress() { return mapProgress; }
+  public synchronized float mapProgress() { return mapProgress; }
     
   /**
    * Sets the map progress of this job
    * @param p The value of map progress to set to
    */
-  void setMapProgress(float p) { 
+  synchronized void setMapProgress(float p) { 
     this.mapProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
-    
   }
     
   /**
    * @return Percentage of progress in reduce 
    */
-  public float reduceProgress() { return reduceProgress; }
+  public synchronized float reduceProgress() { return reduceProgress; }
     
   /**
    * Sets the reduce progress of this Job
    * @param p The value of reduce progress to set to
    */
-  void setReduceProgress(float p) { 
+  synchronized void setReduceProgress(float p) { 
     this.reduceProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
   }
     
   /**
    * @return running state of the job
    */
-  public int getRunState() { return runState; }
+  public synchronized int getRunState() { return runState; }
     
   /**
    * Change the current run state of the job.
    */
-  public void setRunState(int state) {
+  public synchronized void setRunState(int state) {
     this.runState = state;
   }
 
@@ -123,22 +120,22 @@ public class JobStatus implements Writable {
    * Set the start time of the job
    * @param startTime The startTime of the job
    */
-  void setStartTime(long startTime) { this.startTime = startTime;};
+  synchronized void setStartTime(long startTime) { this.startTime = startTime;}
     
   /**
    * @return start time of the job
    */
-  public long getStartTime() { return startTime;};
+  synchronized public long getStartTime() { return startTime;}
 
   /**
    * @param user The username of the job
    */
-  void setUsername(String userName) { this.user = userName;};
+  synchronized void setUsername(String userName) { this.user = userName;}
 
   /**
    * @return the username of the job
    */
-  public String getUsername() { return this.user;};
+  public synchronized String getUsername() { return this.user;}
     
   ///////////////////////////////////////
   // Writable

+ 1 - 1
src/java/org/apache/hadoop/mapred/lib/aggregate/ValueHistogram.java

@@ -51,7 +51,7 @@ public class ValueHistogram implements ValueAggregator {
     String valStr = valCountStr;
     String countStr = "1";
     if (pos >= 0) {
-      valCountStr.substring(0, pos);
+      valStr = valCountStr.substring(0, pos);
       countStr = valCountStr.substring(pos + 1);
     }
     

+ 2 - 0
src/test/org/apache/hadoop/mapred/TestAggregates.java

@@ -106,6 +106,8 @@ public class TestAggregates extends TestCase {
     boolean success = true;
     Path outPath = new Path(OUTPUT_DIR, "part-00000");
     String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
+    System.out.println("full out data:");
+    System.out.println(outdata.toString());
     outdata = outdata.substring(0, expectedOutput.toString().length());
 
     assertEquals(expectedOutput.toString(),outdata);