Explorar o código

commit 3169900051676baa501025625f18d6d0b260e94d
Author: Hemanth Yamijala <yhemanth@yahoo-inc.com>
Date: Tue Dec 15 18:52:33 2009 +0530

MAPREDUCE:1143 from https://issues.apache.org/jira/secure/attachment/12427898/MAPRED-1143-ydist-9.patch

+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1143. Fix running task counters to be updated correctly
+ when speculative attempts are running for a TIP.
+ (Rahul Kumar Singh via yhemanth)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077078 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley %!s(int64=14) %!d(string=hai) anos
pai
achega
e4038e248e

+ 29 - 11
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -838,7 +838,8 @@ class JobInProgress {
     boolean wasComplete = tip.isComplete();
     boolean wasPending = tip.isOnlyCommitPending();
     TaskAttemptID taskid = status.getTaskID();
-    
+    boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
+
     // If the TIP is already completed and the task reports as SUCCEEDED then 
     // mark the task as KILLED.
     // In case of task with no promotion the task tracker will mark the task 
@@ -934,7 +935,7 @@ class JobInProgress {
         
         // Tell the job to fail the relevant task
         failedTask(tip, taskid, status, taskTracker,
-                   wasRunning, wasComplete);
+                   wasRunning, wasComplete, wasAttemptRunning);
 
         // Did the task failure lead to tip failure?
         TaskCompletionEvent.Status taskCompletionStatus = 
@@ -2569,8 +2570,8 @@ class JobInProgress {
    */
   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
                           TaskStatus status, 
-                          TaskTracker taskTracker,
-                          boolean wasRunning, boolean wasComplete) {
+                          TaskTracker taskTracker, boolean wasRunning,
+                          boolean wasComplete, boolean wasAttemptRunning) {
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
@@ -2580,6 +2581,30 @@ class JobInProgress {
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
+    
+    if (wasAttemptRunning) {
+      // We are decrementing counters without looking for isRunning ,
+      // because we increment the counters when we obtain
+      // new map task attempt or reduce task attempt.We do not really check
+      // for tip being running.
+      // Whenever we obtain new task attempt following counters are incremented.
+      //      ++runningMapTasks;
+      //.........
+      //      metrics.launchMap(id);
+      // hence we are decrementing the same set.
+      if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
+        if (tip.isMapTask()) {
+          runningMapTasks -= 1;
+          metrics.failedMap(taskid);
+        } else {
+          runningReduceTasks -= 1;
+          metrics.failedReduce(taskid);
+        }
+      }
+      
+      // Metering
+      meterTaskAttempt(tip, status);
+    }
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
@@ -2588,8 +2613,6 @@ class JobInProgress {
       } else if (tip.isJobSetupTask()) {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
-        runningMapTasks -= 1;
-        metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
         if (!isComplete) {
@@ -2597,8 +2620,6 @@ class JobInProgress {
           failMap(tip);
         }
       } else {
-        runningReduceTasks -= 1;
-        metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete
         if (!isComplete) {
@@ -2606,9 +2627,6 @@ class JobInProgress {
           failReduce(tip);
         }
       }
-      
-      // Metering
-      meterTaskAttempt(tip, status);
     }
         
     // The case when the map was complete but the task tracker went down.

+ 9 - 0
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -303,6 +303,15 @@ class TaskInProgress {
   public boolean isRunning() {
     return !activeTasks.isEmpty();
   }
+
+  /**
+   * Is this attempt currently running ?
+   * @param  taskId task attempt id.
+   * @return true if attempt taskId is running
+   */
+  boolean isAttemptRunning(TaskAttemptID taskId) {
+    return activeTasks.containsKey(taskId);
+  }
     
   TaskAttemptID getSuccessfulTaskid() {
     return successfulTaskId;