Przeglądaj źródła

Merge -r 1165929:1165930 from trunk to branch-0.23 to fix MAPREDUCE-2800.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1165932 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 lat temu
rodzic
commit
e7c0ee77c6

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1201,6 +1201,9 @@ Release 0.23.0 - Unreleased
    MAPREDUCE-2687. Fix NodeManager to use the right version of
    LocalDirAllocator.getLocalPathToWrite. (mahadev & acmurthy) 
 
+   MAPREDUCE-2800. Set final progress for tasks to ensure all task information
+   is correctly logged to JobHistory. (Siddharth Seth via acmurthy)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1380,6 +1380,8 @@ public abstract class TaskAttemptImpl implements
       // for it
       taskAttempt.taskAttemptListener.unregister(
           taskAttempt.attemptId, taskAttempt.jvmID);
+      taskAttempt.reportedStatus.progress = 1.0f;
+      taskAttempt.updateProgressSplits();
       //send the cleanup event to containerLauncher
       taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
           taskAttempt.attemptId, 

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java

@@ -1736,6 +1736,7 @@ class MapTask extends Task {
           indexCacheList.get(0).writeToFile(
             mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
         }
+        sortPhase.complete();
         return;
       }
 
@@ -1776,6 +1777,7 @@ class MapTask extends Task {
         } finally {
           finalOut.close();
         }
+        sortPhase.complete();
         return;
       }
       {

+ 15 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
@@ -151,7 +152,7 @@ public class TestMRJobs {
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
     verifySleepJobCounters(job);
-    
+    verifyTaskProgress(job);
     
     // TODO later:  add explicit "isUber()" checks of some sort (extend
     // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
@@ -173,6 +174,18 @@ public class TestMRJobs {
         .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
             && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
   }
+  
+  protected void verifyTaskProgress(Job job) throws InterruptedException,
+      IOException {
+    for (TaskReport taskReport : job.getTaskReports(TaskType.MAP)) {
+      Assert.assertTrue(0.9999f < taskReport.getProgress()
+          && 1.0001f > taskReport.getProgress());
+    }
+    for (TaskReport taskReport : job.getTaskReports(TaskType.REDUCE)) {
+      Assert.assertTrue(0.9999f < taskReport.getProgress()
+          && 1.0001f > taskReport.getProgress());
+    }
+  }
 
   @Test
   public void testRandomWriter() throws IOException, InterruptedException,
@@ -198,6 +211,7 @@ public class TestMRJobs {
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    
     // Make sure there are three files in the output-dir
     
     RemoteIterator<FileStatus> iterator =